Skip to content

Commit

Permalink
Use persistent connection #100
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor authored Dec 31, 2022
1 parent aefdc21 commit 2b4e5ee
Showing 1 changed file with 60 additions and 12 deletions.
72 changes: 60 additions & 12 deletions src/Lib/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,20 @@ class Gateway
* 与Gateway是否是长链接
* @var bool
*/
public static $persistentConnection = false;
public static $persistentConnection = true;

/**
* 是否清除注册地址缓存
* @var bool
*/
public static $addressesCacheDisable = false;


/**
* 与gateway建立的连接
* @var array
*/
protected static $gatewayConnections = [];

/**
* 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息
*
Expand Down Expand Up @@ -758,8 +764,8 @@ protected static function getBufferFromGateway($gateway_buffer_array)
$client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array();
// 批量向所有gateway进程发送请求数据
foreach ($gateway_buffer_array as $address => $gateway_buffer) {
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
$client = static::getGatewayConnection("tcp://$address");
if (strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
$socket_id = (int)$client;
$client_array[$socket_id] = $client;
$client_address_map[$socket_id] = explode(':', $address);
Expand Down Expand Up @@ -792,6 +798,7 @@ protected static function getBufferFromGateway($gateway_buffer_array)
}
}
if (microtime(true) - $time_start > $timeout) {
static::$gatewayConnections = [];
break;
}
}
Expand Down Expand Up @@ -1154,10 +1161,8 @@ protected static function sendAndRecv($address, $data)
{
$buffer = GatewayProtocol::encode($data);
$buffer = static::$secretKey ? static::generateAuthBuffer() . $buffer : $buffer;
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
if (!$client) {
throw new Exception("can not connect to tcp://$address $errmsg");
}
$address = "tcp://$address";
$client = static::getGatewayConnection($address);
if (strlen($buffer) === stream_socket_sendto($client, $buffer)) {
$timeout = 5;
// 阻塞读
Expand All @@ -1173,8 +1178,10 @@ protected static function sendAndRecv($address, $data)
$all_buffer .= $buf;
} else {
if (feof($client)) {
throw new Exception("connection close tcp://$address");
unset(static::$gatewayConnections[$address]);
throw new Exception("connection close $address");
} elseif (microtime(true) - $time_start > $timeout) {
unset(static::$gatewayConnections[$address]);
break;
}
continue;
Expand All @@ -1183,8 +1190,12 @@ protected static function sendAndRecv($address, $data)
if (!$pack_len && $recv_len >= 4) {
$pack_len= current(unpack('N', $all_buffer));
}
if (microtime(true) - $time_start > $timeout) {
unset(static::$gatewayConnections[$address]);
break;
}
// 回复的数据都是以\n结尾
if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) {
if (($pack_len && $recv_len >= $pack_len + 4)) {
break;
}
}
Expand Down Expand Up @@ -1224,8 +1235,7 @@ protected static function sendBufferToGateway($address, $gateway_buffer)
}
// 非workerman环境
$gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
$flag = static::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT;
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout, $flag);
$client = static::getGatewayConnection("tcp://$address");
return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
}

Expand Down Expand Up @@ -1373,6 +1383,44 @@ protected static function isValidGroupId($group)
}
return true;
}

/**
* 获取与gateway的连接,用于数据返回
*
* @param $address
* @return mixed
* @throws Exception
*/
protected static function getGatewayConnection($address)
{
$ttl = 50;
$time = time();
if (isset(static::$gatewayConnections[$address])) {
$created_time = static::$gatewayConnections[$address]['created_time'];
$connection = static::$gatewayConnections[$address]['connection'];
if ($time - $created_time > $ttl || !is_resource($connection) || feof($connection)) {
\set_error_handler(function () {});
fclose($connection);
\restore_error_handler();
unset(static::$gatewayConnections[$address]);
}
}
if (!isset(static::$gatewayConnections[$address])) {
$client = stream_socket_client($address, $errno, $errmsg, static::$connectTimeout);
if (!$client) {
throw new Exception("can not connect to $address $errmsg");
}
static::$gatewayConnections[$address] = [
'created_time' => $time,
'connection' => $client
];
}
$client = static::$gatewayConnections[$address]['connection'];
if (!static::$persistentConnection) {
static::$gatewayConnections = [];
}
return $client;
}
}

if (!class_exists('\Protocols\GatewayProtocol')) {
Expand Down

0 comments on commit 2b4e5ee

Please sign in to comment.