Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temp: introduce NetMQChannel #4013

Draft
wants to merge 5 commits into
base: public-validator-test-2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions src/Libplanet.Net/Protocols/KademliaProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ await PingAsync(peer, dialTimeout, cancellationToken)
dialTimeout,
cancellationToken));
}
catch (PingTimeoutException)
catch (PingFailedException)
{
_logger.Warning("A timeout exception occurred connecting to seed peer");
RemovePeer(peer);
Expand Down Expand Up @@ -140,19 +140,25 @@ public async Task AddPeersAsync(
var tasks = new List<Task>();
foreach (BoundPeer peer in peers)
{
tasks.Add(PingAsync(
peer,
timeout: timeout,
cancellationToken: cancellationToken));
tasks.Add(
PingAsync(
peer,
timeout: timeout,
cancellationToken: cancellationToken));
}

_logger.Verbose("Trying to ping {PeerCount} peers", tasks.Count);
await Task.WhenAll(tasks).ConfigureAwait(false);
_logger.Verbose("Update complete");
}
catch (PingTimeoutException e)
catch (PingFailedException pfe)
{
_logger.Debug(e, "Ping timed out");
if (pfe.InnerException is { } e)
{
throw e;
}

throw;
}
catch (TaskCanceledException e)
{
Expand Down Expand Up @@ -284,7 +290,7 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken
await PingAsync(replacement, _requestTimeout, cancellationToken)
.ConfigureAwait(false);
}
catch (PingTimeoutException)
catch (PingFailedException)
{
_logger.Verbose(
"Removed stale peer {Peer} from replacement cache",
Expand Down Expand Up @@ -327,7 +333,7 @@ await PingAsync(replacement, _requestTimeout, cancellationToken)
await PingAsync(boundPeer, _requestTimeout, cancellationToken)
.ConfigureAwait(false);
}
catch (PingTimeoutException)
catch (PingFailedException)
{
var msg =
"{BoundPeer}, a target peer, is in the routing table does not respond";
Expand Down Expand Up @@ -394,7 +400,7 @@ await PingAsync(found, _requestTimeout, cancellationToken)
throw new TaskCanceledException(
$"Task is cancelled during {nameof(FindSpecificPeerAsync)}()");
}
catch (PingTimeoutException)
catch (PingFailedException)
{
// Ignore peer not responding
}
Expand Down Expand Up @@ -439,11 +445,9 @@ internal async Task PingAsync(

AddPeer(peer);
}
catch (CommunicationFailException)
catch (Exception e)
{
throw new PingTimeoutException(
$"Failed to send Ping to {peer}.",
peer);
throw new PingFailedException(peer, e);
}
}

Expand Down Expand Up @@ -497,7 +501,7 @@ private async Task ValidateAsync(
await PingAsync(peer, timeout, cancellationToken).ConfigureAwait(false);
_table.Check(peer, check, DateTimeOffset.UtcNow);
}
catch (PingTimeoutException)
catch (PingFailedException)
{
_logger.Verbose("Removing invalid peer {Peer}...", peer);
RemovePeer(peer);
Expand Down Expand Up @@ -711,7 +715,7 @@ private async Task ProcessFoundAsync(
AggregateException aggregateException = aggregateTask.Exception!;
foreach (Exception e in aggregateException.InnerExceptions)
{
if (e is PingTimeoutException pte)
if (e is PingFailedException pte)
{
peers.Remove(pte.Target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@
namespace Libplanet.Net.Protocols
{
[Serializable]
public class PingTimeoutException : TimeoutException
public class PingFailedException : Exception
{
public PingTimeoutException(BoundPeer target)
: base()
public PingFailedException(BoundPeer target, Exception innerException)
: base($"Failed to send ping to target peer {target}", innerException)
{
Target = target;
}

public PingTimeoutException(string message, BoundPeer target)
: base(message)
{
Target = target;
}

public PingTimeoutException(string message, BoundPeer target, Exception innerException)
public PingFailedException(string message, BoundPeer target, Exception innerException)
: base(message, innerException)
{
Target = target;
}

protected PingTimeoutException(SerializationInfo info, StreamingContext context)
protected PingFailedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
Target = info.GetValue(nameof(Target), typeof(BoundPeer)) is BoundPeer target
Expand Down
205 changes: 205 additions & 0 deletions src/Libplanet.Net/Transports/NetMQChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
using Serilog;

namespace Libplanet.Net.Transports
{
public class NetMQChannel
{
private readonly BoundPeer _peer;
private readonly Channel<MessageRequest> _requests;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ILogger _logger;

private DateTimeOffset _lastUpdated;
private bool _opened;

public NetMQChannel(BoundPeer peer)
{
_peer = peer;
_requests = Channel.CreateUnbounded<MessageRequest>();
_cancellationTokenSource = new CancellationTokenSource();
_logger = Log.Logger
.ForContext<NetMQChannel>()
.ForContext("Source", nameof(NetMQTransport));
}

public event EventHandler? Closed;

public event EventHandler<Exception>? Faulted;

public event EventHandler? Opened;

public void Abort()
{
if (!_opened)
{
throw new InvalidOperationException("Cannot abort an unopened channel.");
}

_opened = false;
_cancellationTokenSource.Cancel();
}

public void Close()
{
if (!_opened)
{
throw new InvalidOperationException("Cannot close an unopened channel.");
}

_opened = false;
_cancellationTokenSource.Cancel();
Closed?.Invoke(this, EventArgs.Empty);
}

public void Open()
{
_opened = true;
Opened?.Invoke(this, EventArgs.Empty);
DoOpen();
}

public async IAsyncEnumerable<NetMQMessage> SendMessageAsync(
NetMQMessage message,
TimeSpan? timeout,
int expectedResponses,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!_opened)
{
throw new InvalidOperationException(
"Cannot send message with an unopened channel.");
}

var channel = Channel.CreateUnbounded<NetMQMessage>();
await _requests.Writer.WriteAsync(
new MessageRequest(
message,
timeout,
expectedResponses,
channel,
cancellationToken),
cancellationToken);

foreach (var unused in Enumerable.Range(0, expectedResponses))
{
// FIXME: Can be replaced with Channel.Reader.Completion?
yield return await channel.Reader.ReadAsync(cancellationToken);
}
}

private void DoOpen()
{
TaskCreationOptions taskCreationOptions =
TaskCreationOptions.DenyChildAttach |
TaskCreationOptions.LongRunning |
TaskCreationOptions.HideScheduler;
Task.Factory.StartNew(
ProcessRuntime,
_cancellationTokenSource.Token,
taskCreationOptions,
TaskScheduler.Default);
}

private async Task ProcessRuntime()
{
var ct = _cancellationTokenSource.Token;
using var dealer = new DealerSocket();
dealer.Options.DisableTimeWait = true;
dealer.Options.Identity = Guid.NewGuid().ToByteArray();
var address = await _peer.ResolveNetMQAddressAsync();
try
{
dealer.Connect(address);
}
catch (Exception e)
{
Faulted?.Invoke(this, e);
Close();
}

while (!ct.IsCancellationRequested)
{
MessageRequest req = await _requests.Reader.ReadAsync(ct);
try
{
_lastUpdated = DateTimeOffset.UtcNow;
CancellationTokenSource linked =
CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken);
if (!dealer.TrySendMultipartMessage(req.Message))
{
_requests.Writer.Complete();
dealer.Close();
DoOpen();
break;
}

foreach (var i in Enumerable.Range(0, req.ExpectedResponses))
{
var raw = new NetMQMessage();
if (!dealer.TryReceiveMultipartMessage(
req.Timeout ?? TimeSpan.FromSeconds(1),
ref raw))
{
break;
}

_lastUpdated = DateTimeOffset.UtcNow;

await req.Channel.Writer.WriteAsync(raw, linked.Token);
}

req.Channel.Writer.Complete();
}
catch (Exception)
{
req.Channel.Writer.Complete();
dealer.Close();
DoOpen();
break;
}
}
}

private bool HandShake(DealerSocket dealerSocket)
{
var msg = default(Msg);
return dealerSocket.TrySend(ref msg, TimeSpan.Zero, false);
}

private readonly struct MessageRequest
{
public MessageRequest(
NetMQMessage message,
TimeSpan? timeout,
in int expectedResponses,
Channel<NetMQMessage> channel,
CancellationToken cancellationToken)
{
Message = message;
Timeout = timeout;
ExpectedResponses = expectedResponses;
Channel = channel;
CancellationToken = cancellationToken;
}

public NetMQMessage Message { get; }

public TimeSpan? Timeout { get; }

public int ExpectedResponses { get; }

public Channel<NetMQMessage> Channel { get; }

public CancellationToken CancellationToken { get; }
}
}
}
Loading
Loading