Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reproduce list bug #4300

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,7 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
args.setdefault("log_dir", self.params.log_dir)

if version >= 1.21 and "serialization_max_chunk_size" not in args:
# Add 1 byte limit for big values
args.setdefault("serialization_max_chunk_size", 1)
args.setdefault("serialization_max_chunk_size", 16384)

for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
Expand Down
54 changes: 34 additions & 20 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,21 @@ async def wait_for_replicas_state(*clients, state="online", node_role="slave", t


@pytest.mark.parametrize(
"t_master, t_replicas, seeder_config, stream_target, big_value",
"t_master, t_replicas, seeder_config, stream_target",
[
# Quick general test that replication is working
(1, 3 * [1], dict(key_target=1_000), 500, False),
(4, [4, 4], dict(key_target=10_000), 1_000, False),
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, False, marks=M_OPT),
# Skewed tests with different thread ratio
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, False, marks=M_SLOW),
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, False, marks=M_SLOW),
# Test with big value size
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, False, marks=M_SLOW),
# Test with big value and big value serialization
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, True, marks=M_SLOW),
# Stress test
pytest.param(
8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, False, marks=M_STRESS
),
# # Quick general test that replication is working
# (1, 3 * [1], dict(key_target=1_000), 500),
# # A lot of huge values
(2, 2 * [1], dict(key_target=1_000, huge_value_percentage=2), 500),
# (4, [4, 4], dict(key_target=10_000), 1_000),
# pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT),
# # Skewed tests with different thread ratio
# pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW),
# pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW),
# # Everything is big because data size is 10k
# pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW),
# # Stress test
# pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
],
)
@pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})])
Expand All @@ -70,17 +68,13 @@ async def test_replication_all(
t_replicas,
seeder_config,
stream_target,
big_value,
mode,
):
args = {}
if mode:
args["cache_mode"] = "true"
args["maxmemory"] = str(t_master * 256) + "mb"

if big_value:
args["serialization_max_chunk_size"] = 4096

master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **args)
replicas = [
df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
Expand Down Expand Up @@ -132,6 +126,26 @@ async def check():
# Check data after stable state stream
await check()

info = await c_master.info()
preemptions = info["big_value_preemptions"]
key_target = seeder_config["key_target"]
# Rough estimate
estimated_preemptions = key_target * (0.01)
assert preemptions > estimated_preemptions

# Because data size could be 10k and for that case there will be almost a preemption
# per bucket.
if "data_size" not in seeder_config.keys():
total_buckets = info["num_buckets"]
# We care that we preempt less times than the total buckets such that we can be
# sure that we test both flows (with and without preemptions). Preemptions on 30%
# of buckets seems like a big number but that depends on a few parameters like
# the size of the hug value and the serialization max chunk size. For the test cases here,
# it's usually close to 10% but there are some that are close to 30.
total_buckets = info["num_buckets"]
logging.debug(f"Buckets {total_buckets}. Preemptions {preemptions}")
assert preemptions <= (total_buckets * 0.3)


async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
role = await c_replica.role()
Expand Down
10 changes: 5 additions & 5 deletions tests/dragonfly/seeder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ def __init__(
data_size=100,
collection_size=None,
types: typing.Optional[typing.List[str]] = None,
huge_value_percentage=1,
huge_value_size=1024,
# 1 huge entries per container/key as default
huge_value_csize=1,
huge_value_percentage=0,
huge_value_size=10000,
# 2 huge entries per container/key as default
huge_value_csize=2,
):
SeederBase.__init__(self, types)
self.key_target = key_target
Expand Down Expand Up @@ -216,6 +216,6 @@ async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey,

msg = f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}"
if huge_keys > 0:
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total extra modified huge entries {huge_entries}."
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total huge entries {huge_entries}."

logging.debug(msg)
24 changes: 13 additions & 11 deletions tests/dragonfly/seeder/script-genlib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,23 @@ end

function LG_funcs.add_list(key, keys)
local is_huge = keys[key]
--- TODO -- investigate why second case of replication_test_all fails
--- we somehow create a quicklist that is circular and we deadlock
redis.apcall('LPUSH', key, unpack(randstr_sequence(is_huge)))
end

function LG_funcs.mod_list(key, keys)
-- equally likely pops and pushes, we rely on the list size being large enough
-- to "highly likely" not get emptied out by consequitve pops
local is_huge = keys[key]
local action = math.random(1, 4)
if action == 1 then
redis.apcall('RPOP', key)
elseif action == 2 then
redis.apcall('LPOP', key)
elseif action == 3 then
redis.apcall('LPUSH', key, randstr(is_huge))
redis.apcall('LPUSH', key, randstr(false))
else
redis.apcall('RPUSH', key, randstr(is_huge))
redis.apcall('RPUSH', key, randstr(false))
end
end

Expand Down Expand Up @@ -101,7 +102,7 @@ function LG_funcs.mod_set(key, keys)
redis.apcall('SPOP', key)
else
local is_huge = keys[key]
redis.apcall('SADD', key, randstr(is_huge))
redis.apcall('SADD', key, randstr(false))
end
end

Expand All @@ -113,19 +114,21 @@ end
function LG_funcs.add_hash(key, keys)
local blobs
local is_huge = keys[key]
local limit = LG_funcs.csize
if is_huge then
blobs = dragonfly.randstr(LG_funcs.huge_value_size, LG_funcs.csize / 2)
limit = LG_funcs.huge_value_csize
blobs = dragonfly.randstr(LG_funcs.huge_value_size, limit)
huge_entries = huge_entries + 1
else
blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize / 2)
end

local htable = {}
for i = 1, LG_funcs.csize, 2 do
for i = 1, limit, 2 do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = math.random(0, 1000)
end
for i = 2, LG_funcs.csize, 2 do
for i = 2, limit, 2 do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = blobs[i // 2]
end
Expand All @@ -137,8 +140,7 @@ function LG_funcs.mod_hash(key, keys)
if idx % 2 == 1 then
redis.apcall('HINCRBY', key, tostring(idx), 1)
else
local is_huge = keys[key]
redis.apcall('HSET', key, tostring(idx), randstr(is_huge))
redis.apcall('HSET', key, tostring(idx), randstr(false))
end
end

Expand Down Expand Up @@ -166,8 +168,8 @@ end
function LG_funcs.mod_zset(key, keys)
local action = math.random(1, 4)
if action <= 2 then
local is_huge = keys[key]
redis.apcall('ZADD', key, math.random(0, LG_funcs.csize * 2), randstr(is_huge))
local size = LG_funcs.csize * 2
redis.apcall('ZADD', key, math.random(0, size), randstr(false))
elseif action == 3 then
redis.apcall('ZPOPMAX', key)
else
Expand Down
Loading