This repository has been archived by the owner on Sep 12, 2020. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channels-two-way.php
105 lines (88 loc) · 2.76 KB
/
channels-two-way.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<?php
require 'vendor/autoload.php';
use \parallel\Runtime;
use \parallel\Channel;
use \parallel\Future;
use \parallel\Events;
use \parallel\Events\Event;
use \parallel\Events\Input;
use \parallel\Events\Timeout;
use React\EventLoop\Factory;
$parallels = [];
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$parallels[] = new \parallel\Runtime();
$channel = Channel::make("channel", Channel::Infinite);
$channelTwo = Channel::make("channelTwo", Channel::Infinite);
$events = new Events();
$events->setTimeout(0);
$events->addChannel($channel);
$f = function($channel, $channelTwo) {
$ri = random_int(1, 1000);
$channel = Channel::open($channel);
$channelTwo = Channel::open($channelTwo);
while ($int = $channelTwo->recv()) {
sleep(1);
echo $ri . ':' . $int . PHP_EOL;
$channel->send(($int * 13));
}
echo 'true', PHP_EOL;
return true;
};
$futures = [];
foreach ($parallels as $parallel) {
$futures[] = $parallel->run($f, [(string)$channel, (string)$channelTwo]);
}
$loop = Factory::create();
$loop->futureTick(function () use ($channelTwo) {
foreach (range(1, 500) as $i) {
$channelTwo->send($i);
}
});
// Call 1K times per second
$loop->addPeriodicTimer(0.001, function () use ($events, $channel, $loop, &$timer) {
echo 'tick';
try {
while ($event = $events->poll()) {
if (!($event instanceof Event)) {
return;
}
switch ($event->type) {
case Event::Read:
echo microtime(true) . ':read:' . var_export($event->value, true) . "\n";
break;
}
/* $event->object removed
from Events when Event emitted */
$events->addChannel($event->object);
}
} catch (Timeout $timeout) {
return;
} catch (Throwable $throwable) {
echo (string)$throwable;
}
});
// Terminate on SIGINT
$loop->addSignal(SIGINT, function () use ($parallels, $loop, $channel, $channelTwo) {
foreach ($parallels as $parallel) {
$parallel->close();
}
$loop->stop();
// There might be some cleanup issues here with the channels
$channel->close();
$channelTwo->close();
});
$loop->run();