diff --git a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryPerformanceTests.cs b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryPerformanceTests.cs similarity index 97% rename from src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryPerformanceTests.cs rename to src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryPerformanceTests.cs index 11548d05..56219212 100644 --- a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryPerformanceTests.cs +++ b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryPerformanceTests.cs @@ -4,14 +4,13 @@ using System.Linq; using Abc.Zebus.Directory.Cassandra.Cql; using Abc.Zebus.Directory.Cassandra.Storage; -using Abc.Zebus.Directory.Cassandra.Tests.Cql; using Abc.Zebus.Routing; using Abc.Zebus.Testing.Extensions; using Abc.Zebus.Testing.Measurements; using Cassandra; using NUnit.Framework; -namespace Abc.Zebus.Directory.Cassandra.Tests.Storage +namespace Abc.Zebus.Directory.Cassandra.Tests.Cql { [TestFixture] [Ignore("Performance tests")] @@ -27,7 +26,7 @@ public void insert_30_peers_with_8000_subscriptions_each() const int numberOfPeersToInsert = 30; var repo = new CqlPeerRepository(DataContext); var subscriptionForTypes = Get10MessageTypesWith800BindingKeysEach(); - + for (var i = 0; i < numberOfPeersToInsert; i++) { var stopwatch = Stopwatch.StartNew(); @@ -50,7 +49,7 @@ public void insert_1_peer_with_100_000_subscriptions() Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info; var repo = new CqlPeerRepository(DataContext); var subscriptionForTypes = Get1MessageTypesWith100000BindingKeys(); - + var stopwatch = Stopwatch.StartNew(); repo.AddOrUpdatePeer(new PeerDescriptor(new PeerId("Abc.Peer.0"), "tcp://toto:123", true, true, true, DateTime.UtcNow)); diff --git a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.DynamicSubscriptions.cs b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.DynamicSubscriptions.cs similarity index 93% rename from src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.DynamicSubscriptions.cs rename to src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.DynamicSubscriptions.cs index 2a967e69..fce974b5 100644 --- a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.DynamicSubscriptions.cs +++ b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.DynamicSubscriptions.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using Abc.Zebus.Directory.Tests; using Abc.Zebus.Routing; using Abc.Zebus.Testing; @@ -7,9 +8,8 @@ using Cassandra; using Cassandra.Data.Linq; using NUnit.Framework; -using System; -namespace Abc.Zebus.Directory.Cassandra.Tests.Storage +namespace Abc.Zebus.Directory.Cassandra.Tests.Cql { public partial class CqlPeerRepositoryTests { @@ -39,22 +39,6 @@ private SubscriptionsForType CreateSubscriptionsForType(params Binding return new SubscriptionsForType(MessageUtil.GetTypeId(typeof(TMessage)), bindings.Any() ? bindings : new[] { BindingKey.Empty }); } - [Test] - public void should_not_crash_when_passing_null_subscriptions_array_to_AddDynamicSubscriptions() - { - var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand)); - - Assert.DoesNotThrow(() => _repository.AddDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null)); - } - - [Test] - public void should_not_crash_when_passing_null_subscriptions_array_to_RemoveDynamicSubscriptions() - { - var peerDescriptor = _peer1.ToPeerDescriptorWithRoundedTime(true, typeof(FakeCommand)); - - Assert.DoesNotThrow(() => _repository.RemoveDynamicSubscriptionsForTypes(peerDescriptor.PeerId, peerDescriptor.TimestampUtc.Value, null)); - } - [Test] public void should_remove_dynamic_subscriptions() { @@ -78,9 +62,10 @@ public void should_remove_the_dynamic_subscriptions_of_a_peer_when_removing_it() _repository.RemovePeer(peerDescriptor.PeerId); _repository.Get(peerDescriptor.PeerId).ShouldBeNull(); + var retrievedSubscriptions = DataContext.DynamicSubscriptions .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(sub => sub.UselessKey == false && sub.PeerId == peerDescriptor.PeerId.ToString()) + .Where(s => s.PeerId == peerDescriptor.PeerId.ToString()) .Execute(); retrievedSubscriptions.ShouldBeEmpty(); } diff --git a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.cs b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.cs similarity index 94% rename from src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.cs rename to src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.cs index f45979dd..5d1d3a86 100644 --- a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/CqlPeerRepositoryTests.cs +++ b/src/Abc.Zebus.Directory.Cassandra.Tests/Cql/CqlPeerRepositoryTests.cs @@ -1,6 +1,9 @@ -using Abc.Zebus.Directory.Cassandra.Cql; +using System; +using System.Linq; +using System.Threading; +using Abc.Zebus.Directory.Cassandra.Cql; +using Abc.Zebus.Directory.Cassandra.Data; using Abc.Zebus.Directory.Cassandra.Storage; -using Abc.Zebus.Directory.Cassandra.Tests.Cql; using Abc.Zebus.Directory.Tests; using Abc.Zebus.Testing; using Abc.Zebus.Testing.Extensions; @@ -8,11 +11,8 @@ using Cassandra; using Cassandra.Data.Linq; using NUnit.Framework; -using System; -using System.Linq; -using System.Threading; -namespace Abc.Zebus.Directory.Cassandra.Tests.Storage +namespace Abc.Zebus.Directory.Cassandra.Tests.Cql { [TestFixture] public partial class CqlPeerRepositoryTests : CqlTestFixture @@ -159,11 +159,6 @@ public void should_read_peer_after_removing_it() fetched.ShouldNotBeNull(); } - private static DateTime GetUnspecifiedKindUtcNow() - { - return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified); - } - [Test] public void should_insert_a_peer_with_no_timestamp_that_was_previously_deleted() { @@ -206,10 +201,10 @@ public void should_handle_peers_with_null_subscriptions_gracefully() descriptor.TimestampUtc = DateTime.UtcNow; _repository.AddOrUpdatePeer(descriptor); - DataContext.StoragePeers + DataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false && peer.PeerId == "Abc.DecommissionnedPeer.0") - .Select(peer => new StoragePeer { StaticSubscriptionsBytes = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false }) + .Where(peer => peer.PeerId == "Abc.DecommissionedPeer.0") + .Select(peer => new CassandraPeer { StaticSubscriptionsBytes = null, IsResponding = false, IsPersistent = false, HasDebuggerAttached = false, IsUp = false }) .Update() .SetTimestamp(DateTime.UtcNow) .Execute(); @@ -217,5 +212,10 @@ public void should_handle_peers_with_null_subscriptions_gracefully() _repository.Get(_peer1.Id).Peer.IsResponding.ShouldBeTrue(); _repository.GetPeers().ExpectedSingle().PeerId.ShouldEqual(_peer1.Id); } + + private static DateTime GetUnspecifiedKindUtcNow() + { + return new DateTime(SystemDateTime.UtcNow.RoundToMillisecond().Ticks, DateTimeKind.Unspecified); + } } } diff --git a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/StorageConvertionExtensionsTests.cs b/src/Abc.Zebus.Directory.Cassandra.Tests/Data/CassandraExtensionsTests.cs similarity index 60% rename from src/Abc.Zebus.Directory.Cassandra.Tests/Storage/StorageConvertionExtensionsTests.cs rename to src/Abc.Zebus.Directory.Cassandra.Tests/Data/CassandraExtensionsTests.cs index b0c7252d..0814b503 100644 --- a/src/Abc.Zebus.Directory.Cassandra.Tests/Storage/StorageConvertionExtensionsTests.cs +++ b/src/Abc.Zebus.Directory.Cassandra.Tests/Data/CassandraExtensionsTests.cs @@ -1,11 +1,11 @@ using System; -using Abc.Zebus.Directory.Cassandra.Storage; +using Abc.Zebus.Directory.Cassandra.Data; using Abc.Zebus.Testing.Extensions; using NUnit.Framework; -namespace Abc.Zebus.Directory.Cassandra.Tests.Storage +namespace Abc.Zebus.Directory.Cassandra.Tests.Data { - public class StorageConvertionExtensionsTests + public class CassandraExtensionsTests { [Test] public void should_return_a_storage_peer_with_its_timestamp_kind_set_to_utc() @@ -13,20 +13,18 @@ public void should_return_a_storage_peer_with_its_timestamp_kind_set_to_utc() var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified); var peerDescriptor = new PeerDescriptor(new PeerId("Abc.Titi.0"), "tcp://toto:123", false, true, true, unspecifiedKindUtcNow); - var storagePeer = peerDescriptor.ToStoragePeer(); - - storagePeer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc); + var peer = peerDescriptor.ToCassandra(); + peer.TimestampUtc.Kind.ShouldEqual(DateTimeKind.Utc); } [Test] public void should_return_a_peer_descriptor_with_its_timestamp_kind_set_to_utc() { var unspecifiedKindUtcNow = new DateTime(DateTime.UtcNow.Ticks, DateTimeKind.Unspecified); - var storagePeer = new StoragePeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]}; - - var peerDescriptor = storagePeer.ToPeerDescriptor(new Subscription[0]); + var peer = new CassandraPeer { TimestampUtc = unspecifiedKindUtcNow, StaticSubscriptionsBytes = new byte[0]}; + var peerDescriptor = peer.ToPeerDescriptor(new Subscription[0]); peerDescriptor.TimestampUtc.Value.Kind.ShouldEqual(DateTimeKind.Utc); } } -} \ No newline at end of file +} diff --git a/src/Abc.Zebus.Directory.Cassandra/Storage/CqlPeerRepository.cs b/src/Abc.Zebus.Directory.Cassandra/Cql/CqlPeerRepository.cs similarity index 73% rename from src/Abc.Zebus.Directory.Cassandra/Storage/CqlPeerRepository.cs rename to src/Abc.Zebus.Directory.Cassandra/Cql/CqlPeerRepository.cs index fdf9403e..8c46c88a 100644 --- a/src/Abc.Zebus.Directory.Cassandra/Storage/CqlPeerRepository.cs +++ b/src/Abc.Zebus.Directory.Cassandra/Cql/CqlPeerRepository.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using Abc.Zebus.Directory.Cassandra.Data; using Abc.Zebus.Directory.Storage; using Cassandra; using Cassandra.Data.Linq; @@ -18,9 +19,9 @@ public CqlPeerRepository(DirectoryDataContext dataContext) public bool? IsPersistent(PeerId peerId) { - return _dataContext.StoragePeers + return _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString()) + .Where(peer => peer.PeerId == peerId.ToString()) .Select(x => (bool?)x.IsPersistent) .Execute() .FirstOrDefault(); @@ -30,13 +31,13 @@ public CqlPeerRepository(DirectoryDataContext dataContext) { var peerDynamicSubscriptions = _dataContext.DynamicSubscriptions .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString()) + .Where(s => s.PeerId == peerId.ToString()) .Execute() - .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions()); + .SelectMany(s => s.ToSubscriptionsForType().ToSubscriptions()); - return _dataContext.StoragePeers + return _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString()) + .Where(peer => peer.PeerId == peerId.ToString()) .Execute() .FirstOrDefault() .ToPeerDescriptor(peerDynamicSubscriptions); @@ -46,9 +47,8 @@ public IEnumerable GetPeers(bool loadDynamicSubscriptions = true { if (!loadDynamicSubscriptions) { - return _dataContext.StoragePeers + return _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false) .Execute() .Select(peer => peer.ToPeerDescriptor()!) .ToList(); @@ -56,15 +56,13 @@ public IEnumerable GetPeers(bool loadDynamicSubscriptions = true var dynamicSubscriptionsByPeer = _dataContext.DynamicSubscriptions .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(sub => sub.UselessKey == false) .Execute() .SelectMany(sub => sub.ToSubscriptionsForType().ToSubscriptions().Select(s => new { sub.PeerId, Subscription = s })) .ToLookup(peerSub => peerSub.PeerId, peerSub=> peerSub.Subscription); - return _dataContext.StoragePeers + return _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false) .Execute() .Select(peer => peer.ToPeerDescriptor(dynamicSubscriptionsByPeer[peer.PeerId])) .Where(descriptor => descriptor != null) @@ -73,26 +71,28 @@ public IEnumerable GetPeers(bool loadDynamicSubscriptions = true public void AddOrUpdatePeer(PeerDescriptor peerDescriptor) { - var storagePeer = peerDescriptor.ToStoragePeer(); - _dataContext.StoragePeers - .Insert(storagePeer) + var cassandraPeer = peerDescriptor.ToCassandra(); + _dataContext.Peers + .Insert(cassandraPeer) .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .SetTimestamp(storagePeer.TimestampUtc) + .SetTimestamp(cassandraPeer.TimestampUtc) .Execute(); } public void RemovePeer(PeerId peerId) { var now = DateTime.UtcNow; - _dataContext.StoragePeers + + _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString()) + .Where(peer => peer.PeerId == peerId.ToString()) .Delete() .SetTimestamp(now) .Execute(); + _dataContext.DynamicSubscriptions .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString()) + .Where(s => s.PeerId == peerId.ToString()) .Delete() .SetTimestamp(now) .Execute(); @@ -100,10 +100,10 @@ public void RemovePeer(PeerId peerId) public void SetPeerResponding(PeerId peerId, bool isResponding) { - _dataContext.StoragePeers + _dataContext.Peers .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(peer => peer.UselessKey == false && peer.PeerId == peerId.ToString()) - .Select(peer => new StoragePeer { IsResponding = isResponding }) + .Where(peer => peer.PeerId == peerId.ToString()) + .Select(peer => new CassandraPeer { IsResponding = isResponding }) .Update() .SetTimestamp(DateTime.UtcNow) .Execute(); @@ -112,15 +112,13 @@ public void SetPeerResponding(PeerId peerId, bool isResponding) public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, SubscriptionsForType[] subscriptionsForTypes) { - if (subscriptionsForTypes == null) - return; var batch = _dataContext.Session.CreateBatch(); batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); foreach (var subscription in subscriptionsForTypes) { batch.Append(_dataContext.DynamicSubscriptions - .Insert(subscription.ToStorageSubscription(peerId)) + .Insert(subscription.ToCassandra(peerId)) .SetTimestamp(timestampUtc)); } batch.Execute(); @@ -128,15 +126,13 @@ public void AddDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc public void RemoveDynamicSubscriptionsForTypes(PeerId peerId, DateTime timestampUtc, MessageTypeId[] messageTypeIds) { - if (messageTypeIds == null) - return; var batch = _dataContext.Session.CreateBatch(); batch.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); foreach (var messageTypeId in messageTypeIds) { var deleteQuery = _dataContext.DynamicSubscriptions - .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString() && sub.MessageTypeId == messageTypeId.FullName) + .Where(s => s.PeerId == peerId.ToString() && s.MessageTypeId == messageTypeId.FullName) .Delete() .SetTimestamp(timestampUtc); batch.Append(deleteQuery); @@ -149,7 +145,7 @@ public void RemoveAllDynamicSubscriptionsForPeer(PeerId peerId, DateTime timesta { _dataContext.DynamicSubscriptions .SetConsistencyLevel(ConsistencyLevel.LocalQuorum) - .Where(sub => sub.UselessKey == false && sub.PeerId == peerId.ToString()) + .Where(s => s.PeerId == peerId.ToString()) .Delete() .SetTimestamp(timestampUtc) .Execute(); diff --git a/src/Abc.Zebus.Directory.Cassandra/Storage/DirectoryDataContext.cs b/src/Abc.Zebus.Directory.Cassandra/Cql/DirectoryDataContext.cs similarity index 69% rename from src/Abc.Zebus.Directory.Cassandra/Storage/DirectoryDataContext.cs rename to src/Abc.Zebus.Directory.Cassandra/Cql/DirectoryDataContext.cs index 15cd4f6a..e31cb95e 100644 --- a/src/Abc.Zebus.Directory.Cassandra/Storage/DirectoryDataContext.cs +++ b/src/Abc.Zebus.Directory.Cassandra/Cql/DirectoryDataContext.cs @@ -1,4 +1,5 @@ using Abc.Zebus.Directory.Cassandra.Cql; +using Abc.Zebus.Directory.Cassandra.Data; using Cassandra.Data.Linq; namespace Abc.Zebus.Directory.Cassandra.Storage @@ -10,7 +11,7 @@ public DirectoryDataContext(CassandraCqlSessionManager sessionManager, ICassandr { } - public Table DynamicSubscriptions => new Table(Session); - public Table StoragePeers => new Table(Session); + public Table DynamicSubscriptions => new(Session); + public Table Peers => new (Session); } } diff --git a/src/Abc.Zebus.Directory.Cassandra/Storage/StorageConvertionExtensions.cs b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraExtensions.cs similarity index 65% rename from src/Abc.Zebus.Directory.Cassandra/Storage/StorageConvertionExtensions.cs rename to src/Abc.Zebus.Directory.Cassandra/Data/CassandraExtensions.cs index 4e221a0d..acaa56e5 100644 --- a/src/Abc.Zebus.Directory.Cassandra/Storage/StorageConvertionExtensions.cs +++ b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraExtensions.cs @@ -2,17 +2,18 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using Abc.Zebus.Directory.Cassandra.Storage; using Abc.Zebus.Routing; using ProtoBuf; -namespace Abc.Zebus.Directory.Cassandra.Storage +namespace Abc.Zebus.Directory.Cassandra.Data { - public static class StorageConversionExtensions + public static class CassandraExtensions { - public static StoragePeer ToStoragePeer(this PeerDescriptor peerDescriptor) + public static CassandraPeer ToCassandra(this PeerDescriptor peerDescriptor) { var timestamp = peerDescriptor.TimestampUtc.HasValue ? new DateTime(peerDescriptor.TimestampUtc.Value.Ticks, DateTimeKind.Utc) : DateTime.UtcNow; - return new StoragePeer + return new CassandraPeer { PeerId = peerDescriptor.PeerId.ToString(), EndPoint = peerDescriptor.Peer.EndPoint, @@ -25,9 +26,9 @@ public static StoragePeer ToStoragePeer(this PeerDescriptor peerDescriptor) }; } - public static StorageSubscription ToStorageSubscription(this SubscriptionsForType subscriptionFortype, PeerId peerId) + public static CassandraSubscription ToCassandra(this SubscriptionsForType subscriptionFortype, PeerId peerId) { - return new StorageSubscription + return new CassandraSubscription { PeerId = peerId.ToString(), MessageTypeId = subscriptionFortype.MessageTypeId.FullName!, @@ -35,40 +36,43 @@ public static StorageSubscription ToStorageSubscription(this SubscriptionsForTyp }; } - public static SubscriptionsForType ToSubscriptionsForType(this StorageSubscription storageSubscription) + public static SubscriptionsForType ToSubscriptionsForType(this CassandraSubscription subscription) { - return new SubscriptionsForType(new MessageTypeId(storageSubscription.MessageTypeId), DeserializeBindingKeys(storageSubscription.SubscriptionBindings)); + return new SubscriptionsForType( + new MessageTypeId(subscription.MessageTypeId), + DeserializeBindingKeys(subscription.SubscriptionBindings) + ); } - public static PeerDescriptor? ToPeerDescriptor(this StoragePeer? storagePeer, IEnumerable peerDynamicSubscriptions) + public static PeerDescriptor? ToPeerDescriptor(this CassandraPeer? peer, IEnumerable peerDynamicSubscriptions) { - if (storagePeer?.StaticSubscriptionsBytes == null) + if (peer?.StaticSubscriptionsBytes == null) return null; - var staticSubscriptions = DeserializeSubscriptions(storagePeer.StaticSubscriptionsBytes); + var staticSubscriptions = DeserializeSubscriptions(peer.StaticSubscriptionsBytes); var allSubscriptions = staticSubscriptions.Concat(peerDynamicSubscriptions).Distinct().ToArray(); - return new PeerDescriptor(new PeerId(storagePeer.PeerId), - storagePeer.EndPoint, - storagePeer.IsPersistent, - storagePeer.IsUp, - storagePeer.IsResponding, - new DateTime(storagePeer.TimestampUtc.Ticks, DateTimeKind.Utc), - allSubscriptions) { HasDebuggerAttached = storagePeer.HasDebuggerAttached }; + return new PeerDescriptor(new PeerId(peer.PeerId), + peer.EndPoint, + peer.IsPersistent, + peer.IsUp, + peer.IsResponding, + new DateTime(peer.TimestampUtc.Ticks, DateTimeKind.Utc), + allSubscriptions) { HasDebuggerAttached = peer.HasDebuggerAttached }; } - public static PeerDescriptor? ToPeerDescriptor(this StoragePeer? storagePeer) + public static PeerDescriptor? ToPeerDescriptor(this CassandraPeer? peer) { - if (storagePeer == null) + if (peer == null) return null; - var staticSubscriptions = DeserializeSubscriptions(storagePeer.StaticSubscriptionsBytes); - return new PeerDescriptor(new PeerId(storagePeer.PeerId), - storagePeer.EndPoint, - storagePeer.IsPersistent, - storagePeer.IsUp, - storagePeer.IsResponding, - new DateTime(storagePeer.TimestampUtc.Ticks, DateTimeKind.Utc), - staticSubscriptions) { HasDebuggerAttached = storagePeer.HasDebuggerAttached }; + var staticSubscriptions = DeserializeSubscriptions(peer.StaticSubscriptionsBytes); + return new PeerDescriptor(new PeerId(peer.PeerId), + peer.EndPoint, + peer.IsPersistent, + peer.IsUp, + peer.IsResponding, + new DateTime(peer.TimestampUtc.Ticks, DateTimeKind.Utc), + staticSubscriptions) { HasDebuggerAttached = peer.HasDebuggerAttached }; } private static byte[] SerializeSubscriptions(Subscription[] subscriptions) diff --git a/src/Abc.Zebus.Directory.Cassandra/Storage/StoragePeer.cs b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraPeer.cs similarity index 75% rename from src/Abc.Zebus.Directory.Cassandra/Storage/StoragePeer.cs rename to src/Abc.Zebus.Directory.Cassandra/Data/CassandraPeer.cs index 9ce7b4e4..4bd4690b 100644 --- a/src/Abc.Zebus.Directory.Cassandra/Storage/StoragePeer.cs +++ b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraPeer.cs @@ -1,16 +1,12 @@ using System; using Cassandra.Mapping.Attributes; -namespace Abc.Zebus.Directory.Cassandra.Storage +namespace Abc.Zebus.Directory.Cassandra.Data { - [Table("Peers", CaseSensitive = true)] - public class StoragePeer + [Table("Peers_2", CaseSensitive = true)] + public class CassandraPeer { - [PartitionKey] - public bool UselessKey { get; set; } - - [ClusteringKey(0)] - [Column("PeerId")] + [PartitionKey, Column("PeerId")] public string PeerId { get; set; } = default!; [Column("EndPoint")] @@ -33,5 +29,6 @@ public class StoragePeer [Column("StaticSubscriptions")] public byte[] StaticSubscriptionsBytes { get; set; } = default!; + } } diff --git a/src/Abc.Zebus.Directory.Cassandra/Data/CassandraSubscription.cs b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraSubscription.cs new file mode 100644 index 00000000..bc328dce --- /dev/null +++ b/src/Abc.Zebus.Directory.Cassandra/Data/CassandraSubscription.cs @@ -0,0 +1,17 @@ +using Cassandra.Mapping.Attributes; + +namespace Abc.Zebus.Directory.Cassandra.Data +{ + [Table("DynamicSubscriptions_2", CaseSensitive = true)] + public class CassandraSubscription + { + [PartitionKey, Column("PeerId")] + public string PeerId { get; set; } = default!; + + [ClusteringKey(0), Column("MessageTypeId")] + public string MessageTypeId { get; set; } = default!; + + [Column("SubscriptionBindings")] + public byte[] SubscriptionBindings { get; set; } = default!; + } +} diff --git a/src/Abc.Zebus.Directory.Cassandra/Storage/StorageSubscription.cs b/src/Abc.Zebus.Directory.Cassandra/Storage/StorageSubscription.cs deleted file mode 100644 index f9cd4ade..00000000 --- a/src/Abc.Zebus.Directory.Cassandra/Storage/StorageSubscription.cs +++ /dev/null @@ -1,22 +0,0 @@ -using Cassandra.Mapping.Attributes; - -namespace Abc.Zebus.Directory.Cassandra.Storage -{ - [Table("DynamicSubscriptions", CaseSensitive = true)] - public class StorageSubscription - { - [PartitionKey] - public bool UselessKey { get; set; } - - [ClusteringKey(0)] - [Column("PeerId")] - public string PeerId { get; set; } = default!; - - [ClusteringKey(1)] - [Column("MessageTypeId")] - public string MessageTypeId { get; set; } = default!; - - [Column("SubscriptionBindings")] - public byte[] SubscriptionBindings { get; set; } = default!; - } -} diff --git a/src/Abc.Zebus.Directory.Cassandra/schema_creation.cql b/src/Abc.Zebus.Directory.Cassandra/schema_creation.cql index 7a29f8e1..1c98559c 100644 --- a/src/Abc.Zebus.Directory.Cassandra/schema_creation.cql +++ b/src/Abc.Zebus.Directory.Cassandra/schema_creation.cql @@ -1,7 +1,6 @@ -- First create your Keyspace with the replication factor you want -create table IF NOT EXISTS "Peers" ( - "UselessKey" boolean, +create table IF NOT EXISTS "Peers_2" ( "PeerId" text, "EndPoint" text, "IsUp" boolean, @@ -10,13 +9,12 @@ create table IF NOT EXISTS "Peers" ( "TimestampUtc" timestamp, "HasDebuggerAttached" boolean, "StaticSubscriptions" blob, - PRIMARY KEY("UselessKey", "PeerId") + PRIMARY KEY("PeerId") ); -create table IF NOT EXISTS "DynamicSubscriptions" ( - "UselessKey" boolean, +create table IF NOT EXISTS "DynamicSubscriptions_2" ( "PeerId" text, - "MessageTypeId" text, + "MessageTypeId" text, "SubscriptionBindings" blob, - PRIMARY KEY("UselessKey", "PeerId", "MessageTypeId") -); \ No newline at end of file + PRIMARY KEY("PeerId", "MessageTypeId") +); diff --git a/src/Abc.Zebus.Persistence.CQL.Testing/Abc.Zebus.Persistence.CQL.Testing.csproj b/src/Abc.Zebus.Persistence.Cassandra.Testing/Abc.Zebus.Persistence.Cassandra.Testing.csproj similarity index 71% rename from src/Abc.Zebus.Persistence.CQL.Testing/Abc.Zebus.Persistence.CQL.Testing.csproj rename to src/Abc.Zebus.Persistence.Cassandra.Testing/Abc.Zebus.Persistence.Cassandra.Testing.csproj index ef1d92a2..85b6a024 100644 --- a/src/Abc.Zebus.Persistence.CQL.Testing/Abc.Zebus.Persistence.CQL.Testing.csproj +++ b/src/Abc.Zebus.Persistence.Cassandra.Testing/Abc.Zebus.Persistence.Cassandra.Testing.csproj @@ -2,12 +2,12 @@ netstandard2.0 - Zebus.Persistence.CQL.Testing + Zebus.Persistence.Cassandra.Testing $(ZebusPersistenceVersion) - + diff --git a/src/Abc.Zebus.Persistence.CQL.Testing/CassandraConfigurationMock.cs b/src/Abc.Zebus.Persistence.Cassandra.Testing/CassandraConfigurationMock.cs similarity index 90% rename from src/Abc.Zebus.Persistence.CQL.Testing/CassandraConfigurationMock.cs rename to src/Abc.Zebus.Persistence.Cassandra.Testing/CassandraConfigurationMock.cs index 4db7972a..910841a1 100644 --- a/src/Abc.Zebus.Persistence.CQL.Testing/CassandraConfigurationMock.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Testing/CassandraConfigurationMock.cs @@ -1,8 +1,8 @@ using System; -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra.Cql; using Moq; -namespace Abc.Zebus.Persistence.CQL.Testing +namespace Abc.Zebus.Persistence.Cassandra.Testing { // TODO: This entire namespace belongs in Zebus.Testing but it would require the creation of a Zebus.Shared, and we are not ready for that just yet public class CassandraConfigurationMock : Mock where TConfig : class, ICassandraConfiguration @@ -15,4 +15,4 @@ public CassandraConfigurationMock(string host, string keySpace, string localData As().SetupGet(config => config.LocalDataCenter).Returns(localDataCenter); } } -} \ No newline at end of file +} diff --git a/src/Abc.Zebus.Persistence.CQL.Testing/Properties/AssemblyInfo.cs b/src/Abc.Zebus.Persistence.Cassandra.Testing/Properties/AssemblyInfo.cs similarity index 100% rename from src/Abc.Zebus.Persistence.CQL.Testing/Properties/AssemblyInfo.cs rename to src/Abc.Zebus.Persistence.Cassandra.Testing/Properties/AssemblyInfo.cs diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/Abc.Zebus.Persistence.CQL.Tests.csproj b/src/Abc.Zebus.Persistence.Cassandra.Tests/Abc.Zebus.Persistence.Cassandra.Tests.csproj similarity index 70% rename from src/Abc.Zebus.Persistence.CQL.Tests/Abc.Zebus.Persistence.CQL.Tests.csproj rename to src/Abc.Zebus.Persistence.Cassandra.Tests/Abc.Zebus.Persistence.Cassandra.Tests.csproj index f5e8beb3..a4c4915a 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/Abc.Zebus.Persistence.CQL.Tests.csproj +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/Abc.Zebus.Persistence.Cassandra.Tests.csproj @@ -3,12 +3,13 @@ net48;net5.0 $(ZebusPersistenceVersion) + Abc.Zebus.Persistence.Cassandra.Tests - + diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/BucketIdHelperTests.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/BucketIdHelperTests.cs similarity index 97% rename from src/Abc.Zebus.Persistence.CQL.Tests/BucketIdHelperTests.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/BucketIdHelperTests.cs index 15a32fe6..573526ff 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/BucketIdHelperTests.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/BucketIdHelperTests.cs @@ -3,7 +3,7 @@ using Abc.Zebus.Testing.Extensions; using NUnit.Framework; -namespace Abc.Zebus.Persistence.CQL.Tests +namespace Abc.Zebus.Persistence.Cassandra.Tests { public class BucketIdHelperTests { diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/Cql/CqlTestFixture.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/Cql/CqlTestFixture.cs similarity index 95% rename from src/Abc.Zebus.Persistence.CQL.Tests/Cql/CqlTestFixture.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/Cql/CqlTestFixture.cs index 750fd4c0..6ac650ff 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/Cql/CqlTestFixture.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/Cql/CqlTestFixture.cs @@ -2,14 +2,14 @@ using System.Collections.Generic; using System.Diagnostics; using System.Reflection; -using Abc.Zebus.Persistence.CQL.Testing; -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra.Cql; +using Abc.Zebus.Persistence.Cassandra.Testing; using Cassandra; using Cassandra.Mapping; using Moq; using NUnit.Framework; -namespace Abc.Zebus.Persistence.CQL.Tests.Cql +namespace Abc.Zebus.Persistence.Cassandra.Tests.Cql { // TODO: This entire namespace belongs in Zebus.Testing but it would require the creation of a Zebus.Shared, and we are not ready for that just yet public abstract class CqlTestFixture diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/CqlMessageReaderTests.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs similarity index 96% rename from src/Abc.Zebus.Persistence.CQL.Tests/CqlMessageReaderTests.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs index 4a1ba870..91733605 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/CqlMessageReaderTests.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs @@ -1,9 +1,9 @@ using System; using System.IO; using System.Linq; -using Abc.Zebus.Persistence.CQL.Data; -using Abc.Zebus.Persistence.CQL.Storage; -using Abc.Zebus.Persistence.CQL.Tests.Cql; +using Abc.Zebus.Persistence.Cassandra.Cql; +using Abc.Zebus.Persistence.Cassandra.Data; +using Abc.Zebus.Persistence.Cassandra.Tests.Cql; using Abc.Zebus.Persistence.Messages; using Abc.Zebus.Persistence.Storage; using Abc.Zebus.Serialization; @@ -12,7 +12,7 @@ using Abc.Zebus.Transport; using NUnit.Framework; -namespace Abc.Zebus.Persistence.CQL.Tests +namespace Abc.Zebus.Persistence.Cassandra.Tests { public class CqlMessageReaderTests : CqlTestFixture { diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/CqlStorageTests.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs similarity index 99% rename from src/Abc.Zebus.Persistence.CQL.Tests/CqlStorageTests.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs index 1a924d1a..369e7568 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/CqlStorageTests.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/CqlStorageTests.cs @@ -2,9 +2,9 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Abc.Zebus.Persistence.CQL.Data; -using Abc.Zebus.Persistence.CQL.Storage; -using Abc.Zebus.Persistence.CQL.Tests.Cql; +using Abc.Zebus.Persistence.Cassandra.Cql; +using Abc.Zebus.Persistence.Cassandra.Data; +using Abc.Zebus.Persistence.Cassandra.Tests.Cql; using Abc.Zebus.Persistence.Matching; using Abc.Zebus.Persistence.Messages; using Abc.Zebus.Persistence.Reporter; @@ -17,7 +17,7 @@ using NUnit.Framework; using ProtoBuf; -namespace Abc.Zebus.Persistence.CQL.Tests +namespace Abc.Zebus.Persistence.Cassandra.Tests { public class CqlStorageTests : CqlTestFixture { diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/MessageIdV2.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/MessageIdV2.cs similarity index 97% rename from src/Abc.Zebus.Persistence.CQL.Tests/MessageIdV2.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/MessageIdV2.cs index 5125ca14..61d0637e 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/MessageIdV2.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/MessageIdV2.cs @@ -1,7 +1,7 @@ using System; using System.Security.Cryptography; -namespace Abc.Zebus.Persistence.CQL.Tests +namespace Abc.Zebus.Persistence.Cassandra.Tests { public static class MessageIdV2 { diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs similarity index 93% rename from src/Abc.Zebus.Persistence.CQL.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs index 7f663535..5f3a5ad6 100644 --- a/src/Abc.Zebus.Persistence.CQL.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs +++ b/src/Abc.Zebus.Persistence.Cassandra.Tests/OldestNonAckedMessageUpdaterPeriodicActionTests.cs @@ -2,17 +2,17 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Abc.Zebus.Persistence.CQL.Data; -using Abc.Zebus.Persistence.CQL.PeriodicAction; -using Abc.Zebus.Persistence.CQL.Storage; -using Abc.Zebus.Persistence.CQL.Tests.Cql; +using Abc.Zebus.Persistence.Cassandra.Cql; +using Abc.Zebus.Persistence.Cassandra.Data; +using Abc.Zebus.Persistence.Cassandra.PeriodicAction; +using Abc.Zebus.Persistence.Cassandra.Tests.Cql; using Abc.Zebus.Testing; using Abc.Zebus.Testing.Extensions; using Abc.Zebus.Util; using Moq; using NUnit.Framework; -namespace Abc.Zebus.Persistence.CQL.Tests +namespace Abc.Zebus.Persistence.Cassandra.Tests { public class OldestNonAckedMessageUpdaterPeriodicActionTests : CqlTestFixture { diff --git a/src/Abc.Zebus.Persistence.CQL.Tests/Properties/AssemblyInfo.cs b/src/Abc.Zebus.Persistence.Cassandra.Tests/Properties/AssemblyInfo.cs similarity index 100% rename from src/Abc.Zebus.Persistence.CQL.Tests/Properties/AssemblyInfo.cs rename to src/Abc.Zebus.Persistence.Cassandra.Tests/Properties/AssemblyInfo.cs diff --git a/src/Abc.Zebus.Persistence.CQL/Abc.Zebus.Persistence.CQL.csproj b/src/Abc.Zebus.Persistence.Cassandra/Abc.Zebus.Persistence.Cassandra.csproj similarity index 90% rename from src/Abc.Zebus.Persistence.CQL/Abc.Zebus.Persistence.CQL.csproj rename to src/Abc.Zebus.Persistence.Cassandra/Abc.Zebus.Persistence.Cassandra.csproj index 06ff41e7..9c93ad1f 100644 --- a/src/Abc.Zebus.Persistence.CQL/Abc.Zebus.Persistence.CQL.csproj +++ b/src/Abc.Zebus.Persistence.Cassandra/Abc.Zebus.Persistence.Cassandra.csproj @@ -2,7 +2,7 @@ netstandard2.0 - Zebus.Persistence.CQL + Zebus.Persistence.Cassandra $(ZebusPersistenceVersion) diff --git a/src/Abc.Zebus.Persistence.CQL/Util/CassandraCqlSessionManager.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/CassandraCqlSessionManager.cs similarity index 98% rename from src/Abc.Zebus.Persistence.CQL/Util/CassandraCqlSessionManager.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/CassandraCqlSessionManager.cs index e448af48..5cab679f 100644 --- a/src/Abc.Zebus.Persistence.CQL/Util/CassandraCqlSessionManager.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/CassandraCqlSessionManager.cs @@ -3,7 +3,7 @@ using System.Linq; using Cassandra; -namespace Abc.Zebus.Persistence.CQL.Util +namespace Abc.Zebus.Persistence.Cassandra.Cql { public class CassandraCqlSessionManager : IDisposable { diff --git a/src/Abc.Zebus.Persistence.CQL/Util/CqlDataContext.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlDataContext.cs similarity index 98% rename from src/Abc.Zebus.Persistence.CQL/Util/CqlDataContext.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/CqlDataContext.cs index 7c5a3789..1aaf29e0 100644 --- a/src/Abc.Zebus.Persistence.CQL/Util/CqlDataContext.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlDataContext.cs @@ -5,7 +5,7 @@ using Cassandra.Data.Linq; using TableAttribute = Cassandra.Mapping.Attributes.TableAttribute; -namespace Abc.Zebus.Persistence.CQL.Util +namespace Abc.Zebus.Persistence.Cassandra.Cql { public abstract class CqlDataContext where TConfig : ICassandraConfiguration { diff --git a/src/Abc.Zebus.Persistence.CQL/Storage/CqlMessageReader.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlMessageReader.cs similarity index 97% rename from src/Abc.Zebus.Persistence.CQL/Storage/CqlMessageReader.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/CqlMessageReader.cs index ee826a0f..6ee2d34c 100644 --- a/src/Abc.Zebus.Persistence.CQL/Storage/CqlMessageReader.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlMessageReader.cs @@ -1,14 +1,14 @@ using System; using System.Collections.Generic; using System.Linq; -using Abc.Zebus.Persistence.CQL.Data; +using Abc.Zebus.Persistence.Cassandra.Data; using Abc.Zebus.Persistence.Messages; using Abc.Zebus.Persistence.Storage; using Cassandra; using Cassandra.Data.Linq; using log4net; -namespace Abc.Zebus.Persistence.CQL.Storage +namespace Abc.Zebus.Persistence.Cassandra.Cql { public class CqlMessageReader : IMessageReader { diff --git a/src/Abc.Zebus.Persistence.CQL/Storage/CqlStorage.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs similarity index 98% rename from src/Abc.Zebus.Persistence.CQL/Storage/CqlStorage.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs index cb4ba0a0..a92d5302 100644 --- a/src/Abc.Zebus.Persistence.CQL/Storage/CqlStorage.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/CqlStorage.cs @@ -3,8 +3,8 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Abc.Zebus.Persistence.CQL.Data; -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra.Data; +using Abc.Zebus.Persistence.Cassandra.Util; using Abc.Zebus.Persistence.Matching; using Abc.Zebus.Persistence.Messages; using Abc.Zebus.Persistence.Reporter; @@ -15,7 +15,7 @@ using Cassandra.Data.Linq; using log4net; -namespace Abc.Zebus.Persistence.CQL.Storage +namespace Abc.Zebus.Persistence.Cassandra.Cql { public class CqlStorage : ICqlStorage, IDisposable { diff --git a/src/Abc.Zebus.Persistence.CQL/Util/ICassandraConfiguration.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/ICassandraConfiguration.cs similarity index 94% rename from src/Abc.Zebus.Persistence.CQL/Util/ICassandraConfiguration.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/ICassandraConfiguration.cs index d1ac41a9..18b387f3 100644 --- a/src/Abc.Zebus.Persistence.CQL/Util/ICassandraConfiguration.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/ICassandraConfiguration.cs @@ -1,6 +1,6 @@ using System; -namespace Abc.Zebus.Persistence.CQL.Util +namespace Abc.Zebus.Persistence.Cassandra.Cql { public interface ICassandraConfiguration { @@ -26,4 +26,4 @@ public interface ICassandraConfiguration /// string LocalDataCenter { get; } } -} \ No newline at end of file +} diff --git a/src/Abc.Zebus.Persistence.CQL/Storage/ICqlStorage.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/ICqlStorage.cs similarity index 85% rename from src/Abc.Zebus.Persistence.CQL/Storage/ICqlStorage.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/ICqlStorage.cs index 6416d678..23c68685 100644 --- a/src/Abc.Zebus.Persistence.CQL/Storage/ICqlStorage.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/ICqlStorage.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Abc.Zebus.Persistence.Storage; -namespace Abc.Zebus.Persistence.CQL.Storage +namespace Abc.Zebus.Persistence.Cassandra.Cql { public interface ICqlStorage : IStorage { diff --git a/src/Abc.Zebus.Persistence.CQL/Storage/PeerState.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/PeerState.cs similarity index 97% rename from src/Abc.Zebus.Persistence.CQL/Storage/PeerState.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/PeerState.cs index d4837760..8499708e 100644 --- a/src/Abc.Zebus.Persistence.CQL/Storage/PeerState.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/PeerState.cs @@ -1,7 +1,7 @@ using System; using Abc.Zebus.Util; -namespace Abc.Zebus.Persistence.CQL.Storage +namespace Abc.Zebus.Persistence.Cassandra.Cql { public class PeerState { @@ -31,7 +31,7 @@ public PeerState(PeerId peerId, int nonAckMessageCount = 0, long oldestNonAckedM /// /// Provides a timestamp that is lower than or equal to the timestamp of the last unacked message. - /// + /// /// The goal of this value is to make replay efficient by using a recent starting point, thus reducing /// the number of messages that the replay will scan. /// diff --git a/src/Abc.Zebus.Persistence.CQL/Storage/PeerStateRepository.cs b/src/Abc.Zebus.Persistence.Cassandra/Cql/PeerStateRepository.cs similarity index 97% rename from src/Abc.Zebus.Persistence.CQL/Storage/PeerStateRepository.cs rename to src/Abc.Zebus.Persistence.Cassandra/Cql/PeerStateRepository.cs index d3161891..88d4e48d 100644 --- a/src/Abc.Zebus.Persistence.CQL/Storage/PeerStateRepository.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Cql/PeerStateRepository.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Abc.Zebus.Persistence.CQL.Data; +using Abc.Zebus.Persistence.Cassandra.Data; using Abc.Zebus.Persistence.Messages; using Abc.Zebus.Persistence.Util; using Abc.Zebus.Util; @@ -10,7 +10,7 @@ using Cassandra.Data.Linq; using log4net; -namespace Abc.Zebus.Persistence.CQL.Storage +namespace Abc.Zebus.Persistence.Cassandra.Cql { public class PeerStateRepository { diff --git a/src/Abc.Zebus.Persistence.CQL/Data/CassandraPeerState.cs b/src/Abc.Zebus.Persistence.Cassandra/Data/CassandraPeerState.cs similarity index 89% rename from src/Abc.Zebus.Persistence.CQL/Data/CassandraPeerState.cs rename to src/Abc.Zebus.Persistence.Cassandra/Data/CassandraPeerState.cs index e874b040..3d1b020e 100644 --- a/src/Abc.Zebus.Persistence.CQL/Data/CassandraPeerState.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Data/CassandraPeerState.cs @@ -1,7 +1,7 @@ -using Abc.Zebus.Persistence.CQL.Storage; +using Abc.Zebus.Persistence.Cassandra.Cql; using Cassandra.Mapping.Attributes; -namespace Abc.Zebus.Persistence.CQL.Data +namespace Abc.Zebus.Persistence.Cassandra.Data { [Table("PeerState", CaseSensitive = true)] public class CassandraPeerState diff --git a/src/Abc.Zebus.Persistence.CQL/Data/PersistenceCqlDataContext.cs b/src/Abc.Zebus.Persistence.Cassandra/Data/PersistenceCqlDataContext.cs similarity index 84% rename from src/Abc.Zebus.Persistence.CQL/Data/PersistenceCqlDataContext.cs rename to src/Abc.Zebus.Persistence.Cassandra/Data/PersistenceCqlDataContext.cs index 2c96a0b1..cad90e2c 100644 --- a/src/Abc.Zebus.Persistence.CQL/Data/PersistenceCqlDataContext.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Data/PersistenceCqlDataContext.cs @@ -1,7 +1,7 @@ -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra.Cql; using Cassandra.Data.Linq; -namespace Abc.Zebus.Persistence.CQL.Data +namespace Abc.Zebus.Persistence.Cassandra.Data { public class PersistenceCqlDataContext : CqlDataContext { @@ -12,4 +12,4 @@ public PersistenceCqlDataContext(CassandraCqlSessionManager sessionManager, ICql public Table PersistentMessages => new Table(Session); public Table PeerStates => new Table(Session); } -} \ No newline at end of file +} diff --git a/src/Abc.Zebus.Persistence.CQL/Data/PersistentMessage.cs b/src/Abc.Zebus.Persistence.Cassandra/Data/PersistentMessage.cs similarity index 94% rename from src/Abc.Zebus.Persistence.CQL/Data/PersistentMessage.cs rename to src/Abc.Zebus.Persistence.Cassandra/Data/PersistentMessage.cs index 4dfa1a7c..f4835b4c 100644 --- a/src/Abc.Zebus.Persistence.CQL/Data/PersistentMessage.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Data/PersistentMessage.cs @@ -1,7 +1,7 @@ using System; using Cassandra.Mapping.Attributes; -namespace Abc.Zebus.Persistence.CQL.Data +namespace Abc.Zebus.Persistence.Cassandra.Data { [Table("PersistentMessage", CaseSensitive = true)] public class PersistentMessage diff --git a/src/Abc.Zebus.Persistence.CQL/ICqlPersistenceConfiguration.cs b/src/Abc.Zebus.Persistence.Cassandra/ICqlPersistenceConfiguration.cs similarity index 74% rename from src/Abc.Zebus.Persistence.CQL/ICqlPersistenceConfiguration.cs rename to src/Abc.Zebus.Persistence.Cassandra/ICqlPersistenceConfiguration.cs index a0849271..57d367c7 100644 --- a/src/Abc.Zebus.Persistence.CQL/ICqlPersistenceConfiguration.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/ICqlPersistenceConfiguration.cs @@ -1,7 +1,7 @@ using System; -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra.Cql; -namespace Abc.Zebus.Persistence.CQL +namespace Abc.Zebus.Persistence.Cassandra { public interface ICqlPersistenceConfiguration : ICassandraConfiguration { diff --git a/src/Abc.Zebus.Persistence.CQL/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs b/src/Abc.Zebus.Persistence.Cassandra/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs similarity index 95% rename from src/Abc.Zebus.Persistence.CQL/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs rename to src/Abc.Zebus.Persistence.Cassandra/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs index fb6fdca5..616dc774 100644 --- a/src/Abc.Zebus.Persistence.CQL/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/PeriodicAction/OldestNonAckedMessageUpdaterPeriodicAction.cs @@ -2,12 +2,12 @@ using System.Linq; using System.Threading.Tasks; using Abc.Zebus.Hosting; -using Abc.Zebus.Persistence.CQL.Storage; +using Abc.Zebus.Persistence.Cassandra.Cql; using Abc.Zebus.Persistence.Storage; using Abc.Zebus.Util; using Abc.Zebus.Util.Extensions; -namespace Abc.Zebus.Persistence.CQL.PeriodicAction +namespace Abc.Zebus.Persistence.Cassandra.PeriodicAction { public class OldestNonAckedMessageUpdaterPeriodicAction : PeriodicActionHostInitializer { diff --git a/src/Abc.Zebus.Persistence.CQL/Properties/AssemblyInfo.cs b/src/Abc.Zebus.Persistence.Cassandra/Properties/AssemblyInfo.cs similarity index 68% rename from src/Abc.Zebus.Persistence.CQL/Properties/AssemblyInfo.cs rename to src/Abc.Zebus.Persistence.Cassandra/Properties/AssemblyInfo.cs index 83b76494..0b2cc2d2 100644 --- a/src/Abc.Zebus.Persistence.CQL/Properties/AssemblyInfo.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Properties/AssemblyInfo.cs @@ -6,7 +6,7 @@ [assembly: InternalsVisibleTo("Abc.Zebus.Integration")] [assembly: InternalsVisibleTo("Abc.Zebus.PersistenceService.Tests")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL.Testing")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL.Tests")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra.Testing")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra.Tests")] [module: SkipLocalsInit] diff --git a/src/Abc.Zebus.Persistence.CQL/Util/MessageIdExtensions.cs b/src/Abc.Zebus.Persistence.Cassandra/Util/MessageIdExtensions.cs similarity index 96% rename from src/Abc.Zebus.Persistence.CQL/Util/MessageIdExtensions.cs rename to src/Abc.Zebus.Persistence.Cassandra/Util/MessageIdExtensions.cs index d270f621..9c6e2e93 100644 --- a/src/Abc.Zebus.Persistence.CQL/Util/MessageIdExtensions.cs +++ b/src/Abc.Zebus.Persistence.Cassandra/Util/MessageIdExtensions.cs @@ -1,6 +1,6 @@ using System; -namespace Abc.Zebus.Persistence.CQL.Util +namespace Abc.Zebus.Persistence.Cassandra.Util { public static class MessageIdExtensions { diff --git a/src/Abc.Zebus.Persistence.CQL/schema/1.0/01_Create_PersistentMessage.cql b/src/Abc.Zebus.Persistence.Cassandra/schema/1.0/01_Create_PersistentMessage.cql similarity index 100% rename from src/Abc.Zebus.Persistence.CQL/schema/1.0/01_Create_PersistentMessage.cql rename to src/Abc.Zebus.Persistence.Cassandra/schema/1.0/01_Create_PersistentMessage.cql diff --git a/src/Abc.Zebus.Persistence.CQL/schema/1.0/02_Create_PeerState.cql b/src/Abc.Zebus.Persistence.Cassandra/schema/1.0/02_Create_PeerState.cql similarity index 100% rename from src/Abc.Zebus.Persistence.CQL/schema/1.0/02_Create_PeerState.cql rename to src/Abc.Zebus.Persistence.Cassandra/schema/1.0/02_Create_PeerState.cql diff --git a/src/Abc.Zebus.Persistence.CQL/schema/1.1/01_Create_PersistentMessage.cql b/src/Abc.Zebus.Persistence.Cassandra/schema/1.1/01_Create_PersistentMessage.cql similarity index 100% rename from src/Abc.Zebus.Persistence.CQL/schema/1.1/01_Create_PersistentMessage.cql rename to src/Abc.Zebus.Persistence.Cassandra/schema/1.1/01_Create_PersistentMessage.cql diff --git a/src/Abc.Zebus.Persistence.Runner/Abc.Zebus.Persistence.Runner.csproj b/src/Abc.Zebus.Persistence.Runner/Abc.Zebus.Persistence.Runner.csproj index 0134ecce..35fe1878 100644 --- a/src/Abc.Zebus.Persistence.Runner/Abc.Zebus.Persistence.Runner.csproj +++ b/src/Abc.Zebus.Persistence.Runner/Abc.Zebus.Persistence.Runner.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Abc.Zebus.Persistence.Runner/CassandraAppSettingsConfiguration.cs b/src/Abc.Zebus.Persistence.Runner/CassandraAppSettingsConfiguration.cs index 5c884e97..5fe33f84 100644 --- a/src/Abc.Zebus.Persistence.Runner/CassandraAppSettingsConfiguration.cs +++ b/src/Abc.Zebus.Persistence.Runner/CassandraAppSettingsConfiguration.cs @@ -1,5 +1,5 @@ using System; -using Abc.Zebus.Persistence.CQL; +using Abc.Zebus.Persistence.Cassandra; using FluentDate; namespace Abc.Zebus.Persistence.Runner diff --git a/src/Abc.Zebus.Persistence.Runner/Program.cs b/src/Abc.Zebus.Persistence.Runner/Program.cs index a29c6345..49bca474 100644 --- a/src/Abc.Zebus.Persistence.Runner/Program.cs +++ b/src/Abc.Zebus.Persistence.Runner/Program.cs @@ -6,10 +6,9 @@ using Abc.Zebus.Directory; using Abc.Zebus.Dispatch; using Abc.Zebus.Monitoring; -using Abc.Zebus.Persistence.CQL; -using Abc.Zebus.Persistence.CQL.PeriodicAction; -using Abc.Zebus.Persistence.CQL.Storage; -using Abc.Zebus.Persistence.CQL.Util; +using Abc.Zebus.Persistence.Cassandra; +using Abc.Zebus.Persistence.Cassandra.Cql; +using Abc.Zebus.Persistence.Cassandra.PeriodicAction; using Abc.Zebus.Persistence.Initialization; using Abc.Zebus.Persistence.Matching; using Abc.Zebus.Persistence.Reporter; diff --git a/src/Abc.Zebus.Persistence/Properties/AssemblyInfo.cs b/src/Abc.Zebus.Persistence/Properties/AssemblyInfo.cs index 096a0cf8..0dabc45c 100644 --- a/src/Abc.Zebus.Persistence/Properties/AssemblyInfo.cs +++ b/src/Abc.Zebus.Persistence/Properties/AssemblyInfo.cs @@ -5,7 +5,7 @@ [assembly: Guid("acc56b25-51e9-4864-8913-d36ba6d8ffab")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Tests")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL.Tests")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra.Tests")] [module: SkipLocalsInit] diff --git a/src/Abc.Zebus.Testing/Properties/AssemblyInfo.cs b/src/Abc.Zebus.Testing/Properties/AssemblyInfo.cs index 06c137a6..e12bdba2 100644 --- a/src/Abc.Zebus.Testing/Properties/AssemblyInfo.cs +++ b/src/Abc.Zebus.Testing/Properties/AssemblyInfo.cs @@ -12,5 +12,5 @@ [assembly: InternalsVisibleTo("Abc.Zebus.Directory.Cassandra.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Directory.RocksDb.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Tests")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL.Tests")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.RocksDb.Tests")] diff --git a/src/Abc.Zebus.sln b/src/Abc.Zebus.sln index 12502a21..05612468 100644 --- a/src/Abc.Zebus.sln +++ b/src/Abc.Zebus.sln @@ -33,11 +33,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Directory.Cassand EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Directory.Tests", "Abc.Zebus.Directory.Tests\Abc.Zebus.Directory.Tests.csproj", "{7C004E59-3B24-4359-8E45-1CD93F03FD80}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.CQL", "Abc.Zebus.Persistence.CQL\Abc.Zebus.Persistence.CQL.csproj", "{C59CFDDA-987B-4948-B016-95982BBE2CFA}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.Cassandra", "Abc.Zebus.Persistence.Cassandra\Abc.Zebus.Persistence.Cassandra.csproj", "{C59CFDDA-987B-4948-B016-95982BBE2CFA}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.CQL.Testing", "Abc.Zebus.Persistence.CQL.Testing\Abc.Zebus.Persistence.CQL.Testing.csproj", "{1059B7D5-1C8F-4702-A6DA-58986FB042F6}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.Cassandra.Testing", "Abc.Zebus.Persistence.Cassandra.Testing\Abc.Zebus.Persistence.Cassandra.Testing.csproj", "{1059B7D5-1C8F-4702-A6DA-58986FB042F6}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.CQL.Tests", "Abc.Zebus.Persistence.CQL.Tests\Abc.Zebus.Persistence.CQL.Tests.csproj", "{A9B86B14-A1D5-46F8-9F2D-777C204D7C0B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.Cassandra.Tests", "Abc.Zebus.Persistence.Cassandra.Tests\Abc.Zebus.Persistence.Cassandra.Tests.csproj", "{A9B86B14-A1D5-46F8-9F2D-777C204D7C0B}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abc.Zebus.Persistence.Messages", "Abc.Zebus.Persistence.Messages\Abc.Zebus.Persistence.Messages.csproj", "{8FC5C46E-9C7F-4905-B8A8-89E154F576CF}" EndProject diff --git a/src/Abc.Zebus/Properties/AssemblyInfo.cs b/src/Abc.Zebus/Properties/AssemblyInfo.cs index cf4d5f0a..c919212d 100644 --- a/src/Abc.Zebus/Properties/AssemblyInfo.cs +++ b/src/Abc.Zebus/Properties/AssemblyInfo.cs @@ -21,8 +21,8 @@ [assembly: InternalsVisibleTo("Abc.Zebus.DirectoryService.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL")] -[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.CQL.Tests")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra")] +[assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Cassandra.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.RocksDb")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.RocksDb.Tests")] [assembly: InternalsVisibleTo("Abc.Zebus.Persistence.Tests")]