Skip to content

Commit

Permalink
fix(pytest): timed ticker for simpler conditions (#3242)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Jun 29, 2024
1 parent b34d3ba commit 4cc9834
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
7 changes: 4 additions & 3 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dataclasses import dataclass
from aiohttp import ClientSession

from .utility import tick_timer
from . import dfly_args
from .instance import DflyInstance, DflyInstanceFactory

Expand Down Expand Up @@ -309,9 +310,9 @@ async def resub(s: "aioredis.PubSub", sub: bool, chan: str):
await asyncio.gather(*(resub(s, False, "chan1") for s in subs1))

# Make sure numsub drops to 0
with async_timeout.timeout(1):
while (await async_client.pubsub_numsub("chan1"))[0][1] > 0:
await asyncio.sleep(0.05)
async for numsub, breaker in tick_timer(lambda: async_client.pubsub_numsub("chan1")):
with breaker:
assert numsub[0][1] == 0

# Check empty numsub
assert await async_client.pubsub_numsub() == []
Expand Down
24 changes: 7 additions & 17 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2114,13 +2114,6 @@ async def save_replica():
await disconnect_clients(c_master, *[c_replica])


async def is_replicaiton_conn_down(conn):
role = await conn.execute_command("INFO REPLICATION")
# fancy of way of extracting the field master_link_status
is_down = role.split("\r\n")[4].split(":")[1]
return is_down == "down"


@pytest.mark.asyncio
async def test_user_acl_replication(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
Expand All @@ -2142,11 +2135,9 @@ async def test_user_acl_replication(df_local_factory):

# revoke acl's from tmp
await c_master.execute_command("ACL SETUSER tmp -replconf")
async with async_timeout.timeout(5):
while True:
if await is_replicaiton_conn_down(c_replica):
break
await asyncio.sleep(1)
async for info, breaker in info_tick_timer(c_replica, section="REPLICATION"):
with breaker:
assert info["master_link_status"] == "down"

await c_master.execute_command("SET bar foo")

Expand Down Expand Up @@ -2180,13 +2171,12 @@ async def test_replica_reconnect(df_local_factory, break_conn):
await c_master.execute_command("set k 12345")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)

assert not await is_replicaiton_conn_down(c_replica)
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "up"

# kill existing master, create master with different repl_id but same port
master_port = master.port
master.stop()
assert await is_replicaiton_conn_down(c_replica)
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "down"

master = df_local_factory.create(proactor_threads=1, port=master_port)
df_local_factory.start_all([master])
Expand All @@ -2198,14 +2188,14 @@ async def test_replica_reconnect(df_local_factory, break_conn):
assert await c_replica.execute_command("get k") == "12345"
assert await c_master.execute_command("set k 6789")
assert await c_replica.execute_command("get k") == "12345"
assert await is_replicaiton_conn_down(c_replica)
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "down"
else:
assert await c_master.execute_command("get k") == None
assert await c_replica.execute_command("get k") == None
assert await c_master.execute_command("set k 6789")
await check_all_replicas_finished([c_replica], c_master)
assert await c_replica.execute_command("get k") == "6789"
assert not await is_replicaiton_conn_down(c_replica)
assert (await c_replica.info("REPLICATION"))["master_link_status"] == "up"

# Force re-replication, assert that it worked
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
Expand Down
10 changes: 4 additions & 6 deletions tests/dragonfly/tiering_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from . import dfly_args
from .seeder import StaticSeeder
from .utility import info_tick_timer


BASIC_ARGS = {"port": 6379, "proactor_threads": 4, "tiered_prefix": "/tmp/tiering_test_backing"}
Expand All @@ -26,12 +27,9 @@ async def test_basic_memory_usage(async_client: aioredis.Redis):
await seeder.run(async_client)

# Wait for tiering stashes
with async_timeout.timeout(5):
while True:
info = await async_client.info("ALL")
if info["tiered_entries"] > 195_000:
break
await asyncio.sleep(0.2)
async for info, breaker in info_tick_timer(async_client, section="TIERED"):
with breaker:
assert info["tiered_entries"] > 195_000

info = await async_client.info("ALL")
assert info["num_entries"] == 200_000
Expand Down
45 changes: 45 additions & 0 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,51 @@ def batch_fill_data(client, gen, batch_size=100):
client.mset({k: v for k, v, in group})


async def tick_timer(func, timeout=5, step=0.1):
"""
Async generator with automatic break when all asserts pass
for object, breaker in tick_timer():
with breaker:
assert conditions on object
If the generator times out, the last failed assert is raised
"""

class ticker_breaker:
def __init__(self):
self.exc = None
self.entered = False

def __enter__(self):
self.entered = True

def __exit__(self, exc_type, exc_value, trace):
if exc_value:
self.exc = exc_value
return True

last_error = None
start = time.time()
while time.time() - start < timeout:
breaker = ticker_breaker()
yield (await func(), breaker)
if breaker.entered and not breaker.exc:
return

last_error = breaker.exc
await asyncio.sleep(step)

if last_error:
raise RuntimeError("Timed out!") from last_error
raise RuntimeError("Timed out!")


async def info_tick_timer(client: aioredis.Redis, section=None, **kwargs):
async for x in tick_timer(lambda: client.info(section), **kwargs):
yield x


async def wait_available_async(client: aioredis.Redis, timeout=10):
"""Block until instance exits loading phase"""
its = 0
Expand Down

0 comments on commit 4cc9834

Please sign in to comment.