Skip to content

Commit

Permalink
PHP Tutorial sources conform to PSR-2 standard
Browse files Browse the repository at this point in the history
The code in the PHP tutorials was sometimes a bit messy.
With these changes the code conforms to standards outlined here:
https://www.php-fig.org/psr/psr-2/
  • Loading branch information
Brian Short committed May 28, 2018
1 parent ea4dd6a commit 0ea9699
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 120 deletions.
7 changes: 3 additions & 4 deletions php/emit_log.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "info: Hello World!";
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');
Expand All @@ -20,5 +21,3 @@

$channel->close();
$connection->close();

?>
6 changes: 3 additions & 3 deletions php/emit_log_direct.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
if (empty($data)) {
$data = "Hello World!";
}

$msg = new AMQPMessage($data);

Expand All @@ -22,5 +24,3 @@

$channel->close();
$connection->close();

?>
7 changes: 3 additions & 4 deletions php/emit_log_topic.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";
if (empty($data)) {
$data = "Hello World!";
}

$msg = new AMQPMessage($data);

Expand All @@ -22,5 +23,3 @@

$channel->close();
$connection->close();

?>
14 changes: 7 additions & 7 deletions php/new_task.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>
9 changes: 3 additions & 6 deletions php/receive.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,19 @@
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
9 changes: 3 additions & 6 deletions php/receive_logs.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
Expand All @@ -15,17 +14,15 @@

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
16 changes: 7 additions & 9 deletions php/receive_logs_direct.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}

foreach($severities as $severity) {
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
echo ' [x] '.$msg->delivery_info['routing_key'].':'.$msg->body."\n";
$callback = function ($msg) {
echo ' [x] '.$msg->delivery_info['routing_key'].':'.$msg->body."\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
18 changes: 8 additions & 10 deletions php/receive_logs_topic.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}

foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
$callback = function ($msg) {
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
99 changes: 61 additions & 38 deletions php/rpc_client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,71 @@
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
class FibonacciRpcClient
{
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"",
false,
false,
true,
false
);
$this->channel->basic_consume(
$this->callback_queue,
'',
false,
false,
false,
false,
array(
$this,
'onResponse'
)
);
}

public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}

$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
};
public function call($n)
{
$this->response = null;
$this->corr_id = uniqid();

$msg = new AMQPMessage(
(string) $n,
array(
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

?>
49 changes: 27 additions & 22 deletions php/rpc_server.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,43 @@

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";

$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);

$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
$callback = function ($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";

$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);

$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')
);
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']
);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
Loading

0 comments on commit 0ea9699

Please sign in to comment.