-
Notifications
You must be signed in to change notification settings - Fork 968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: update command interface for cluster and search families #4258
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,6 +124,17 @@ void ClusterFamily::Shutdown() { | |
}); | ||
} | ||
|
||
std::optional<ClusterShardInfos> ClusterFamily::GetShardInfos(ConnectionContext* cntx) const { | ||
if (IsClusterEmulated()) { | ||
return {GetEmulatedShardInfo(cntx)}; | ||
} | ||
|
||
if (tl_cluster_config != nullptr) { | ||
return GetConfigForStats(cntx); | ||
} | ||
return nullopt; | ||
} | ||
|
||
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { | ||
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}), | ||
.master = {}, | ||
|
@@ -227,13 +238,11 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde | |
} // namespace | ||
|
||
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
if (IsClusterEmulated()) { | ||
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder); | ||
} else if (tl_cluster_config != nullptr) { | ||
return ClusterShardsImpl(GetConfigForStats(cntx), builder); | ||
} else { | ||
return builder->SendError(kClusterNotConfigured); | ||
auto shard_infos = GetShardInfos(cntx); | ||
if (shard_infos) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super nit, but I think it'd be slightly more elegant to first return on errors (i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i agree, i just automatically followed the previous flow. |
||
return ClusterShardsImpl(*shard_infos, builder); | ||
} | ||
return builder->SendError(kClusterNotConfigured); | ||
} | ||
|
||
namespace { | ||
|
@@ -272,13 +281,11 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder | |
} // namespace | ||
|
||
void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
if (IsClusterEmulated()) { | ||
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder); | ||
} else if (tl_cluster_config != nullptr) { | ||
return ClusterSlotsImpl(GetConfigForStats(cntx), builder); | ||
} else { | ||
return builder->SendError(kClusterNotConfigured); | ||
auto shard_infos = GetShardInfos(cntx); | ||
if (shard_infos) { | ||
return ClusterSlotsImpl(*shard_infos, builder); | ||
} | ||
return builder->SendError(kClusterNotConfigured); | ||
} | ||
|
||
namespace { | ||
|
@@ -328,13 +335,11 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, | |
} // namespace | ||
|
||
void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
if (IsClusterEmulated()) { | ||
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder); | ||
} else if (tl_cluster_config != nullptr) { | ||
return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder); | ||
} else { | ||
return builder->SendError(kClusterNotConfigured); | ||
auto shard_infos = GetShardInfos(cntx); | ||
if (shard_infos) { | ||
return ClusterNodesImpl(*shard_infos, id_, builder); | ||
} | ||
return builder->SendError(kClusterNotConfigured); | ||
} | ||
|
||
namespace { | ||
|
@@ -392,13 +397,8 @@ void ClusterInfoImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) | |
} // namespace | ||
|
||
void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
if (IsClusterEmulated()) { | ||
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder); | ||
} else if (tl_cluster_config != nullptr) { | ||
return ClusterInfoImpl(GetConfigForStats(cntx), builder); | ||
} else { | ||
return ClusterInfoImpl({}, builder); | ||
} | ||
auto shard_infos = GetShardInfos(cntx); | ||
return ClusterInfoImpl(shard_infos.value_or(ClusterShardInfos{}), builder); | ||
} | ||
|
||
void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) { | ||
|
@@ -410,11 +410,12 @@ void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) { | |
return builder->SendLong(id); | ||
} | ||
|
||
void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
void ClusterFamily::Cluster(CmdArgList args, const CommandContext& cmd_cntx) { | ||
// In emulated cluster mode, all slots are mapped to the same host, and number of cluster | ||
// instances is thus 1. | ||
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); | ||
|
||
auto* builder = cmd_cntx.rb; | ||
if (!IsClusterEnabledOrEmulated()) { | ||
return builder->SendError(kClusterDisabled); | ||
} | ||
|
@@ -432,34 +433,35 @@ void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, Connecti | |
} else if (sub_cmd == "MYID") { | ||
return ClusterMyId(builder); | ||
} else if (sub_cmd == "SHARDS") { | ||
return ClusterShards(builder, cntx); | ||
return ClusterShards(builder, cmd_cntx.conn_cntx); | ||
} else if (sub_cmd == "SLOTS") { | ||
return ClusterSlots(builder, cntx); | ||
return ClusterSlots(builder, cmd_cntx.conn_cntx); | ||
} else if (sub_cmd == "NODES") { | ||
return ClusterNodes(builder, cntx); | ||
return ClusterNodes(builder, cmd_cntx.conn_cntx); | ||
} else if (sub_cmd == "INFO") { | ||
return ClusterInfo(builder, cntx); | ||
return ClusterInfo(builder, cmd_cntx.conn_cntx); | ||
} else { | ||
return builder->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType); | ||
} | ||
} | ||
|
||
void ClusterFamily::ReadOnly(CmdArgList args, SinkReplyBuilder* builder) { | ||
void ClusterFamily::ReadOnly(CmdArgList args, const CommandContext& cmd_cntx) { | ||
if (!IsClusterEmulated()) { | ||
return builder->SendError(kClusterDisabled); | ||
return cmd_cntx.rb->SendError(kClusterDisabled); | ||
} | ||
builder->SendOk(); | ||
cmd_cntx.rb->SendOk(); | ||
} | ||
|
||
void ClusterFamily::ReadWrite(CmdArgList args, SinkReplyBuilder* builder) { | ||
void ClusterFamily::ReadWrite(CmdArgList args, const CommandContext& cmd_cntx) { | ||
if (!IsClusterEmulated()) { | ||
return builder->SendError(kClusterDisabled); | ||
return cmd_cntx.rb->SendError(kClusterDisabled); | ||
} | ||
builder->SendOk(); | ||
cmd_cntx.rb->SendOk(); | ||
} | ||
|
||
void ClusterFamily::DflyCluster(CmdArgList args, SinkReplyBuilder* builder, | ||
ConnectionContext* cntx) { | ||
void ClusterFamily::DflyCluster(CmdArgList args, const CommandContext& cmd_cntx) { | ||
auto* builder = cmd_cntx.rb; | ||
auto* cntx = cmd_cntx.conn_cntx; | ||
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) { | ||
return builder->SendError("Cluster is disabled. Use --cluster_mode=yes to enable."); | ||
} | ||
|
@@ -773,15 +775,15 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b | |
} | ||
} | ||
|
||
void ClusterFamily::DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, | ||
ConnectionContext* cntx) { | ||
void ClusterFamily::DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx) { | ||
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); | ||
|
||
args.remove_prefix(1); | ||
auto* builder = cmd_cntx.rb; | ||
if (sub_cmd == "INIT") { | ||
InitMigration(args, builder); | ||
} else if (sub_cmd == "FLOW") { | ||
DflyMigrateFlow(args, builder, cntx); | ||
DflyMigrateFlow(args, builder, cmd_cntx.conn_cntx); | ||
} else if (sub_cmd == "ACK") { | ||
DflyMigrateAck(args, builder); | ||
} else { | ||
|
@@ -1034,21 +1036,10 @@ void ClusterFamily::PauseAllIncomingMigrations(bool pause) { | |
} | ||
} | ||
|
||
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder, | ||
ConnectionContext* cntx); | ||
|
||
using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder); | ||
|
||
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { | ||
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
return (se->*f)(args, builder, cntx); | ||
}; | ||
} | ||
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, const CommandContext& cmd_cntx); | ||
|
||
inline CommandId::Handler2 HandlerFunc(ClusterFamily* se, EngineFunc2 f) { | ||
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder) { | ||
return (se->*f)(args, builder); | ||
}; | ||
inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) { | ||
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); }; | ||
} | ||
|
||
#define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! :)