Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix test_network_disconnect_during_migration test #4224

Merged
merged 6 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

~SliceSlotMigration() {
streamer_.Cancel();
Cancel();
cntx_.JoinErrorHandler();
}

Expand Down Expand Up @@ -81,6 +81,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
cntx_.Cancel();
streamer_.Cancel();
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ bool IterateMap(const PrimeValue& pv, const IterateKVFunc& func) {
bool finished = true;

if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t k_intbuf[LP_INTBUF_SIZE], v_intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* fptr = lpFirst(lp);
while (fptr) {
string_view key = LpGetView(fptr, intbuf);
string_view key = LpGetView(fptr, k_intbuf);
fptr = lpNext(lp, fptr);
string_view val = LpGetView(fptr, intbuf);
string_view val = LpGetView(fptr, v_intbuf);
fptr = lpNext(lp, fptr);
if (!func(ContainerEntry{key.data(), key.size()}, ContainerEntry{val.data(), val.size()})) {
finished = false;
Expand Down
15 changes: 13 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
}

JournalStreamer::~JournalStreamer() {
DCHECK_EQ(in_flight_bytes_, 0u);
if (!cntx_->IsCancelled()) {
DCHECK_EQ(in_flight_bytes_, 0u);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we not trigger this before ? Or did we just deadlocked because WaitForInFlightToCOmplete() would never progress ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk why we didn't trigger this before, but indeed this dead locks

}
VLOG(1) << "~JournalStreamer";
}

Expand Down Expand Up @@ -79,7 +81,9 @@ void JournalStreamer::Cancel() {
VLOG(1) << "JournalStreamer::Cancel";
waker_.notifyAll();
journal_->UnregisterOnChange(journal_cb_id_);
WaitForInflightToComplete();
if (!cntx_->IsCancelled()) {
WaitForInflightToComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to write that maybe we should move cntx_->IsCancelled() within WaitForInFlightToComplete but then I realized it's only called in this place so not really needed I guess

}
}

size_t JournalStreamer::GetTotalBufferCapacities() const {
Expand Down Expand Up @@ -215,8 +219,15 @@ void RestoreStreamer::Run() {
return;

cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can traverse preempt if we dont have the big value serialization merged yet?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the callback

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which callback can preempt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way I think we can use snapshot_version_ instead of fiber_canceled_ because we always process them together

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which callback can preempt?

db_slice_->FlushChangeToEarlierCallbacks(0 /db_id always 0 for cluster/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also WriteBucket(it); can yield

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I think the if fiber_cancelled_ should be also after call to FlushChangeToEarlierCallbacks

return;

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

if (fiber_cancelled_) // Could have been cancelled in above call too
return;

WriteBucket(it);
});

Expand Down
2 changes: 2 additions & 0 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm

cmd->Fill(&tmp_keylist_);
auto args = absl::MakeSpan(tmp_keylist_);
if (args.empty())
return SquashResult::NOT_SQUASHED;

auto keys = DetermineKeys(cmd->Cid(), args);
if (!keys.ok())
Expand Down
5 changes: 3 additions & 2 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ async def test_migration_with_key_ttl(df_factory):


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_network_disconnect_during_migration(df_factory, df_seeder_factory):
async def test_network_disconnect_during_migration(df_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
Expand Down Expand Up @@ -1328,14 +1328,15 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory

await proxy.start()

await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 20)
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 60)
nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

assert (await StaticSeeder.capture(nodes[1].client)) == start_capture
await proxy.close()


@pytest.mark.parametrize(
Expand Down
Loading