Skip to content

Commit

Permalink
chore: fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Dec 10, 2024
1 parent f3593e3 commit f9d995c
Showing 1 changed file with 20 additions and 29 deletions.
49 changes: 20 additions & 29 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ struct NACKInfo {

struct ConsumerInfo {
string name;
uint64_t seen_time;
int64_t active_time;
mstime_t seen_time;
mstime_t active_time;
size_t pel_count;
vector<NACKInfo> pending;
size_t idle;
Expand Down Expand Up @@ -1185,6 +1185,18 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
return FindGroupResult{s, cg, std::move(res_it->post_updater), res_it->it};
}

// Try to get the consumer. If not found, create a new one.
streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) {
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(name);
streamConsumer* consumer = streamLookupConsumer(cg, cname);
if (consumer)
consumer->seen_time = now_ms;
else // TODO: notify xgroup-createconsumer event once we support stream events.
consumer = StreamCreateConsumer(cg, name, now_ms, SCC_DEFAULT);
return consumer;
}

constexpr uint8_t kClaimForce = 1 << 0;
constexpr uint8_t kClaimJustID = 1 << 1;
constexpr uint8_t kClaimLastID = 1 << 2;
Expand Down Expand Up @@ -1257,13 +1269,7 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO

StreamMemTracker tracker;

// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(cgr_res->cg, cname);
if (consumer == nullptr)
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, SCC_DEFAULT);
else
consumer->seen_time = now_ms;
streamConsumer* consumer = FindOrAddConsumer(opts.consumer, cgr_res->cg, now_ms);

for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf;
Expand Down Expand Up @@ -1576,13 +1582,8 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
uint64_t now_ms = op_args.db_cntx.time_now_ms;
int count = opts.count;

auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(group, cname);
if (consumer == nullptr) {
consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT);
// TODO: notify xgroup-createconsumer event once we support stream events.
}
consumer->seen_time = now_ms;
streamConsumer* consumer = FindOrAddConsumer(opts.consumer, group, now_ms);

while (attempts-- && count && raxNext(&ri)) {
streamNACK* nack = (streamNACK*)ri.data;

Expand Down Expand Up @@ -2334,12 +2335,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,

// Update consumer
if (sitem.group) {
auto cname = WrapSds(opts->consumer_name);
range_opts.consumer = streamLookupConsumer(sitem.group, cname);
if (!range_opts.consumer) {
range_opts.consumer = StreamCreateConsumer(
sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
range_opts.consumer =
FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs());
}

range_opts.noack = opts->noack;
Expand Down Expand Up @@ -3113,13 +3110,7 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
return facade::ErrorReply{
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};

sds cname = WrapSds(opts->consumer_name);
consumer = streamLookupConsumer(group, cname);
uint64_t now_ms = op_args.db_cntx.time_now_ms;
if (consumer)
consumer->seen_time = now_ms;
else
consumer = StreamCreateConsumer(group, opts->consumer_name, now_ms, SCC_DEFAULT);
consumer = FindOrAddConsumer(opts->consumer_name, group, op_args.db_cntx.time_now_ms);

requested_sitem.group = group;
requested_sitem.consumer = consumer;
Expand Down

0 comments on commit f9d995c

Please sign in to comment.