Skip to content

Commit

Permalink
Merge pull request #113 from Abc-Arbitrage/improve-storage-report
Browse files Browse the repository at this point in the history
Improve storage report
  • Loading branch information
MendelMonteiro authored Sep 27, 2022
2 parents 09fe755 + 2c543fd commit 29044ad
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 83 deletions.
33 changes: 19 additions & 14 deletions src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
using Abc.Zebus.Persistence.Matching;
using Abc.Zebus.Persistence.Messages;
using Abc.Zebus.Persistence.Reporter;
using Abc.Zebus.Persistence.Storage;
using Abc.Zebus.Testing;
using Abc.Zebus.Testing.Comparison;
using Abc.Zebus.Testing.Extensions;
using Abc.Zebus.Transport;
using Abc.Zebus.Util;
using Cassandra.Data.Linq;
using Moq;
using NUnit.Framework;
Expand Down Expand Up @@ -123,7 +122,7 @@ public async Task should_not_overwrite_messages_with_same_time_component_and_dif
{
var messageBytes = new byte[512];
new Random().NextBytes(messageBytes);
var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365")); // Time component @2016-01-01 00:00:00Z
var messageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9706-ae1ea5dcf365")); // Time component @2016-01-01 00:00:00Z
var otherMessageId = new MessageId(Guid.Parse("0000c399-1ab0-e511-9806-f1ef55aac8e9")); // Time component @2016-01-01 00:00:00Z
var peerId = "Abc.Peer.0";

Expand Down Expand Up @@ -334,15 +333,15 @@ public async Task should_persist_messages_in_order()
{
var transportMessages = Enumerable.Range(1, 100).Select(CreateTestTransportMessage).ToList();
var messages = transportMessages.SelectMany(x =>
{
var transportMessageBytes = TransportMessage.Serialize(x);
return new[]
{
MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
};
})
.ToList();
{
var transportMessageBytes = TransportMessage.Serialize(x);
return new[]
{
MatcherEntry.Message(firstPeer, x.Id, x.MessageTypeId, transportMessageBytes),
MatcherEntry.Message(secondPeer, x.Id, x.MessageTypeId, transportMessageBytes),
};
})
.ToList();

await _storage.Write(messages);

Expand All @@ -360,7 +359,7 @@ public async Task should_persist_messages_in_order()
}

[Test]
public void should_report_storage_informations()
public void should_report_storage_information()
{
var peer = new PeerId("peer");

Expand All @@ -370,7 +369,13 @@ public void should_report_storage_informations()
MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
});

_reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
var entryTypeStatistics = new Dictionary<string, MessageTypeStorageReport> { ["Abc.Message"] = new(1, 3), ["Abc.Message.Fat"] = new(1, 4) };
var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
_reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
&& x.BatchSizeInBytes == storageReport.BatchSizeInBytes
&& x.FattestMessageTypeId == storageReport.FattestMessageTypeId
&& x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
&& x.MessageTypeStorageReports.DeepCompare(storageReport.MessageTypeStorageReports))));
}

[Test]
Expand Down
29 changes: 18 additions & 11 deletions src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Abc.Zebus.Persistence.Reporter;
using Abc.Zebus.Persistence.Storage;
using Abc.Zebus.Persistence.Util;
using Abc.Zebus.Util;
using Cassandra;
using Cassandra.Data.Linq;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -68,8 +67,7 @@ public Task Write(IList<MatcherEntry> entriesToPersist)
if (entriesToPersist.Count == 0)
return Task.CompletedTask;

var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageBytes?.Length ?? 0).First();
_reporter.AddStorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageBytes?.Length ?? 0), fattestMessage.MessageBytes?.Length ?? 0, fattestMessage.MessageTypeName);
_reporter.AddStorageReport(ToStorageReport(entriesToPersist));

var countByPeer = new Dictionary<PeerId, int>();
foreach (var matcherEntry in entriesToPersist)
Expand Down Expand Up @@ -114,16 +112,17 @@ public Task Write(IList<MatcherEntry> entriesToPersist)
var insertTask = _dataContext.Session.ExecuteAsync(boundStatement);
insertTasks.Add(insertTask);
insertTask.ContinueWith(t =>
{
var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
if (shouldInvestigatePeer)
_log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");
{
var shouldInvestigatePeer = _configuration.PeerIdsToInvestigate != null && _configuration.PeerIdsToInvestigate.Contains(matcherEntry.PeerId.ToString());
if (shouldInvestigatePeer)
_log.LogInformation($"Storage done for peer {matcherEntry.PeerId}, Type: {matcherEntry.Type}, Message Id: {matcherEntry.MessageId}, TaskResult: {t.Status}");

if (t.IsFaulted)
_log.LogError(t.Exception, "Error while inserting to Cassandra");
if (t.IsFaulted)
_log.LogError(t.Exception, "Error while inserting to Cassandra");

remaining.Release();
}, TaskContinuationOptions.ExecuteSynchronously);
remaining.Release();
},
TaskContinuationOptions.ExecuteSynchronously);
}

var updateNonAckedCountTasks = new List<Task>();
Expand All @@ -135,6 +134,14 @@ public Task Write(IList<MatcherEntry> entriesToPersist)
return Task.WhenAll(insertTasks.Concat(updateNonAckedCountTasks));
}

private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
{
var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageLength).First();
var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
.ToDictionary(xs => xs.Key, xs => new MessageTypeStorageReport(xs.Count(), xs.Sum(x => x.MessageLength)));
return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageLength), fattestMessage.MessageLength, fattestMessage.MessageTypeName, entriesByTypeName);
}

public Task RemovePeer(PeerId peerId)
{
return _peerStateRepository.RemovePeer(peerId);
Expand Down
3 changes: 2 additions & 1 deletion src/Abc.Zebus.Persistence.RocksDb.Tests/PerformanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Abc.Zebus.Persistence.Matching;
using Abc.Zebus.Persistence.Reporter;
using Abc.Zebus.Persistence.Storage;
using Abc.Zebus.Testing;
using Abc.Zebus.Transport;
Expand All @@ -23,7 +24,7 @@ public class PerformanceTests
[SetUp]
public void SetUp()
{
_storage = new RocksDbStorage(Guid.NewGuid().ToString());
_storage = new RocksDbStorage(Guid.NewGuid().ToString(), new NoopReporter());
_storage.Start();
}

Expand Down
17 changes: 12 additions & 5 deletions src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Abc.Zebus.Persistence.Matching;
using Abc.Zebus.Persistence.Reporter;
using Abc.Zebus.Testing;
using Abc.Zebus.Testing.Comparison;
using Abc.Zebus.Testing.Extensions;
using Abc.Zebus.Transport;
using Moq;
Expand All @@ -27,7 +28,7 @@ public void SetUp()
_databaseDirectoryPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());

_reporterMock = new Mock<IReporter>();
_storage = new RocksDbStorage(_databaseDirectoryPath);
_storage = new RocksDbStorage(_databaseDirectoryPath, _reporterMock.Object);
_storage.Start();
}

Expand Down Expand Up @@ -210,7 +211,7 @@ public async Task should_load_previous_out_of_order_acks()
await _storage.Write(new[] { MatcherEntry.Ack(peer, messageId) });
_storage.Stop();

_storage = new RocksDbStorage(_databaseDirectoryPath);
_storage = new RocksDbStorage(_databaseDirectoryPath, _reporterMock.Object);
_storage.Start();

var message = MatcherEntry.Message(peer, messageId, MessageUtil.TypeId<Message1>(), Array.Empty<byte>());
Expand All @@ -224,8 +225,8 @@ public async Task should_load_previous_out_of_order_acks()
}
}

[Test, Explicit]
public void should_report_storage_informations()
[Test]
public void should_report_storage_information()
{
var peer = new PeerId("peer");

Expand All @@ -235,7 +236,13 @@ public void should_report_storage_informations()
MatcherEntry.Message(peer, MessageId.NextId(), new MessageTypeId("Abc.Message.Fat"), new byte[] { 0x01, 0x02, 0x03, 0x04 }),
});

_reporterMock.Verify(r => r.AddStorageReport(2, 7, 4, "Abc.Message.Fat"));
var entryTypeStatistics = new Dictionary<string, MessageTypeStorageReport> { ["Abc.Message"] = new(1, 3), ["Abc.Message.Fat"] = new(1, 4) };
var storageReport = new StorageReport(2, 7, 4, "Abc.Message.Fat", entryTypeStatistics);
_reporterMock.Verify(r => r.AddStorageReport(It.Is<StorageReport>(x => x.MessageCount == storageReport.MessageCount
&& x.BatchSizeInBytes == storageReport.BatchSizeInBytes
&& x.FattestMessageTypeId == storageReport.FattestMessageTypeId
&& x.FattestMessageSizeInBytes == storageReport.FattestMessageSizeInBytes
&& x.MessageTypeStorageReports.DeepCompare(storageReport.MessageTypeStorageReports))));
}

private TransportMessage CreateTestTransportMessage(int i)
Expand Down
19 changes: 16 additions & 3 deletions src/Abc.Zebus.Persistence.RocksDb/RocksDbStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Text;
using System.Threading.Tasks;
using Abc.Zebus.Persistence.Matching;
using Abc.Zebus.Persistence.Reporter;
using Abc.Zebus.Persistence.Storage;
using RocksDbSharp;
using StructureMap;
Expand All @@ -20,6 +21,7 @@ namespace Abc.Zebus.Persistence.RocksDb
/// </summary>
public class RocksDbStorage : IStorage, IDisposable
{
private readonly IReporter _reporter;
private static readonly int _guidLength = Guid.Empty.ToByteArray().Length;

private readonly ConcurrentDictionary<MessageId, bool> _outOfOrderAcks = new ConcurrentDictionary<MessageId, bool>();
Expand All @@ -31,14 +33,15 @@ public class RocksDbStorage : IStorage, IDisposable
private ColumnFamilyHandle _acksColumnFamily = default!;

[DefaultConstructor]
public RocksDbStorage()
: this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"))
public RocksDbStorage(IReporter reporter)
: this(Path.Combine(AppDomain.CurrentDomain.BaseDirectory!, "database"), reporter)
{
}

public RocksDbStorage(string databaseDirectoryPath)
public RocksDbStorage(string databaseDirectoryPath, IReporter reporter)
{
_databaseDirectoryPath = databaseDirectoryPath;
_reporter = reporter;
}

public void Start()
Expand Down Expand Up @@ -75,6 +78,8 @@ public void Start()

public Task Write(IList<MatcherEntry> entriesToPersist)
{
_reporter.AddStorageReport(ToStorageReport(entriesToPersist));

foreach (var entry in entriesToPersist)
{
var key = CreateKeyBuffer(entry.PeerId);
Expand Down Expand Up @@ -115,6 +120,14 @@ public Task Write(IList<MatcherEntry> entriesToPersist)
return Task.CompletedTask;
}

private static StorageReport ToStorageReport(IList<MatcherEntry> entriesToPersist)
{
var fattestMessage = entriesToPersist.OrderByDescending(msg => msg.MessageLength).First();
var entriesByTypeName = entriesToPersist.ToLookup(x => x.MessageTypeName)
.ToDictionary(xs => xs.Key, xs => new MessageTypeStorageReport(xs.Count(), xs.Sum(x => x.MessageLength)));
return new StorageReport(entriesToPersist.Count, entriesToPersist.Sum(msg => msg.MessageLength), fattestMessage.MessageLength, fattestMessage.MessageTypeName, entriesByTypeName);
}

private void UpdateNonAckedCounts(IGrouping<PeerId, MatcherEntry> entry)
{
var nonAcked = entry.Aggregate(0, (s, e) => s + (e.IsAck ? -1 : 1));
Expand Down
2 changes: 2 additions & 0 deletions src/Abc.Zebus.Persistence/Matching/MatcherEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ public bool CanBeProcessed(TimeSpan delay)
{
return IsEventWaitHandle || SystemDateTime.UtcNow - TimestampUtc >= delay;
}

public int MessageLength => MessageBytes?.Length ?? 0;
}
}
46 changes: 22 additions & 24 deletions src/Abc.Zebus.Persistence/MessageReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,35 +153,33 @@ public void Run(CancellationToken cancellationToken)

private int ReplayUnackedMessages(CancellationToken cancellationToken)
{
using (var reader = _storage.CreateMessageReader(_peer.Id))
{
if (reader == null)
return 0;
var totalMessageCount = 0;
using var reader = _storage.CreateMessageReader(_peer.Id);
if (reader == null)
return 0;
var totalMessageCount = 0;

foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
foreach (var partition in reader.GetUnackedMessages().TakeWhile(m => !cancellationToken.IsCancellationRequested).Partition(_replayBatchSize, true))
{
var messageSentCount = 0;
var batchDuration = MeasureDuration();
var readAndSendDuration = MeasureDuration();
foreach (var message in partition.Select(DeserializeTransportMessage))
{
var messageSentCount = 0;
var batchDuration = MeasureDuration();
var readAndSendDuration = MeasureDuration();
foreach (var message in partition.Select(DeserializeTransportMessage))
{
_unackedIds.Add(message.Id);
ReplayMessage(message);
messageSentCount++;
}

totalMessageCount += messageSentCount;

_logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
WaitForAcks(cancellationToken);
_logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
_reporter.AddReplaySpeedReport(messageSentCount, readAndSendDuration.Value.TotalSeconds, batchDuration.Value.TotalSeconds);
_unackedIds.Add(message.Id);
ReplayMessage(message);
messageSentCount++;
}

_logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
return totalMessageCount;
totalMessageCount += messageSentCount;

_logger.LogInformation($"Read and send for last batch of {messageSentCount} msgs for {_peer.Id} took {readAndSendDuration.Value}. ({messageSentCount / readAndSendDuration.Value.TotalSeconds} msg/s)");
WaitForAcks(cancellationToken);
_logger.LogInformation($"Last batch for {_peer.Id} took {batchDuration.Value} to be totally replayed ({messageSentCount / batchDuration.Value.TotalSeconds} msg/s)");
_reporter.AddReplaySpeedReport(new ReplaySpeedReport(messageSentCount, readAndSendDuration.Value, batchDuration.Value));
}

_logger.LogInformation($"Replay finished for peer {_peer.Id}. Disposing the reader");
return totalMessageCount;
}

private static TransportMessage DeserializeTransportMessage(byte[] row) => TransportMessage.Deserialize(row);
Expand Down
12 changes: 3 additions & 9 deletions src/Abc.Zebus.Persistence/Reporter/IReporter.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
using System.Collections.Generic;

namespace Abc.Zebus.Persistence.Reporter
{
public interface IReporter
{
void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds);
IList<ReplaySpeedReport> TakeAndResetReplaySpeedReports();

void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId);
IList<StorageReport> TakeAndResetStorageReports();

void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport);
void AddStorageReport(StorageReport storageReport);
}
}
}
10 changes: 5 additions & 5 deletions src/Abc.Zebus.Persistence/Reporter/NoopReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ namespace Abc.Zebus.Persistence.Reporter
{
public class NoopReporter : IReporter
{
private static readonly List<ReplaySpeedReport> _emptyReplayReports = new List<ReplaySpeedReport>(0);
private static readonly List<StorageReport> _emptyStorageReports = new List<StorageReport>(0);
private static readonly List<ReplaySpeedReport> _emptyReplayReports = new(0);
private static readonly List<StorageReport> _emptyStorageReports = new(0);

public void AddReplaySpeedReport(int messagesReplayedCount, double sendDurationInSeconds, double ackDurationInSeconds)
public void AddReplaySpeedReport(ReplaySpeedReport replaySpeedReport)
{
}

Expand All @@ -16,7 +16,7 @@ public IList<ReplaySpeedReport> TakeAndResetReplaySpeedReports()
return _emptyReplayReports;
}

public void AddStorageReport(int messageCount, int batchSizeInBytes, int fattestMessageSizeInBytes, string fattestMessageTypeId)
public void AddStorageReport(StorageReport storageReport)
{
}

Expand All @@ -25,4 +25,4 @@ public IList<StorageReport> TakeAndResetStorageReports()
return _emptyStorageReports;
}
}
}
}
Loading

0 comments on commit 29044ad

Please sign in to comment.