From 5e169d2cc2ecd800e731455311339f1654cd25b7 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 14:57:39 +0300 Subject: [PATCH 1/9] Added an AMQP blocking client for testing purposes --- lib/amqp/testing.ex | 203 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 lib/amqp/testing.ex diff --git a/lib/amqp/testing.ex b/lib/amqp/testing.ex new file mode 100644 index 0000000..aa544fb --- /dev/null +++ b/lib/amqp/testing.ex @@ -0,0 +1,203 @@ +defmodule Spotter.Testing.AmqpBlockingClient do + @moduledoc """ + A blocking AMPQ client for testing purposes and simple RPC use cases. + """ + use GenServer + alias Spotter.AMQP.Connection.Helper + + @doc """ + Initializes a new blocking GenServer instance. + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @doc """ + Initializes a new connection and a channel. + """ + def init(opts) do + {:ok, connection} = Helper.open_connection(opts) + {:ok, channel} = Helper.open_channel(connection) + + {:ok, %{ + connection: connection, + channel: channel, + channel_opts: %{ + queue: Keyword.get(opts, :queue, []), + exchange: Keyword.get(opts, :exchange, []), + qos: Keyword.get(opts, :qos, []) + } + }} + end + + defp deinit(connection, channel) do + Helper.close_channel(channel) + AMQP.Connection.close(connection) + end + + @doc """ + Creates a queue with binding to the certain exchange and specified QoS rules. + """ + defp configure(channel, channel_opts) do + channel + |> configure_qos(channel_opts[:qos]) + |> configure_queue(channel_opts[:queue]) + |> configure_exchange(channel_opts[:queue], channel_opts[:exchange]) + end + + @doc """ + Confgures a QoS for the channel. + """ + defp configure_qos(channel, nil) do + channel + end + + @doc """ + Configures QoS for the channel. + """ + defp configure_qos(channel, qos_opts) do + Helper.set_channel_qos(channel, qos_opts) + channel + end + + @doc """ + Creates a new queue during the session. + """ + defp configure_queue(channel, nil) do + channel + end + + @doc """ + Creates a new queue during the session. + """ + defp configure_queue(channel, queue_opts) do + {:ok, queue} = AMQP.Queue.declare(channel, env(queue_opts[:name]), env(queue_opts)) + + if queue_opts[:name] == "" and queue_opts[:routing_key] == "" do + queue_opts = Keyword.merge(queue_opts, [name: queue[:queue], routing_key: queue[:queue]]) + end + + channel + end + + @doc """ + Configures the exchange and bind queue to it. + """ + defp configure_exchange(channel, queue_opts, exchange_opts) when is_nil(queue_opts) or is_nil(exchange_opts) do + channel + end + + @doc """ + Configures the exchange and bind queue to it. + """ + defp configure_exchange(channel, queue_opts, exchange_opts) do + Helper.declare_exchange(channel, exchange_opts[:name], exchange_opts[:type], exchange_opts) + Helper.bind_queue(channel, queue_opts[:name], exchange_opts[:name], routing_key: queue_opts[:routing_key]) + channel + end + + defp env(var) do + Confex.Resolver.resolve!(var) + end + + # Public API + + @doc """ + Sends a new message without waiting for a response. + """ + def send(pid, data, opts, call_timeout \\ 5000) do + GenServer.call(pid, {:send, data, opts}, call_timeout) + end + + @doc """ + Sends a new message and wait for result. + """ + def send_and_wait(pid, data, opts, timeout \\ 1000, attempts \\ 5, call_timeout \\ 5000) do + GenServer.call(pid, {:send_and_wait, data, opts, timeout, attempts}, call_timeout) + end + + @doc """ + Returns the message from the certain queue if it exists. + """ + def consume(pid, queue, timeout \\ 1000, attempts \\ 5, call_timeout \\ 500) do + GenServer.call(pid, {:consume_response, queue, timeout, attempts}, call_timeout) + end + + # Internal stuff + + @doc """ + Sends the message into the certain queue. + """ + defp send_message(channel, routing_key, data, opts) do + exchange_request = Keyword.get(opts, :exchange_request, "") + queue_request = Keyword.get(opts, :queue_request, "") + + publish_options = Keyword.merge(opts, [ + persistent: Keyword.get(opts, :persistent, true), + reply_to: routing_key, + content_type: Keyword.get(opts, :content_type, "application/json") + ]) + AMQP.Basic.publish(channel, exchange_request, queue_request, data, publish_options) + end + + @doc """ + Extracts the message from the certain queue and returns to the client. + """ + defp consume_response(channel, queue_name, timeout, attempts) do + {payload, meta} = receive_message(channel, queue_name, timeout, attempts) + + if meta != nil do + AMQP.Basic.ack(channel, meta.delivery_tag) + end + + {payload, meta} + end + + @doc """ + Extracts the message from the AMQP queue with retries. + """ + defp receive_message(channel, queue_name, timeout, attempts) do + case AMQP.Basic.get(channel, queue_name) do + {:ok, message, meta} -> + {message, meta} + {:empty, _} when is_integer(attempts) and attempts == 0 -> + {:empty, nil} + {:empty, _} when is_integer(attempts) and attempts > 0 -> + :timer.sleep(timeout) + receive_message(channel, queue_name, timeout, attempts - 1) + end + end + + # Private API + + def handle_call({:send, data, opts}, _from, state) do + {:reply, send_message(state[:channel], :undefined, data, opts), state} + end + + def handle_call({:send_and_wait, data, opts, timeout, attempts}, _from, state) do + channel = state[:channel] + channel_opts = state[:channel_opts] + queue_name = Keyword.get(channel_opts[:queue], :name, :undefined) + routing_key = Keyword.get(channel_opts[:queue], :routing_key, :undefined) + + configure(channel, channel_opts) + send_message(channel, routing_key, data, opts) + response = consume_response(state[:channel], queue_name, timeout, attempts) + + AMQP.Queue.delete(channel, queue_name) + {:reply, response, state} + end + + def handle_call({:consume_response, queue, timeout, attempts}, _from, state) do + {:reply, consume_response(state[:channel], queue, timeout, attempts), state} + end + + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + deinit(state[:connection], state[:channel]) + {:noreply, state} + end + + def terminate(_reason, state) do + deinit(state[:connection], state[:channel]) + end +end From 86c20b7446865848f4016a503f76371115ef4857 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 16:23:30 +0300 Subject: [PATCH 2/9] Updated dependencies --- mix.exs | 9 +++++---- mix.lock | 19 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/mix.exs b/mix.exs index d365bbd..5e2270a 100644 --- a/mix.exs +++ b/mix.exs @@ -29,10 +29,11 @@ defmodule Spotter.MixProject do defp deps do [ - {:confex, "~> 3.3.0"}, - {:amqp, "~> 1.0.2"}, - {:earmark, "~> 1.2.0", only: :dev}, - {:ex_doc, "~> 0.18", only: :dev} + {:confex, "~> 3.3.1"}, + {:amqp, "~> 1.0"}, + {:earmark, "~> 1.2.6", only: :dev}, + {:ex_doc, "~> 0.19.1", only: :dev}, + {:ranch_proxy_protocol, "~> 2.0", override: true} ] end diff --git a/mix.lock b/mix.lock index 169afc8..86e5f92 100644 --- a/mix.lock +++ b/mix.lock @@ -1,14 +1,17 @@ %{ - "amqp": {:hex, :amqp, "1.0.2", "3c4b0c2a0c02a908a78c51e6e8206ca24b0f6b879a5e980eccf840cda360ed71", [:mix], [{:amqp_client, "~> 3.7.3", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.4", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "~> 1.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, - "amqp_client": {:hex, :amqp_client, "3.7.3", "29a818d3871de5f8484e876ad34b0a940b2cecd6c6dfbed30d9b1679072eb9bc", [:make, :rebar3], [{:rabbit_common, "3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"}, + "amqp": {:hex, :amqp, "1.0.3", "06a6d909abc71d82b7c3133ca491899ca18fce857d0697dd060c29de1ef498d8", [:mix], [{:amqp_client, "~> 3.7.3", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.4", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "~> 1.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, + "amqp_client": {:hex, :amqp_client, "3.7.7", "e5ac06275d140d21ff30408d90419e9f7ecc31f1b125736c29911b8eec3e54d4", [:make, :rebar3], [{:rabbit_common, "3.7.7", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"}, "confex": {:hex, :confex, "3.3.1", "8febaf751bf293a16a1ed2cbd258459cdcc7ca53cfa61d3f83d49dd276a992b4", [:mix], [], "hexpm"}, - "earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, + "earmark": {:hex, :earmark, "1.2.6", "b6da42b3831458d3ecc57314dff3051b080b9b2be88c2e5aa41cd642a5b044ed", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.19.1", "519bb9c19526ca51d326c060cb1778d4a9056b190086a8c6c115828eaccea6cf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"}, "jsx": {:hex, :jsx, "2.8.2", "7acc7d785b5abe8a6e9adbde926a24e481f29956dd8b4df49e3e4e7bcc92a018", [:mix, :rebar3], [], "hexpm"}, - "lager": {:hex, :lager, "3.5.1", "63897a61af646c59bb928fee9756ce8bdd02d5a1a2f3551d4a5e38386c2cc071", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"}, - "rabbit_common": {:hex, :rabbit_common, "3.7.3", "f23ed393e12150e3d3b6ef640be7bfddfefce72a65e2ce27d44249ef84287c96", [:make, :rebar3], [{:jsx, "2.8.2", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.5.1", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.4.0", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "1.4.4", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, - "ranch": {:hex, :ranch, "1.4.0", "10272f95da79340fa7e8774ba7930b901713d272905d0012b06ca6d994f8826b", [:rebar3], [], "hexpm"}, - "ranch_proxy_protocol": {:hex, :ranch_proxy_protocol, "1.4.4", "8853b11757a9798e86c7d6d0ff783a8e2e87f77052aad7f1c91108f254ba4a9c", [:rebar3], [{:ranch, "1.4.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, + "lager": {:hex, :lager, "3.6.3", "fe78951d174616273f87f0dbc3374d1430b1952e5efc4e1c995592d30a207294", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"}, + "makeup": {:hex, :makeup, "0.5.1", "966c5c2296da272d42f1de178c1d135e432662eca795d6dc12e5e8787514edf7", [:mix], [{:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.8.0", "1204a2f5b4f181775a0e456154830524cf2207cf4f9112215c05e0b76e4eca8b", [:mix], [{:makeup, "~> 0.5.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.2.2", "d526b23bdceb04c7ad15b33c57c4526bf5f50aaa70c7c141b4b4624555c68259", [:mix], [], "hexpm"}, + "rabbit_common": {:hex, :rabbit_common, "3.7.7", "c0afdf060c091f43d7d6f58889978a074a7a8e28a5e89b9e40c6a474ba295011", [:make, :rebar3], [{:jsx, "2.8.2", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.3", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.5.0", [hex: :ranch, repo: "hexpm", optional: false]}, {:ranch_proxy_protocol, "1.5.0", [hex: :ranch_proxy_protocol, repo: "hexpm", optional: false]}, {:recon, "2.3.2", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, + "ranch": {:hex, :ranch, "1.5.0", "f04166f456790fee2ac1aa05a02745cc75783c2bfb26d39faf6aefc9a3d3a58a", [:rebar3], [], "hexpm"}, + "ranch_proxy_protocol": {:hex, :ranch_proxy_protocol, "2.0.0", "623c732025f9d66d123a8ccc1735e5f43d7eb9b20aa09457c9609ef05f7e8ace", [:rebar3], [{:ranch, "1.5.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, "recon": {:hex, :recon, "2.3.2", "4444c879be323b1b133eec5241cb84bd3821ea194c740d75617e106be4744318", [:rebar3], [], "hexpm"}, } From cd15710edbcde7dc80f2fac64de93522aff32c9f Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 16:23:40 +0300 Subject: [PATCH 3/9] Created Dockerfile --- Dockerfile | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..35314db --- /dev/null +++ b/Dockerfile @@ -0,0 +1,4 @@ +FROM elixir:1.7.3 + +COPY ./ /app +WORKDIR /app From 1968bb20a67cc95ccb3e339ab0055aafbcda5562 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 16:23:56 +0300 Subject: [PATCH 4/9] Updated docker-compose.dev.yml --- docker-compose.dev.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 5a371d1..659876b 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -5,6 +5,25 @@ networks: driver: bridge services: + + app: + build: + context: . + dockerfile: ./Dockerfile + image: spotter-library + environment: + - SPOTTER_AMQP_USERNAME=user + - SPOTTER_AMQP_PASSWORD=password + - SPOTTER_AMQP_HOST=rabbitmq + - SPOTTER_AMQP_PORT=5672 + volumes: + - ./:/app/ + depends_on: + - rabbitmq + networks: + - app-tier + tty: true + rabbitmq: image: "bitnami/rabbitmq:latest" ports: From 6c2077445799efed65eef07892175a7bdca5bd69 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 16:24:35 +0300 Subject: [PATCH 5/9] Fixed module compilation warnings --- lib/amqp/testing.ex | 50 +++++++++++---------------------------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/lib/amqp/testing.ex b/lib/amqp/testing.ex index aa544fb..01fbef9 100644 --- a/lib/amqp/testing.ex +++ b/lib/amqp/testing.ex @@ -35,61 +35,44 @@ defmodule Spotter.Testing.AmqpBlockingClient do AMQP.Connection.close(connection) end - @doc """ - Creates a queue with binding to the certain exchange and specified QoS rules. - """ defp configure(channel, channel_opts) do - channel - |> configure_qos(channel_opts[:qos]) - |> configure_queue(channel_opts[:queue]) - |> configure_exchange(channel_opts[:queue], channel_opts[:exchange]) + channel = configure_qos(channel, channel_opts[:qos]) + {channel, queue_opts} = configure_queue(channel, channel_opts[:queue]) + channel_opts = Keyword.merge(channel_opts, [queue: queue_opts]) + channel = configure_exchange(channel, channel_opts[:queue], channel_opts[:exchange]) + channel end - @doc """ - Confgures a QoS for the channel. - """ defp configure_qos(channel, nil) do channel end - @doc """ - Configures QoS for the channel. - """ defp configure_qos(channel, qos_opts) do Helper.set_channel_qos(channel, qos_opts) channel end - @doc """ - Creates a new queue during the session. - """ defp configure_queue(channel, nil) do channel end - @doc """ - Creates a new queue during the session. - """ defp configure_queue(channel, queue_opts) do {:ok, queue} = AMQP.Queue.declare(channel, env(queue_opts[:name]), env(queue_opts)) - if queue_opts[:name] == "" and queue_opts[:routing_key] == "" do - queue_opts = Keyword.merge(queue_opts, [name: queue[:queue], routing_key: queue[:queue]]) - end + queue_opts = + if queue_opts[:name] == "" and queue_opts[:routing_key] == "" do + Keyword.merge(queue_opts, [name: queue[:queue], routing_key: queue[:queue]]) + else + queue_opts + end - channel + {channel, queue_opts} end - @doc """ - Configures the exchange and bind queue to it. - """ defp configure_exchange(channel, queue_opts, exchange_opts) when is_nil(queue_opts) or is_nil(exchange_opts) do channel end - @doc """ - Configures the exchange and bind queue to it. - """ defp configure_exchange(channel, queue_opts, exchange_opts) do Helper.declare_exchange(channel, exchange_opts[:name], exchange_opts[:type], exchange_opts) Helper.bind_queue(channel, queue_opts[:name], exchange_opts[:name], routing_key: queue_opts[:routing_key]) @@ -125,9 +108,6 @@ defmodule Spotter.Testing.AmqpBlockingClient do # Internal stuff - @doc """ - Sends the message into the certain queue. - """ defp send_message(channel, routing_key, data, opts) do exchange_request = Keyword.get(opts, :exchange_request, "") queue_request = Keyword.get(opts, :queue_request, "") @@ -140,9 +120,6 @@ defmodule Spotter.Testing.AmqpBlockingClient do AMQP.Basic.publish(channel, exchange_request, queue_request, data, publish_options) end - @doc """ - Extracts the message from the certain queue and returns to the client. - """ defp consume_response(channel, queue_name, timeout, attempts) do {payload, meta} = receive_message(channel, queue_name, timeout, attempts) @@ -153,9 +130,6 @@ defmodule Spotter.Testing.AmqpBlockingClient do {payload, meta} end - @doc """ - Extracts the message from the AMQP queue with retries. - """ defp receive_message(channel, queue_name, timeout, attempts) do case AMQP.Basic.get(channel, queue_name) do {:ok, message, meta} -> From 69ebab0241e9cafe1901b0eb4919e9ab0d83e884 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 16:24:57 +0300 Subject: [PATCH 6/9] Fixed timeouts after updating develop environment --- test/worker_test.exs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/worker_test.exs b/test/worker_test.exs index 7fa29ac..5af4189 100644 --- a/test/worker_test.exs +++ b/test/worker_test.exs @@ -9,7 +9,7 @@ defmodule SpotterWorkerTest do @custom_amqp_opts [ username: "user", password: "password", - host: "localhost", + host: "rabbitmq", port: 5672, virtual_host: "/" ] @@ -178,7 +178,7 @@ defmodule SpotterWorkerTest do headers: [{"path", :longstr, "api.matchmaking.search"}, {"permissions", :longstr, "get;post"}] ) - :timer.sleep(100) + :timer.sleep(200) {:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, @generic_queue_forward) assert payload == "DATA" @@ -215,7 +215,7 @@ defmodule SpotterWorkerTest do headers: [{"path", :longstr, "api.matchmaking.search"}, {"permissions", :longstr, "get;post"}] ) - :timer.sleep(100) + :timer.sleep(200) {:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, queue[:queue]) assert payload == "VALIDATION_ERROR" @@ -234,7 +234,7 @@ defmodule SpotterWorkerTest do headers: [{"path", :longstr, "api.matchmaking.search"}, {"permissions", :longstr, ""}] ) - :timer.sleep(100) + :timer.sleep(200) {:ok, payload, %{delivery_tag: tag}} = AMQP.Basic.get(channel, queue[:queue]) assert payload == "NO_PERMISSIONS" From ce03f8095d27a9fb3cfddb4f95274a0c39ece9d6 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 18:27:51 +0300 Subject: [PATCH 7/9] Moved module to the another directory --- lib/{amqp/testing.ex => testing/client.ex} | 36 +++++++++++++++------- 1 file changed, 25 insertions(+), 11 deletions(-) rename lib/{amqp/testing.ex => testing/client.ex} (86%) diff --git a/lib/amqp/testing.ex b/lib/testing/client.ex similarity index 86% rename from lib/amqp/testing.ex rename to lib/testing/client.ex index 01fbef9..59c030c 100644 --- a/lib/amqp/testing.ex +++ b/lib/testing/client.ex @@ -8,8 +8,8 @@ defmodule Spotter.Testing.AmqpBlockingClient do @doc """ Initializes a new blocking GenServer instance. """ - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) + def start_link(opts, name \\ __MODULE__) do + GenServer.start_link(__MODULE__, opts, name: name) end @doc """ @@ -22,11 +22,11 @@ defmodule Spotter.Testing.AmqpBlockingClient do {:ok, %{ connection: connection, channel: channel, - channel_opts: %{ + channel_opts: [ queue: Keyword.get(opts, :queue, []), exchange: Keyword.get(opts, :exchange, []), qos: Keyword.get(opts, :qos, []) - } + ] }} end @@ -85,6 +85,13 @@ defmodule Spotter.Testing.AmqpBlockingClient do # Public API + @doc """ + Stop the client and close the existing connection. + """ + def stop(pid) do + GenServer.stop(pid) + end + @doc """ Sends a new message without waiting for a response. """ @@ -106,12 +113,18 @@ defmodule Spotter.Testing.AmqpBlockingClient do GenServer.call(pid, {:consume_response, queue, timeout, attempts}, call_timeout) end + @doc """ + Initializes QoS, a queue and an exchanges for the channel. + """ + def configure_channel(pid, channel_opts, call_timeout \\ 500) do + GenServer.call(pid, {:configure_channel, channel_opts}, call_timeout) + end + # Internal stuff defp send_message(channel, routing_key, data, opts) do exchange_request = Keyword.get(opts, :exchange_request, "") queue_request = Keyword.get(opts, :queue_request, "") - publish_options = Keyword.merge(opts, [ persistent: Keyword.get(opts, :persistent, true), reply_to: routing_key, @@ -151,8 +164,8 @@ defmodule Spotter.Testing.AmqpBlockingClient do def handle_call({:send_and_wait, data, opts, timeout, attempts}, _from, state) do channel = state[:channel] channel_opts = state[:channel_opts] - queue_name = Keyword.get(channel_opts[:queue], :name, :undefined) - routing_key = Keyword.get(channel_opts[:queue], :routing_key, :undefined) + queue_name = Keyword.get(channel_opts[:queue] || [], :name, :undefined) + routing_key = Keyword.get(channel_opts[:queue] || [], :routing_key, :undefined) configure(channel, channel_opts) send_message(channel, routing_key, data, opts) @@ -166,12 +179,13 @@ defmodule Spotter.Testing.AmqpBlockingClient do {:reply, consume_response(state[:channel], queue, timeout, attempts), state} end - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do - deinit(state[:connection], state[:channel]) - {:noreply, state} + def handle_call({:configure_channel, channel_opts}, _from, state) do + configure(state[:channel], channel_opts) + {:reply, :ok, state} end - def terminate(_reason, state) do + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do deinit(state[:connection], state[:channel]) + {:noreply, state} end end From fbff40c6bdb09c22c386168d61964152ee279812 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 18:28:26 +0300 Subject: [PATCH 8/9] Implemented tests for testing client --- test/blocking_amqp_client_test.exs | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 test/blocking_amqp_client_test.exs diff --git a/test/blocking_amqp_client_test.exs b/test/blocking_amqp_client_test.exs new file mode 100644 index 0000000..0c4b886 --- /dev/null +++ b/test/blocking_amqp_client_test.exs @@ -0,0 +1,76 @@ +defmodule SpotterTestingAmqpBlockingClientTest do + use ExUnit.Case, async: false + alias Spotter.Testing.AmqpBlockingClient + + @generic_exchange "test.direct" + @queue_name "blocking_client_test" + + @custom_amqp_opts [ + username: "user", + password: "password", + host: "rabbitmq", + port: 5672, + virtual_host: "/", + queue: [ + name: @queue_name, + routing_key: @queue_name, + durable: true, + passive: false, + auto_delete: true + ], + exchange: [ + name: @generic_exchange, + type: :direct, + durable: true, + passive: true + ], + qos: [ + prefetch_count: 10 + ] + ] + + setup do + {:ok, pid} = start_supervised({AmqpBlockingClient, @custom_amqp_opts}) + {:ok, [client: pid]} + end + + test "A AMQP blocking client sends the message and consumes the message from the queue", state do + client = state[:client] + message = "test" + AmqpBlockingClient.configure_channel(client, @custom_amqp_opts) + + send_result = AmqpBlockingClient.send( + client, message, + [queue_request: @queue_name, exchange_request: @generic_exchange] + ) + assert send_result == :ok + + {response, _meta} = AmqpBlockingClient.consume(client, @queue_name) + assert response == message + + stop_supervised(client) + end + + test "A AMQP blocking client consumes the message from the queue and returns empty results", state do + client = state[:client] + message = "test" + AmqpBlockingClient.configure_channel(client, @custom_amqp_opts) + + {:empty, nil} = AmqpBlockingClient.consume(client, @queue_name, 1, 100) + stop_supervised(client) + end + + test "A AMQP blocking client sends the message and waits for the response", state do + client = state[:client] + message = "test" + + channel_options = Keyword.merge( + @custom_amqp_opts, + [queue_request: @queue_name, exchange_request: @generic_exchange] + ) + {response, _meta} = AmqpBlockingClient.send_and_wait(client, message, channel_options) + assert response == message + + stop_supervised(client) + end +end From 3463ab5a08f264dd49f057d3c89ecb864c5f0134 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 3 Sep 2018 18:28:39 +0300 Subject: [PATCH 9/9] Increased the version of the package --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 5e2270a..7357eed 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Spotter.MixProject do use Mix.Project - @version "0.4.1" + @version "0.5.0" def project do [