From 8d343bfd6990beaf9d5efbb5e0d55e805920c932 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 3 Dec 2024 16:12:54 +0200 Subject: [PATCH] fix: stream bugs (#4240) This PR syncs some of the improvements that were introduced in streams in Redis 7.2.3 OSS. 1. verify xsetid against max deleted id in the stream 2. Implement precise memory measurement of streams for "memory usage" command. 3. Always compact nodes in stream listpacks after creating new nodes. Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 91 +++++++++++++++++++++++++++++++- src/facade/op_status.h | 1 - src/facade/reply_builder_test.cc | 5 +- src/server/stream_family.cc | 59 ++++++++++++--------- src/server/stream_family_test.cc | 22 ++++++++ 5 files changed, 148 insertions(+), 30 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 349094770a0a..d6d4f43eaa4b 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -125,6 +125,95 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) { return 0; } +/* This is a helper function with the goal of estimating the memory + * size of a radix tree that is used to store Stream IDs. + * + * Note: to guess the size of the radix tree is not trivial, so we + * approximate it considering 16 bytes of data overhead for each + * key (the ID), and then adding the number of bare nodes, plus some + * overhead due by the data and child pointers. This secret recipe + * was obtained by checking the average radix tree created by real + * workloads, and then adjusting the constants to get numbers that + * more or less match the real memory usage. + * + * Actually the number of nodes and keys may be different depending + * on the insertion speed and thus the ability of the radix tree + * to compress prefixes. */ +size_t streamRadixTreeMemoryUsage(rax* rax) { + size_t size = sizeof(*rax); + size = rax->numele * sizeof(streamID); + size += rax->numnodes * sizeof(raxNode); + /* Add a fixed overhead due to the aux data pointer, children, ... */ + size += rax->numnodes * sizeof(long) * 30; + return size; +} + +size_t MallocUsedStream(stream* s) { + size_t asize = sizeof(*s); + asize += streamRadixTreeMemoryUsage(s->rax_tree); + + /* Now we have to add the listpacks. The last listpack is often non + * complete, so we estimate the size of the first N listpacks, and + * use the average to compute the size of the first N-1 listpacks, and + * finally add the real size of the last node. */ + raxIterator ri; + raxStart(&ri, s->rax_tree); + raxSeek(&ri, "^", NULL, 0); + size_t lpsize = 0, samples = 0; + while (raxNext(&ri)) { + uint8_t* lp = (uint8_t*)ri.data; + /* Use the allocated size, since we overprovision the node initially. */ + lpsize += zmalloc_size(lp); + samples++; + } + if (s->rax_tree->numele <= samples) { + asize += lpsize; + } else { + if (samples) + lpsize /= samples; /* Compute the average. */ + asize += lpsize * (s->rax_tree->numele - 1); + /* No need to check if seek succeeded, we enter this branch only + * if there are a few elements in the radix tree. */ + raxSeek(&ri, "$", NULL, 0); + raxNext(&ri); + /* Use the allocated size, since we overprovision the node initially. */ + asize += zmalloc_size(ri.data); + } + raxStop(&ri); + + /* Consumer groups also have a non trivial memory overhead if there + * are many consumers and many groups, let's count at least the + * overhead of the pending entries in the groups and consumers + * PELs. */ + if (s->cgroups) { + raxStart(&ri, s->cgroups); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + streamCG* cg = (streamCG*)ri.data; + asize += sizeof(*cg); + asize += streamRadixTreeMemoryUsage(cg->pel); + asize += sizeof(streamNACK) * raxSize(cg->pel); + + /* For each consumer we also need to add the basic data + * structures and the PEL memory usage. */ + raxIterator cri; + raxStart(&cri, cg->consumers); + raxSeek(&cri, "^", NULL, 0); + while (raxNext(&cri)) { + const streamConsumer* consumer = (const streamConsumer*)cri.data; + asize += sizeof(*consumer); + asize += sdslen(consumer->name); + asize += streamRadixTreeMemoryUsage(consumer->pel); + /* Don't count NACKs again, they are shared with the + * consumer group PEL. */ + } + raxStop(&cri); + } + raxStop(&ri); + } + return asize; +} + inline void FreeObjHash(unsigned encoding, void* ptr) { switch (encoding) { case kEncodingStrMap2: @@ -311,7 +400,7 @@ size_t RobjWrapper::MallocUsed(bool slow) const { case OBJ_ZSET: return MallocUsedZSet(encoding_, inner_obj_); case OBJ_STREAM: - return sz_; + return slow ? MallocUsedStream((stream*)inner_obj_) : sz_; default: LOG(FATAL) << "Not supported " << type_; diff --git a/src/facade/op_status.h b/src/facade/op_status.h index 9749b9d5fd43..892ee255a123 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -26,7 +26,6 @@ enum class OpStatus : uint16_t { SYNTAX_ERR, BUSY_GROUP, STREAM_ID_SMALL, - ENTRIES_ADDED_SMALL, INVALID_NUMERIC_RESULT, CANCELLED, AT_LEAST_ONE_KEY, diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index dda3f5992283..028d594eef9f 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -300,9 +300,8 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) { TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) { // All these op codes creating the same error message - OpStatus none_unique_codes[] = {OpStatus::ENTRIES_ADDED_SMALL, OpStatus::SKIPPED, - OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE, - OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL}; + OpStatus none_unique_codes[] = {OpStatus::SKIPPED, OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE, + OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL}; uint64_t error_count = 0; for (const auto& err : none_unique_codes) { const std::string_view error_name = StatusToMsg(err); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 04608d4fa902..f4c790d1f659 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -184,6 +184,11 @@ string NoGroupOrKey(string_view key, string_view cgroup, string_view suffix = "" suffix); } +string LeqTopIdError(string_view cmd_name) { + return absl::StrCat("The ID specified in ", cmd_name, + " is equal or smaller than the target stream top item"); +} + inline const uint8_t* SafePtr(MutableSlice field) { return field.empty() ? reinterpret_cast("") : reinterpret_cast(field.data()); @@ -424,23 +429,28 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad * if we need to switch to the next one. 'lp' will be set to NULL if * the current node is full. */ if (lp != NULL) { + int new_node = 0; size_t node_max_bytes = kStreamNodeMaxBytes; if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE) node_max_bytes = STREAM_LISTPACK_MAX_SIZE; if (lp_bytes + totelelen >= node_max_bytes) { - lp = NULL; + new_node = 1; } else if (kStreamNodeMaxEntries) { unsigned char* lp_ele = lpFirst(lp); /* Count both live entries and deleted ones. */ int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp, lp_ele)); if (count >= kStreamNodeMaxEntries) { - /* Shrink extra pre-allocated memory */ - lp = lpShrinkToFit(lp); - if (ri.data != lp) - raxInsert(s->rax_tree, ri.key, ri.key_len, lp, NULL); - lp = NULL; + new_node = 1; } } + + if (new_node) { + /* Shrink extra pre-allocated memory */ + lp = lpShrinkToFit(lp); + if (ri.data != lp) + raxInsert(s->rax_tree, ri.key, ri.key_len, lp, NULL); + lp = NULL; + } } int flags = 0; @@ -1403,7 +1413,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri return OpStatus::OK; } -OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { +ErrorReply OpXSetId(const OpArgs& op_args, string_view key, const streamID& sid) { auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) @@ -1415,13 +1425,18 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { stream* stream_inst = (stream*)cobj.RObjPtr(); long long entries_added = -1; streamID max_xdel_id{0, 0}; + streamID id = sid; + + if (streamCompareID(&id, &stream_inst->max_deleted_entry_id) < 0) { + return ErrorReply{"The ID specified in XSETID is smaller than current max_deleted_entry_id", + "stream_smaller_deleted"}; + } /* If the stream has at least one item, we want to check that the user * is setting a last ID that is equal or greater than the current top * item, otherwise the fundamental ID monotonicity assumption is violated. */ if (stream_inst->length > 0) { streamID maxid; - streamID id = sid; streamLastValidID(stream_inst, &maxid); if (streamCompareID(&id, &maxid) < 0) { @@ -1430,7 +1445,9 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { /* If an entries_added was provided, it can't be lower than the length. */ if (entries_added != -1 && stream_inst->length > uint64_t(entries_added)) { - return OpStatus::ENTRIES_ADDED_SMALL; + return ErrorReply{ + "The entries_added specified in XSETID is smaller than the target stream length", + "stream_added_small"}; } } @@ -2557,9 +2574,7 @@ void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) { } if (add_result.status() == OpStatus::STREAM_ID_SMALL) { - return cmd_cntx.rb->SendError( - "The ID specified in XADD is equal or smaller than " - "the target stream top item"); + return cmd_cntx.rb->SendError(LeqTopIdError("XADD")); } return cmd_cntx.rb->SendError(add_result.status()); @@ -3133,23 +3148,17 @@ void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) { return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); } + facade::ErrorReply reply(OpStatus::OK); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val); + reply = OpXSetId(t->GetOpArgs(shard), key, parsed_id.val); + return OpStatus::OK; }; - OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); - switch (result) { - case OpStatus::STREAM_ID_SMALL: - return cmd_cntx.rb->SendError( - "The ID specified in XSETID is smaller than the " - "target stream top item"); - case OpStatus::ENTRIES_ADDED_SMALL: - return cmd_cntx.rb->SendError( - "The entries_added specified in XSETID is smaller than " - "the target stream length"); - default: - return cmd_cntx.rb->SendError(result); + cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); + if (reply.status == OpStatus::STREAM_ID_SMALL) { + return cmd_cntx.rb->SendError(LeqTopIdError("XSETID")); } + return cmd_cntx.rb->SendError(reply); } void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) { diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 3bd43ea876dc..d91e947b7c0e 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -1168,4 +1168,26 @@ TEST_F(StreamFamilyTest, XAddMaxSeq) { EXPECT_THAT(resp, ErrArg("The ID specified in XADD is equal or smaller")); } +TEST_F(StreamFamilyTest, XsetIdSmallerMaxDeleted) { + Run({"XADD", "x", "1-1", "a", "1"}); + Run({"XADD", "x", "1-2", "b", "2"}); + Run({"XADD", "x", "1-3", "c", "3"}); + Run({"XDEL", "x", "1-2"}); + Run({"XDEL", "x", "1-3"}); + auto resp = Run({"XINFO", "stream", "x"}); + ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); + auto vec = resp.GetVec(); + string max_del_id; + for (unsigned i = 0; i < vec.size(); i += 2) { + if (vec[i] == "max-deleted-entry-id") { + max_del_id = vec[i + 1].GetString(); + break; + } + } + EXPECT_EQ(max_del_id, "1-3"); + + resp = Run({"XSETID", "x", "1-2"}); + ASSERT_THAT(resp, ErrArg("smaller")); +} + } // namespace dfly