Skip to content

Commit

Permalink
refactor: clean cluster slot migration code (#2848)
Browse files Browse the repository at this point in the history
* refactor: clean cluster slot migration code
  • Loading branch information
BorysTheDev authored Apr 5, 2024
1 parent 5fcd64a commit b994f93
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 218 deletions.
4 changes: 2 additions & 2 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc
cluster/cluster_family.cc cluster/incoming_slot_migration.cc
cluster/outgoing_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)

Expand Down
1 change: 0 additions & 1 deletion src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include <array>
#include <memory>
#include <string_view>
#include <vector>
Expand Down
14 changes: 7 additions & 7 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflyClusterSlotMigrationStatus(args, cntx);
return DflyIncomingSlotMigrationStatus(args, cntx);
}

return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
Expand Down Expand Up @@ -626,7 +626,7 @@ static std::string_view state_to_str(MigrationState state) {
return "UNDEFINED_STATE"sv;
}

void ClusterFamily::DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser(args);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());

Expand Down Expand Up @@ -681,22 +681,22 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
}
}

ClusterSlotMigration* ClusterFamily::CreateIncomingMigration(std::string source_id,
SlotRanges slots,
uint32_t shards_num) {
IncomingSlotMigration* ClusterFamily::CreateIncomingMigration(std::string source_id,
SlotRanges slots,
uint32_t shards_num) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
if (mj->GetSourceID() == source_id) {
return nullptr;
}
}
return incoming_migrations_jobs_
.emplace_back(make_shared<ClusterSlotMigration>(
.emplace_back(make_shared<IncomingSlotMigration>(
std::move(source_id), &server_family_->service(), std::move(slots), shards_num))
.get();
}

std::shared_ptr<ClusterSlotMigration> ClusterFamily::GetIncomingMigration(
std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
std::string_view source_id) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
Expand Down
14 changes: 7 additions & 7 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#include "facade/conn_context.h"
#include "server/cluster/cluster_config.h"
#include "server/cluster/cluster_slot_migration.h"
#include "server/cluster/incoming_slot_migration.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/common.h"

Expand Down Expand Up @@ -56,7 +56,7 @@ class ClusterFamily {
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);

private: // Slots migration section
void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);
void DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx);

// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -72,11 +72,11 @@ class ClusterFamily {

void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);

// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* CreateIncomingMigration(std::string source_id, SlotRanges slots,
uint32_t shards_num);
// create a IncomingSlotMigration entity which will execute migration
IncomingSlotMigration* CreateIncomingMigration(std::string source_id, SlotRanges slots,
uint32_t shards_num);

std::shared_ptr<ClusterSlotMigration> GetIncomingMigration(std::string_view source_id);
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id);

bool StartSlotMigrations(std::vector<MigrationInfo> migrations, ConnectionContext* cntx);
void RemoveOutgoingMigrations(const std::vector<MigrationInfo>& migrations);
Expand All @@ -87,7 +87,7 @@ class ClusterFamily {

mutable util::fb2::Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress.
std::vector<std::shared_ptr<ClusterSlotMigration>> incoming_migrations_jobs_
std::vector<std::shared_ptr<IncomingSlotMigration>> incoming_migrations_jobs_
ABSL_GUARDED_BY(migration_mu_);

// holds all outgoing slots migrations that are currently in progress
Expand Down
75 changes: 0 additions & 75 deletions src/server/cluster/cluster_shard_migration.cc

This file was deleted.

36 changes: 0 additions & 36 deletions src/server/cluster/cluster_shard_migration.h

This file was deleted.

74 changes: 0 additions & 74 deletions src/server/cluster/cluster_slot_migration.cc

This file was deleted.

103 changes: 103 additions & 0 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/cluster/incoming_slot_migration.h"

#include "base/logging.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/tx_executor.h"
#include "server/main_service.h"

namespace dfly {

using namespace std;
using namespace util;
using namespace facade;
using absl::GetFlag;

// ClusterShardMigration manage data receiving in slots migration process.
// It is created per shard on the target node to initiate FLOW step.
class ClusterShardMigration {
public:
ClusterShardMigration(uint32_t shard_id, Service* service) : source_shard_id_(shard_id) {
executor_ = std::make_unique<JournalExecutor>(service);
}

void Start(Context* cntx, io::Source* source) {
JournalReader reader{source, 0};
TransactionReader tx_reader{false};

while (!cntx->IsCancelled()) {
if (cntx->IsCancelled())
break;

auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) {
VLOG(1) << "No tx data";
break;
}

if (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Flow " << source_shard_id_ << " is finalized";
break;
} else if (tx_data->opcode == journal::Op::PING) {
// TODO check about ping logic
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
}
}

private:
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
} else {
// TODO check which global commands should be supported
CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0])
<< "in cluster migration process.";
}
}

private:
uint32_t source_shard_id_;
std::unique_ptr<JournalExecutor> executor_;
};

IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
uint32_t shards_num)
: source_id_(std::move(source_id)),
service_(*se),
slots_(std::move(slots)),
state_(MigrationState::C_CONNECTING),
bc_(shards_num) {
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_));
}
}

IncomingSlotMigration::~IncomingSlotMigration() {
sync_fb_.JoinIfNeeded();
}

void IncomingSlotMigration::Join() {
bc_->Wait();
state_ = MigrationState::C_FINISHED;
}

void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) {
VLOG(1) << "Start flow for shard: " << shard;

shard_flows_[shard]->Start(&cntx_, source);
bc_->Dec();
}

} // namespace dfly
Loading

0 comments on commit b994f93

Please sign in to comment.