diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs index c3165a2ad97..42fd5fbc380 100644 --- a/src/Libplanet.Net/Transports/NetMQChannel.cs +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -19,6 +19,7 @@ public class NetMQChannel private readonly ILogger _logger; private DateTimeOffset _lastUpdated; + private bool _opened; public NetMQChannel(BoundPeer peer) { @@ -32,35 +33,38 @@ public NetMQChannel(BoundPeer peer) public event EventHandler? Closed; -#pragma warning disable SA1005, SA1515, S125 - //public event EventHandler? Faulted; -#pragma warning restore SA1005, SA1515, S125 + public event EventHandler? Faulted; public event EventHandler? Opened; public void Abort() { + if (!_opened) + { + throw new InvalidOperationException("Cannot abort an unopened channel."); + } + + _opened = false; _cancellationTokenSource.Cancel(); } public void Close() { + if (!_opened) + { + throw new InvalidOperationException("Cannot close an unopened channel."); + } + + _opened = false; _cancellationTokenSource.Cancel(); Closed?.Invoke(this, EventArgs.Empty); } public void Open() { - TaskCreationOptions taskCreationOptions = - TaskCreationOptions.DenyChildAttach | - TaskCreationOptions.LongRunning | - TaskCreationOptions.HideScheduler; - Task.Factory.StartNew( - () => ProcessRuntime(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, - taskCreationOptions, - TaskScheduler.Default); + _opened = true; Opened?.Invoke(this, EventArgs.Empty); + DoOpen(); } public async IAsyncEnumerable SendMessageAsync( @@ -69,6 +73,12 @@ public async IAsyncEnumerable SendMessageAsync( int expectedResponses, [EnumeratorCancellation] CancellationToken cancellationToken) { + if (!_opened) + { + throw new InvalidOperationException( + "Cannot send message with an unopened channel."); + } + var channel = Channel.CreateUnbounded(); await _requests.Writer.WriteAsync( new MessageRequest( @@ -86,57 +96,84 @@ await _requests.Writer.WriteAsync( } } - private async Task ProcessRuntime(CancellationToken ct) + private void DoOpen() { + TaskCreationOptions taskCreationOptions = + TaskCreationOptions.DenyChildAttach | + TaskCreationOptions.LongRunning | + TaskCreationOptions.HideScheduler; + Task.Factory.StartNew( + ProcessRuntime, + _cancellationTokenSource.Token, + taskCreationOptions, + TaskScheduler.Default); + } + + private async Task ProcessRuntime() + { + var ct = _cancellationTokenSource.Token; using var dealer = new DealerSocket(); dealer.Options.DisableTimeWait = true; var address = await _peer.ResolveNetMQAddressAsync(); - _logger.Debug("[NetMQChannel] Connecting {Address}", address); - dealer.Connect(address); + try + { + dealer.Connect(address); + } + catch (Exception e) + { + Faulted?.Invoke(this, e); + Close(); + } + while (!ct.IsCancellationRequested) { MessageRequest req = await _requests.Reader.ReadAsync(ct); - _lastUpdated = DateTimeOffset.UtcNow; - CancellationTokenSource linked = - CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); - _logger.Debug( - "[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})", - req.Message, - req.ExpectedResponses); - if (!dealer.TrySendMultipartMessage(req.Message)) - { - _logger.Debug( - "[NetMQChannel] Failed to send {Message} to {Peer}", - req.Message, - _peer); - continue; - } - - _logger.Debug("[NetMQChannel] Message {Message} successfully sent.", req.Message); - - foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + try { - _logger.Debug( - "[NetMQChannel] Waiting for replies... (#{Index})", i); - var raw = new NetMQMessage(); - if (!dealer.TryReceiveMultipartMessage( - req.Timeout ?? TimeSpan.FromSeconds(1), - ref raw)) + _lastUpdated = DateTimeOffset.UtcNow; + CancellationTokenSource linked = + CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); + if (!dealer.TrySendMultipartMessage(req.Message)) { + _requests.Writer.Complete(); + dealer.Close(); + DoOpen(); break; } - _logger.Debug( - "[NetMQChannel] Successfully received replies #{Index}", i); - _lastUpdated = DateTimeOffset.UtcNow; + foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + { + var raw = new NetMQMessage(); + if (!dealer.TryReceiveMultipartMessage( + req.Timeout ?? TimeSpan.FromSeconds(1), + ref raw)) + { + break; + } - await req.Channel.Writer.WriteAsync(raw, linked.Token); - } + _lastUpdated = DateTimeOffset.UtcNow; + + await req.Channel.Writer.WriteAsync(raw, linked.Token); + } - req.Channel.Writer.Complete(); + req.Channel.Writer.Complete(); + } + catch (Exception) + { + req.Channel.Writer.Complete(); + dealer.Close(); + DoOpen(); + break; + } } } + private bool HandShake(DealerSocket dealerSocket) + { + var msg = default(Msg); + return dealerSocket.TrySend(ref msg, TimeSpan.Zero, false); + } + private readonly struct MessageRequest { public MessageRequest( diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 3f6159f3172..28a0e5ec2cf 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -328,6 +328,8 @@ CancellationToken cancellationToken AsPeer, DateTimeOffset.UtcNow ); + var stopwatch = new Stopwatch(); + stopwatch.Start(); NetMQChannel channel; if (_channels.TryGetValue(peer, out var c)) @@ -341,11 +343,12 @@ CancellationToken cancellationToken _channels[peer] = channel; } - await foreach (var raw in channel.SendMessageAsync( - rawMessage, - timeout, - expectedResponses, - linkedCt)) + await foreach ( + var raw in channel.SendMessageAsync( + rawMessage, + timeout, + expectedResponses, + linkedCt)) { Message reply = _messageCodec.Decode(raw, true); @@ -400,6 +403,18 @@ CancellationToken cancellationToken reqId, peer, replies.Select(reply => reply.Content.Type)); + _logger + .ForContext("Tag", "Metric") + .ForContext("Subtag", "OutboundMessageReport") + .Information( + "Request {RequestId} {Message} " + + "processed in {DurationMs} ms with {ReceivedCount} replies received " + + "out of {ExpectedCount} expected replies", + reqId, + content.Type, + stopwatch.ElapsedMilliseconds, + replies.Count, + expectedResponses); a?.SetStatus(ActivityStatusCode.Ok); return replies; } diff --git a/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs new file mode 100644 index 00000000000..0065a0e3759 --- /dev/null +++ b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs @@ -0,0 +1,121 @@ +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Libplanet.Crypto; +using Libplanet.Net.Messages; +using Libplanet.Net.Options; +using Libplanet.Net.Transports; +using Xunit; + +namespace Libplanet.Net.Tests.Transports +{ + // Test uses NetMQTransport as BoundPeer, which can be replaced with simple RouterSocket. + public class NetMQChannelTest + { + private const int Timeout = 60 * 1000; + + [Fact] + public void Abort() + { + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + Assert.Throws(() => channel.Abort()); + channel.Open(); + channel.Abort(); + Assert.True(true); + } + + [Fact] + public void Close() + { + var closed = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Closed += (_, _) => closed = true; + Assert.Throws(() => channel.Close()); + channel.Open(); + channel.Close(); + Assert.True(closed); + } + + [Fact] + public void Open() + { + var opened = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Opened += (_, _) => opened = true; + channel.Open(); + Assert.True(opened); + } + + [Fact] + public async Task Faulted() + { + var faulted = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Faulted += (_, _) => faulted = true; + channel.Open(); + await Task.Delay(100); + Assert.True(faulted); + } + + [Fact(Timeout = Timeout)] + public async Task SendMessageAsync() + { + var received = false; + var receiverKey = new PrivateKey(); + var transport = await NetMQTransport.Create( + receiverKey, + new AppProtocolVersionOptions(), + new HostOptions( + IPAddress.Loopback.ToString(), + new IceServer[] { })); + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + transport.ProcessMessageHandler.Register( + async msg => + { + received = msg.Content is PingMsg; + await transport.ReplyMessageAsync( + new PongMsg(), + msg.Identity!, + CancellationToken.None); + }); + var channel = new NetMQChannel(transport.AsPeer); + channel.Open(); + var key = new PrivateKey(); + await foreach ( + var reply in channel.SendMessageAsync( + new NetMQMessageCodec().Encode( + new PingMsg(), + key, + default, + new BoundPeer( + key.PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0)), + DateTimeOffset.UtcNow), + null, + 1, + CancellationToken.None)) + { + if (new NetMQMessageCodec().Decode(reply, true).Content is PongMsg) + { + received = true; + } + } + + Assert.True(received); + } + } +}