Skip to content

Commit

Permalink
Add example of reopening channels that have been closed
Browse files Browse the repository at this point in the history
  • Loading branch information
saville committed Feb 16, 2021
1 parent 5af608d commit 38893c7
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
78 changes: 78 additions & 0 deletions docs/source/examples/pooling-reopen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from collections import defaultdict

import asyncio
import aio_pika
from aio_pika.pool import Pool


class NonRestoringRobustChannel(aio_pika.RobustChannel):
"""
Custom robust channel that when reopened, does not restore any
queues or exchanges.
"""
async def reopen(self) -> None:
# Clear out exchanges and queues when reopened
self._exchanges = defaultdict(set)
self._queues = defaultdict(set)
await super().reopen()


class NonRestoringRobustConnection(aio_pika.RobustConnection):
"""
Robust connection that uses a custom channel class
"""
CHANNEL_CLASS = NonRestoringRobustChannel


async def main():
loop = asyncio.get_event_loop()

async def get_connection():
return await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
# Use the connection class that does not restore connections
connection_class=NonRestoringRobustConnection,
)

connection_pool = Pool(get_connection, max_size=2, loop=loop)

async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()

channel_pool = Pool(get_channel, max_size=10, loop=loop)
queue_name = "pool_queue"

async def consume():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)

queue = await channel.declare_queue(
queue_name, durable=False, auto_delete=False
)

async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message)
await message.ack()

async def publish():
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
# Reopen channels that have been closed previously
if channel.is_closed:
await channel.reopen()
await channel.default_exchange.publish(
aio_pika.Message(("Channel: %r" % channel).encode()),
queue_name,
)

async with connection_pool, channel_pool:
task = loop.create_task(consume())
await asyncio.wait([publish() for _ in range(10000)])
await task


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
6 changes: 6 additions & 0 deletions docs/source/quick-start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,9 @@ Connection pooling

.. literalinclude:: examples/pooling.py
:language: python

Connection pooling with reopening connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. literalinclude:: examples/pooling-reopen.py
:language: python

0 comments on commit 38893c7

Please sign in to comment.