Skip to content

Commit

Permalink
Merge pull request #10 from OpenMatchmaking/hotfix-send
Browse files Browse the repository at this point in the history
Refactoring send/send_and_wait/consume GenServer calls
  • Loading branch information
Relrin authored Aug 7, 2019
2 parents ce1d4d5 + d162062 commit bb26abf
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The package can be installed via adding the `spotter` dependency to your list of

```elixir
def deps do
[{:spotter, "~> 0.5.0"}]
[{:spotter, "~> 0.6.0"}]
end
```

Expand Down
97 changes: 84 additions & 13 deletions lib/testing/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,43 +94,112 @@ defmodule Spotter.Testing.AmqpBlockingClient do

@doc """
Sends a new message without waiting for a response.
The `data` parameter represents a payload, added to the message body.
The `ops` parameter represented as a keyword, that can contain keys:
* `:request_exchange` - Exchange key, through which will be published message.
* `:request_routing_key` - Routing key, used for pushing message to the certain queue.
* `:mandatory` - If set, returns an error if the broker can't route the message to a queue (default `false`);
* `:immediate` - If set, returns an error if the broker can't deliver te message to a consumer immediately (default `false`);
* `:content_type` - MIME Content type;
* `:content_encoding` - MIME Content encoding;
* `:headers` - Custom message headers;
* `:persistent` - Determines delivery mode. Messages marked as `persistent` and delivered to `durable` \
queues will be logged to disk;
* `:correlation_id` - Application correlation identifier;
* `:priority` - Message priority, ranging from 0 to 9;
* `:reply_to` - Name of the reply queue;
* `:expiration` - How long the message is valid (in milliseconds);
* `:message_id` - Message identifier;
* `:timestamp` - Timestamp associated with this message (epoch time);
* `:type` - Message type (as a string);
* `:user_id` - User ID. Validated by RabbitMQ against the active connection user;
* `:app_id` - Publishing application ID.
The `call_timeout` parameter determines maximum amount time in milliseconds before exit from the method by timeout.
"""
def send(pid, data, opts, call_timeout \\ 5000) do
GenServer.call(pid, {:send, data, opts}, call_timeout)
end

@doc """
Sends a new message and wait for result.
The `data` parameter represents a payload, added to the message body.
The `ops` parameter represented as a keyword, that can contain keys:
* `:request_exchange` - Exchange key, through which will be published message. Required.
* `:request_routing_key` - Routing key, used for pushing message to the certain queue. Required.
* `:response_queue` - The name of the queue which will be used for tracking responses. Required.
* `:channel_opts` - Keyword list which is used for creating response queue and linking it with the exchange. Required.
* `:mandatory` - If set, returns an error if the broker can't route the message to a queue (default `false`);
* `:immediate` - If set, returns an error if the broker can't deliver te message to a consumer immediately (default `false`);
* `:content_type` - MIME Content type;
* `:content_encoding` - MIME Content encoding;
* `:headers` - Custom message headers;
* `:persistent` - Determines delivery mode. Messages marked as `persistent` and delivered to `durable` \
queues will be logged to disk;
* `:correlation_id` - Application correlation identifier;
* `:priority` - Message priority, ranging from 0 to 9;
* `:reply_to` - Name of the reply queue;
* `:expiration` - How long the message is valid (in milliseconds);
* `:message_id` - Message identifier;
* `:timestamp` - Timestamp associated with this message (epoch time);
* `:type` - Message type (as a string);
* `:user_id` - User ID. Validated by RabbitMQ against the active connection user;
* `:app_id` - Publishing application ID.
The `timeout` parameter determines amout of time before doing next attempt to extract the message.
The `attemps` parameter determines the general amount of attempts to extract the message.
The `call_timeout` parameter determines maximum amount time in milliseconds before exit from the method by timeout.
"""
def send_and_wait(pid, data, opts, timeout \\ 1000, attempts \\ 5, call_timeout \\ 5000) do
GenServer.call(pid, {:send_and_wait, data, opts, timeout, attempts}, call_timeout)
try do
GenServer.call(pid, {:send_and_wait, data, opts, timeout, attempts}, call_timeout)
catch
:exit, _reason -> {:empty, nil}
end
end

@doc """
Returns the message from the certain queue if it exists.
The `queue` parameter represents the name of the queue which will be used for tracking responses.
The `timeout` parameter determines amout of time before doing next attempt to extract the message.
The `attemps` parameter determines the general amount of attempts to extract the message.
The `call_timeout` parameter determines maximum amount time in milliseconds before exit from the method by timeout.
"""
def consume(pid, queue, timeout \\ 1000, attempts \\ 5, call_timeout \\ 500) do
GenServer.call(pid, {:consume_response, queue, timeout, attempts}, call_timeout)
try do
GenServer.call(pid, {:consume_response, queue, timeout, attempts}, call_timeout)
catch
:exit, _reason -> {:empty, nil}
end
end

@doc """
Initializes QoS, a queue and an exchanges for the channel.
The `channel_opts` parameters stores generic information about the response queue and the linked exchange.
The `call_timeout` parameter determines maximum amount time in milliseconds before exit from the method by timeout.
"""
def configure_channel(pid, channel_opts, call_timeout \\ 500) do
GenServer.call(pid, {:configure_channel, channel_opts}, call_timeout)
end

# Internal stuff

defp send_message(channel, routing_key, data, opts) do
exchange_request = Keyword.get(opts, :exchange_request, "")
queue_request = Keyword.get(opts, :queue_request, "")
defp send_message(channel, data, opts) do
request_exchange = Keyword.get(opts, :request_exchange, "")
request_routing_key = Keyword.get(opts, :request_routing_key, "")
response_queue = Keyword.get(opts, :response_queue, "")
publish_options = Keyword.merge(opts, [
persistent: Keyword.get(opts, :persistent, true),
reply_to: routing_key,
reply_to: response_queue,
content_type: Keyword.get(opts, :content_type, "application/json")
])
AMQP.Basic.publish(channel, exchange_request, queue_request, data, publish_options)
AMQP.Basic.publish(channel, request_exchange, request_routing_key, data, publish_options)
end

defp consume_response(channel, queue_name, timeout, attempts) do
Expand Down Expand Up @@ -158,20 +227,22 @@ defmodule Spotter.Testing.AmqpBlockingClient do
# Private API

def handle_call({:send, data, opts}, _from, state) do
{:reply, send_message(state[:channel], :undefined, data, opts), state}
{:reply, send_message(state[:channel], data, opts), state}
end

def handle_call({:send_and_wait, data, opts, timeout, attempts}, _from, state) do
channel = state[:channel]
channel_opts = state[:channel_opts]
queue_name = Keyword.get(channel_opts[:queue] || [], :name, :undefined)
routing_key = Keyword.get(channel_opts[:queue] || [], :routing_key, :undefined)
response_queue = Keyword.get(opts, :response_queue, "")

configure(channel, channel_opts)
send_message(channel, routing_key, data, opts)
response = consume_response(state[:channel], queue_name, timeout, attempts)
send_message(channel, data, opts)
response = consume_response(state[:channel], response_queue, timeout, attempts)

if response_queue != "" do
AMQP.Queue.delete(channel, response_queue)
end

AMQP.Queue.delete(channel, queue_name)
{:reply, response, state}
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Spotter.MixProject do
use Mix.Project

@version "0.5.1"
@version "0.6.0"

def project do
[
Expand Down
111 changes: 100 additions & 11 deletions test/blocking_amqp_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule SpotterTestingAmqpBlockingClientTest do

@generic_exchange "test.direct"
@queue_name "blocking_client_test"
@secondary_queue_name "secondary_queue"

@custom_amqp_opts [
username: "user",
Expand All @@ -29,19 +30,47 @@ defmodule SpotterTestingAmqpBlockingClientTest do
]
]

@secondary_amqp_opts [
username: "user",
password: "password",
host: "rabbitmq",
port: 5672,
virtual_host: "/",
queue: [
name: @secondary_queue_name,
routing_key: @secondary_queue_name,
durable: true,
passive: false,
auto_delete: true
],
exchange: [
name: @generic_exchange,
type: :direct,
durable: true,
passive: true
],
qos: [
prefetch_count: 10
]
]

setup do
{:ok, pid} = start_supervised({AmqpBlockingClient, @custom_amqp_opts})
{:ok, [client: pid]}
{:ok, client} = start_supervised({AmqpBlockingClient, @custom_amqp_opts})
AmqpBlockingClient.configure_channel(client, @custom_amqp_opts)
AmqpBlockingClient.configure_channel(client, @secondary_amqp_opts)
{:ok, [client: client]}
end

test "A AMQP blocking client sends the message and consumes the message from the queue", state do
test "AMQP blocking client sends the message and consumes the message from the queue", state do
client = state[:client]
message = "test"
AmqpBlockingClient.configure_channel(client, @custom_amqp_opts)

send_result = AmqpBlockingClient.send(
client, message,
[queue_request: @queue_name, exchange_request: @generic_exchange]
[
request_exchange: @generic_exchange,
request_routing_key: @queue_name
]
)
assert send_result == :ok

Expand All @@ -51,25 +80,85 @@ defmodule SpotterTestingAmqpBlockingClientTest do
stop_supervised(client)
end

test "A AMQP blocking client consumes the message from the queue and returns empty results", state do
test "AMQP blocking client consumes the message from the queue and returns empty results", state do
client = state[:client]
message = "test"
AmqpBlockingClient.configure_channel(client, @custom_amqp_opts)

{:empty, nil} = AmqpBlockingClient.consume(client, @queue_name, 1, 100)
stop_supervised(client)
end

test "A AMQP blocking client sends the message and waits for the response", state do
test "AMQP blocking client sends the message and waits for the response", state do
client = state[:client]
message = "test"

opts = Keyword.merge(
@custom_amqp_opts,
[
request_exchange: @generic_exchange,
request_routing_key: @queue_name,
response_queue: @queue_name
]
)
{response, _meta} = AmqpBlockingClient.send_and_wait(client, message, opts)
assert response == message

stop_supervised(client)
end

test "AMQP blocking client receive {:empty, nil} by timeout for consuming messages", state do
client = state[:client]

{response, meta} = AmqpBlockingClient.consume(client, @queue_name, 100, 1, 1)
assert response == :empty
assert meta == nil

stop_supervised(client)
end

test "AMQP blocking client receive {:empty, nil} by timeout for sent_and_wait call", state do
client = state[:client]
message = "test"

opts = Keyword.merge(
@custom_amqp_opts,
[
request_exchange: @generic_exchange,
request_routing_key: @queue_name,
response_queue: @secondary_queue_name
]
)
{response, meta} = AmqpBlockingClient.send_and_wait(client, message, opts, 100)
assert response == :empty
assert meta == nil

{response_2, _meta_2} = AmqpBlockingClient.consume(client, @queue_name, 100)
assert response_2 == message
stop_supervised(client)
end

test "AMQP blocking client receive message with custom headers", state do
client = state[:client]
message = "test"

channel_options = Keyword.merge(
opts = Keyword.merge(
@custom_amqp_opts,
[queue_request: @queue_name, exchange_request: @generic_exchange]
[
request_exchange: @generic_exchange,
request_routing_key: @queue_name,
response_queue: @queue_name,
headers: [
om_permissions: "matchmaking.test.test; matchmaking.test.api",
om_request_url: "/api/v1/matchmaking/search",
]
]
)
{response, _meta} = AmqpBlockingClient.send_and_wait(client, message, channel_options)
{response, meta} = AmqpBlockingClient.send_and_wait(client, message, opts, 100)
assert response == message
assert meta[:headers] == [
{"om_permissions", :longstr, "matchmaking.test.test; matchmaking.test.api"},
{"om_request_url", :longstr, "/api/v1/matchmaking/search"}
]

stop_supervised(client)
end
Expand Down

0 comments on commit bb26abf

Please sign in to comment.