Skip to content

Commit

Permalink
feat: Implement "true" pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
achimnol committed Feb 17, 2024
1 parent b8af802 commit 1cf13a4
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions examples/proof-of-concept.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
class Waiter:
future: asyncio.Future
parser: hiredis.Reader
reply_queue: list[Any] | None
expected_reply_count: int = 1


class RedisClientProtocol(QuicConnectionProtocol):
Expand All @@ -36,6 +38,7 @@ async def query(
waiter = Waiter(
future=self._loop.create_future(),
parser=hiredis.Reader(notEnoughData=Ellipsis),
reply_queue=None,
)
self._waiters[stream_id] = waiter
data = hiredis.pack_command(command) # type: ignore
Expand All @@ -51,17 +54,21 @@ async def pipeline(
) -> list[Any]:
stream_id = self._quic.get_next_available_stream_id()
replies = []
waiter = Waiter(
future=self._loop.create_future(),
parser=hiredis.Reader(notEnoughData=Ellipsis),
reply_queue=[],
expected_reply_count=len(commands),
)
assert waiter.reply_queue is not None
self._waiters[stream_id] = waiter
for command in commands:
waiter = Waiter(
future=self._loop.create_future(),
parser=hiredis.Reader(notEnoughData=Ellipsis),
)
self._waiters[stream_id] = waiter
data = hiredis.pack_command(command) # type: ignore
self._quic.send_stream_data(stream_id, data)
logger.info("request (stream_id=%d): %r", stream_id, command)
self.transmit()
reply = await waiter.future
self.transmit()
await waiter.future
for reply in waiter.reply_queue:
logger.info("reply (stream_id=%d): %r", stream_id, reply)
replies.append(reply)
return replies
Expand All @@ -85,16 +92,23 @@ def quic_event_received(self, event: QuicEvent) -> None:
)
return
waiter.parser.feed(event.data)
msg = waiter.parser.gets()
if msg is Ellipsis:
# wait for more data
logger.debug(
"Protocol: data-recv (stream_id=%d): waiting for more data",
event.stream_id,
)
return
logger.debug("Protocol: parsed-msg: %r", msg)
waiter.future.set_result(msg)
while waiter.parser.has_data():
msg = waiter.parser.gets()
if msg is Ellipsis:
# Need to receive more data
return
logger.debug("Protocol: parsed-msg: %r", msg)
if waiter.reply_queue is None:
waiter.future.set_result(msg)
else:
waiter.reply_queue.append(msg)
logger.debug(
"Protocol: enqueue-pipelined-reply (count=%d / expected=%d)",
len(waiter.reply_queue),
waiter.expected_reply_count,
)
if len(waiter.reply_queue) == waiter.expected_reply_count:
waiter.future.set_result(True)


async def main(
Expand Down

0 comments on commit 1cf13a4

Please sign in to comment.