Django Propaganda is a simple pub/sub utility for Kombu.
Define your broker's URL in your django settings:
PROPAGANDA_BROKER_URL = "amqp://guest:guest@localhost:5672//"
Give Propaganda a topic exchange name and it provides a nice interface for pub/sub.
pip install django-microservice-propaganda
from django_propaganda import Propaganda
propaganda = Propaganda("exchange_topic_name")
Publish method takes routing key and payload parameters:
propaganda.publish('my.routing.key', {'hello': 'world'})
You can omit the payload which defaults to an empty dictionary.
propaganda.publish('my.routing.key')
publish
method by default blocks the execution until message is sent. You can add block=False
parameter to make
publish return immediately, and message will be sent in a background thread.
# this will return after creating a thread that will send the message
propaganda.publish('my.routing.key', {'hello': 'world'}, block=False)
Propaganda object has a dictionary property named payload
. Content of payload
will be added
to the payload given to the publish method.
propaganda.payload['sender'] = 'Yigit Ozen'
# this will publish {'sender': 'Yigit Ozen', 'hello': 'world'}
propaganda.publish('my.routing.key', {'hello': 'world'})
# sender will still be added if you omit the payload parameter.
# this will publish {'sender': 'Yigit Ozen'}
propaganda.publish({'my.routing.key')
# use del to stop adding a key-value pair to the payloads
del propaganda.payload['sender']
Propaganda's subscribe
method creates and returns a Subscription object. Subscription creates a
queue and starts consuming it on a separate thread.
on
method registers a callback for a certain routing key.
all
method registers a callback for all routing keys.
The signature of the callbacks must take two arguments: (body, message), which is the decoded message body and Kombu Message instance.
wait
method blocks until a message arrives with the given routing key.
wait_any
method blocks until any message arrives.
def on_rabbit(body, message):
print('Look, a rabbit!')
print(body)
sub = propaganda.subscribe('animals.#')
# from now on when a animals.rabbit message arrives, on_rabbit will be called
sub.on('animals.rabbit', on_rabbit)
# when any message matching animals.# arrives, on_animal will be called
sub.all(on_animal)
# this will block the program until an animals.rabbit message arrives
sub.wait('animals.rabbit')
# wait has the same semantic with Python's multithreading wait. so you can pass a timeout.
# return value will be true if a message arrives before the timeout, false otherwise
arrived = sub.wait('animals.rabbit', timeout=10)
# blocks until any message arrives
sub.wait_any()
on
and all
methods return the Subscription object, so what you can chain the calls.
sub = propaganda.subscribe('animals.#')
.on('animals.rabbit', on_rabbit)
.on('animals.turtle', on_turtle)
.all(on_animal)
You can pass an exception handler to on
and all
methods. Propaganda will call your handler with the exception
when an exception is raised from on of your callbacks instead of raising it.
For example you can use this to register loggers.
def log_mq_exception(exception):
logger.error('Exception occurred when handling MQ message: {0}'.format(exception))
sub = propaganda.subscribe('animals.#')
sub.on('animals.rabbit', on_rabbit, on_exception=log_mq_exception)
sub.all(on_animal, on_exception=log_mq_exception)
You can also pass an exception handler to sub
method in the same way.
It applies to all callbacks under that subscription.
It is called after the handler given to on
or all
method.
The following code works exactly like the one above.
def log_mq_exception(exception):
logger.error('Exception occurred when handling MQ message: {0}'.format(exception))
sub = propaganda.subscribe('animals.#', on_exception=log_mq_exception)
sub.on('animals.rabbit', on_rabbit)
sub.all(on_animal)
Subscription creates a queue and starts a thread that consumes it upon creation. You can destroy the queue and the thread without destroying the Subscription, and you can recreate them later.
# starts listening for the events
sub = propaganda.subscribe('animals.#')
.on('animals.rabbit', on_rabbit)
.on('animals.turtle', on_turtle)
# you don't want to receive messages for a while
sub.stop()
# the registered callbacks are still there.
# you can recreate the queue and start receiving messages again.
sub.start()
If you don't keep the subscription in a variable, you can use unsubscribe
method of Propaganda object by binding key
to stop consuming.
propaganda.unsubscribe('animals.#')
You can pass a key prefix when initializing Propaganda object, which will be automatically prefixed all routing keys when subscribing and publishing. If you use Propaganda object to pub/sub in a specific namespace, this will save you from adding the prefix manually for all pub/sub calls.
propaganda = Propaganda(connection, exchange, key_prefix='my.namespace.')
# this will actually subscribe to 'my.namespace.animals.#'
sub = propaganda.subscribe('animals.#')
# on_rabbit will be called for messages with the key 'my.namespace.animals.rabbit'
sub.on('animals.rabbit', on_rabbit)
# this will publish the message with the key 'my.namespace.my.routing.key'
propaganda.publish({'my.routing.key', 'hello': 'world'})