Skip to content

Commit

Permalink
feat(server): dont use channel for replication / save df (#4041)
Browse files Browse the repository at this point in the history
* feat server: dont use channel for replication / save df

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Nov 5, 2024
1 parent 7df8c26 commit ae3faf5
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 175 deletions.
32 changes: 27 additions & 5 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,22 @@ GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
}

error_code RdbSnapshot::SaveBody() {
return saver_->SaveBody(&cntx_, &freq_map_);
return saver_->SaveBody(&cntx_);
}

error_code RdbSnapshot::WaitSnapshotInShard(EngineShard* shard) {
return saver_->WaitSnapshotInShard(shard);
}

size_t RdbSnapshot::GetSaveBuffersSize() {
CHECK(saver_);
return saver_->GetTotalBuffersSize();
}

void RdbSnapshot::FillFreqMap() {
saver_->FillFreqMap(&freq_map_);
}

RdbSaver::SnapshotStats RdbSnapshot::GetCurrentSnapshotProgress() const {
CHECK(saver_);
return saver_->GetCurrentSnapshotProgress();
Expand All @@ -147,7 +155,7 @@ error_code RdbSnapshot::Close() {
}

void RdbSnapshot::StartInShard(EngineShard* shard) {
saver_->StartSnapshotInShard(false, cntx_.GetCancellation(), shard);
saver_->StartSnapshotInShard(false, &cntx_, shard);
started_shards_.fetch_add(1, memory_order_relaxed);
}

Expand Down Expand Up @@ -176,7 +184,12 @@ std::optional<SaveInfo> SaveStagesController::InitResourcesAndStart() {
}

void SaveStagesController::WaitAllSnapshots() {
RunStage(&SaveStagesController::SaveCb);
if (use_dfs_format_) {
shard_set->RunBlockingInParallel([&](EngineShard* shard) { WaitSnapshotInShard(shard); });
SaveBody(shard_set->size());
} else {
SaveBody(0);
}
}

SaveInfo SaveStagesController::Finalize() {
Expand Down Expand Up @@ -395,13 +408,22 @@ GenericError SaveStagesController::BuildFullPath() {
return {};
}

void SaveStagesController::SaveCb(unsigned index) {
if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted())
void SaveStagesController::SaveBody(unsigned index) {
CHECK(!use_dfs_format_ || index == shard_set->size()); // used in rdb and df summary file
if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted()) {
shared_err_ = snapshot->SaveBody();
}
}

void SaveStagesController::WaitSnapshotInShard(EngineShard* shard) {
if (auto& snapshot = snapshots_[shard->shard_id()].first; snapshot && snapshot->HasStarted()) {
shared_err_ = snapshot->WaitSnapshotInShard(shard);
}
}

void SaveStagesController::CloseCb(unsigned index) {
if (auto& snapshot = snapshots_[index].first; snapshot) {
snapshot->FillFreqMap();
shared_err_ = snapshot->Close();

unique_lock lk{rdb_name_map_mu_};
Expand Down
6 changes: 5 additions & 1 deletion src/server/detail/save_stages_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RdbSnapshot {
void StartInShard(EngineShard* shard);

error_code SaveBody();
error_code WaitSnapshotInShard(EngineShard* shard);
void FillFreqMap();
error_code Close();
size_t GetSaveBuffersSize();

Expand Down Expand Up @@ -101,6 +103,8 @@ struct SaveStagesController : public SaveStagesInputs {

// Start saving a dfs file on shard
void SaveDfsSingle(EngineShard* shard);
void SaveSnashot(EngineShard* shard);
void WaitSnapshotInShard(EngineShard* shard);

// Save a single rdb file
void SaveRdb();
Expand All @@ -115,7 +119,7 @@ struct SaveStagesController : public SaveStagesInputs {
// Build full path: get dir, try creating dirs, get filename with placeholder
GenericError BuildFullPath();

void SaveCb(unsigned index);
void SaveBody(unsigned index);

void CloseCb(unsigned index);

Expand Down
40 changes: 13 additions & 27 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ void DflyCmd::ReplicaInfo::Cancel() {
flow->cleanup();
}
VLOG(2) << "After flow cleanup " << shard->shard_id();
flow->full_sync_fb.JoinIfNeeded();
flow->conn = nullptr;
});
// Wait for error handler to quit.
Expand Down Expand Up @@ -371,7 +370,7 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];

StopFullSyncInThread(flow, shard);
StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));
Expand Down Expand Up @@ -551,7 +550,6 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
}

OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(!flow->full_sync_fb.IsJoinable());
DCHECK(shard);
DCHECK(flow->conn);

Expand All @@ -569,7 +567,6 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// callbacks are blocked on trying to insert to channel.
flow->TryShutdownSocket();
flow->saver->CancelInShard(shard); // stops writing to journal stream to channel
flow->full_sync_fb.JoinIfNeeded(); // finishes poping data from channel
flow->saver.reset();
};

Expand All @@ -588,18 +585,24 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
if (flow->start_partial_sync_at.has_value())
saver->StartIncrementalSnapshotInShard(cntx, shard, *flow->start_partial_sync_at);
else
saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
saver->StartSnapshotInShard(true, cntx, shard);

flow->full_sync_fb = fb2::Fiber("full_sync", &DflyCmd::FullSyncFb, this, flow, cntx);
return OpStatus::OK;
}

void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(shard);
flow->saver->StopFullSyncInShard(shard);
error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
return;
}

// Wait for full sync to finish.
flow->full_sync_fb.JoinIfNeeded();
ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
}

// Reset cleanup and saver
flow->cleanup = []() {};
Expand All @@ -626,23 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
return OpStatus::OK;
}

void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
error_code ec;

if (ec = flow->saver->SaveBody(cntx, nullptr); ec) {
if (!flow->conn->socket()->IsOpen())
ec = make_error_code(errc::operation_canceled); // we cancelled the operation.
cntx->ReportError(ec);
return;
}

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
}
}

auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair<uint32_t, unsigned> {
util::fb2::LockGuard lk(mu_);
unsigned sync_id = next_sync_id_++;
Expand Down
6 changes: 1 addition & 5 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ struct FlowInfo {

facade::Connection* conn = nullptr;

util::fb2::Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver for full sync phase.
std::unique_ptr<JournalStreamer> streamer; // Streamer for stable sync phase
std::string eof_token;
Expand Down Expand Up @@ -210,14 +209,11 @@ class DflyCmd {
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Stop full sync in thread. Run state switch cleanup.
void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard);
void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow, Context* cntx);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_);

Expand Down
Loading

0 comments on commit ae3faf5

Please sign in to comment.