def deps do
[{:subscribex, "~> 0.8"}]
end
A lightweight wrapper around pma's Elixir AMQP library. Compared to the AMQP library, this package adds:
- Auto-start a global connection to your RabbitMQ server on app startup
- Auto-reconnect to your RabbitMQ server (at 30 second intervals)
- A configurable subscriber abstraction
- Simplified channel creation, with the ability to automatically link or monitor the channel process.
NOTE: master
is usually in "beta" status. Make sure you pull an actual release.
Special thanks to:
- cavneb
- justgage
- vorce
- telnicky
- Cohedrin
- ssnickolay
- dconger
Your contributions are greatly appreciated!
- Add
subscribex
to your list of dependencies inmix.exs
:
def deps do
[{:subscribex, "~> 0.8"}]
end
- Create a broker:
defmodule MyApp.Broker do
use Subscribex.Broker, otp_app: :my_app
end
- Configure your broker's amqp server information:
config :my_app, MyApp.Broker,
rabbit_host: [
username: "guest",
password: "guest",
host: "localhost",
port: 5672
]
- Ensure your broker is started before your application, but probably not in test:
defmodule MyApp.Application do
use Application
import Supervisor.Spec
def start(_type, _args) do
children =
case Mix.env() do
:test -> []
_ -> [supervisor(MyApp.Broker, [])]
end
Supervisor.start_link(children, [strategy: :one_for_one])
end
end
After creating, configuring, and starting a broker, you can add subscribers:
defmodule MyApp.Subscribers.ActivityCreated do
use Subscribex.Subscriber
require Logger
def init do
config =
%Config{
queue: "my_queue",
exchange: "my_exchange",
exchange_type: :topic,
binding_opts: [routing_key: "my_key"]
}
{:ok, config}
end
def handle_payload(payload, _channel, _delivery_tag, _redelivered) do
Logger.info(payload)
end
# handle_error/4 is an optional callback for handling when an exception is
# raised in handle_payload/4
#
# In this example, we reject the message, telling Rabbit not to retry.
def handle_error(payload, channel, delivery_tag, error) do
Logger.error("Raised #{inspect error} handling #{inspect payload}")
reject(channel, delivery_tag, requeue: false)
end
end
You'll then want to configure your application to start your subscriber with your broker:
defmodule MyApp.Application do
use Application
import Supervisor.Spec
def start(_type, _args) do
children =
case Mix.env() do
:test -> []
_ -> [supervisor(MyApp.Broker, [[MyApp.Subscribers.ActivityCreated]])]
end
Supervisor.start_link(children, [strategy: :one_for_one])
end
end
If you would like to start multiple subscriber processes for the same queue you can do that as well:
defmodule MyApp.Application do
use Application
import Supervisor.Spec
def start(_type, _args) do
children =
case Mix.env() do
:test -> []
_ -> [supervisor(MyApp.Broker, [[{3, MyApp.Subscribers.ActivityCreated}]])]
end
Supervisor.start_link(children, [strategy: :one_for_one])
end
end
First, configure your broker's amqp server information in the appropriate config.exs file:
config :my_app, MyApp.Broker,
rabbit_host: [
username: "guest",
password: "guest",
host: "localhost",
port: 5672]
Alternatively, you can use a well-formed RabbitMQ URI:
config :my_app, MyApp.Broker, rabbit_host: "amqp://guest:guest@localhost"
Once configured, you can start making subscribers. Subscribex.Subscriber
is
a behavior which requires three callbacks:
@type redelivered :: boolean()
@callback init()
:: {:ok, %Subscribex.Subscriber.Config{}}
@callback handle_payload(payload, channel, Subscribex.delivery_tag, redelivered)
:: ignored
@callback handle_error(payload, channel, Subscribex.delivery_tag, RuntimeError.t)
:: ignored
The Config struct is defined as:
defmodule Config do
defstruct [
queue: nil,
dead_letter_queue: nil,
dead_letter_exchange: nil,
exchange: nil,
exchange_type: nil,
dead_letter_exchange_type: nil,
auto_ack: true,
prefetch_count: 10,
queue_opts: [],
dead_letter_queue_opts: [],
dead_letter_exchange_opts: [],
exchange_opts: [],
binding_opts: [],
dl_binding_opts: []
]
end
Assume a queue called "my_queue" is bound to a "my_exchange" exchange using the routing key "my_key".
An example of a subscriber that pops items off this queue and simply logs the payloads using Logger is as follows:
defmodule MyApp.Subscribers.ActivityCreated do
use Subscribex.Subscriber
require Logger
def init do
config =
%Config{
queue: "my_queue",
exchange: "my_exchange",
exchange_type: :topic,
binding_opts: [routing_key: "my_key"]
}
{:ok, config}
end
def handle_payload(payload, _channel, _delivery_tag, _redelivered) do
Logger.info(payload)
end
def handle_error(payload, channel, delivery_tag, error) do
Logger.error("Raised #{inspect error} handling #{inspect payload}")
Subscribex.reject(channel, delivery_tag, requeue: false)
end
end
Instead of just logging the payload, let's deserialize it from a JSON string, transform the result, and publish it to a different queue.
Let's say the payload is of the format:
{
"email": "[email protected]",
"username": "abc123"
}
Let's say our goal is to send the user a welcome email and publish to the "my_app.new_user.emailed" routing key a result of the format:
{
"email": "[email protected]",
"username": "abc123",
"welcome_email_delivered": true
}
defmodule MyApp.Subscribers.ActivityCreated do
use Subscribex.Subscriber
preprocess &__MODULE__.deserialize/1
require Logger
@exchange "my_exchange"
def init do
config =
%Config{
queue: "my_queue",
exchange: "my_exchange",
exchange_type: :topic,
binding_opts: [routing_key: "my_key"]
}
{:ok, config}
end
def deserialize(payload) do
Poison.decode!(payload)
end
def handle_payload(%{"email" => email, "username" => username} = payload, channel, _delivery_tag, _redelivered) do
:ok = MyApp.Email.send_welcome_email(email, username)
{:ok, publishing_payload} =
payload
|> Map.put_new("welcome_email_delivered", true)
|> Poison.encode
routing_key = "my_app.new_user.emailed"
Subscribex.publish(channel, @exchange, routing_key, publishing_payload)
end
def handle_error(payload, channel, delivery_tag, error) do
Logger.error("Raised #{inspect error} handling #{inspect payload}")
Subscribex.reject(channel, delivery_tag, requeue: false)
end
end
Using the preprocess/1
macro, we can setup plug-like pipelines of functions to
execute before arriving at handle_payload/4
.
Let's take the email example and modify it a bit. It still has the same requirements, but the email sending is handled by another process, and we don't want to block the subscriber while it's working.
defmodule MyApp.Subscribers.UserRegistered do
use Subscribex.Subscriber
preprocess &__MODULE__.deserialize/1
require Logger
@exchange "my_exchange"
def init do
config =
%Config{
queue: "my_queue",
exchange: "my_exchange",
exchange_type: :topic,
binding_opts: [routing_key: "my_key"],
auto_ack: false # Specify that we want to manually ack these jobs
}
{:ok, config}
end
def deserialize(payload) do
Poison.decode!(payload)
end
def handle_payload(%{"email" => email, "username" => username}, channel, delivery_tag, _redelivered) do
# hands off the job to another process, which will be responsible form
# acking. It must ack the job on the same channel used to receive it.
:ok = MyApp.Email.send_welcome_email(email, username, channel, delivery_tag)
end
def handle_error(payload, channel, delivery_tag, error) do
Logger.error("Raised #{inspect error} handling #{inspect payload}")
Subscribex.reject(channel, delivery_tag, requeue: false)
end
end
Because we've told the subscriber we intend to manually ack, we need the channel and delivery_tag for the payload, which is provided by RabbitMQ. Now the process in charge of sending the email is responsible for acking the message to RabbitMQ when it's done processing the message.
In this example, we need a channel to publish the message; we can use the
MyApp.Broker.channel/1
function, which is spec-ed as:
@spec channel(:link | :no_link | :monitor | function | (channel -> term))
:: %AMQP.Channel{} | {%AMQP.Channel{}, monitor} | :ok
It expects you to pass in :link, :no_link, or :monitor. There is no default value - you must explicitly pass in the appropriate monitoring scheme for your use case.
We can then use this channel to publish on. When we use
MyApp.Broker.channel/1
, we must remember to MyApp.Broker.close/1
the channel
when we're done with it - otherwise, we'll experience a memory leak as channel
processes are created and never stopped.
If the channel is needed long-term, it's best to either link or monitor it, and handle the case where a connection failure occurs.
In this case, we only need the process for a short duration. This is why
Subscribex.channel/1
also accepts a function which takes the channel as an
argument. By doing this, we can do either of the following examples:
defmodule MyApp.Email do
# Code for delivering the email here
def handle_email_sent(channel, delivery_tag) do
publish_channel = MyApp.Broker.channel(:link)
MyApp.Broker.publish(publish_channel, ...other args)
MyApp.Broker.close(publish_channel)
MyApp.Broker.ack(channel, delivery_tag)
end
end
Now the process of channel creation and closing are managed for us, and we only have to code what we need to do with the channel.
Note that acking a message on a different channel than what it was received on is not allowed by RabbitMQ. So in the above example we should not ack on the publish_channel. Similarly we should not publish on the channel we receive messages on as it is not recommended.
Copyright (c) 2018 Cody J. Poll
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.