Skip to content

Commit

Permalink
fix(cluster): Wait for flow creation before cancelling it (#3159)
Browse files Browse the repository at this point in the history
* fix(cluster): Wait for flow creation before cancelling it

This subtle bug could appear if a migration fails at exactly the wrong
time:
1. Thread1 creates an outgoing flow and starts the migration
2. Thread1's flow fails for whatever reason
3. Thread1 attempts to cancel all flows
4. Thread2 did *not* yet create its flow
5. Thread1 skips non-existing Thread2 flow
6. Thread2 now creates the flow
7. Upon destruction of entire migration, Thread2's fiber has not been
   joined, crashing Dragonfly

The fix is quite simple though :)

Fixes #3139

* split creation and invocation
  • Loading branch information
chakaz authored Jun 13, 2024
1 parent c087191 commit 93b4193
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,17 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) {

void OutgoingMigration::Finish(bool is_error) {
const auto new_state = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
if (ChangeState(new_state)) {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
if (flow != nullptr) {
flow->Cancel();
}
}
});
if (!ChangeState(new_state)) {
return;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
CHECK(flow != nullptr);
flow->Cancel();
}
});
}

MigrationState OutgoingMigration::GetState() const {
Expand Down Expand Up @@ -182,6 +183,13 @@ void OutgoingMigration::SyncFb() {
server_family_->journal()->StartInThread();
slot_migrations_[shard->shard_id()] = std::make_unique<SliceSlotMigration>(
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
}
});

// Start migrations in a separate hop to make sure that Finish() is called only after all
// migrations are created (see #3139)
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
auto& migration = *slot_migrations_[shard->shard_id()];
migration.Sync(cf_->MyID(), shard->shard_id());
if (migration.GetError()) {
Expand Down

0 comments on commit 93b4193

Please sign in to comment.