diff --git a/src/Lib/Gateway.php b/src/Lib/Gateway.php index 45ecd55..a027bc4 100644 --- a/src/Lib/Gateway.php +++ b/src/Lib/Gateway.php @@ -52,14 +52,20 @@ class Gateway * 与Gateway是否是长链接 * @var bool */ - public static $persistentConnection = false; + public static $persistentConnection = true; /** * 是否清除注册地址缓存 * @var bool */ public static $addressesCacheDisable = false; - + + /** + * 与gateway建立的连接 + * @var array + */ + protected static $gatewayConnections = []; + /** * 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息 * @@ -758,8 +764,8 @@ protected static function getBufferFromGateway($gateway_buffer_array) $client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array(); // 批量向所有gateway进程发送请求数据 foreach ($gateway_buffer_array as $address => $gateway_buffer) { - $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout); - if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) { + $client = static::getGatewayConnection("tcp://$address"); + if (strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) { $socket_id = (int)$client; $client_array[$socket_id] = $client; $client_address_map[$socket_id] = explode(':', $address); @@ -792,6 +798,7 @@ protected static function getBufferFromGateway($gateway_buffer_array) } } if (microtime(true) - $time_start > $timeout) { + static::$gatewayConnections = []; break; } } @@ -1154,10 +1161,8 @@ protected static function sendAndRecv($address, $data) { $buffer = GatewayProtocol::encode($data); $buffer = static::$secretKey ? static::generateAuthBuffer() . $buffer : $buffer; - $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout); - if (!$client) { - throw new Exception("can not connect to tcp://$address $errmsg"); - } + $address = "tcp://$address"; + $client = static::getGatewayConnection($address); if (strlen($buffer) === stream_socket_sendto($client, $buffer)) { $timeout = 5; // 阻塞读 @@ -1173,8 +1178,10 @@ protected static function sendAndRecv($address, $data) $all_buffer .= $buf; } else { if (feof($client)) { - throw new Exception("connection close tcp://$address"); + unset(static::$gatewayConnections[$address]); + throw new Exception("connection close $address"); } elseif (microtime(true) - $time_start > $timeout) { + unset(static::$gatewayConnections[$address]); break; } continue; @@ -1183,8 +1190,12 @@ protected static function sendAndRecv($address, $data) if (!$pack_len && $recv_len >= 4) { $pack_len= current(unpack('N', $all_buffer)); } + if (microtime(true) - $time_start > $timeout) { + unset(static::$gatewayConnections[$address]); + break; + } // 回复的数据都是以\n结尾 - if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) { + if (($pack_len && $recv_len >= $pack_len + 4)) { break; } } @@ -1224,8 +1235,7 @@ protected static function sendBufferToGateway($address, $gateway_buffer) } // 非workerman环境 $gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer; - $flag = static::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT; - $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout, $flag); + $client = static::getGatewayConnection("tcp://$address"); return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer); } @@ -1373,6 +1383,44 @@ protected static function isValidGroupId($group) } return true; } + + /** + * 获取与gateway的连接,用于数据返回 + * + * @param $address + * @return mixed + * @throws Exception + */ + protected static function getGatewayConnection($address) + { + $ttl = 50; + $time = time(); + if (isset(static::$gatewayConnections[$address])) { + $created_time = static::$gatewayConnections[$address]['created_time']; + $connection = static::$gatewayConnections[$address]['connection']; + if ($time - $created_time > $ttl || !is_resource($connection) || feof($connection)) { + \set_error_handler(function () {}); + fclose($connection); + \restore_error_handler(); + unset(static::$gatewayConnections[$address]); + } + } + if (!isset(static::$gatewayConnections[$address])) { + $client = stream_socket_client($address, $errno, $errmsg, static::$connectTimeout); + if (!$client) { + throw new Exception("can not connect to $address $errmsg"); + } + static::$gatewayConnections[$address] = [ + 'created_time' => $time, + 'connection' => $client + ]; + } + $client = static::$gatewayConnections[$address]['connection']; + if (!static::$persistentConnection) { + static::$gatewayConnections = []; + } + return $client; + } } if (!class_exists('\Protocols\GatewayProtocol')) {