Skip to content

Commit

Permalink
Merge pull request #104 from Abc-Arbitrage/Directory.CQLV2
Browse files Browse the repository at this point in the history
Directory: Update Cassandra schema of `Peers` and `DynamicSubscriptions` tables
  • Loading branch information
ocoanet authored Jan 6, 2022
2 parents 965a68d + 34626ab commit a94562f
Show file tree
Hide file tree
Showing 48 changed files with 183 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
using System.Linq;
using Abc.Zebus.Directory.Cassandra.Cql;
using Abc.Zebus.Directory.Cassandra.Storage;
using Abc.Zebus.Directory.Cassandra.Tests.Cql;
using Abc.Zebus.Routing;
using Abc.Zebus.Testing.Extensions;
using Abc.Zebus.Testing.Measurements;
using Cassandra;
using NUnit.Framework;

namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
{
[TestFixture]
[Ignore("Performance tests")]
Expand All @@ -27,7 +26,7 @@ public void insert_30_peers_with_8000_subscriptions_each()
const int numberOfPeersToInsert = 30;
var repo = new CqlPeerRepository(DataContext);
var subscriptionForTypes = Get10MessageTypesWith800BindingKeysEach();

for (var i = 0; i < numberOfPeersToInsert; i++)
{
var stopwatch = Stopwatch.StartNew();
Expand All @@ -50,7 +49,7 @@ public void insert_1_peer_with_100_000_subscriptions()
Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info;
var repo = new CqlPeerRepository(DataContext);
var subscriptionForTypes = Get1MessageTypesWith100000BindingKeys();


var stopwatch = Stopwatch.StartNew();
repo.AddOrUpdatePeer(new PeerDescriptor(new PeerId("Abc.Peer.0"), "tcp://toto:123", true, true, true, DateTime.UtcNow));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using Abc.Zebus.Directory.Tests;
using Abc.Zebus.Routing;
using Abc.Zebus.Testing;
Expand All @@ -7,9 +8,8 @@
using Cassandra;
using Cassandra.Data.Linq;
using NUnit.Framework;
using System;

namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
{
public partial class CqlPeerRepositoryTests
{
Expand Down Expand Up @@ -39,22 +39,6 @@ private SubscriptionsForType CreateSubscriptionsForType<TMessage>(params Binding
return new SubscriptionsForType(MessageUtil.GetTypeId(typeof(TMessage)), bindings.Any() ? bindings : new[] { BindingKey.Empty });
}

[Test]
public void should_not_crash_when_passing_null_subscriptions_array_to_AddDynamicSubscriptions()
{
var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand));

Assert.DoesNotThrow(() => _repository.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null));
}

[Test]
public void should_not_crash_when_passing_null_subscriptions_array_to_RemoveDynamicSubscriptions()
{
var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand));

Assert.DoesNotThrow(() => _repository.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null));
}

[Test]
public void should_remove_dynamic_subscriptions()
{
Expand All @@ -78,9 +62,10 @@ public void should_remove_the_dynamic_subscriptions_of_a_peer_when_removing_it()
_repository.RemovePeer(peerDescriptor.PeerId);

_repository.Get(peerDescriptor.PeerId).ShouldBeNull();

var retrievedSubscriptions = DataContext.DynamicSubscriptions
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(sub => sub.UselessKey == false && sub.PeerId == peerDescriptor.PeerId.ToString())
.Where(s => s.PeerId == peerDescriptor.PeerId.ToString())
.Execute();
retrievedSubscriptions.ShouldBeEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using Abc.Zebus.Directory.Cassandra.Cql;
using System;
using System.Linq;
using System.Threading;
using Abc.Zebus.Directory.Cassandra.Cql;
using Abc.Zebus.Directory.Cassandra.Data;
using Abc.Zebus.Directory.Cassandra.Storage;
using Abc.Zebus.Directory.Cassandra.Tests.Cql;
using Abc.Zebus.Directory.Tests;
using Abc.Zebus.Testing;
using Abc.Zebus.Testing.Extensions;
using Abc.Zebus.Util;
using Cassandra;
using Cassandra.Data.Linq;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading;

namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
namespace Abc.Zebus.Directory.Cassandra.Tests.Cql
{
[TestFixture]
public partial class CqlPeerRepositoryTests : CqlTestFixture<DirectoryDataContext, ICassandraConfiguration>
Expand Down Expand Up @@ -159,11 +159,6 @@ public void should_read_peer_after_removing_it()
fetched.ShouldNotBeNull();
}

private static DateTime GetUnspecifiedKindUtcNow()
{
return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified);
}

[Test]
public void should_insert_a_peer_with_no_timestamp_that_was_previously_deleted()
{
Expand Down Expand Up @@ -206,16 +201,21 @@ public void should_handle_peers_with_null_subscriptions_gracefully()
descriptor.TimestampUtc = DateTime.UtcNow;
_repository.AddOrUpdatePeer(descriptor);

DataContext.StoragePeers
DataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false && peer.PeerId == "Abc.DecommissionnedPeer.0")
.Select(peer => new StoragePeer { StaticSubscriptionsBytes = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false })
.Where(peer => peer.PeerId == "Abc.DecommissionedPeer.0")
.Select(peer => new CassandraPeer { StaticSubscriptionsBytes = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false })
.Update()
.SetTimestamp(DateTime.UtcNow)
.Execute();

_repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue();
_repository.GetPeers().ExpectedSingle().PeerId.ShouldEqual(_peer1.Id);
}

private static DateTime GetUnspecifiedKindUtcNow()
{
return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
using System;
using Abc.Zebus.Directory.Cassandra.Storage;
using Abc.Zebus.Directory.Cassandra.Data;
using Abc.Zebus.Testing.Extensions;
using NUnit.Framework;

namespace Abc.Zebus.Directory.Cassandra.Tests.Storage
namespace Abc.Zebus.Directory.Cassandra.Tests.Data
{
public class StorageConvertionExtensionsTests
public class CassandraExtensionsTests
{
[Test]
public void should_return_a_storage_peer_with_its_timestamp_kind_set_to_utc()
{
var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
var peerDescriptor = new PeerDescriptor(new PeerId("Abc.Titi.0"), "tcp://toto:123", false, true, true, unspecifiedKindUtcNow);

var storagePeer = peerDescriptor.ToStoragePeer();

storagePeer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc);
var peer = peerDescriptor.ToCassandra();
peer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc);
}

[Test]
public void should_return_a_peer_descriptor_with_its_timestamp_kind_set_to_utc()
{
var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified);
var storagePeer = new StoragePeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]};

var peerDescriptor = storagePeer.ToPeerDescriptor(new Subscription[0]);
var peer = new CassandraPeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]};

var peerDescriptor = peer.ToPeerDescriptor(new Subscription[0]);
peerDescriptor.TimestampUtc.Value.Kind.ShouldEqual(DateTimeKind.Utc);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Abc.Zebus.Directory.Cassandra.Data;
using Abc.Zebus.Directory.Storage;
using Cassandra;
using Cassandra.Data.Linq;
Expand All @@ -18,9 +19,9 @@ public CqlPeerRepository(DirectoryDataContext dataContext)

public bool? IsPersistent(PeerId peerId)
{
return _dataContext.StoragePeers
return _dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
.Where(peer => peer.PeerId == peerId.ToString())
.Select(x => (bool?)x.IsPersistent)
.Execute()
.FirstOrDefault();
Expand All @@ -30,13 +31,13 @@ public CqlPeerRepository(DirectoryDataContext dataContext)
{
var peerDynamicSubscriptions = _dataContext.DynamicSubscriptions
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
.Where(s => s.PeerId == peerId.ToString())
.Execute()
.SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions());
.SelectMany(s => s.ToSubscriptionsForType().ToSubscriptions());

return _dataContext.StoragePeers
return _dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
.Where(peer => peer.PeerId == peerId.ToString())
.Execute()
.FirstOrDefault()
.ToPeerDescriptor(peerDynamicSubscriptions);
Expand All @@ -46,25 +47,22 @@ public IEnumerable<PeerDescriptor> GetPeers(bool loadDynamicSubscriptions = true
{
if (!loadDynamicSubscriptions)
{
return _dataContext.StoragePeers
return _dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false)
.Execute()
.Select(peer => peer.ToPeerDescriptor()!)
.ToList();
}

var dynamicSubscriptionsByPeer = _dataContext.DynamicSubscriptions
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(sub => sub.UselessKey == false)
.Execute()
.SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions().Select(s => new { sub.PeerId, Subscription = s }))
.ToLookup(peerSub => peerSub.PeerId, peerSub=> peerSub.Subscription);


return _dataContext.StoragePeers
return _dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false)
.Execute()
.Select(peer => peer.ToPeerDescriptor(dynamicSubscriptionsByPeer[peer.PeerId]))
.Where(descriptor => descriptor != null)
Expand All @@ -73,37 +71,39 @@ public IEnumerable<PeerDescriptor> GetPeers(bool loadDynamicSubscriptions = true

public void AddOrUpdatePeer(PeerDescriptor peerDescriptor)
{
var storagePeer = peerDescriptor.ToStoragePeer();
_dataContext.StoragePeers
.Insert(storagePeer)
var cassandraPeer = peerDescriptor.ToCassandra();
_dataContext.Peers
.Insert(cassandraPeer)
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.SetTimestamp(storagePeer.TimestampUtc)
.SetTimestamp(cassandraPeer.TimestampUtc)
.Execute();
}

public void RemovePeer(PeerId peerId)
{
var now = DateTime.UtcNow;
_dataContext.StoragePeers

_dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
.Where(peer => peer.PeerId == peerId.ToString())
.Delete()
.SetTimestamp(now)
.Execute();

_dataContext.DynamicSubscriptions
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
.Where(s => s.PeerId == peerId.ToString())
.Delete()
.SetTimestamp(now)
.Execute();
}

public void SetPeerResponding(PeerId peerId, bool isResponding)
{
_dataContext.StoragePeers
_dataContext.Peers
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString())
.Select(peer => new StoragePeer { IsResponding = isResponding })
.Where(peer => peer.PeerId == peerId.ToString())
.Select(peer => new CassandraPeer { IsResponding = isResponding })
.Update()
.SetTimestamp(DateTime.UtcNow)
.Execute();
Expand All @@ -112,31 +112,27 @@ public void SetPeerResponding(PeerId peerId, bool isResponding)

public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes)
{
if (subscriptionsForTypes == null)
return;
var batch = _dataContext.Session.CreateBatch();
batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);

foreach (var subscription in subscriptionsForTypes)
{
batch.Append(_dataContext.DynamicSubscriptions
.Insert(subscription.ToStorageSubscription(peerId))
.Insert(subscription.ToCassandra(peerId))
.SetTimestamp(timestampUtc));
}
batch.Execute();
}

public void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds)
{
if (messageTypeIds == null)
return;
var batch = _dataContext.Session.CreateBatch();
batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);

foreach (var messageTypeId in messageTypeIds)
{
var deleteQuery = _dataContext.DynamicSubscriptions
.Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString() && sub.MessageTypeId == messageTypeId.FullName)
.Where(s => s.PeerId == peerId.ToString() && s.MessageTypeId == messageTypeId.FullName)
.Delete()
.SetTimestamp(timestampUtc);
batch.Append(deleteQuery);
Expand All @@ -149,7 +145,7 @@ public void RemoveAllDynamicSubscriptionsForPeer(PeerId peerId, DateTime timesta
{
_dataContext.DynamicSubscriptions
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString())
.Where(s => s.PeerId == peerId.ToString())
.Delete()
.SetTimestamp(timestampUtc)
.Execute();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Abc.Zebus.Directory.Cassandra.Cql;
using Abc.Zebus.Directory.Cassandra.Data;
using Cassandra.Data.Linq;

namespace Abc.Zebus.Directory.Cassandra.Storage
Expand All @@ -10,7 +11,7 @@ public DirectoryDataContext(CassandraCqlSessionManager sessionManager, ICassandr
{
}

public Table<StorageSubscription> DynamicSubscriptions => new Table<StorageSubscription>(Session);
public Table<StoragePeer> StoragePeers => new Table<StoragePeer>(Session);
public Table<CassandraSubscription> DynamicSubscriptions => new(Session);
public Table<CassandraPeer> Peers => new (Session);
}
}
Loading

0 comments on commit a94562f

Please sign in to comment.