Skip to content

Commit

Permalink
feat: Break serialization of huge values into multiple chunks
Browse files Browse the repository at this point in the history
This actually yields between invocations, and tests that modifying huge
values while migrating slots to other nodes works.

Still TODO: assert on RSS size during migration (will probably need to
create bigger containers)

Fixes #4100
  • Loading branch information
chakaz committed Nov 25, 2024
1 parent 2c663f3 commit 7e01880
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 16 deletions.
6 changes: 4 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
Expand Down Expand Up @@ -318,7 +317,10 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(s); });
CmdSerializer serializer([&](std::string s) {
Write(s);
ThrottleIfNeeded();
});
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
68 changes: 54 additions & 14 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import pytest
import re
import json
Expand Down Expand Up @@ -1825,11 +1826,26 @@ async def node1size0():
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")


@pytest.mark.parametrize(
"huge_values_threshold",
[
pytest.param(10),
pytest.param(1_000),
pytest.param(1_000_000),
],
)
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
async def test_cluster_migration_huge_container(
df_factory: DflyInstanceFactory, huge_values_threshold: int
):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
serialization_max_chunk_size=huge_values_threshold,
)
for i in range(2)
]
df_factory.start_all(instances)

Expand All @@ -1840,16 +1856,25 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Generating huge containers")
seeder = StaticSeeder(
key_target=10,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=["LIST", "HASH", "SET", "ZSET", "STRING"],
)
await seeder.run(nodes[0].client)
source_data = await StaticSeeder.capture(nodes[0].client)
# Insert data to containers with a gaussian distribution: some will be small and other big
stop = False

async def insert_data(client):
nonlocal stop
for i in itertools.count(start=1):
if not stop:
return
key = math.ceil(random.gauss(mu=50, sigma=100))
tasks = []
tasks.append(client.rpush(f"l:{key}", i))
tasks.append(client.sadd(f"s:{key}", i))
tasks.append(client.hset(f"h:{key}", i, i))
tasks.append(client.zadd(f"z:{key}", i, i))
await asyncio.gather(*tasks)

insert_task = asyncio.create_task(insert_data(instances[0].cluster_client()))

await asyncio.sleep(2) # give it time to feed some data

nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
Expand All @@ -1860,8 +1885,23 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
logging.debug("Waiting for migration to finish")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

target_data = await StaticSeeder.capture(nodes[1].client)
assert source_data == target_data
stop = True
logging.debug("Waiting for task")
await insert_task
logging.debug("Done waiting for task")
await instances[0].cluster_client().close()

len = await nodes[1].client.dbsize()
logging.debug(f"Inserted {len} keys")

for i in range(-500, 500):
l = await nodes[1].client.lrange(f"l:{i}", 0, -1)
s = await nodes[1].client.smembers(f"s:{i}")
h = await nodes[1].client.hkeys(f"h:{i}")
z = await nodes[1].client.zrange(f"z:{i}", 0, -1)
assert set(l) == s
assert set(h) == s
assert set(z) == s


def parse_lag(replication_info: str):
Expand Down

0 comments on commit 7e01880

Please sign in to comment.