Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/osstotalsoft/nbb
Browse files Browse the repository at this point in the history
  • Loading branch information
fraliv13 committed Apr 25, 2019
2 parents 8d1087e + ab62b23 commit 8e9ff6c
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 80 deletions.
146 changes: 146 additions & 0 deletions src/Messaging/NBB.Messaging.Nats/Internal/StanConnectionManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NATS.Client;
using STAN.Client;

namespace NBB.Messaging.Nats.Internal
{
public class StanConnectionProvider : IDisposable
{
private readonly IConfiguration _configuration;
private readonly ILogger<StanConnectionProvider> _logger;
private readonly IApplicationLifetime _applicationLifetime;
private IStanConnection _connection;
private Exception _unrecoverableException;
private readonly Lazy<IStanConnection> _lazyConnection;

public StanConnectionProvider(IConfiguration configuration, ILogger<StanConnectionProvider> logger,
IApplicationLifetime applicationLifetime)
{
_configuration = configuration;
_logger = logger;
_applicationLifetime = applicationLifetime;
_lazyConnection = new Lazy<IStanConnection>(GetConnection);
}

public async Task ExecuteAsync(Func<IStanConnection, Task> action)
{
var connection = GetAndCheckConnection();
try
{
await action(connection);
}
catch (Exception ex)
when (IsUnrecoverableException(ex))
{
SetUnrecoverableState(ex);
throw;
}
}

public void Execute(Action<IStanConnection> action)
{
var connection = GetAndCheckConnection();
try
{
action(connection);
}
catch (Exception ex)
when (IsUnrecoverableException(ex))
{
SetUnrecoverableState(ex);
throw;
}
}

private IStanConnection GetAndCheckConnection()
{
ThrowIfUnrecoverableState();
try
{
// Exception from the lazy factory method is cached
return _lazyConnection.Value;
}
catch (Exception ex)
{
SetUnrecoverableState(ex);
throw;
}
}

private IStanConnection GetConnection()
{
var natsUrl = _configuration.GetSection("Messaging").GetSection("Nats")["natsUrl"];
var cluster = _configuration.GetSection("Messaging").GetSection("Nats")["cluster"];
var clientId = _configuration.GetSection("Messaging").GetSection("Nats")["clientId"]
?.Replace(".", "_");
var options = StanOptions.GetDefaultOptions();
options.NatsURL = natsUrl;

options.ConnectionLostEventHandler = (obj, args) =>
{
SetUnrecoverableState(args.ConnectionException ?? new Exception("NATS connection was lost"));
};

//fix https://github.com/nats-io/csharp-nats-streaming/issues/28
options.PubAckWait = 30000;

var cf = new StanConnectionFactory();
_connection = cf.CreateConnection(cluster, clientId + Guid.NewGuid(), options);

return _connection;
}

private void SetUnrecoverableState(Exception exception)
{
// Set the field to the current exception if not already set
var existingException =
Interlocked.CompareExchange(ref _unrecoverableException, exception, null);

// Send the application stop signal only once
if (existingException != null)
return;

_logger.LogCritical(exception, "NATS connection unrecoverable");
_applicationLifetime.StopApplication();

Dispose();
}

private void ThrowIfUnrecoverableState()
{
// For consistency, read the field using the same primitive used for writing instead of using Thread.VolatileRead
var exception = Interlocked.CompareExchange(ref _unrecoverableException, null, null);
if (exception != null)
{
throw new Exception("NATS connection encountered an unrecoverable exception", exception);
}
}

private static bool IsUnrecoverableException(Exception ex)
{
return
ex is NATSConnectionClosedException ||
ex is StanConnectionClosedException ||
ex is NATSConnectionException ||
ex is StanConnectionException ||
ex is NATSBadSubscriptionException ||
ex is StanBadSubscriptionException ||
ex is StanTimeoutException ||
ex is NATSTimeoutException ||
ex is NATSStaleConnectionException ||
ex is NATSNoServersException ||
ex is StanConnectRequestException ||
ex is StanMaxPingsException;
}

public void Dispose()
{
_connection?.Dispose();
}
}
}

This file was deleted.

4 changes: 2 additions & 2 deletions src/Messaging/NBB.Messaging.Nats/NBB.Messaging.Nats.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="NATS.Client" Version="0.8.1" />
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonPackageVersion)" />
<PackageReference Include="STAN.Client" Version="0.1.4" />
<PackageReference Include="STAN.Client" Version="0.1.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\NBB.Messaging.Abstractions\NBB.Messaging.Abstractions.csproj" />
</ItemGroup>

</Project>
23 changes: 12 additions & 11 deletions src/Messaging/NBB.Messaging.Nats/NatsMessagingTopicPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
using System;
using System.Diagnostics;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NBB.Messaging.Abstractions;
using NBB.Messaging.DataContracts;
using NBB.Messaging.Nats.Internal;

namespace NBB.Messaging.Nats
{
public class NatsMessagingTopicPublisher : IMessagingTopicPublisher
{
private readonly StanConnectionProvider _stanConnectionProvider;
private readonly StanConnectionProvider _stanConnectionManager;
private readonly ILogger<NatsMessagingTopicPublisher> _logger;

public NatsMessagingTopicPublisher(StanConnectionProvider stanConnectionProvider, ILogger<NatsMessagingTopicPublisher> logger)
public NatsMessagingTopicPublisher(StanConnectionProvider stanConnectionManager,
ILogger<NatsMessagingTopicPublisher> logger)
{
_stanConnectionProvider = stanConnectionProvider;
_stanConnectionManager = stanConnectionManager;
_logger = logger;
}

public async Task PublishAsync(string topic, string key, string message, CancellationToken cancellationToken = default(CancellationToken))
{
public async Task PublishAsync(string topic, string key, string message,
CancellationToken cancellationToken = default(CancellationToken))
{
var stopWatch = new Stopwatch();
stopWatch.Start();
var connection = _stanConnectionProvider.GetConnection();
var result = await connection.PublishAsync(topic, System.Text.Encoding.UTF8.GetBytes(message));
await _stanConnectionManager.ExecuteAsync(async connection =>
await connection.PublishAsync(topic, System.Text.Encoding.UTF8.GetBytes(message)));
stopWatch.Stop();

_logger.LogDebug("Nats message published to subject {Subject} in {ElapsedMilliseconds} ms", topic, stopWatch.ElapsedMilliseconds);
_logger.LogDebug("Nats message published to subject {Subject} in {ElapsedMilliseconds} ms", topic,
stopWatch.ElapsedMilliseconds);
}
}
}
33 changes: 15 additions & 18 deletions src/Messaging/NBB.Messaging.Nats/NatsMessagingTopicSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@

namespace NBB.Messaging.Nats
{
public class NatsMessagingTopicSubscriber : IMessagingTopicSubscriber, IDisposable
public class NatsMessagingTopicSubscriber : IMessagingTopicSubscriber
{
private readonly StanConnectionProvider _stanConnectionProvider;
private readonly StanConnectionProvider _stanConnectionManager;
private readonly IConfiguration _configuration;
private readonly ILogger<NatsMessagingTopicSubscriber> _logger;

private IStanConnection _stanConnection;
private bool _subscribedToTopic;

public NatsMessagingTopicSubscriber(StanConnectionProvider stanConnectionProvider, IConfiguration configuration,
public NatsMessagingTopicSubscriber(StanConnectionProvider stanConnectionManager, IConfiguration configuration,
ILogger<NatsMessagingTopicSubscriber> logger)
{
_stanConnectionProvider = stanConnectionProvider;
_stanConnectionManager = stanConnectionManager;
_configuration = configuration;
_logger = logger;
}
Expand Down Expand Up @@ -52,10 +51,9 @@ private Task SubscribeToTopicAsync(string subject, Func<string, Task> handler, C
{
var opts = StanSubscriptionOptions.GetDefaultOptions();
opts.DurableName = _configuration.GetSection("Messaging").GetSection("Nats")["durableName"];
_stanConnection = _stanConnectionProvider.GetConnection();
var qGroup = _configuration.GetSection("Messaging").GetSection("Nats")["qGroup"];
var _subscriberOptions = options ?? new MessagingSubscriberOptions();
opts.ManualAcks = _subscriberOptions.AcknowledgeStrategy != MessagingAcknowledgeStrategy.Auto;
var subscriberOptions = options ?? new MessagingSubscriberOptions();
opts.ManualAcks = subscriberOptions.AcknowledgeStrategy != MessagingAcknowledgeStrategy.Auto;

//https://github.com/nats-io/go-nats-streaming#subscriber-rate-limiting
opts.MaxInflight = 1;
Expand All @@ -70,7 +68,7 @@ void StanMsgHandler(object obj, StanMsgHandlerArgs args)

try
{
if (_subscriberOptions.HandlerStrategy == MessagingHandlerStrategy.Serial)
if (subscriberOptions.HandlerStrategy == MessagingHandlerStrategy.Serial)
{
handler(json).Wait(token);
}
Expand All @@ -83,22 +81,21 @@ void StanMsgHandler(object obj, StanMsgHandlerArgs args)
//TODO: push to DLQ
}

if (_subscriberOptions.AcknowledgeStrategy == MessagingAcknowledgeStrategy.Serial)
if (subscriberOptions.AcknowledgeStrategy == MessagingAcknowledgeStrategy.Serial)
{
args.Message.Ack();
}
}

var s = _subscriberOptions.ConsumerType == MessagingConsumerType.CollaborativeConsumer
? _stanConnection.Subscribe(subject, opts, StanMsgHandler)
: _stanConnection.Subscribe(subject, qGroup, opts, StanMsgHandler);
_stanConnectionManager.Execute(stanConnection =>
{
var _ = subscriberOptions.ConsumerType == MessagingConsumerType.CollaborativeConsumer
? stanConnection.Subscribe(subject, opts, StanMsgHandler)
: stanConnection.Subscribe(subject, qGroup, opts, StanMsgHandler);
});


return Task.CompletedTask;
}

public void Dispose()
{
_stanConnection.Dispose();
}
}
}

0 comments on commit 8e9ff6c

Please sign in to comment.