From 3686f816bf48fc69acff1348e20ab4a29ec5c59b Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 5 Dec 2024 00:45:37 +0200 Subject: [PATCH] chore: update command interface for cluster and search families --- src/server/cluster/cluster_family.cc | 101 ++++++++++++--------------- src/server/cluster/cluster_family.h | 12 ++-- src/server/search/search_family.cc | 85 +++++++++++----------- src/server/search/search_family.h | 21 +++--- 4 files changed, 106 insertions(+), 113 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index dc1ef944e41a..db6d41bd5c03 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -124,6 +124,17 @@ void ClusterFamily::Shutdown() { }); } +std::optional ClusterFamily::GetShardInfos(ConnectionContext* cntx) const { + if (IsClusterEmulated()) { + return {GetEmulatedShardInfo(cntx)}; + } + + if (tl_cluster_config != nullptr) { + return GetConfigForStats(cntx); + } + return nullopt; +} + ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}), .master = {}, @@ -227,13 +238,11 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde } // namespace void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) { - if (IsClusterEmulated()) { - return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder); - } else if (tl_cluster_config != nullptr) { - return ClusterShardsImpl(GetConfigForStats(cntx), builder); - } else { - return builder->SendError(kClusterNotConfigured); + auto shard_infos = GetShardInfos(cntx); + if (shard_infos) { + return ClusterShardsImpl(*shard_infos, builder); } + return builder->SendError(kClusterNotConfigured); } namespace { @@ -272,13 +281,11 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder } // namespace void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx) { - if (IsClusterEmulated()) { - return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder); - } else if (tl_cluster_config != nullptr) { - return ClusterSlotsImpl(GetConfigForStats(cntx), builder); - } else { - return builder->SendError(kClusterNotConfigured); + auto shard_infos = GetShardInfos(cntx); + if (shard_infos) { + return ClusterSlotsImpl(*shard_infos, builder); } + return builder->SendError(kClusterNotConfigured); } namespace { @@ -328,13 +335,11 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, } // namespace void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx) { - if (IsClusterEmulated()) { - return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder); - } else if (tl_cluster_config != nullptr) { - return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder); - } else { - return builder->SendError(kClusterNotConfigured); + auto shard_infos = GetShardInfos(cntx); + if (shard_infos) { + return ClusterNodesImpl(*shard_infos, id_, builder); } + return builder->SendError(kClusterNotConfigured); } namespace { @@ -392,13 +397,8 @@ void ClusterInfoImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) } // namespace void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx) { - if (IsClusterEmulated()) { - return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder); - } else if (tl_cluster_config != nullptr) { - return ClusterInfoImpl(GetConfigForStats(cntx), builder); - } else { - return ClusterInfoImpl({}, builder); - } + auto shard_infos = GetShardInfos(cntx); + return ClusterInfoImpl(shard_infos.value_or(ClusterShardInfos{}), builder); } void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) { @@ -410,11 +410,12 @@ void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) { return builder->SendLong(id); } -void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { +void ClusterFamily::Cluster(CmdArgList args, const CommandContext& cmd_cntx) { // In emulated cluster mode, all slots are mapped to the same host, and number of cluster // instances is thus 1. string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); + auto* builder = cmd_cntx.rb; if (!IsClusterEnabledOrEmulated()) { return builder->SendError(kClusterDisabled); } @@ -432,34 +433,35 @@ void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, Connecti } else if (sub_cmd == "MYID") { return ClusterMyId(builder); } else if (sub_cmd == "SHARDS") { - return ClusterShards(builder, cntx); + return ClusterShards(builder, cmd_cntx.conn_cntx); } else if (sub_cmd == "SLOTS") { - return ClusterSlots(builder, cntx); + return ClusterSlots(builder, cmd_cntx.conn_cntx); } else if (sub_cmd == "NODES") { - return ClusterNodes(builder, cntx); + return ClusterNodes(builder, cmd_cntx.conn_cntx); } else if (sub_cmd == "INFO") { - return ClusterInfo(builder, cntx); + return ClusterInfo(builder, cmd_cntx.conn_cntx); } else { return builder->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType); } } -void ClusterFamily::ReadOnly(CmdArgList args, SinkReplyBuilder* builder) { +void ClusterFamily::ReadOnly(CmdArgList args, const CommandContext& cmd_cntx) { if (!IsClusterEmulated()) { - return builder->SendError(kClusterDisabled); + return cmd_cntx.rb->SendError(kClusterDisabled); } - builder->SendOk(); + cmd_cntx.rb->SendOk(); } -void ClusterFamily::ReadWrite(CmdArgList args, SinkReplyBuilder* builder) { +void ClusterFamily::ReadWrite(CmdArgList args, const CommandContext& cmd_cntx) { if (!IsClusterEmulated()) { - return builder->SendError(kClusterDisabled); + return cmd_cntx.rb->SendError(kClusterDisabled); } - builder->SendOk(); + cmd_cntx.rb->SendOk(); } -void ClusterFamily::DflyCluster(CmdArgList args, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ClusterFamily::DflyCluster(CmdArgList args, const CommandContext& cmd_cntx) { + auto* builder = cmd_cntx.rb; + auto* cntx = cmd_cntx.conn_cntx; if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) { return builder->SendError("Cluster is disabled. Use --cluster_mode=yes to enable."); } @@ -773,15 +775,15 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b } } -void ClusterFamily::DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, - ConnectionContext* cntx) { +void ClusterFamily::DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); args.remove_prefix(1); + auto* builder = cmd_cntx.rb; if (sub_cmd == "INIT") { InitMigration(args, builder); } else if (sub_cmd == "FLOW") { - DflyMigrateFlow(args, builder, cntx); + DflyMigrateFlow(args, builder, cmd_cntx.conn_cntx); } else if (sub_cmd == "ACK") { DflyMigrateAck(args, builder); } else { @@ -1034,21 +1036,10 @@ void ClusterFamily::PauseAllIncomingMigrations(bool pause) { } } -using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder, - ConnectionContext* cntx); - -using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder); - -inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { - return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) { - return (se->*f)(args, builder, cntx); - }; -} +using EngineFunc = void (ClusterFamily::*)(CmdArgList args, const CommandContext& cmd_cntx); -inline CommandId::Handler2 HandlerFunc(ClusterFamily* se, EngineFunc2 f) { - return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder) { - return (se->*f)(args, builder); - }; +inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) { + return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); }; } #define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x)) diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index fbd50f3c20e4..2591c7479d1f 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -51,7 +51,7 @@ class ClusterFamily { using SinkReplyBuilder = facade::SinkReplyBuilder; // Cluster commands compatible with Redis - void Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Cluster(CmdArgList args, const CommandContext& cmd_cntx); void ClusterHelp(SinkReplyBuilder* builder); void ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx); void ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx); @@ -61,11 +61,11 @@ class ClusterFamily { void KeySlot(CmdArgList args, SinkReplyBuilder* builder); - void ReadOnly(CmdArgList args, SinkReplyBuilder* builder); - void ReadWrite(CmdArgList args, SinkReplyBuilder* builder); + void ReadOnly(CmdArgList args, const CommandContext& cmd_cntx); + void ReadWrite(CmdArgList args, const CommandContext& cmd_cntx); // Custom Dragonfly commands for cluster management - void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); + void DflyCluster(CmdArgList args, const CommandContext& cmd_cntx); void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder) @@ -77,7 +77,7 @@ class ClusterFamily { ABSL_LOCKS_EXCLUDED(migration_mu_); // DFLYMIGRATE is internal command defines several steps in slots migrations process - void DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); + void DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx); // DFLYMIGRATE INIT is internal command to create incoming migration object void InitMigration(CmdArgList args, SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(migration_mu_); @@ -122,6 +122,8 @@ class ClusterFamily { ABSL_GUARDED_BY(migration_mu_); private: + std::optional GetShardInfos(ConnectionContext* cntx) const; + ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const; // Guards set configuration, so that we won't handle 2 in parallel. diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index f1151dc60a67..ad21e3d8573a 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -515,9 +515,9 @@ void ReplySorted(search::AggregationInfo agg, const SearchParams& params, } // namespace -void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx) { - if (cntx->conn_state.db_index != 0) { +void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) { + auto* builder = cmd_cntx.rb; + if (cmd_cntx.conn_cntx->conn_state.db_index != 0) { return builder->SendError("Cannot create index on db != 0"sv); } @@ -567,7 +567,7 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* // Check if index already exists atomic_uint exists_cnt = 0; - tx->Execute( + cmd_cntx.tx->Execute( [idx_name, &exists_cnt](auto* tx, auto* es) { if (es->search_indices()->GetIndex(idx_name) != nullptr) exists_cnt.fetch_add(1, std::memory_order_relaxed); @@ -578,12 +578,12 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* DCHECK(exists_cnt == 0u || exists_cnt == shard_set->size()); if (exists_cnt.load(memory_order_relaxed) > 0) { - tx->Conclude(); + cmd_cntx.tx->Conclude(); return builder->SendError("Index already exists"); } auto idx_ptr = make_shared(std::move(index)); - tx->Execute( + cmd_cntx.tx->Execute( [idx_name, idx_ptr](auto* tx, auto* es) { es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, idx_ptr); return OpStatus::OK; @@ -593,12 +593,12 @@ void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder->SendOk(); } -void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtAlter(CmdArgList args, const CommandContext& cmd_cntx) { CmdArgParser parser{args}; string_view idx_name = parser.Next(); parser.ExpectTag("SCHEMA"); parser.ExpectTag("ADD"); - + auto* builder = cmd_cntx.rb; if (auto err = parser.Error(); err) return builder->SendError(err->MakeReply()); @@ -612,17 +612,17 @@ void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* b index_info = make_shared(idx->GetInfo().base_index); return OpStatus::OK; }; - tx->Execute(idx_cb, false); + cmd_cntx.tx->Execute(idx_cb, false); if (!index_info) { - tx->Conclude(); + cmd_cntx.tx->Conclude(); return builder->SendError("Index not found"); } // Parse additional schema optional new_fields = ParseSchemaOrReply(index_info->type, parser, builder); if (!new_fields) { - tx->Conclude(); + cmd_cntx.tx->Conclude(); return; } @@ -641,17 +641,17 @@ void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* b es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, index_info); return OpStatus::OK; }; - tx->Execute(upd_cb, true); + cmd_cntx.tx->Execute(upd_cb, true); builder->SendOk(); } -void SearchFamily::FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx) { string_view idx_name = ArgS(args, 0); // TODO: Handle optional DD param atomic_uint num_deleted{0}; - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (es->search_indices()->DropIndex(idx_name)) num_deleted.fetch_add(1); return OpStatus::OK; @@ -659,17 +659,17 @@ void SearchFamily::FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilde DCHECK(num_deleted == 0u || num_deleted == shard_set->size()); if (num_deleted == 0u) - return builder->SendError("-Unknown Index name"); - return builder->SendOk(); + return cmd_cntx.rb->SendError("-Unknown Index name"); + return cmd_cntx.rb->SendOk(); } -void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtInfo(CmdArgList args, const CommandContext& cmd_cntx) { string_view idx_name = ArgS(args, 0); atomic_uint num_notfound{0}; vector infos(shard_set->size()); - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(idx_name); if (index == nullptr) num_notfound.fetch_add(1); @@ -679,9 +679,10 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu }); DCHECK(num_notfound == 0u || num_notfound == shard_set->size()); + auto* rb = static_cast(cmd_cntx.rb); if (num_notfound > 0u) - return builder->SendError("Unknown Index name"); + return rb->SendError("Unknown Index name"); DCHECK(infos.front().base_index.schema.fields.size() == infos.back().base_index.schema.fields.size()); @@ -693,7 +694,6 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu const auto& info = infos.front(); const auto& schema = info.base_index.schema; - auto* rb = static_cast(builder); rb->StartCollection(4, RedisReplyBuilder::MAP); rb->SendSimpleString("index_name"); @@ -731,25 +731,25 @@ void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu rb->SendLong(total_num_docs); } -void SearchFamily::FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtList(CmdArgList args, const CommandContext& cmd_cntx) { atomic_int first{0}; vector names; - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { // Using `first` to assign `names` only once without a race if (first.fetch_add(1) == 0) names = es->search_indices()->GetIndexNames(); return OpStatus::OK; }); - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); rb->SendBulkStrArr(names); } -void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) { CmdArgParser parser{args}; string_view index_name = parser.Next(); string_view query_str = parser.Next(); - + auto* builder = cmd_cntx.rb; auto params = ParseSearchParamsOrReply(&parser, builder); if (!params.has_value()) return; @@ -763,7 +763,7 @@ void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* atomic index_not_found{false}; vector docs(shard_set->size()); - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(index_name); index) docs[es->shard_id()] = index->Search(t->GetOpArgs(es), *params, &search_algo); else @@ -785,13 +785,14 @@ void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* ReplyWithResults(*params, absl::MakeSpan(docs), builder); } -void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtProfile(CmdArgList args, const CommandContext& cmd_cntx) { CmdArgParser parser{args}; string_view index_name = parser.Next(); + auto* rb = static_cast(cmd_cntx.rb); if (!parser.Check("SEARCH") && !parser.Check("AGGREGATE")) { - return builder->SendError("no `SEARCH` or `AGGREGATE` provided"); + return rb->SendError("no `SEARCH` or `AGGREGATE` provided"); } parser.Check("LIMITED"); // TODO: Implement limited profiling @@ -799,14 +800,14 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* string_view query_str = parser.Next(); - optional params = ParseSearchParamsOrReply(&parser, builder); + optional params = ParseSearchParamsOrReply(&parser, rb); if (!params.has_value()) return; search::SearchAlgorithm search_algo; search::SortOption* sort_opt = params->sort_option.has_value() ? &*params->sort_option : nullptr; if (!search_algo.Init(query_str, ¶ms->query_params, sort_opt)) - return builder->SendError("query syntax error"); + return rb->SendError("query syntax error"); search_algo.EnableProfiling(); @@ -818,7 +819,7 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* std::vector search_results(shards_count); std::vector profile_results(shards_count); - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(index_name); if (!index) { index_not_found.store(true, memory_order_relaxed); @@ -835,7 +836,7 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* }); if (index_not_found.load()) - return builder->SendError(std::string{index_name} + ": no such index"); + return rb->SendError(std::string{index_name} + ": no such index"); auto took = absl::Now() - start; @@ -851,7 +852,6 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* } } - auto* rb = static_cast(builder); // First element -> Result of the search command // Second element -> Profile information rb->StartArray(2); @@ -860,9 +860,9 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* if (!result_is_empty) { auto agg = search_algo.HasAggregation(); if (agg) { - ReplySorted(*agg, *params, absl::MakeSpan(search_results), builder); + ReplySorted(*agg, *params, absl::MakeSpan(search_results), rb); } else { - ReplyWithResults(*params, absl::MakeSpan(search_results), builder); + ReplyWithResults(*params, absl::MakeSpan(search_results), rb); } } else { rb->StartArray(1); @@ -918,14 +918,14 @@ void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* } } -void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtTagVals(CmdArgList args, const CommandContext& cmd_cntx) { string_view index_name = ArgS(args, 0); string_view field_name = ArgS(args, 1); VLOG(1) << "FtTagVals: " << index_name << " " << field_name; vector> shard_results(shard_set->size(), StringVec{}); - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(index_name); index) shard_results[es->shard_id()] = index->GetTagVals(field_name); else @@ -935,6 +935,7 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* }); absl::flat_hash_set result_set; + auto* rb = static_cast(cmd_cntx.rb); // Check first if either shard had errors. Also merge the results into a single set. for (auto& res : shard_results) { @@ -942,18 +943,18 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* result_set.insert(make_move_iterator(res->begin()), make_move_iterator(res->end())); } else { res.error().kind = facade::kSearchErrType; - return builder->SendError(res.error()); + return rb->SendError(res.error()); } } shard_results.clear(); vector vec(result_set.begin(), result_set.end()); - auto* rb = static_cast(builder); rb->SendBulkStrArr(vec, RedisReplyBuilder::SET); } -void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { +void SearchFamily::FtAggregate(CmdArgList args, const CommandContext& cmd_cntx) { + auto* builder = cmd_cntx.rb; const auto params = ParseAggregatorParamsOrReply(args, builder); if (!params) return; @@ -966,7 +967,7 @@ void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilde declval(), params.value(), &search_algo)); vector query_results(shard_set->size()); - tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + cmd_cntx.tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(params->index); index) { query_results[es->shard_id()] = index->SearchForAggregator(t->GetOpArgs(es), params.value(), &search_algo); @@ -985,7 +986,7 @@ void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilde return builder->SendError(agg_results.error()); size_t result_size = agg_results->size(); - auto* rb = static_cast(builder); + auto* rb = static_cast(cmd_cntx.rb); auto sortable_value_sender = SortableValueSender(rb); rb->StartArray(result_size + 1); diff --git a/src/server/search/search_family.h b/src/server/search/search_family.h index db5445eeb2b5..104c9a92076c 100644 --- a/src/server/search/search_family.h +++ b/src/server/search/search_family.h @@ -15,21 +15,20 @@ class SinkReplyBuilder; namespace dfly { class CommandRegistry; -class ConnectionContext; +struct CommandContext; class SearchFamily { using SinkReplyBuilder = facade::SinkReplyBuilder; - static void FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ConnectionContext* cntx); - static void FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); - static void FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtCreate(CmdArgList args, const CommandContext& cmd_cntx); + static void FtAlter(CmdArgList args, const CommandContext& cmd_cntx); + static void FtDropIndex(CmdArgList args, const CommandContext& cmd_cntx); + static void FtInfo(CmdArgList args, const CommandContext& cmd_cntx); + static void FtList(CmdArgList args, const CommandContext& cmd_cntx); + static void FtSearch(CmdArgList args, const CommandContext& cmd_cntx); + static void FtProfile(CmdArgList args, const CommandContext& cmd_cntx); + static void FtAggregate(CmdArgList args, const CommandContext& cmd_cntx); + static void FtTagVals(CmdArgList args, const CommandContext& cmd_cntx); public: static void Register(CommandRegistry* registry);