Skip to content

Commit

Permalink
Merge pull request #14 from danizord/feature/handler-pipeline
Browse files Browse the repository at this point in the history
Support a pipeline of middlewares to handle a message
  • Loading branch information
bakura10 committed Jan 26, 2016
2 parents 1d8d57c + 04bd91e commit a8c9abf
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 57 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
},
"require-dev": {
"phpunit/phpunit": "~5.1",
"squizlabs/php_codesniffer": "^2.3"
"squizlabs/php_codesniffer": "^2.3",
"zendframework/zend-diactoros": "^1.3"
},
"autoload": {
"psr-4": {
Expand Down
25 changes: 25 additions & 0 deletions src/Exception/InvalidArgumentException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php
/*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* This software consists of voluntary contributions made by many individuals
* and is licensed under the MIT license.
*/

namespace ZfrEbWorker\Exception;

use InvalidArgumentException as BaseInvalidArgumentException;

class InvalidArgumentException extends BaseInvalidArgumentException implements ExceptionInterface
{
}
74 changes: 55 additions & 19 deletions src/Middleware/WorkerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
use Interop\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use ZfrEbWorker\Exception\InvalidArgumentException;
use ZfrEbWorker\Exception\RuntimeException;

/**
* Worker middleware
*
* What this thing does is extracting the message from the request, and dispatching to the proper middleware. Because
* Zend Expressive does not have a simple way of redirecting, the simplest way is simply to fetch the corresponding
* middleware, and do the routing here.
*
* You can find a complete reference of what Elastic Beanstalk set here:
* What this thing does is extracting the message from the request, and dispatching a pipeline of the mapped
* middlewares. You can find a complete reference of what Elastic Beanstalk set here:
* http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html
*
* @author Michaël Gallego
Expand All @@ -48,10 +45,12 @@ class WorkerMiddleware
private $container;

/**
* Map messages names to a middleware. For instance:
*
* Map message names to a list middleware names. For instance:
* [
* 'image.saved' => ProcessImageMiddleware::class
* 'image.saved' => [
* WorkerAuthenticationMiddleware::class,
* ProcessImageMiddleware::class,
* ],
* ]
*
* @var array
Expand All @@ -72,16 +71,21 @@ public function __construct(array $messagesMapping, ContainerInterface $containe
* @param ServerRequestInterface $request
* @param ResponseInterface $response
* @param callable|null $out
*
* @return ResponseInterface
*/
public function __invoke(ServerRequestInterface $request, ResponseInterface $response, callable $out = null)
{
public function __invoke(
ServerRequestInterface $request,
ResponseInterface $response,
callable $out = null
): ResponseInterface {
// The full message is set as part of the body
$body = json_decode($request->getBody(), true);
$name = $body['name'];
$payload = $body['payload'];

// Let's retrieve the correct middleware by using the mapping
$middleware = $this->getMiddlewareForMessage($name);
// Let's create a middleware pipeline of mapped middlewares
$pipeline = $this->createMiddlewarePipeline($name, $out);

// Elastic Beanstalk set several headers. We will extract some of them and add them as part of the request
// attributes so they can be easier to process, and set the message attributes
Expand All @@ -90,23 +94,55 @@ public function __invoke(ServerRequestInterface $request, ResponseInterface $res
->withAttribute(self::MESSAGE_PAYLOAD_ATTRIBUTE, $payload)
->withAttribute(self::MESSAGE_NAME_ATTRIBUTE, $name);

return $middleware($request, $response, $out);
return $pipeline($request, $response);
}

/**
* @param string $messageName
* @param string $messageName
* @param callable|null $out
*
* @return callable
*/
private function getMiddlewareForMessage(string $messageName): callable
private function createMiddlewarePipeline(string $messageName, callable $out = null): callable
{
if (!isset($this->messagesMapping[$messageName])) {
throw new RuntimeException(sprintf(
'No middleware could be found for message "%s". Did you have properly fill
the "zfr_eb_worker" configuration?',
'No middleware was mapped for message "%s". Did you fill the "zfr_eb_worker" configuration?',
$messageName
));
}

return $this->container->get($this->messagesMapping[$messageName]);
$mappedMiddlewares = $this->messagesMapping[$messageName];

if (is_string($mappedMiddlewares)) {
$mappedMiddlewares = (array) $mappedMiddlewares;
}

if (!is_array($mappedMiddlewares)) {
throw new InvalidArgumentException(sprintf(
'Mapped middleware must be either a string or an array of strings, %s given.',
is_object($mappedMiddlewares) ? get_class($mappedMiddlewares) : gettype($mappedMiddlewares)
));
}

$pipeline = function (
ServerRequestInterface $request,
ResponseInterface $response
) use (
&$pipeline,
&$mappedMiddlewares,
$out
) {
if (empty($mappedMiddlewares)) {
return is_callable($out) ? $out($request, $response) : $response;
}

/** @var callable $middleware */
$middleware = $this->container->get(array_shift($mappedMiddlewares));

return $middleware($request, $response, $pipeline);
};

return $pipeline;
}
}
150 changes: 113 additions & 37 deletions test/Middleware/WorkerMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,140 @@
use Interop\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Zend\Diactoros\Response;
use Zend\Diactoros\ServerRequest;
use Zend\Diactoros\Stream;
use ZfrEbWorker\Exception\InvalidArgumentException;
use ZfrEbWorker\Exception\RuntimeException;
use ZfrEbWorker\Middleware\WorkerMiddleware;

class WorkerMiddlewareTest extends \PHPUnit_Framework_TestCase
{
/**
* @var \Prophecy\Prophecy\ObjectProphecy
*/
private $request;
public function testThrowsExceptionIfNoMappedMiddleware()
{
$middleware = new WorkerMiddleware([], $this->prophesize(ContainerInterface::class)->reveal());

/**
* @var \Prophecy\Prophecy\ObjectProphecy
*/
private $response;
$this->setExpectedException(
RuntimeException::class,
'No middleware was mapped for message "message-name". Did you fill the "zfr_eb_worker" configuration?'
);

/**
* @var \Prophecy\Prophecy\ObjectProphecy
*/
private $container;
$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
});
}

public function setUp()
public function testThrowsExceptionIfInvalidMappedMiddlewareType()
{
$this->container = $this->prophesize(ContainerInterface::class);
$this->request = $this->prophesize(ServerRequestInterface::class);
$this->response = $this->prophesize(ResponseInterface::class);
$middleware = new WorkerMiddleware(['message-name' => 10], $this->prophesize(ContainerInterface::class)->reveal());

$this->setExpectedException(
InvalidArgumentException::class,
'Mapped middleware must be either a string or an array of strings, integer given.'
);

$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
});
}

public function testThrowExceptionIfNoTaskMapping()
public function testThrowsExceptionIfInvalidMappedMiddlewareClass()
{
$this->setExpectedException(RuntimeException::class);
$middleware = new WorkerMiddleware(['message-name' => new \stdClass()], $this->prophesize(ContainerInterface::class)->reveal());

$body = json_encode(['name' => 'message-name', 'payload' => []]);
$this->request->getBody()->shouldBeCalled()->willReturn($body);
$this->setExpectedException(
InvalidArgumentException::class,
'Mapped middleware must be either a string or an array of strings, stdClass given.'
);

$middleware = new WorkerMiddleware([], $this->container->reveal());
$middleware->__invoke($this->request->reveal(), $this->response->reveal(), function() {});
$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
});
}

public function testCanDispatchToMiddleware()
/**
* @dataProvider mappedMiddlewaresProvider
*
* @param array|string $mappedMiddlewares
* @param int $expectedCounter
*/
public function testDispatchesMappedMiddlewares($mappedMiddlewares, int $expectedCounter)
{
$body = json_encode(['name' => 'message-name', 'payload' => ['id' => 123]]);
$this->request->getBody()->shouldBeCalled()->willReturn($body);
$container = $this->prophesize(ContainerInterface::class);
$middleware = new WorkerMiddleware(['message-name' => $mappedMiddlewares], $container->reveal());
$request = $this->createRequest();
$response = new Response();

if (is_string($mappedMiddlewares)) {
$mappedMiddlewares = (array) $mappedMiddlewares;
}

foreach ($mappedMiddlewares as $mappedMiddleware) {
$container->get($mappedMiddleware)->shouldBeCalled()->willReturn([$this, 'incrementMiddleware']);
}

$outWasCalled = false;
$responseFromOut = new Response();

$out = function ($request, ResponseInterface $response) use (&$outWasCalled, $expectedCounter, $responseFromOut) {
$outWasCalled = true;

$middleware = new WorkerMiddleware(['message-name' => 'MyMiddleware'], $this->container->reveal());
$this->assertEquals('default-queue', $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE));
$this->assertEquals('123abc', $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE));
$this->assertEquals('message-name', $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE));
$this->assertEquals(['id' => 123], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
$this->assertEquals($expectedCounter, $request->getAttribute('counter', 0));
$this->assertEquals($expectedCounter, $response->hasHeader('counter') ? $response->getHeaderLine('counter') : 0);

$messageMiddleware = function($request, $response) {
$this->assertSame($request, $this->request->reveal());
return $responseFromOut;
};

$this->container->get('MyMiddleware')->shouldBeCalled()->willReturn($messageMiddleware);
$returnedResponse = $middleware($request, $response, $out);

$this->assertTrue($outWasCalled, 'Make sure that $out middleware was called');
$this->assertSame($responseFromOut, $returnedResponse, 'Make sure that it returns response from $out');
}

public function testReturnsResponseIfNoOutMiddlewareIsProvided()
{
$middleware = new WorkerMiddleware(['message-name' => []], $this->prophesize(ContainerInterface::class)->reveal());
$request = $this->createRequest();
$response = new Response();

$returnedResponse = $middleware($request, $response);

$this->assertSame($response, $returnedResponse);
}

public function incrementMiddleware(ServerRequestInterface $request, ResponseInterface $response, callable $next): ResponseInterface
{
$counter = $request->getAttribute('counter', 0) + 1;
$request = $request->withAttribute('counter', $counter);
$response = $response->withHeader('counter', (string) $counter);

return $next($request, $response);
}

public function mappedMiddlewaresProvider(): array
{
return [
[[], 0],
['FooMiddleware', 1],
[['FooMiddleware'], 1],
[['FooMiddleware', 'BarMiddleware'], 2],
[['FooMiddleware', 'BarMiddleware', 'BazMiddleware'], 3],
];
}

private function createRequest(): ServerRequestInterface
{
$request = new ServerRequest();
$request = $request->withHeader('X-Aws-Sqsd-Queue', 'default-queue');
$request = $request->withHeader('X-Aws-Sqsd-Msgid', '123abc');
$request = $request->withBody(new Stream('php://temp', 'w'));

$this->request->getHeaderLine('X-Aws-Sqsd-Queue')->shouldBeCalled()->willReturn('default-queue');
$this->request->withAttribute('worker.matched_queue', 'default-queue')->shouldBeCalled()->willReturn($this->request->reveal());
$this->request->getHeaderLine('X-Aws-Sqsd-Msgid')->shouldBeCalled()->willReturn('123abc');
$this->request->withAttribute('worker.message_id', '123abc')->shouldBeCalled()->willReturn($this->request->reveal());
$this->request->withAttribute('worker.message_payload', ['id' => 123])->shouldBeCalled()->willReturn($this->request->reveal());
$this->request->withAttribute('worker.message_name', 'message-name')->shouldBeCalled()->willReturn($this->request->reveal());
$request->getBody()->write(json_encode(['name' => 'message-name', 'payload' => ['id' => 123]]));

$middleware->__invoke($this->request->reveal(), $this->response->reveal());
return $request;
}
}
}

0 comments on commit a8c9abf

Please sign in to comment.