Skip to content

Commit

Permalink
Merge pull request #6 from OpenMatchmaking/feature-testing-client
Browse files Browse the repository at this point in the history
AMQP blocking client for testing purposes
  • Loading branch information
Relrin authored Sep 3, 2018
2 parents bda5214 + 3463ab5 commit a7c015e
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 17 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM elixir:1.7.3

COPY ./ /app
WORKDIR /app
19 changes: 19 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ networks:
driver: bridge

services:

app:
build:
context: .
dockerfile: ./Dockerfile
image: spotter-library
environment:
- SPOTTER_AMQP_USERNAME=user
- SPOTTER_AMQP_PASSWORD=password
- SPOTTER_AMQP_HOST=rabbitmq
- SPOTTER_AMQP_PORT=5672
volumes:
- ./:/app/
depends_on:
- rabbitmq
networks:
- app-tier
tty: true

rabbitmq:
image: "bitnami/rabbitmq:latest"
ports:
Expand Down
191 changes: 191 additions & 0 deletions lib/testing/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
defmodule Spotter.Testing.AmqpBlockingClient do
@moduledoc """
A blocking AMPQ client for testing purposes and simple RPC use cases.
"""
use GenServer
alias Spotter.AMQP.Connection.Helper

@doc """
Initializes a new blocking GenServer instance.
"""
def start_link(opts, name \\ __MODULE__) do
GenServer.start_link(__MODULE__, opts, name: name)
end

@doc """
Initializes a new connection and a channel.
"""
def init(opts) do
{:ok, connection} = Helper.open_connection(opts)
{:ok, channel} = Helper.open_channel(connection)

{:ok, %{
connection: connection,
channel: channel,
channel_opts: [
queue: Keyword.get(opts, :queue, []),
exchange: Keyword.get(opts, :exchange, []),
qos: Keyword.get(opts, :qos, [])
]
}}
end

defp deinit(connection, channel) do
Helper.close_channel(channel)
AMQP.Connection.close(connection)
end

defp configure(channel, channel_opts) do
channel = configure_qos(channel, channel_opts[:qos])
{channel, queue_opts} = configure_queue(channel, channel_opts[:queue])
channel_opts = Keyword.merge(channel_opts, [queue: queue_opts])
channel = configure_exchange(channel, channel_opts[:queue], channel_opts[:exchange])
channel
end

defp configure_qos(channel, nil) do
channel
end

defp configure_qos(channel, qos_opts) do
Helper.set_channel_qos(channel, qos_opts)
channel
end

defp configure_queue(channel, nil) do
channel
end

defp configure_queue(channel, queue_opts) do
{:ok, queue} = AMQP.Queue.declare(channel, env(queue_opts[:name]), env(queue_opts))

queue_opts =
if queue_opts[:name] == "" and queue_opts[:routing_key] == "" do
Keyword.merge(queue_opts, [name: queue[:queue], routing_key: queue[:queue]])
else
queue_opts
end

{channel, queue_opts}
end

defp configure_exchange(channel, queue_opts, exchange_opts) when is_nil(queue_opts) or is_nil(exchange_opts) do
channel
end

defp configure_exchange(channel, queue_opts, exchange_opts) do
Helper.declare_exchange(channel, exchange_opts[:name], exchange_opts[:type], exchange_opts)
Helper.bind_queue(channel, queue_opts[:name], exchange_opts[:name], routing_key: queue_opts[:routing_key])
channel
end

defp env(var) do
Confex.Resolver.resolve!(var)
end

# Public API

@doc """
Stop the client and close the existing connection.
"""
def stop(pid) do
GenServer.stop(pid)
end

@doc """
Sends a new message without waiting for a response.
"""
def send(pid, data, opts, call_timeout \\ 5000) do
GenServer.call(pid, {:send, data, opts}, call_timeout)
end

@doc """
Sends a new message and wait for result.
"""
def send_and_wait(pid, data, opts, timeout \\ 1000, attempts \\ 5, call_timeout \\ 5000) do
GenServer.call(pid, {:send_and_wait, data, opts, timeout, attempts}, call_timeout)
end

@doc """
Returns the message from the certain queue if it exists.
"""
def consume(pid, queue, timeout \\ 1000, attempts \\ 5, call_timeout \\ 500) do
GenServer.call(pid, {:consume_response, queue, timeout, attempts}, call_timeout)
end

@doc """
Initializes QoS, a queue and an exchanges for the channel.
"""
def configure_channel(pid, channel_opts, call_timeout \\ 500) do
GenServer.call(pid, {:configure_channel, channel_opts}, call_timeout)
end

# Internal stuff

defp send_message(channel, routing_key, data, opts) do
exchange_request = Keyword.get(opts, :exchange_request, "")
queue_request = Keyword.get(opts, :queue_request, "")
publish_options = Keyword.merge(opts, [
persistent: Keyword.get(opts, :persistent, true),
reply_to: routing_key,
content_type: Keyword.get(opts, :content_type, "application/json")
])
AMQP.Basic.publish(channel, exchange_request, queue_request, data, publish_options)
end

defp consume_response(channel, queue_name, timeout, attempts) do
{payload, meta} = receive_message(channel, queue_name, timeout, attempts)

if meta != nil do
AMQP.Basic.ack(channel, meta.delivery_tag)
end

{payload, meta}
end

defp receive_message(channel, queue_name, timeout, attempts) do
case AMQP.Basic.get(channel, queue_name) do
{:ok, message, meta} ->
{message, meta}
{:empty, _} when is_integer(attempts) and attempts == 0 ->
{:empty, nil}
{:empty, _} when is_integer(attempts) and attempts > 0 ->
:timer.sleep(timeout)
receive_message(channel, queue_name, timeout, attempts - 1)
end
end

# Private API

def handle_call({:send, data, opts}, _from, state) do
{:reply, send_message(state[:channel], :undefined, data, opts), state}
end

def handle_call({:send_and_wait, data, opts, timeout, attempts}, _from, state) do
channel = state[:channel]
channel_opts = state[:channel_opts]
queue_name = Keyword.get(channel_opts[:queue] || [], :name, :undefined)
routing_key = Keyword.get(channel_opts[:queue] || [], :routing_key, :undefined)

configure(channel, channel_opts)
send_message(channel, routing_key, data, opts)
response = consume_response(state[:channel], queue_name, timeout, attempts)

AMQP.Queue.delete(channel, queue_name)
{:reply, response, state}
end

def handle_call({:consume_response, queue, timeout, attempts}, _from, state) do
{:reply, consume_response(state[:channel], queue, timeout, attempts), state}
end

def handle_call({:configure_channel, channel_opts}, _from, state) do
configure(state[:channel], channel_opts)
{:reply, :ok, state}
end

def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
deinit(state[:connection], state[:channel])
{:noreply, state}
end
end
11 changes: 6 additions & 5 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Spotter.MixProject do
use Mix.Project

@version "0.4.1"
@version "0.5.0"

def project do
[
Expand Down Expand Up @@ -29,10 +29,11 @@ defmodule Spotter.MixProject do

defp deps do
[
{:confex, "~> 3.3.0"},
{:amqp, "~> 1.0.2"},
{:earmark, "~> 1.2.0", only: :dev},
{:ex_doc, "~> 0.18", only: :dev}
{:confex, "~> 3.3.1"},
{:amqp, "~> 1.0"},
{:earmark, "~> 1.2.6", only: :dev},
{:ex_doc, "~> 0.19.1", only: :dev},
{:ranch_proxy_protocol, "~> 2.0", override: true}
]
end

Expand Down
19 changes: 11 additions & 8 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
%{
"amqp": {:hex, :amqp, "1.0.2", "3c4b0c2a0c02a908a78c51e6e8206ca24b0f6b879a5e980eccf840cda360ed71", [:mix], [{:amqp_client, "~> 3.7.3", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.4", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "~> 1.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"amqp_client": {:hex, :amqp_client, "3.7.3", "29a818d3871de5f8484e876ad34b0a940b2cecd6c6dfbed30d9b1679072eb9bc", [:make, :rebar3], [{:rabbit_common, "3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"},
"amqp": {:hex, :amqp, "1.0.3", "06a6d909abc71d82b7c3133ca491899ca18fce857d0697dd060c29de1ef498d8", [:mix], [{:amqp_client, "~> 3.7.3", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.4", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "~> 1.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"amqp_client": {:hex, :amqp_client, "3.7.7", "e5ac06275d140d21ff30408d90419e9f7ecc31f1b125736c29911b8eec3e54d4", [:make, :rebar3], [{:rabbit_common, "3.7.7", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"},
"confex": {:hex, :confex, "3.3.1", "8febaf751bf293a16a1ed2cbd258459cdcc7ca53cfa61d3f83d49dd276a992b4", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"},
"jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"},
"lager": {:hex, :lager, "3.5.1", "63897a61af646c59bb928fee9756ce8bdd02d5a1a2f3551d4a5e38386c2cc071", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"},
"rabbit_common": {:hex, :rabbit_common, "3.7.3", "f23ed393e12150e3d3b6ef640be7bfddfefce72a65e2ce27d44249ef84287c96", [:make, :rebar3], [{:jsx, "2.8.2", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.5.1", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.4.0", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "1.4.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"ranch": {:hex, :ranch, "1.4.0", "10272f95da79340fa7e8774ba7930b901713d272905d0012b06ca6d994f8826b", [:rebar3], [], "hexpm"},
"ranch_proxy_protocol": {:hex, :ranch_proxy_protocol, "1.4.4", "8853b11757a9798e86c7d6d0ff783a8e2e87f77052aad7f1c91108f254ba4a9c", [:rebar3], [{:ranch, "1.4.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"},
"lager": {:hex, :lager, "3.6.3", "fe78951d174616273f87f0dbc3374d1430b1952e5efc4e1c995592d30a207294", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "0.5.1", "966c5c2296da272d42f1de178c1d135e432662eca795d6dc12e5e8787514edf7", [:mix], [{:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.8.0", "1204a2f5b4f181775a0e456154830524cf2207cf4f9112215c05e0b76e4eca8b", [:mix], [{:makeup, "~> 0.5.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.2.2", "d526b23bdceb04c7ad15b33c57c4526bf5f50aaa70c7c141b4b4624555c68259", [:mix], [], "hexpm"},
"rabbit_common": {:hex, :rabbit_common, "3.7.7", "c0afdf060c091f43d7d6f58889978a074a7a8e28a5e89b9e40c6a474ba295011", [:make, :rebar3], [{:jsx, "2.8.2", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.3", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.5.0", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "1.5.0", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"ranch": {:hex, :ranch, "1.5.0", "f04166f456790fee2ac1aa05a02745cc75783c2bfb26d39faf6aefc9a3d3a58a", [:rebar3], [], "hexpm"},
"ranch_proxy_protocol": {:hex, :ranch_proxy_protocol, "2.0.0", "623c732025f9d66d123a8ccc1735e5f43d7eb9b20aa09457c9609ef05f7e8ace", [:rebar3], [{:ranch, "1.5.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"},
"recon": {:hex, :recon, "2.3.2", "4444c879be323b1b133eec5241cb84bd3821ea194c740d75617e106be4744318", [:rebar3], [], "hexpm"},
}
76 changes: 76 additions & 0 deletions test/blocking_amqp_client_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule SpotterTestingAmqpBlockingClientTest do
use ExUnit.Case, async: false
alias Spotter.Testing.AmqpBlockingClient

@generic_exchange "test.direct"
@queue_name "blocking_client_test"

@custom_amqp_opts [
username: "user",
password: "password",
host: "rabbitmq",
port: 5672,
virtual_host: "/",
queue: [
name: @queue_name,
routing_key: @queue_name,
durable: true,
passive: false,
auto_delete: true
],
exchange: [
name: @generic_exchange,
type: :direct,
durable: true,
passive: true
],
qos: [
prefetch_count: 10
]
]

setup do
{:ok, pid} = start_supervised({AmqpBlockingClient, @custom_amqp_opts})
{:ok, [client: pid]}
end

test "A AMQP blocking client sends the message and consumes the message from the queue", state do
client = state[:client]
message = "test"
AmqpBlockingClient.configure_channel(client, @custom_amqp_opts)

send_result = AmqpBlockingClient.send(
client, message,
[queue_request: @queue_name, exchange_request: @generic_exchange]
)
assert send_result == :ok

{response, _meta} = AmqpBlockingClient.consume(client, @queue_name)
assert response == message

stop_supervised(client)
end

test "A AMQP blocking client consumes the message from the queue and returns empty results", state do
client = state[:client]
message = "test"
AmqpBlockingClient.configure_channel(client, @custom_amqp_opts)

{:empty, nil} = AmqpBlockingClient.consume(client, @queue_name, 1, 100)
stop_supervised(client)
end

test "A AMQP blocking client sends the message and waits for the response", state do
client = state[:client]
message = "test"

channel_options = Keyword.merge(
@custom_amqp_opts,
[queue_request: @queue_name, exchange_request: @generic_exchange]
)
{response, _meta} = AmqpBlockingClient.send_and_wait(client, message, channel_options)
assert response == message

stop_supervised(client)
end
end
8 changes: 4 additions & 4 deletions test/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule SpotterWorkerTest do
@custom_amqp_opts [
username: "user",
password: "password",
host: "localhost",
host: "rabbitmq",
port: 5672,
virtual_host: "/"
]
Expand Down Expand Up @@ -178,7 +178,7 @@ defmodule SpotterWorkerTest do
headers: [{"path", :longstr, "api.matchmaking.search"},
{"permissions", :longstr, "get;post"}]
)
:timer.sleep(100)
:timer.sleep(200)
{:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, @generic_queue_forward)
assert payload == "DATA"

Expand Down Expand Up @@ -215,7 +215,7 @@ defmodule SpotterWorkerTest do
headers: [{"path", :longstr, "api.matchmaking.search"},
{"permissions", :longstr, "get;post"}]
)
:timer.sleep(100)
:timer.sleep(200)
{:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, queue[:queue])
assert payload == "VALIDATION_ERROR"

Expand All @@ -234,7 +234,7 @@ defmodule SpotterWorkerTest do
headers: [{"path", :longstr, "api.matchmaking.search"},
{"permissions", :longstr, ""}]
)
:timer.sleep(100)
:timer.sleep(200)
{:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, queue[:queue])
assert payload == "NO_PERMISSIONS"

Expand Down

0 comments on commit a7c015e

Please sign in to comment.