Skip to content

Commit

Permalink
Merge pull request #4 from OpenMatchmaking/feature-configurable_worker
Browse files Browse the repository at this point in the history
Feature configurable worker
  • Loading branch information
Relrin authored Mar 26, 2018
2 parents 3d0fc35 + c60732d commit 5a0ad6a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed via adding the `spotter` dependency to your list of

```elixir
def deps do
[{:spotter, "~> 0.2.2"}]
[{:spotter, "~> 0.3.0"}]
end
```

Expand Down Expand Up @@ -75,7 +75,8 @@ Any of those arguments (that were mentioned in the documentation) can be specifi
])

# Specify here the queue that you want to use
def configure(channel, _config) do
# `opts` will contain options (as a Map) that were specified in child_spec for supervisor
def configure(channel, _opts) do
:ok = AMQP.Exchange.direct(channel, @exchange, durable: true)

# An initial point where the worker do required stuff
Expand All @@ -89,13 +90,16 @@ Any of those arguments (that were mentioned in the documentation) can be specifi
# Specify a consumer here
{:ok, _} = AMQP.Basic.consume(channel, @queue_validate)

# And dont forget to return the channel
{:ok, channel}
# You must return here the tuple, where the first element is `:ok` atom, and the
# second element whill be any type what you would like. The second element will
# be set for the GenServer state, so that you can get an access to it via the
# `state[:meta]` expression
{:ok, [queue_request: queue_request, queue_forward: queue_forward]}
end

# Invoked when a message successfully consumed
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, state) do
channel = state[:meta][:channel]
channel = state[:channel]
spawn fn -> consume(channel, tag, reply_to, headers, payload) end
{:noreply, state}
end
Expand Down
20 changes: 11 additions & 9 deletions lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ defmodule Spotter.Worker do
end

def start_link() do
GenServer.start_link(__MODULE__, config())
GenServer.start_link(__MODULE__, %{config: config()})
end

def start_link(opts) do
GenServer.start_link(__MODULE__, %{config: config(), opts: opts})
end

def init(opts) do
Expand All @@ -37,17 +41,15 @@ defmodule Spotter.Worker do
{:error, :noconn}
_ ->
@connection.spawn_channel(@channel_name)
@connection.configure_channel(@channel_name, opts)
@connection.configure_channel(@channel_name, opts[:config])

channel = get_channel()
|> configure(opts)

{:ok, channel}
{:ok, [channel: channel, meta: configure(channel, opts[:opts])]}
end
end

def configure(channel, _opts) do
channel
def configure(_channel, _opts) do
{:ok, []}
end

def validate_config!(config) do
Expand All @@ -67,9 +69,9 @@ defmodule Spotter.Worker do
Spotter.AMQP.Connection.Channel.get_config(@channel_name)
end

def handle_call(:status, _from, channel) do
def handle_call(:status, _from, state) do
safe_run fn(_) ->
{:reply, AMQP.Queue.status(channel, channel_config()[:queue][:name]), channel}
{:reply, AMQP.Queue.status(state[:channel], channel_config()[:queue][:name]), state}
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Spotter.MixProject do
use Mix.Project

@version "0.2.2"
@version "0.3.0"

def project do
[
Expand Down
9 changes: 5 additions & 4 deletions test/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ defmodule SpotterWorkerTest do
:ok = AMQP.Basic.qos(channel, prefetch_count: 1)
{:ok, _} = AMQP.Basic.consume(channel, @queue_request)

channel
{:ok, []}
end

# Handle the trapped exit call
def handle_info({:EXIT, _from, reason}, state) do
cleanup(reason, state)
cleanup(reason, state[:channel])
{:stop, reason, state}
end

Expand All @@ -97,10 +97,11 @@ defmodule SpotterWorkerTest do
end

# Invoked when a message successfully consumed
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, channel) do
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, state) do
channel = state[:channel]
message_headers = Enum.into(Enum.map(headers, fn({key, _, value}) -> {key, value} end), %{})
spawn fn -> consume(channel, tag, reply_to, message_headers, payload) end
{:noreply, channel}
{:noreply, state}
end

def terminate(reason, state) do
Expand Down

0 comments on commit 5a0ad6a

Please sign in to comment.