Skip to content

Commit

Permalink
add test and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Dec 24, 2024
1 parent eec8d82 commit 9f747b2
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 51 deletions.
129 changes: 83 additions & 46 deletions src/Libplanet.Net/Transports/NetMQChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class NetMQChannel
private readonly ILogger _logger;

private DateTimeOffset _lastUpdated;
private bool _opened;

public NetMQChannel(BoundPeer peer)
{
Expand All @@ -32,35 +33,38 @@ public NetMQChannel(BoundPeer peer)

public event EventHandler? Closed;

#pragma warning disable SA1005, SA1515, S125
//public event EventHandler? Faulted;
#pragma warning restore SA1005, SA1515, S125
public event EventHandler<Exception>? Faulted;

public event EventHandler? Opened;

public void Abort()
{
if (!_opened)
{
throw new InvalidOperationException("Cannot abort an unopened channel.");
}

_opened = false;
_cancellationTokenSource.Cancel();
}

public void Close()
{
if (!_opened)
{
throw new InvalidOperationException("Cannot close an unopened channel.");
}

_opened = false;
_cancellationTokenSource.Cancel();
Closed?.Invoke(this, EventArgs.Empty);
}

public void Open()
{
TaskCreationOptions taskCreationOptions =
TaskCreationOptions.DenyChildAttach |
TaskCreationOptions.LongRunning |
TaskCreationOptions.HideScheduler;
Task.Factory.StartNew(
() => ProcessRuntime(_cancellationTokenSource.Token),
_cancellationTokenSource.Token,
taskCreationOptions,
TaskScheduler.Default);
_opened = true;
Opened?.Invoke(this, EventArgs.Empty);
DoOpen();
}

public async IAsyncEnumerable<NetMQMessage> SendMessageAsync(
Expand All @@ -69,6 +73,12 @@ public async IAsyncEnumerable<NetMQMessage> SendMessageAsync(
int expectedResponses,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!_opened)
{
throw new InvalidOperationException(
"Cannot send message with an unopened channel.");
}

var channel = Channel.CreateUnbounded<NetMQMessage>();
await _requests.Writer.WriteAsync(
new MessageRequest(
Expand All @@ -86,57 +96,84 @@ await _requests.Writer.WriteAsync(
}
}

private async Task ProcessRuntime(CancellationToken ct)
private void DoOpen()
{
TaskCreationOptions taskCreationOptions =
TaskCreationOptions.DenyChildAttach |
TaskCreationOptions.LongRunning |
TaskCreationOptions.HideScheduler;
Task.Factory.StartNew(
ProcessRuntime,
_cancellationTokenSource.Token,
taskCreationOptions,
TaskScheduler.Default);
}

private async Task ProcessRuntime()
{
var ct = _cancellationTokenSource.Token;
using var dealer = new DealerSocket();
dealer.Options.DisableTimeWait = true;
var address = await _peer.ResolveNetMQAddressAsync();
_logger.Debug("[NetMQChannel] Connecting {Address}", address);
dealer.Connect(address);
try
{
dealer.Connect(address);
}
catch (Exception e)
{
Faulted?.Invoke(this, e);
Close();
}

while (!ct.IsCancellationRequested)
{
MessageRequest req = await _requests.Reader.ReadAsync(ct);
_lastUpdated = DateTimeOffset.UtcNow;
CancellationTokenSource linked =
CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken);
_logger.Debug(
"[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})",
req.Message,
req.ExpectedResponses);
if (!dealer.TrySendMultipartMessage(req.Message))
{
_logger.Debug(
"[NetMQChannel] Failed to send {Message} to {Peer}",
req.Message,
_peer);
continue;
}

_logger.Debug("[NetMQChannel] Message {Message} successfully sent.", req.Message);

foreach (var i in Enumerable.Range(0, req.ExpectedResponses))
try
{
_logger.Debug(
"[NetMQChannel] Waiting for replies... (#{Index})", i);
var raw = new NetMQMessage();
if (!dealer.TryReceiveMultipartMessage(
req.Timeout ?? TimeSpan.FromSeconds(1),
ref raw))
_lastUpdated = DateTimeOffset.UtcNow;
CancellationTokenSource linked =
CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken);
if (!dealer.TrySendMultipartMessage(req.Message))
{
_requests.Writer.Complete();
dealer.Close();
DoOpen();
break;
}

_logger.Debug(
"[NetMQChannel] Successfully received replies #{Index}", i);
_lastUpdated = DateTimeOffset.UtcNow;
foreach (var i in Enumerable.Range(0, req.ExpectedResponses))
{
var raw = new NetMQMessage();
if (!dealer.TryReceiveMultipartMessage(
req.Timeout ?? TimeSpan.FromSeconds(1),
ref raw))
{
break;
}

await req.Channel.Writer.WriteAsync(raw, linked.Token);
}
_lastUpdated = DateTimeOffset.UtcNow;

await req.Channel.Writer.WriteAsync(raw, linked.Token);
}

req.Channel.Writer.Complete();
req.Channel.Writer.Complete();
}
catch (Exception)
{
req.Channel.Writer.Complete();
dealer.Close();
DoOpen();
break;
}
}
}

private bool HandShake(DealerSocket dealerSocket)
{
var msg = default(Msg);
return dealerSocket.TrySend(ref msg, TimeSpan.Zero, false);
}

private readonly struct MessageRequest
{
public MessageRequest(
Expand Down
25 changes: 20 additions & 5 deletions src/Libplanet.Net/Transports/NetMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ CancellationToken cancellationToken
AsPeer,
DateTimeOffset.UtcNow
);
var stopwatch = new Stopwatch();
stopwatch.Start();

NetMQChannel channel;
if (_channels.TryGetValue(peer, out var c))
Expand All @@ -341,11 +343,12 @@ CancellationToken cancellationToken
_channels[peer] = channel;
}

await foreach (var raw in channel.SendMessageAsync(
rawMessage,
timeout,
expectedResponses,
linkedCt))
await foreach (
var raw in channel.SendMessageAsync(
rawMessage,
timeout,
expectedResponses,
linkedCt))
{
Message reply = _messageCodec.Decode(raw, true);

Expand Down Expand Up @@ -400,6 +403,18 @@ CancellationToken cancellationToken
reqId,
peer,
replies.Select(reply => reply.Content.Type));
_logger
.ForContext("Tag", "Metric")
.ForContext("Subtag", "OutboundMessageReport")
.Information(
"Request {RequestId} {Message} " +
"processed in {DurationMs} ms with {ReceivedCount} replies received " +
"out of {ExpectedCount} expected replies",
reqId,
content.Type,
stopwatch.ElapsedMilliseconds,
replies.Count,
expectedResponses);
a?.SetStatus(ActivityStatusCode.Ok);
return replies;
}
Expand Down
121 changes: 121 additions & 0 deletions test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Crypto;
using Libplanet.Net.Messages;
using Libplanet.Net.Options;
using Libplanet.Net.Transports;
using Xunit;

namespace Libplanet.Net.Tests.Transports
{
// Test uses NetMQTransport as BoundPeer, which can be replaced with simple RouterSocket.
public class NetMQChannelTest
{
private const int Timeout = 60 * 1000;

[Fact]
public void Abort()
{
var channel = new NetMQChannel(
new BoundPeer(
new PrivateKey().PublicKey,
new DnsEndPoint(IPAddress.Loopback.ToString(), 0)));
Assert.Throws<InvalidOperationException>(() => channel.Abort());
channel.Open();
channel.Abort();
Assert.True(true);
}

[Fact]
public void Close()
{
var closed = false;
var channel = new NetMQChannel(
new BoundPeer(
new PrivateKey().PublicKey,
new DnsEndPoint(IPAddress.Loopback.ToString(), 0)));
channel.Closed += (_, _) => closed = true;
Assert.Throws<InvalidOperationException>(() => channel.Close());
channel.Open();
channel.Close();
Assert.True(closed);
}

[Fact]
public void Open()
{
var opened = false;
var channel = new NetMQChannel(
new BoundPeer(
new PrivateKey().PublicKey,
new DnsEndPoint(IPAddress.Loopback.ToString(), 0)));
channel.Opened += (_, _) => opened = true;
channel.Open();
Assert.True(opened);
}

[Fact]
public async Task Faulted()
{
var faulted = false;
var channel = new NetMQChannel(
new BoundPeer(
new PrivateKey().PublicKey,
new DnsEndPoint(IPAddress.Loopback.ToString(), 0)));
channel.Faulted += (_, _) => faulted = true;
channel.Open();
await Task.Delay(100);
Assert.True(faulted);
}

[Fact(Timeout = Timeout)]
public async Task SendMessageAsync()
{
var received = false;
var receiverKey = new PrivateKey();
var transport = await NetMQTransport.Create(
receiverKey,
new AppProtocolVersionOptions(),
new HostOptions(
IPAddress.Loopback.ToString(),
new IceServer[] { }));
_ = transport.StartAsync();
await transport.WaitForRunningAsync();
transport.ProcessMessageHandler.Register(
async msg =>
{
received = msg.Content is PingMsg;
await transport.ReplyMessageAsync(
new PongMsg(),
msg.Identity!,
CancellationToken.None);
});
var channel = new NetMQChannel(transport.AsPeer);
channel.Open();
var key = new PrivateKey();
await foreach (
var reply in channel.SendMessageAsync(
new NetMQMessageCodec().Encode(
new PingMsg(),
key,
default,
new BoundPeer(
key.PublicKey,
new DnsEndPoint(IPAddress.Loopback.ToString(), 0)),
DateTimeOffset.UtcNow),
null,
1,
CancellationToken.None))
{
if (new NetMQMessageCodec().Decode(reply, true).Content is PongMsg)
{
received = true;
}
}

Assert.True(received);
}
}
}

0 comments on commit 9f747b2

Please sign in to comment.