Skip to content

Commit

Permalink
chore: simplify master replication cancelation interface (#3439)
Browse files Browse the repository at this point in the history
* chore: simplify master replication cancelation interface

Before that CancelReplication did too many things, moreover,
we had StopReplication that did the same.

This PR moves CancelReplication under ReplicaInfo struct,
and reduces code duplication around this change.

Signed-off-by: Roman Gershman <[email protected]>

* Update src/server/dflycmd.cc

Co-authored-by: Shahar Mike <[email protected]>
Signed-off-by: Roman Gershman <[email protected]>

---------

Signed-off-by: Roman Gershman <[email protected]>
Signed-off-by: Roman Gershman <[email protected]>
Co-authored-by: Shahar Mike <[email protected]>
  • Loading branch information
romange and chakaz authored Aug 4, 2024
1 parent 55d39b6 commit 9eacedf
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 60 deletions.
85 changes: 37 additions & 48 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r

} // namespace

void DflyCmd::ReplicaInfo::Cancel() {
lock_guard lk = GetExclusiveLock();
if (replica_state == SyncState::CANCELLED) {
return;
}

LOG(INFO) << "Disconnecting from replica " << address << ":" << listening_port;

// Update state and cancel context.
replica_state = SyncState::CANCELLED;
cntx.Cancel();

// Wait for tasks to finish.
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
FlowInfo* flow = &flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}

flow->full_sync_fb.JoinIfNeeded();
});

// Wait for error handler to quit.
cntx.JoinErrorHandler();
}

DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
}

Expand Down Expand Up @@ -605,61 +631,24 @@ auto DflyCmd::GetReplicaInfoFromConnection(ConnectionContext* cntx)
}

void DflyCmd::OnClose(ConnectionContext* cntx) {
unsigned session_id = cntx->conn_state.replication_info.repl_session_id;
if (!session_id)
return;

auto replica_ptr = GetReplicaInfo(session_id);
if (!replica_ptr)
unsigned sync_id = cntx->conn_state.replication_info.repl_session_id;
if (!sync_id)
return;

// Because CancelReplication holds the per-replica mutex,
// aborting connection will block here until cancellation finishes.
// This allows keeping resources alive during the cleanup phase.
CancelReplication(session_id, replica_ptr);
StopReplication(sync_id);
}

void DflyCmd::StopReplication(uint32_t sync_id) {
auto replica_ptr = GetReplicaInfo(sync_id);
if (!replica_ptr)
return;

CancelReplication(sync_id, replica_ptr);
}

void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replica_ptr) {
{
lock_guard lk = replica_ptr->GetExclusiveLock();
if (replica_ptr->replica_state == SyncState::CANCELLED) {
return;
}

LOG(INFO) << "Disconnecting from replica " << replica_ptr->address << ":"
<< replica_ptr->listening_port;

// Update replica_ptr state and cancel context.
replica_ptr->replica_state = SyncState::CANCELLED;
replica_ptr->cntx.Cancel();

// Wait for tasks to finish.
shard_set->RunBlockingInParallel([replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}

flow->full_sync_fb.JoinIfNeeded();
});
}

// Remove ReplicaInfo from global map
{
lock_guard lk(mu_);
replica_infos_.erase(sync_id);
}
// Because CancelReplication holds the per-replica mutex,
// aborting connection will block here until cancellation finishes.
// This allows keeping resources alive during the cleanup phase.
replica_ptr->Cancel();

// Wait for error handler to quit.
replica_ptr->cntx.JoinErrorHandler();
lock_guard lk(mu_);
replica_infos_.erase(sync_id);
}

shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
Expand Down Expand Up @@ -810,8 +799,8 @@ void DflyCmd::Shutdown() {
pending = std::move(replica_infos_);
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
for (auto& [_, replica_ptr] : pending) {
replica_ptr->Cancel();
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class DflyCmd {
return std::shared_lock{shared_mu};
}

// Transition into cancelled state, run cleanup.
void Cancel();

SyncState replica_state; // always guarded by shared_mu
Context cntx;

Expand Down Expand Up @@ -157,9 +160,6 @@ class DflyCmd {
// Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

private:
// JOURNAL [START/STOP]
// Start or stop journaling.
Expand Down Expand Up @@ -208,9 +208,6 @@ class DflyCmd {
// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow, Context* cntx);

// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id);

Expand All @@ -223,6 +220,9 @@ class DflyCmd {
facade::RedisReplyBuilder* rb);

private:
// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Return a map between replication ID to lag. lag is defined as the maximum of difference
// between the master's LSN and the last acknowledged LSN in over all shards.
std::map<uint32_t, LSN> ReplicationLagsLocked() const;
Expand Down
7 changes: 1 addition & 6 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1186,12 +1186,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Bonus points because this allows to continue replication with ACL users who got
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
auto info_ptr = server_family_.GetReplicaInfoFromConnection(dfly_cntx);
if (info_ptr) {
unsigned session_id = dfly_cntx->conn_state.replication_info.repl_session_id;
DCHECK(session_id);
server_family_.GetDflyCmd()->CancelReplication(session_id, std::move(info_ptr));
}
server_family_.GetDflyCmd()->OnClose(dfly_cntx);
return;
}
dfly_cntx->SendError(std::move(*err));
Expand Down

0 comments on commit 9eacedf

Please sign in to comment.