This repository has been archived by the owner on Jul 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
amqp.py
53 lines (43 loc) · 1.49 KB
/
amqp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
from asyncio import get_event_loop
import aio_pika
from ws_handler import ws_handler
class AMQPHandler:
QUEUE_NAME = 'test_queue'
def __init__(self):
self.connection = None
self.channel = None
async def init(self):
self.connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]:5672/", loop=get_event_loop()
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=100)
exchange = await self.channel.declare_exchange(
self.QUEUE_NAME,
aio_pika.ExchangeType.FANOUT,
)
queue = await self.channel.declare_queue(exclusive=True)
await queue.bind(exchange)
await queue.consume(self.handle_message)
return self.connection
@classmethod
async def handle_message(cls, message: aio_pika.IncomingMessage):
async with message.process():
await ws_handler.notify_all(
message=json.loads(message.body),
)
async def publish(self, data: dict):
message = aio_pika.Message(
body=json.dumps(data).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
)
exchange = await self.channel.declare_exchange(
self.QUEUE_NAME,
aio_pika.ExchangeType.FANOUT,
)
await exchange.publish(
message,
routing_key=self.QUEUE_NAME,
)
amqp_handler = AMQPHandler()