Skip to content

Commit

Permalink
chore: update command interface for cluster and search families (#4258)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Dec 5, 2024
1 parent 81079df commit c2f8349
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 113 deletions.
101 changes: 46 additions & 55 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ void ClusterFamily::Shutdown() {
});
}

std::optional<ClusterShardInfos> ClusterFamily::GetShardInfos(ConnectionContext* cntx) const {
if (IsClusterEmulated()) {
return {GetEmulatedShardInfo(cntx)};
}

if (tl_cluster_config != nullptr) {
return GetConfigForStats(cntx);
}
return nullopt;
}

ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
.master = {},
Expand Down Expand Up @@ -227,13 +238,11 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
} // namespace

void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterShardsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterShardsImpl(*shard_infos, builder);
}
return builder->SendError(kClusterNotConfigured);
}

namespace {
Expand Down Expand Up @@ -272,13 +281,11 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder
} // namespace

void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(GetConfigForStats(cntx), builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterSlotsImpl(*shard_infos, builder);
}
return builder->SendError(kClusterNotConfigured);
}

namespace {
Expand Down Expand Up @@ -328,13 +335,11 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
} // namespace

void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(GetConfigForStats(cntx), id_, builder);
} else {
return builder->SendError(kClusterNotConfigured);
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterNodesImpl(*shard_infos, id_, builder);
}
return builder->SendError(kClusterNotConfigured);
}

namespace {
Expand Down Expand Up @@ -392,13 +397,8 @@ void ClusterInfoImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder)
} // namespace

void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterInfoImpl(GetConfigForStats(cntx), builder);
} else {
return ClusterInfoImpl({}, builder);
}
auto shard_infos = GetShardInfos(cntx);
return ClusterInfoImpl(shard_infos.value_or(ClusterShardInfos{}), builder);
}

void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) {
Expand All @@ -410,11 +410,12 @@ void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendLong(id);
}

void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
void ClusterFamily::Cluster(CmdArgList args, const CommandContext& cmd_cntx) {
// In emulated cluster mode, all slots are mapped to the same host, and number of cluster
// instances is thus 1.
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));

auto* builder = cmd_cntx.rb;
if (!IsClusterEnabledOrEmulated()) {
return builder->SendError(kClusterDisabled);
}
Expand All @@ -432,34 +433,35 @@ void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, Connecti
} else if (sub_cmd == "MYID") {
return ClusterMyId(builder);
} else if (sub_cmd == "SHARDS") {
return ClusterShards(builder, cntx);
return ClusterShards(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "SLOTS") {
return ClusterSlots(builder, cntx);
return ClusterSlots(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "NODES") {
return ClusterNodes(builder, cntx);
return ClusterNodes(builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "INFO") {
return ClusterInfo(builder, cntx);
return ClusterInfo(builder, cmd_cntx.conn_cntx);
} else {
return builder->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
}
}

void ClusterFamily::ReadOnly(CmdArgList args, SinkReplyBuilder* builder) {
void ClusterFamily::ReadOnly(CmdArgList args, const CommandContext& cmd_cntx) {
if (!IsClusterEmulated()) {
return builder->SendError(kClusterDisabled);
return cmd_cntx.rb->SendError(kClusterDisabled);
}
builder->SendOk();
cmd_cntx.rb->SendOk();
}

void ClusterFamily::ReadWrite(CmdArgList args, SinkReplyBuilder* builder) {
void ClusterFamily::ReadWrite(CmdArgList args, const CommandContext& cmd_cntx) {
if (!IsClusterEmulated()) {
return builder->SendError(kClusterDisabled);
return cmd_cntx.rb->SendError(kClusterDisabled);
}
builder->SendOk();
cmd_cntx.rb->SendOk();
}

void ClusterFamily::DflyCluster(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ClusterFamily::DflyCluster(CmdArgList args, const CommandContext& cmd_cntx) {
auto* builder = cmd_cntx.rb;
auto* cntx = cmd_cntx.conn_cntx;
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) {
return builder->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
}
Expand Down Expand Up @@ -773,15 +775,15 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
}
}

void ClusterFamily::DflyMigrate(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
void ClusterFamily::DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));

args.remove_prefix(1);
auto* builder = cmd_cntx.rb;
if (sub_cmd == "INIT") {
InitMigration(args, builder);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, builder, cntx);
DflyMigrateFlow(args, builder, cmd_cntx.conn_cntx);
} else if (sub_cmd == "ACK") {
DflyMigrateAck(args, builder);
} else {
Expand Down Expand Up @@ -1034,21 +1036,10 @@ void ClusterFamily::PauseAllIncomingMigrations(bool pause) {
}
}

using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx);

using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder);

inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, builder, cntx);
};
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, const CommandContext& cmd_cntx);

inline CommandId::Handler2 HandlerFunc(ClusterFamily* se, EngineFunc2 f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder) {
return (se->*f)(args, builder);
};
inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, const CommandContext& cmd_cntx) { return (se->*f)(args, cmd_cntx); };
}

#define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x))
Expand Down
12 changes: 7 additions & 5 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ClusterFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;

// Cluster commands compatible with Redis
void Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Cluster(CmdArgList args, const CommandContext& cmd_cntx);
void ClusterHelp(SinkReplyBuilder* builder);
void ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx);
Expand All @@ -61,11 +61,11 @@ class ClusterFamily {

void KeySlot(CmdArgList args, SinkReplyBuilder* builder);

void ReadOnly(CmdArgList args, SinkReplyBuilder* builder);
void ReadWrite(CmdArgList args, SinkReplyBuilder* builder);
void ReadOnly(CmdArgList args, const CommandContext& cmd_cntx);
void ReadWrite(CmdArgList args, const CommandContext& cmd_cntx);

// Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyCluster(CmdArgList args, const CommandContext& cmd_cntx);
void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);

void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder)
Expand All @@ -77,7 +77,7 @@ class ClusterFamily {
ABSL_LOCKS_EXCLUDED(migration_mu_);

// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyMigrate(CmdArgList args, const CommandContext& cmd_cntx);

// DFLYMIGRATE INIT is internal command to create incoming migration object
void InitMigration(CmdArgList args, SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(migration_mu_);
Expand Down Expand Up @@ -122,6 +122,8 @@ class ClusterFamily {
ABSL_GUARDED_BY(migration_mu_);

private:
std::optional<ClusterShardInfos> GetShardInfos(ConnectionContext* cntx) const;

ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;

// Guards set configuration, so that we won't handle 2 in parallel.
Expand Down
Loading

0 comments on commit c2f8349

Please sign in to comment.