From 88762f6d5dee78e8388958e733ce2ab2c84478e8 Mon Sep 17 00:00:00 2001 From: Alex Henning Johannessen Date: Sat, 20 Jun 2020 16:10:11 +0100 Subject: [PATCH] misc: adjust to upstream wrt. protobuf - remove protobuf defintions that are not relevant currently. - adjust code to changes in protobuf and headers. --- core/src/main/protobuf/gossip.proto | 44 +++++ core/src/main/protobuf/operations.proto | 34 ---- core/src/main/protobuf/persistent.proto | 168 ----------------- core/src/main/protobuf/projections.proto | 173 ------------------ core/src/main/protobuf/shared.proto | 11 +- core/src/main/protobuf/streams.proto | 91 +++++---- core/src/main/protobuf/users.proto | 119 ------------ core/src/main/scala/sec/api/callcontext.scala | 3 +- .../main/scala/sec/api/grpc/constants.scala | 17 +- .../src/main/scala/sec/api/grpc/convert.scala | 48 ++--- .../main/scala/sec/api/grpc/implicits.scala | 12 +- .../main/scala/sec/api/grpc/marshalling.scala | 15 +- .../main/scala/sec/api/mapping/shared.scala | 29 ++- .../main/scala/sec/api/mapping/streams.scala | 76 +++++--- core/src/main/scala/sec/api/streams.scala | 37 ++-- core/src/main/scala/sec/core/exceptions.scala | 23 ++- core/src/main/scala/sec/options.scala | 27 ++- core/src/test/resources/dev-cert.pem | 37 ++-- .../src/test/scala/sec/api/grpc/convert.scala | 12 ++ .../test/scala/sec/api/grpc/implicits.scala | 12 +- .../test/scala/sec/api/mapping/shared.scala | 13 +- .../test/scala/sec/api/mapping/streams.scala | 52 +++--- 22 files changed, 381 insertions(+), 672 deletions(-) create mode 100644 core/src/main/protobuf/gossip.proto delete mode 100644 core/src/main/protobuf/operations.proto delete mode 100644 core/src/main/protobuf/persistent.proto delete mode 100644 core/src/main/protobuf/projections.proto delete mode 100644 core/src/main/protobuf/users.proto diff --git a/core/src/main/protobuf/gossip.proto b/core/src/main/protobuf/gossip.proto new file mode 100644 index 00000000..192b3b0a --- /dev/null +++ b/core/src/main/protobuf/gossip.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +package event_store.client.gossip; +option java_package = "com.eventstore.client.gossip"; + +import "shared.proto"; + +service Gossip { + rpc Read (event_store.client.Empty) returns (ClusterInfo); +} + +message ClusterInfo { + repeated MemberInfo members = 1; +} + +message EndPoint { + string address = 1; + uint32 port = 2; +} + +message MemberInfo { + enum VNodeState { + Initializing = 0; + DiscoverLeader = 1; + Unknown = 2; + PreReplica = 3; + CatchingUp = 4; + Clone = 5; + Follower = 6; + PreLeader = 7; + Leader = 8; + Manager = 9; + ShuttingDown = 10; + Shutdown = 11; + ReadOnlyLeaderless = 12; + PreReadOnlyReplica = 13; + ReadOnlyReplica = 14; + ResigningLeader = 15; + } + event_store.client.UUID instance_id = 1; + int64 time_stamp = 2; + VNodeState state = 3; + bool is_alive = 4; + EndPoint http_end_point = 5; +} diff --git a/core/src/main/protobuf/operations.proto b/core/src/main/protobuf/operations.proto deleted file mode 100644 index ac6eb488..00000000 --- a/core/src/main/protobuf/operations.proto +++ /dev/null @@ -1,34 +0,0 @@ -syntax = "proto3"; -package event_store.client.operations; -option java_package = "com.eventstore.client.operations"; - -service Operations { - rpc StartScavenge (StartScavengeReq) returns (ScavengeResp); - rpc StopScavenge (StopScavengeReq) returns (ScavengeResp); -} - -message StartScavengeReq { - Options options = 1; - message Options { - int32 thread_count = 1; - int32 start_from_chunk = 2; - } -} - -message StopScavengeReq { - Options options = 1; - message Options { - string scavenge_id = 1; - } -} - -message ScavengeResp { - string scavenge_id = 1; - ScavengeResult scavenge_result = 2; - - enum ScavengeResult { - Started = 0; - InProgress = 1; - Stopped = 2; - } -} \ No newline at end of file diff --git a/core/src/main/protobuf/persistent.proto b/core/src/main/protobuf/persistent.proto deleted file mode 100644 index 1aba521c..00000000 --- a/core/src/main/protobuf/persistent.proto +++ /dev/null @@ -1,168 +0,0 @@ -syntax = "proto3"; -package event_store.client.persistent_subscriptions; -option java_package = "com.eventstore.client.persistentsubscriptions"; - -import "shared.proto"; - -service PersistentSubscriptions { - rpc Create (CreateReq) returns (CreateResp); - rpc Update (UpdateReq) returns (UpdateResp); - rpc Delete (DeleteReq) returns (DeleteResp); - rpc Read (stream ReadReq) returns (stream ReadResp); -} - -message ReadReq { - oneof content { - Options options = 1; - Ack ack = 2; - Nack nack = 3; - } - - message Options { - string stream_name = 1; - string group_name = 2; - int32 buffer_size = 3; - UUIDOption uuid_option = 4; - - message UUIDOption { - oneof content { - event_store.client.shared.Empty structured = 1; - event_store.client.shared.Empty string = 2; - } - } - } - - message Ack { - bytes id = 1; - repeated event_store.client.shared.UUID ids = 2; - } - - message Nack { - bytes id = 1; - repeated event_store.client.shared.UUID ids = 2; - Action action = 3; - string reason = 4; - - enum Action { - Unknown = 0; - Park = 1; - Retry = 2; - Skip = 3; - Stop = 4; - } - } -} - -message ReadResp { - oneof content { - ReadEvent event = 1; - SubscriptionConfirmation subscription_confirmation = 2; - } - message ReadEvent { - RecordedEvent event = 1; - RecordedEvent link = 2; - oneof position { - uint64 commit_position = 3; - event_store.client.shared.Empty no_position = 4; - } - oneof count { - int32 retry_count = 5; - event_store.client.shared.Empty no_retry_count = 6; - } - message RecordedEvent { - event_store.client.shared.UUID id = 1; - string stream_name = 2; - uint64 stream_revision = 3; - uint64 prepare_position = 4; - uint64 commit_position = 5; - map metadata = 6; - bytes custom_metadata = 7; - bytes data = 8; - } - } - message SubscriptionConfirmation { - string subscription_id = 1; - } -} - -message CreateReq { - Options options = 1; - - message Options { - string stream_name = 1; - string group_name = 2; - Settings settings = 3; - } - - message Settings { - bool resolve_links = 1; - uint64 revision = 2; - bool extra_statistics = 3; - int64 message_timeout = 4; - int32 max_retry_count = 5; - int64 checkpoint_after = 6; - int32 min_checkpoint_count = 7; - int32 max_checkpoint_count = 8; - int32 max_subscriber_count = 9; - int32 live_buffer_size = 10; - int32 read_batch_size = 11; - int32 history_buffer_size = 12; - ConsumerStrategy named_consumer_strategy = 13; - } - - enum ConsumerStrategy { - DispatchToSingle = 0; - RoundRobin = 1; - Pinned = 2; - } -} - -message CreateResp { -} - -message UpdateReq { - Options options = 1; - - message Options { - string stream_name = 1; - string group_name = 2; - Settings settings = 3; - } - - message Settings { - bool resolve_links = 1; - uint64 revision = 2; - bool extra_statistics = 3; - int64 message_timeout = 4; - int32 max_retry_count = 5; - int64 checkpoint_after = 6; - int32 min_checkpoint_count = 7; - int32 max_checkpoint_count = 8; - int32 max_subscriber_count = 9; - int32 live_buffer_size = 10; - int32 read_batch_size = 11; - int32 history_buffer_size = 12; - ConsumerStrategy named_consumer_strategy = 13; - } - - enum ConsumerStrategy { - DispatchToSingle = 0; - RoundRobin = 1; - Pinned = 2; - } -} - -message UpdateResp { -} - -message DeleteReq { - Options options = 1; - - message Options { - string stream_name = 1; - string group_name = 2; - } -} - -message DeleteResp { -} \ No newline at end of file diff --git a/core/src/main/protobuf/projections.proto b/core/src/main/protobuf/projections.proto deleted file mode 100644 index 73a41540..00000000 --- a/core/src/main/protobuf/projections.proto +++ /dev/null @@ -1,173 +0,0 @@ -syntax = "proto3"; -package event_store.client.projections; -option java_package = "com.eventstore.client.projections"; - -import "google/protobuf/struct.proto"; -import "shared.proto"; - -service Projections { - rpc Create (CreateReq) returns (CreateResp); - rpc Update (UpdateReq) returns (UpdateResp); - rpc Delete (DeleteReq) returns (DeleteResp); - rpc Statistics (StatisticsReq) returns (stream StatisticsResp); - rpc Disable (DisableReq) returns (DisableResp); - rpc Enable (EnableReq) returns (EnableResp); - rpc Reset (ResetReq) returns (ResetResp); - rpc State (StateReq) returns (StateResp); - rpc Result (ResultReq) returns (ResultResp); -} - -message CreateReq { - Options options = 1; - - message Options { - oneof mode { - event_store.client.shared.Empty one_time = 1; - Transient transient = 2; - Continuous continuous = 3; - } - string query = 4; - - message Transient { - string name = 1; - } - message Continuous { - string name = 1; - bool track_emitted_streams = 2; - } - } -} - -message CreateResp { -} - -message UpdateReq { - Options options = 1; - - message Options { - string name = 1; - string query = 2; - oneof emit_option { - bool emit_enabled = 3; - event_store.client.shared.Empty no_emit_options = 4; - } - } -} - -message UpdateResp { -} - -message DeleteReq { - Options options = 1; - - message Options { - string name = 1; - bool delete_emitted_streams = 2; - bool delete_state_stream = 3; - bool delete_checkpoint_stream = 4; - } -} - -message DeleteResp { -} - -message StatisticsReq { - Options options = 1; - message Options { - oneof mode { - string name = 1; - event_store.client.shared.Empty all = 2; - event_store.client.shared.Empty transient = 3; - event_store.client.shared.Empty continuous = 4; - event_store.client.shared.Empty one_time = 5; - } - } -} - -message StatisticsResp { - Details details = 1; - - message Details { - int64 coreProcessingTime = 1; - int64 version = 2; - int64 epoch = 3; - string effectiveName = 4; - int32 writesInProgress = 5; - int32 readsInProgress = 6; - int32 partitionsCached = 7; - string status = 8; - string stateReason = 9; - string name = 10; - string mode = 11; - string position = 12; - float progress = 13; - string lastCheckpoint = 14; - int64 eventsProcessedAfterRestart = 15; - string checkpointStatus = 16; - int64 bufferedEvents = 17; - int32 writePendingEventsBeforeCheckpoint = 18; - int32 writePendingEventsAfterCheckpoint = 19; - } -} - -message StateReq { - Options options = 1; - - message Options { - string name = 1; - string partition = 2; - } -} - -message StateResp { - google.protobuf.Value state = 1; -} - -message ResultReq { - Options options = 1; - - message Options { - string name = 1; - string partition = 2; - } -} - -message ResultResp { - google.protobuf.Value result = 1; -} - -message ResetReq { - Options options = 1; - - message Options { - string name = 1; - bool write_checkpoint = 2; - } -} - -message ResetResp { -} - - -message EnableReq { - Options options = 1; - - message Options { - string name = 1; - } -} - -message EnableResp { -} - -message DisableReq { - Options options = 1; - - message Options { - string name = 1; - bool write_checkpoint = 2; - } -} - -message DisableResp { -} \ No newline at end of file diff --git a/core/src/main/protobuf/shared.proto b/core/src/main/protobuf/shared.proto index 1abc0098..6b18dd5b 100644 --- a/core/src/main/protobuf/shared.proto +++ b/core/src/main/protobuf/shared.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package event_store.client.shared; -option java_package = "com.eventstore.client.shared"; +package event_store.client; +option java_package = "com.eventstore.client"; message UUID { oneof value { @@ -14,4 +14,9 @@ message UUID { } } message Empty { -} \ No newline at end of file +} + +message StreamIdentifier { + reserved 1 to 2; + bytes streamName = 3; +} diff --git a/core/src/main/protobuf/streams.proto b/core/src/main/protobuf/streams.proto index 1c2b7ebd..dc43d059 100644 --- a/core/src/main/protobuf/streams.proto +++ b/core/src/main/protobuf/streams.proto @@ -27,7 +27,7 @@ message ReadReq { } oneof filter_option { FilterOptions filter = 7; - event_store.client.shared.Empty no_filter = 8; + event_store.client.Empty no_filter = 8; } UUIDOption uuid_option = 9; @@ -36,18 +36,18 @@ message ReadReq { Backwards = 1; } message StreamOptions { - string stream_name = 1; + event_store.client.StreamIdentifier stream_identifier = 1; oneof revision_option { uint64 revision = 2; - event_store.client.shared.Empty start = 3; - event_store.client.shared.Empty end = 4; + event_store.client.Empty start = 3; + event_store.client.Empty end = 4; } } message AllOptions { oneof all_option { Position position = 1; - event_store.client.shared.Empty start = 2; - event_store.client.shared.Empty end = 3; + event_store.client.Empty start = 2; + event_store.client.Empty end = 3; } } message SubscriptionOptions { @@ -63,7 +63,7 @@ message ReadReq { } oneof window { uint32 max = 3; - event_store.client.shared.Empty count = 4; + event_store.client.Empty count = 4; } uint32 checkpointIntervalMultiplier = 5; @@ -74,8 +74,8 @@ message ReadReq { } message UUIDOption { oneof content { - event_store.client.shared.Empty structured = 1; - event_store.client.shared.Empty string = 2; + event_store.client.Empty structured = 1; + event_store.client.Empty string = 2; } } } @@ -86,6 +86,7 @@ message ReadResp { ReadEvent event = 1; SubscriptionConfirmation confirmation = 2; Checkpoint checkpoint = 3; + StreamNotFound stream_not_found = 4; } message ReadEvent { @@ -93,12 +94,12 @@ message ReadResp { RecordedEvent link = 2; oneof position { uint64 commit_position = 3; - event_store.client.shared.Empty no_position = 4; + event_store.client.Empty no_position = 4; } message RecordedEvent { - event_store.client.shared.UUID id = 1; - string stream_name = 2; + event_store.client.UUID id = 1; + event_store.client.StreamIdentifier stream_identifier = 2; uint64 stream_revision = 3; uint64 prepare_position = 4; uint64 commit_position = 5; @@ -114,6 +115,9 @@ message ReadResp { uint64 commit_position = 1; uint64 prepare_position = 2; } + message StreamNotFound { + event_store.client.StreamIdentifier stream_identifier = 1; + } } message AppendReq { @@ -123,16 +127,16 @@ message AppendReq { } message Options { - string stream_name = 1; + event_store.client.StreamIdentifier stream_identifier = 1; oneof expected_stream_revision { uint64 revision = 2; - event_store.client.shared.Empty no_stream = 3; - event_store.client.shared.Empty any = 4; - event_store.client.shared.Empty stream_exists = 5; + event_store.client.Empty no_stream = 3; + event_store.client.Empty any = 4; + event_store.client.Empty stream_exists = 5; } } message ProposedMessage { - event_store.client.shared.UUID id = 1; + event_store.client.UUID id = 1; map metadata = 2; bytes custom_metadata = 3; bytes data = 4; @@ -140,31 +144,50 @@ message AppendReq { } message AppendResp { - oneof current_revision_option { - uint64 current_revision = 1; - event_store.client.shared.Empty no_stream = 2; - } - oneof position_option { - Position position = 3; - event_store.client.shared.Empty no_position = 4; + oneof result { + Success success = 1; + WrongExpectedVersion wrong_expected_version = 2; } message Position { uint64 commit_position = 1; uint64 prepare_position = 2; } + + message Success { + oneof current_revision_option { + uint64 current_revision = 1; + event_store.client.Empty no_stream = 2; + } + oneof position_option { + Position position = 3; + event_store.client.Empty no_position = 4; + } + } + + message WrongExpectedVersion { + oneof current_revision_option { + uint64 current_revision = 1; + event_store.client.Empty no_stream = 2; + } + oneof expected_revision_option { + uint64 expected_revision = 3; + event_store.client.Empty any = 4; + event_store.client.Empty stream_exists = 5; + } + } } message DeleteReq { Options options = 1; message Options { - string stream_name = 1; + event_store.client.StreamIdentifier stream_identifier = 1; oneof expected_stream_revision { uint64 revision = 2; - event_store.client.shared.Empty no_stream = 3; - event_store.client.shared.Empty any = 4; - event_store.client.shared.Empty stream_exists = 5; + event_store.client.Empty no_stream = 3; + event_store.client.Empty any = 4; + event_store.client.Empty stream_exists = 5; } } } @@ -172,7 +195,7 @@ message DeleteReq { message DeleteResp { oneof position_option { Position position = 1; - event_store.client.shared.Empty no_position = 2; + event_store.client.Empty no_position = 2; } message Position { @@ -185,12 +208,12 @@ message TombstoneReq { Options options = 1; message Options { - string stream_name = 1; + event_store.client.StreamIdentifier stream_identifier = 1; oneof expected_stream_revision { uint64 revision = 2; - event_store.client.shared.Empty no_stream = 3; - event_store.client.shared.Empty any = 4; - event_store.client.shared.Empty stream_exists = 5; + event_store.client.Empty no_stream = 3; + event_store.client.Empty any = 4; + event_store.client.Empty stream_exists = 5; } } } @@ -198,7 +221,7 @@ message TombstoneReq { message TombstoneResp { oneof position_option { Position position = 1; - event_store.client.shared.Empty no_position = 2; + event_store.client.Empty no_position = 2; } message Position { diff --git a/core/src/main/protobuf/users.proto b/core/src/main/protobuf/users.proto deleted file mode 100644 index a74741cb..00000000 --- a/core/src/main/protobuf/users.proto +++ /dev/null @@ -1,119 +0,0 @@ -syntax = "proto3"; -package event_store.client.users; -option java_package = "com.eventstore.client.users"; - -service Users { - rpc Create (CreateReq) returns (CreateResp); - rpc Update (UpdateReq) returns (UpdateResp); - rpc Delete (DeleteReq) returns (DeleteResp); - rpc Disable (DisableReq) returns (DisableResp); - rpc Enable (EnableReq) returns (EnableResp); - rpc Details (DetailsReq) returns (stream DetailsResp); - rpc ChangePassword (ChangePasswordReq) returns (ChangePasswordResp); - rpc ResetPassword (ResetPasswordReq) returns (ResetPasswordResp); -} - -message CreateReq { - Options options = 1; - message Options { - string login_name = 1; - string password = 2; - string full_name = 3; - repeated string groups = 4; - } -} - -message CreateResp { - -} - -message UpdateReq { - Options options = 1; - message Options { - string login_name = 1; - string password = 2; - string full_name = 3; - repeated string groups = 4; - } -} - -message UpdateResp { - -} - -message DeleteReq { - Options options = 1; - message Options { - string login_name = 1; - } -} - -message DeleteResp { - -} - -message EnableReq { - Options options = 1; - message Options { - string login_name = 1; - } -} - -message EnableResp { - -} - -message DisableReq { - Options options = 1; - message Options { - string login_name = 1; - } -} - -message DisableResp { -} - -message DetailsReq { - Options options = 1; - message Options { - string login_name = 1; - } -} - -message DetailsResp { - UserDetails user_details = 1; - message UserDetails { - string login_name = 1; - string full_name = 2; - repeated string groups = 3; - DateTime last_updated = 4; - bool disabled = 5; - - message DateTime { - int64 ticks_since_epoch = 1; - } - } -} - -message ChangePasswordReq { - Options options = 1; - message Options { - string login_name = 1; - string current_password = 2; - string new_password = 3; - } -} - -message ChangePasswordResp { -} - -message ResetPasswordReq { - Options options = 1; - message Options { - string login_name = 1; - string new_password = 2; - } -} - -message ResetPasswordResp { -} \ No newline at end of file diff --git a/core/src/main/scala/sec/api/callcontext.scala b/core/src/main/scala/sec/api/callcontext.scala index 61920afa..a66bef85 100644 --- a/core/src/main/scala/sec/api/callcontext.scala +++ b/core/src/main/scala/sec/api/callcontext.scala @@ -7,7 +7,8 @@ import cats.implicits._ private[sec] final case class Context( userCreds: Option[UserCredentials], - connectionName: String + connectionName: String, + requiresLeader: Boolean ) //====================================================================================================================== diff --git a/core/src/main/scala/sec/api/grpc/constants.scala b/core/src/main/scala/sec/api/grpc/constants.scala index e32d1220..83b0b684 100644 --- a/core/src/main/scala/sec/api/grpc/constants.scala +++ b/core/src/main/scala/sec/api/grpc/constants.scala @@ -25,14 +25,15 @@ private[api] object constants { val UserConflict: String = "user-conflict" val ScavengeNotFound: String = "scavenge-not-found" - val ExpectedVersion: String = "expected-version" - val ActualVersion: String = "actual-version" - val StreamName: String = "stream-name" - val GroupName: String = "group-name" - val Reason: String = "reason" - val MaximumAppendSize: String = "maximum-append-size" - val ScavengeId: String = "scavenge-id" - val LeaderEndpoint: String = "leader-endpoint" + val ExpectedVersion: String = "expected-version" + val ActualVersion: String = "actual-version" + val StreamName: String = "stream-name" + val GroupName: String = "group-name" + val Reason: String = "reason" + val MaximumAppendSize: String = "maximum-append-size" + val ScavengeId: String = "scavenge-id" + val LeaderEndpointHost: String = "leader-endpoint-host" + val LeaderEndpointPort: String = "leader-endpoint-port" val LoginName = "login-name" } diff --git a/core/src/main/scala/sec/api/grpc/convert.scala b/core/src/main/scala/sec/api/grpc/convert.scala index f2e4b999..52ce7020 100644 --- a/core/src/main/scala/sec/api/grpc/convert.scala +++ b/core/src/main/scala/sec/api/grpc/convert.scala @@ -12,13 +12,15 @@ private[sec] object convert { //====================================================================================================================== private[grpc] object keys { - val exception: Metadata.Key[String] = Metadata.Key.of(ce.ExceptionKey, StringMarshaller) - val streamName: Metadata.Key[String] = Metadata.Key.of(ce.StreamName, StringMarshaller) - val groupName: Metadata.Key[String] = Metadata.Key.of(ce.GroupName, StringMarshaller) - val loginName: Metadata.Key[String] = Metadata.Key.of(ce.LoginName, StringMarshaller) - val expectedVersion: Metadata.Key[Long] = Metadata.Key.of(ce.ExpectedVersion, LongMarshaller) - val actualVersion: Metadata.Key[Long] = Metadata.Key.of(ce.ActualVersion, LongMarshaller) - val maximumAppendSize: Metadata.Key[Int] = Metadata.Key.of(ce.MaximumAppendSize, IntMarshaller) + val exception: Metadata.Key[String] = Metadata.Key.of(ce.ExceptionKey, StringMarshaller) + val streamName: Metadata.Key[String] = Metadata.Key.of(ce.StreamName, StringMarshaller) + val groupName: Metadata.Key[String] = Metadata.Key.of(ce.GroupName, StringMarshaller) + val loginName: Metadata.Key[String] = Metadata.Key.of(ce.LoginName, StringMarshaller) + val expectedVersion: Metadata.Key[Long] = Metadata.Key.of(ce.ExpectedVersion, LongMarshaller) + val actualVersion: Metadata.Key[Long] = Metadata.Key.of(ce.ActualVersion, LongMarshaller) + val maximumAppendSize: Metadata.Key[Int] = Metadata.Key.of(ce.MaximumAppendSize, IntMarshaller) + val leaderEndpointHost: Metadata.Key[String] = Metadata.Key.of(ce.LeaderEndpointHost, StringMarshaller) + val leaderEndpointPort: Metadata.Key[Int] = Metadata.Key.of(ce.LeaderEndpointPort, IntMarshaller) } //====================================================================================================================== @@ -29,24 +31,27 @@ private[sec] object convert { val convertToEs: StatusRuntimeException => Option[EsException] = ex => { - val unknown = "" - val md = ex.getTrailers - val status = ex.getStatus() - val exception = md.getOpt(keys.exception) - def streamName = md.getOpt(keys.streamName).getOrElse(unknown) - def groupName = md.getOpt(keys.groupName).getOrElse(unknown) - def expected = md.getOpt(keys.expectedVersion) - def actual = md.getOpt(keys.actualVersion) - def loginName = md.getOpt(keys.loginName).getOrElse(unknown) - def maximumAppendSize = md.getOpt(keys.maximumAppendSize) + val unknown = "" + val md = ex.getTrailers + val status = ex.getStatus() + val exception = md.getOpt(keys.exception) + def streamName = md.getOpt(keys.streamName).getOrElse(unknown) + def groupName = md.getOpt(keys.groupName).getOrElse(unknown) + def expected = md.getOpt(keys.expectedVersion) + def actual = md.getOpt(keys.actualVersion) + def loginName = md.getOpt(keys.loginName).getOrElse(unknown) + def maximumAppendSize = md.getOpt(keys.maximumAppendSize) + def leaderEndpointHost = md.getOpt(keys.leaderEndpointHost) + def leaderEndpointPort = md.getOpt(keys.leaderEndpointPort) val reified: Option[EsException] = exception.map { case ce.AccessDenied => AccessDenied case ce.InvalidTransaction => InvalidTransaction - case ce.MaximumAppendSizeExceeded => MaximumAppendSizeExceeded(maximumAppendSize) case ce.StreamDeleted => StreamDeleted(streamName) case ce.WrongExpectedVersion => WrongExpectedVersion(streamName, expected, actual) case ce.StreamNotFound => StreamNotFound(streamName) + case ce.MaximumAppendSizeExceeded => MaximumAppendSizeExceeded(maximumAppendSize) + case ce.NotLeader => NotLeader(leaderEndpointHost, leaderEndpointPort) case ce.PersistentSubscriptionDoesNotExist => PersistentSubscriptionNotFound(streamName, groupName) case ce.MaximumSubscribersReached => PersistentSubscriptionMaximumSubscribersReached(streamName, groupName) case ce.PersistentSubscriptionDropped => PersistentSubscriptionDroppedByServer(streamName, groupName) @@ -54,9 +59,10 @@ private[sec] object convert { case unknown => UnknownError(s"Exception key: $unknown") } - def serverUnavailable: Option[EsException] = Option.when(status.getCode == Status.Code.UNAVAILABLE)( - ServerUnavailable(Option(ex.getMessage()).getOrElse("Server unavailable")) - ) + def serverUnavailable: Option[EsException] = + Option.when(status.getCode == Status.Code.UNAVAILABLE)( + ServerUnavailable(Option(ex.getMessage()).getOrElse("Server unavailable")) + ) reified orElse serverUnavailable } diff --git a/core/src/main/scala/sec/api/grpc/implicits.scala b/core/src/main/scala/sec/api/grpc/implicits.scala index 4336ff3a..d563580c 100644 --- a/core/src/main/scala/sec/api/grpc/implicits.scala +++ b/core/src/main/scala/sec/api/grpc/implicits.scala @@ -3,20 +3,22 @@ package api package grpc import io.grpc.Metadata -import grpc.constants.Headers.{Authorization, ConnectionName} +import grpc.constants.Headers.{Authorization, ConnectionName, RequiresLeader} object implicits { private[grpc] object keys { - val authKey: Metadata.Key[UserCredentials] = Metadata.Key.of(Authorization, UserCredentialsMarshaller) - val cnKey: Metadata.Key[String] = Metadata.Key.of(ConnectionName, StringMarshaller) + val authorization: Metadata.Key[UserCredentials] = Metadata.Key.of(Authorization, UserCredentialsMarshaller) + val connectionName: Metadata.Key[String] = Metadata.Key.of(ConnectionName, StringMarshaller) + val requiresLeader: Metadata.Key[Boolean] = Metadata.Key.of(RequiresLeader, BooleanMarshaller) } implicit final class ContextOps(val ctx: Context) extends AnyVal { def toMetadata: Metadata = { val md = new Metadata() - ctx.userCreds.foreach(md.put(keys.authKey, _)) - md.put(keys.cnKey, ctx.connectionName) + ctx.userCreds.foreach(md.put(keys.authorization, _)) + md.put(keys.connectionName, ctx.connectionName) + md.put(keys.requiresLeader, ctx.requiresLeader) md } } diff --git a/core/src/main/scala/sec/api/grpc/marshalling.scala b/core/src/main/scala/sec/api/grpc/marshalling.scala index 4df1d020..21a0ca21 100644 --- a/core/src/main/scala/sec/api/grpc/marshalling.scala +++ b/core/src/main/scala/sec/api/grpc/marshalling.scala @@ -9,6 +9,11 @@ import constants.Headers.BasicScheme //====================================================================================================================== +private[grpc] final case class InvalidInput(input: String, tpe: String) + extends RuntimeException(s"Could not parse $input to $tpe") + +//====================================================================================================================== + private[grpc] object IntMarshaller extends NumericAsciiMarshaller[Int]("Int") private[grpc] object LongMarshaller extends NumericAsciiMarshaller[Long]("Long") @@ -17,9 +22,6 @@ private[grpc] abstract sealed class NumericAsciiMarshaller[T: Numeric](tpe: Stri final def parseAsciiString(s: String): T = Numeric[T].parseString(s).getOrElse(throw InvalidInput(s, tpe)) } -private[grpc] final case class InvalidInput(input: String, tpe: String) - extends RuntimeException(s"Could not parse $input to $tpe") - //====================================================================================================================== private[grpc] object StringMarshaller extends AsciiMarshaller[String] { @@ -29,6 +31,13 @@ private[grpc] object StringMarshaller extends AsciiMarshaller[String] { //====================================================================================================================== +private[grpc] object BooleanMarshaller extends AsciiMarshaller[Boolean] { + def toAsciiString(v: Boolean): String = v.toString() + def parseAsciiString(s: String): Boolean = s.toBooleanOption.getOrElse(throw InvalidInput(s, "Boolean")) +} + +//====================================================================================================================== + private[grpc] object UserCredentialsMarshaller extends AsciiMarshaller[UserCredentials] { val decodingNotSupported = UserCredentials.unsafe("decoding-not-supported", "n/a") diff --git a/core/src/main/scala/sec/api/mapping/shared.scala b/core/src/main/scala/sec/api/mapping/shared.scala index a062bf94..193e733f 100644 --- a/core/src/main/scala/sec/api/mapping/shared.scala +++ b/core/src/main/scala/sec/api/mapping/shared.scala @@ -4,7 +4,9 @@ package mapping import java.util.{UUID => JUUID} import cats.implicits._ -import com.eventstore.client.shared.UUID +import com.google.protobuf.ByteString +import com.eventstore.client.{StreamIdentifier, UUID} +import sec.core.StreamId object shared { @@ -22,4 +24,29 @@ object shared { juuid.leftMap(ProtoResultError).liftTo[F] } + // + + def mkStreamId[F[_]: ErrorM](sid: Option[StreamIdentifier]): F[StreamId] = + mkStreamId[F](sid.getOrElse(StreamIdentifier())) + + def mkStreamId[F[_]: ErrorM](sid: StreamIdentifier): F[StreamId] = + sid.utf8[F] >>= { sidStr => + StreamId.stringToStreamId(sidStr).leftMap(ProtoResultError).liftTo[F] + } + + final implicit class StreamIdOps(val v: StreamId) extends AnyVal { + def esSid: StreamIdentifier = v.stringValue.toStreamIdentifer + } + + final implicit class StringOps(val v: String) extends AnyVal { + def toStreamIdentifer: StreamIdentifier = StreamIdentifier(ByteString.copyFromUtf8(v)) + } + + final implicit class StreamIdentifierOps(val v: StreamIdentifier) extends AnyVal { + def utf8[F[_]](implicit F: ErrorA[F]): F[String] = + F.catchNonFatal(Option(v.streamName.toStringUtf8()).getOrElse("")) + } + + // + } diff --git a/core/src/main/scala/sec/api/mapping/streams.scala b/core/src/main/scala/sec/api/mapping/streams.scala index 89c2da8a..c2e43a82 100644 --- a/core/src/main/scala/sec/api/mapping/streams.scala +++ b/core/src/main/scala/sec/api/mapping/streams.scala @@ -4,7 +4,7 @@ package mapping import cats.data.NonEmptyList import cats.implicits._ -import com.eventstore.client.shared.Empty +import com.eventstore.client._ import com.eventstore.client.streams._ import sec.core._ import sec.api.Streams._ @@ -86,7 +86,7 @@ private[sec] object streams { val options = ReadReq .Options() - .withStream(ReadReq.Options.StreamOptions(streamId.stringValue, mapEventNumberOpt(exclusiveFrom))) + .withStream(ReadReq.Options.StreamOptions(streamId.esSid.some, mapEventNumberOpt(exclusiveFrom))) .withSubscription(ReadReq.Options.SubscriptionOptions()) .withReadDirection(mapDirection(Direction.Forwards)) .withResolveLinks(resolveLinkTos) @@ -124,7 +124,7 @@ private[sec] object streams { val options = ReadReq .Options() - .withStream(ReadReq.Options.StreamOptions(streamId.stringValue, mapEventNumber(from))) + .withStream(ReadReq.Options.StreamOptions(streamId.esSid.some, mapEventNumber(from))) .withCount(count) .withReadDirection(mapDirection(direction)) .withResolveLinks(resolveLinkTos) @@ -161,7 +161,7 @@ private[sec] object streams { case StreamRevision.StreamExists => DeleteReq.Options.ExpectedStreamRevision.StreamExists(empty) case StreamRevision.Any => DeleteReq.Options.ExpectedStreamRevision.Any(empty) } - DeleteReq().withOptions(DeleteReq.Options(streamId.stringValue, mapDeleteRevision(expectedRevision))) + DeleteReq().withOptions(DeleteReq.Options(streamId.esSid.some, mapDeleteRevision(expectedRevision))) } def mkHardDeleteReq(streamId: StreamId, expectedRevision: StreamRevision): TombstoneReq = { @@ -172,7 +172,7 @@ private[sec] object streams { case StreamRevision.StreamExists => TombstoneReq.Options.ExpectedStreamRevision.StreamExists(empty) case StreamRevision.Any => TombstoneReq.Options.ExpectedStreamRevision.Any(empty) } - TombstoneReq().withOptions(TombstoneReq.Options(streamId.stringValue, mapTombstoneRevision(expectedRevision))) + TombstoneReq().withOptions(TombstoneReq.Options(streamId.esSid.some, mapTombstoneRevision(expectedRevision))) } def mkAppendHeaderReq(streamId: StreamId, expectedRevision: StreamRevision): AppendReq = { @@ -183,17 +183,18 @@ private[sec] object streams { case StreamRevision.StreamExists => AppendReq.Options.ExpectedStreamRevision.StreamExists(empty) case StreamRevision.Any => AppendReq.Options.ExpectedStreamRevision.Any(empty) } - AppendReq().withOptions(AppendReq.Options(streamId.stringValue, mapAppendRevision(expectedRevision))) + AppendReq().withOptions(AppendReq.Options(streamId.esSid.some, mapAppendRevision(expectedRevision))) } - def mkAppendProposalsReq(events: NonEmptyList[EventData]): NonEmptyList[AppendReq] = events.map { e => - val id = mkUuid(e.eventId) - val customMeta = e.metadata.bytes.toByteString - val data = e.data.bytes.toByteString - val ct = e.contentType.fold(ContentTypes.ApplicationOctetStream, ContentTypes.ApplicationJson) - val meta = Map(Type -> EventType.eventTypeToString(e.eventType), ContentType -> ct) - AppendReq().withProposedMessage(AppendReq.ProposedMessage(id.some, meta, customMeta, data)) - } + def mkAppendProposalsReq(events: NonEmptyList[EventData]): NonEmptyList[AppendReq] = + events.map { e => + val id = mkUuid(e.eventId) + val customMeta = e.metadata.bytes.toByteString + val data = e.data.bytes.toByteString + val ct = e.contentType.fold(ContentTypes.ApplicationOctetStream, ContentTypes.ApplicationJson) + val meta = Map(Type -> EventType.eventTypeToString(e.eventType), ContentType -> ct) + AppendReq().withProposedMessage(AppendReq.ProposedMessage(id.some, meta, customMeta, data)) + } } //====================================================================================================================== @@ -211,7 +212,7 @@ private[sec] object streams { import EventData.{binary, json} - val streamId = mkStreamId[F](e.streamName) + val streamId = mkStreamId[F](e.streamIdentifier) val eventNumber = EventNumber.exact(e.streamRevision) val position = Position.exact(e.commitPosition, e.preparePosition) val data = e.data.toByteVector @@ -229,27 +230,44 @@ private[sec] object streams { } - def mkContentType[F[_]](ct: String)(implicit F: ErrorA[F]): F[Content.Type] = ct match { - case ContentTypes.ApplicationOctetStream => F.pure(Content.Type.Binary) - case ContentTypes.ApplicationJson => F.pure(Content.Type.Json) - case unknown => F.raiseError(ProtoResultError(s"Required value $ContentType missing or invalid: $unknown")) - } - - def mkStreamId[F[_]: ErrorA](name: String): F[StreamId] = - StreamId.stringToStreamId(Option(name).getOrElse("")).leftMap(ProtoResultError).liftTo[F] + def mkContentType[F[_]](ct: String)(implicit F: ErrorA[F]): F[Content.Type] = + ct match { + case ContentTypes.ApplicationOctetStream => F.pure(Content.Type.Binary) + case ContentTypes.ApplicationJson => F.pure(Content.Type.Json) + case unknown => F.raiseError(ProtoResultError(s"Required value $ContentType missing or invalid: $unknown")) + } def mkEventType[F[_]: ErrorA](name: String): F[EventType] = EventType.stringToEventType(Option(name).getOrElse("")).leftMap(ProtoResultError).liftTo[F] - def mkWriteResult[F[_]: ErrorA](ar: AppendResp): F[WriteResult] = { + def mkWriteResult[F[_]: ErrorA](sid: StreamId, ar: AppendResp): F[WriteResult] = { + + import com.eventstore.client.streams.AppendResp.{Result, Success} + + def error(msg: String): Either[Throwable, WriteResult] = + ProtoResultError(msg).asLeft + + def success(s: Result.Success) = + s.value.currentRevisionOption match { + case Success.CurrentRevisionOption.CurrentRevision(v) => WriteResult(EventNumber.exact(v)).asRight + case Success.CurrentRevisionOption.NoStream(_) => error("Did not expect NoStream when using NonEmptyList") + case Success.CurrentRevisionOption.Empty => error("CurrentRevisionOptions is missing") + } + + def wrongExpectedVersion(w: Result.WrongExpectedVersion) = { + // TODO: Decide what to do with other cases + val expected = w.value.expectedRevisionOption.expectedRevision + val actual = w.value.currentRevisionOption.currentRevision + WrongExpectedVersion(sid.stringValue, expected, actual).asLeft + } - val currentRevision = ar.currentRevisionOption match { - case AppendResp.CurrentRevisionOption.CurrentRevision(v) => EventNumber.exact(v).asRight - case AppendResp.CurrentRevisionOption.NoStream(_) => "Did not expect NoStream when using NonEmptyList".asLeft - case AppendResp.CurrentRevisionOption.Empty => "CurrentRevisionOptions is missing".asLeft + val result = ar.result match { + case s: Result.Success => success(s) + case w: Result.WrongExpectedVersion => wrongExpectedVersion(w) + case Result.Empty => error("Result is missing") } - currentRevision.map(WriteResult).leftMap(ProtoResultError).liftTo[F] + result.liftTo[F] } def mkDeleteResult[F[_]: ErrorA](dr: DeleteResp): F[DeleteResult] = diff --git a/core/src/main/scala/sec/api/streams.scala b/core/src/main/scala/sec/api/streams.scala index c174954e..a6938614 100644 --- a/core/src/main/scala/sec/api/streams.scala +++ b/core/src/main/scala/sec/api/streams.scala @@ -8,10 +8,11 @@ import cats.data.NonEmptyList import cats.implicits._ import cats.effect._ import cats.effect.concurrent.Ref -import fs2.{Pull, Stream} +import fs2.{Pipe, Pull, Stream} import com.eventstore.client.streams._ import sec.core._ import sec.syntax.StreamsSyntax +import mapping.shared._ import mapping.streams.outgoing._ import mapping.streams.incoming._ import mapping.implicits._ @@ -103,12 +104,21 @@ object Streams { extends Streams[F] { val ctx: Option[UserCredentials] => Context = uc => { - Context(uc.orElse(options.defaultCreds), options.connectionName) + Context(uc.orElse(options.defaultCreds), options.connectionName, options.nodePreference.isLeader) } - private val readPipe: Stream[F, ReadResp] => Stream[F, Event] = + private val readEventPipe: Stream[F, ReadResp] => Stream[F, Event] = _.evalMap(_.content.event.require[F]("ReadEvent expected!").flatMap(mkEvent[F])).unNone + private val failStreamNotFound: Pipe[F, ReadResp, ReadResp] = + _.evalMap(r => + r.content.streamNotFound.fold(r.pure[F])( + _.streamIdentifier + .require[F]("StreamIdentifer expected!") + .flatMap(_.utf8[F].map(StreamNotFound(_))) + .flatMap(F.raiseError) + )) + private val subConfirmationPipe: Stream[F, ReadResp] => Stream[F, ReadResp] = in => { val extractConfirmation: ReadResp => F[String] = @@ -123,7 +133,7 @@ object Streams { } private val subscriptionPipe: Stream[F, ReadResp] => Stream[F, Event] = - _.through(subConfirmationPipe).through(readPipe) + _.through(subConfirmationPipe).through(readEventPipe) private val subAllFilteredPipe: Stream[F, ReadResp] => Stream[F, Either[Position, Event]] = { @@ -193,11 +203,9 @@ object Streams { creds: Option[UserCredentials] ): Stream[F, Event] = { - if (maxCount > 0) { - - client.read(mkReadAllReq(from, direction, maxCount, resolveLinkTos), ctx(creds)).through(readPipe) - - } else Stream.empty + if (maxCount > 0) + client.read(mkReadAllReq(from, direction, maxCount, resolveLinkTos), ctx(creds)).through(readEventPipe) + else Stream.empty } def readStream( @@ -210,12 +218,11 @@ object Streams { ): Stream[F, Event] = { val valid = direction.fold(from =!= EventNumber.End, true) + def req = mkReadStreamReq(streamId, from, direction, maxCount, resolveLinkTos) - if (valid && maxCount > 0) { - - client.read(mkReadStreamReq(streamId, from, direction, maxCount, resolveLinkTos), ctx(creds)).through(readPipe) - - } else Stream.empty + if (valid && maxCount > 0) + client.read(req, ctx(creds)).through(failStreamNotFound).through(readEventPipe) + else Stream.empty } def appendToStream( @@ -227,7 +234,7 @@ object Streams { client.append( Stream.emit(mkAppendHeaderReq(streamId, expectedRevision)) ++ Stream.emits(mkAppendProposalsReq(events).toList), ctx(creds) - ) >>= mkWriteResult[F] + ) >>= { ar => mkWriteResult[F](streamId, ar) } def softDelete( streamId: StreamId, diff --git a/core/src/main/scala/sec/core/exceptions.scala b/core/src/main/scala/sec/core/exceptions.scala index d4d5437d..b52b802a 100644 --- a/core/src/main/scala/sec/core/exceptions.scala +++ b/core/src/main/scala/sec/core/exceptions.scala @@ -5,23 +5,32 @@ import EventNumber.Exact sealed abstract class EsException(msg: String) extends RuntimeException(msg) -case object AccessDenied extends EsException("Access Denied.") // All -case object InvalidTransaction extends EsException("Invalid Transaction.") // Streams.Delete + Streams.Append -final case class UserNotFound(loginName: String) extends EsException(s"User '$loginName' was not found.") // Users -final case class StreamDeleted(streamId: String) extends EsException(s"Event stream '$streamId' is deleted.") // Streams -final case class StreamNotFound(streamId: String) extends EsException(s"Event stream '$streamId' was not found.") // Streams.Read/Subscribe +case object AccessDenied extends EsException("Access Denied.") +case object InvalidTransaction extends EsException("Invalid Transaction.") +final case class UserNotFound(loginName: String) extends EsException(s"User '$loginName' was not found.") +final case class StreamDeleted(streamId: String) extends EsException(s"Event stream '$streamId' is deleted.") +final case class StreamNotFound(streamId: String) extends EsException(s"Event stream '$streamId' was not found.") final case class UnknownError(msg: String) extends EsException(msg) final case class ServerUnavailable(msg: String) extends EsException(msg) final case class ValidationError(msg: String) extends EsException(msg) -final case class MaximumAppendSizeExceeded(size: Option[Int]) extends EsException(MaximumAppendSizeExceeded.msg(size)) // Streams.Append +final case class NotLeader(host: Option[String], port: Option[Int]) extends EsException(NotLeader.msg(host, port)) + +object NotLeader { + def msg(host: Option[String], port: Option[Int]): String = + s"Not leader. New leader at ${host.getOrElse("")}:${port.getOrElse("")}." +} + +final case class MaximumAppendSizeExceeded(size: Option[Int]) extends EsException(MaximumAppendSizeExceeded.msg(size)) object MaximumAppendSizeExceeded { def msg(maxSize: Option[Int]): String = s"Maximum append size ${maxSize.map(max => s"of $max bytes ").getOrElse("")}exceeded." } -final case class WrongExpectedVersion(streamId: String, expected: Option[Long], actual: Option[Long]) // Streams.Delete + Streams.Append +final case class WrongExpectedVersion(streamId: String, + expected: Option[Long], + actual: Option[Long]) // Streams.Delete + Streams.Append extends EsException(WrongExpectedVersion.msg(streamId, expected, actual)) object WrongExpectedVersion { diff --git a/core/src/main/scala/sec/options.scala b/core/src/main/scala/sec/options.scala index 228ae62a..e0717e5d 100644 --- a/core/src/main/scala/sec/options.scala +++ b/core/src/main/scala/sec/options.scala @@ -7,11 +7,36 @@ import api._ final case class Options( connectionName: String, + nodePreference: NodePreference, defaultCreds: Option[UserCredentials] ) object Options { - val default = Options("sec", UserCredentials.unsafe("admin", "changeit").some) + val default = Options("sec", NodePreference.Leader, UserCredentials.unsafe("admin", "changeit").some) } //====================================================================================================================== + +//====================================================================================================================== + +sealed trait NodePreference +object NodePreference { + case object Leader extends NodePreference + case object Follower extends NodePreference + case object Random extends NodePreference + case object ReadOnlyReplica extends NodePreference + + implicit final class NodePreferenceOps(val np: NodePreference) extends AnyVal { + + private def fold[A](leader: => A, follower: => A, random: => A, readOnlyReplica: => A): A = + np match { + case Leader => leader + case Follower => follower + case Random => random + case ReadOnlyReplica => readOnlyReplica + } + + def isLeader: Boolean = fold(true, false, false, false) + } + +} diff --git a/core/src/test/resources/dev-cert.pem b/core/src/test/resources/dev-cert.pem index 16aefc04..b4e3c975 100644 --- a/core/src/test/resources/dev-cert.pem +++ b/core/src/test/resources/dev-cert.pem @@ -1,21 +1,20 @@ -----BEGIN CERTIFICATE----- -MIIDZTCCAk2gAwIBAgIRAPAIW2Vjo9IsDXwK3QHvH3IwDQYJKoZIhvcNAQELBQAw -UTEYMBYGA1UEChMPRXZlbnQgU3RvcmUgTHRkMRAwDgYDVQQLEwdUZXN0aW5nMSMw -IQYDVQQDExpFdmVudCBTdG9yZSBEZXZlbG9wbWVudCBDQTAeFw0xOTExMjAxMzM1 -MDRaFw0yNDExMTgxMzM1MDRaMCcxJTAjBgNVBAMTHEV2ZW50IFN0b3JlIFRlc3Qg -U2VydmVyIENlcnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDdHLlO -CcUoGdZnHCYRZ6GSgrbhejdpBt3Q/KVCzaksejBJksF8m2761MHG9DNp/FMhCObR -EIs+JHhzALbtC67vLoinB+xUb3xpUxUmLyZxSZXB4Ik0TI/34eCayVW4qZ5G5hWf -ZC7BPdbKnqD3zYlQxUXBl2yFvPhCyvmXx3rQQ9fslwNeCcxBUWBlI4TZPiSuPnFo -RmSEzitf99XofwxUmRaY9K2xdXWXRKEmyIY8cSanqZIS5Lob/cTpySDHrvuP3rWe -+NfFsdrKUNEL12aBbDfkKemXvKN2z2CSdrWqc/q8NGhS/QFIWucPSPAqqmaiPQOR -4aFlkMsclY0vOtRJAgMBAAGjYjBgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMAwGA1Ud -EwEB/wQCMAAwHwYDVR0jBBgwFoAU3XcY6I8mjPaifcFoziQf91Z8+iEwGgYDVR0R -BBMwEYIJbG9jYWxob3N0hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQAC4ATxb7+W -rn6L4g4LfubsFbUtrKq06pJE2x78+2eD7mukszvuRcJLxTilpJS2un35F8SEinPe -Y9nvTTV0oxLSS7bXoHNntel2X4wiPmF2C8t52BlcBJlv/dgsYJUBjyi47qjHn/Bl -1xLyNOWyOwPoPf4xDESe3osHE00y172xRBIxDCQ5MDv6ZktwTvw1XDRvqy84IvV0 -ScuWzeSNtrJEo+yyvwJsS08GlKh6HmmWJ5QzrIWpM4rFg18y3F4njCSh+FNVecAF -0cip2gtvUv1pGH8lVjDBZkYRW3ZN76c05CKQmlftKpT+V7vCdG3m1s0/yGzBhWeB -/h5Yv/fI4M++ +MIIDWTCCAkGgAwIBAgIQPtmScuYJa9Xi5F5tw+np6zANBgkqhkiG9w0BAQsFADBR +MRgwFgYDVQQKEw9FdmVudCBTdG9yZSBMdGQxEDAOBgNVBAsTB1Rlc3RpbmcxIzAh +BgNVBAMTGkV2ZW50IFN0b3JlIERldmVsb3BtZW50IENBMB4XDTIwMDUyOTA5MTky +NVoXDTI1MDUyODA5MTkyNVowHDEaMBgGA1UEAxMRZXZlbnRzdG9yZWRiLW5vZGUw +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQD9i74StwJY2HTtF7dV5wB0 +/nyPGN7ge2ceW8sz2+EakyKoidlf7JFqvpTd0YlJzDWA4F9kE8uand5TDirY6br2 +OoHEtxSGPddaf+lbQdD3/l6Uml0ok120Y12GvXFSEMkLWWD82eyrU6xjJT7xy5l0 +pRt+zWHr89jKiHcXnCKYDFNCckILttlKW/i8CDTYq4UF+P8b8mAMhuXKWprbTuKV +rSpp4FuGlm0hl+aYbP4I+HWwfCNWEkVvlyv8RPCS0hjI0Yk2ExqdxhybwiY/M1HG +KpcBgTVNSepfMnWoNIFms5K4j0DzKuCqir5+H3bYE/z2WR0qxL81m7L8N00FCl3d +AgMBAAGjYjBgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMAwGA1UdEwEB/wQCMAAwHwYD +VR0jBBgwFoAULx3XTI1s1IKOgOJSSQKDnBRIykAwGgYDVR0RBBMwEYIJbG9jYWxo +b3N0hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQCT21dyT7A1xVemyDUY48dWKhgY +QeBl8S2rFpDEYx54wfodZ4qFSIwIxiZ6lKudH/a4Itps2DQ0JJmqkpLxrG34THMJ +4BhL4NbbecCk6f3RSZ76Ajifdt2LcYN1aamKgft2ohN2RdeeSpgaDWJPw1Bmi9Oc +WJNrMGFy5E4IVoTrqIw4PKc5L7ZC2bftsqo4HA/l8JOP9OiOztlm0bKH1zBOnvyZ +HoAyO95aajii+6/IWuwwT94kQbAdPiCh5fV7P0KIpjEhRuMbwB1wUtjbx6oYs+Fb +7co9VTz1s6VV6NMzttWYginKbmM11JDAF+2b7Km+IrlOeG67PRONfgjfvoy1 -----END CERTIFICATE----- \ No newline at end of file diff --git a/core/src/test/scala/sec/api/grpc/convert.scala b/core/src/test/scala/sec/api/grpc/convert.scala index da93b79b..25660af6 100644 --- a/core/src/test/scala/sec/api/grpc/convert.scala +++ b/core/src/test/scala/sec/api/grpc/convert.scala @@ -80,6 +80,18 @@ class ConvertSpec extends mutable.Specification { m.put(k.expectedVersion, 2L) } should beSome(WrongExpectedVersion(unknown, 2L.some, 1L.some)) + convert { m => + m.put(ek, ce.NotLeader) + m.put(k.leaderEndpointHost, "127.0.0.1") + m.put(k.leaderEndpointPort, 2113) + } should beSome(NotLeader("127.0.0.1".some, 2113.some)) + + convert { m => + m.put(ek, ce.NotLeader) + m.put(k.leaderEndpointHost, "127.0.0.1") + m.put(Metadata.Key.of(ce.LeaderEndpointPort, StringMarshaller), "b") + } should beSome(NotLeader("127.0.0.1".some, None)) + convert { m => m.put(ek, ce.StreamNotFound) m.put(k.streamName, streamId) diff --git a/core/src/test/scala/sec/api/grpc/implicits.scala b/core/src/test/scala/sec/api/grpc/implicits.scala index a334922b..0be86c08 100644 --- a/core/src/test/scala/sec/api/grpc/implicits.scala +++ b/core/src/test/scala/sec/api/grpc/implicits.scala @@ -16,14 +16,16 @@ class ImplicitsSpec extends mutable.Specification { val creds = UserCredentials.unsafe("hello", "world") val name = "abc" - val md1 = Context(None, name).toMetadata - val md2 = Context(creds.some, name).toMetadata + val md1 = Context(None, name, false).toMetadata + val md2 = Context(creds.some, name, true).toMetadata - Option(md1.get(cnKey)) should beSome(name) - Option(md1.get(authKey)) should beNone + Option(md1.get(connectionName)) should beSome(name) + Option(md1.get(authorization)) should beNone + Option(md1.get(requiresLeader)) should beSome(false) - Option(md2.get(cnKey)) should beSome(name) + Option(md2.get(connectionName)) should beSome(name) Option(md2.get(Metadata.Key.of(Authorization, StringMarshaller))) should beSome("Basic aGVsbG86d29ybGQ=") + Option(md2.get(requiresLeader)) should beSome(true) } diff --git a/core/src/test/scala/sec/api/mapping/shared.scala b/core/src/test/scala/sec/api/mapping/shared.scala index 23a42961..b35bd6c6 100644 --- a/core/src/test/scala/sec/api/mapping/shared.scala +++ b/core/src/test/scala/sec/api/mapping/shared.scala @@ -5,8 +5,9 @@ package mapping import java.util.{UUID => JUUID} import cats.implicits._ import org.specs2._ -import com.eventstore.client.shared._ +import com.eventstore.client._ import sec.api.mapping.shared._ +import sec.core.StreamId class SharedMappingSpec extends mutable.Specification { @@ -28,6 +29,16 @@ class SharedMappingSpec extends mutable.Specification { mkUuid(juuid) shouldEqual UUID().withStructured(uuidStructured) } + "mkStreamId" >> { + mkStreamId[ErrorOr](StreamIdentifier()) shouldEqual ProtoResultError("name cannot be empty").asLeft + mkStreamId[ErrorOr]("".toStreamIdentifer) shouldEqual ProtoResultError("name cannot be empty").asLeft + mkStreamId[ErrorOr]("abc".toStreamIdentifer) shouldEqual StreamId.normal("abc").unsafe.asRight + mkStreamId[ErrorOr]("$abc".toStreamIdentifer) shouldEqual StreamId.system("abc").unsafe.asRight + mkStreamId[ErrorOr]("$all".toStreamIdentifer) shouldEqual StreamId.All.asRight + mkStreamId[ErrorOr]("$$abc".toStreamIdentifer) shouldEqual StreamId.MetaId(StreamId.normal("abc").unsafe).asRight + mkStreamId[ErrorOr]("$$$streams".toStreamIdentifer) shouldEqual StreamId.MetaId(StreamId.Streams).asRight + } + } } diff --git a/core/src/test/scala/sec/api/mapping/streams.scala b/core/src/test/scala/sec/api/mapping/streams.scala index 2250d015..bc18aef1 100644 --- a/core/src/test/scala/sec/api/mapping/streams.scala +++ b/core/src/test/scala/sec/api/mapping/streams.scala @@ -8,7 +8,7 @@ import cats.implicits._ import cats.data.NonEmptyList import scodec.bits.ByteVector import org.specs2._ -import com.eventstore.client.shared._ +import com.eventstore.client._ import com.eventstore.client.streams._ import sec.{core => c} import sec.api.mapping.shared._ @@ -24,7 +24,7 @@ class StreamsMappingSpec extends mutable.Specification { import ReadReq.Options.AllOptions.AllOption import ReadReq.Options.StreamOptions.RevisionOption - val empty = com.eventstore.client.shared.Empty() + val empty = com.eventstore.client.Empty() /// @@ -95,7 +95,7 @@ class StreamsMappingSpec extends mutable.Specification { mkSubscribeToStreamReq(sid, exclusiveFrom, resolveLinkTos) shouldEqual ReadReq().withOptions( ReadReq .Options() - .withStream(ReadReq.Options.StreamOptions(sid.stringValue, mapEventNumberOpt(exclusiveFrom))) + .withStream(ReadReq.Options.StreamOptions(sid.esSid.some, mapEventNumberOpt(exclusiveFrom))) .withSubscription(ReadReq.Options.SubscriptionOptions()) .withReadDirection(ReadReq.Options.ReadDirection.Forwards) .withResolveLinks(resolveLinkTos) @@ -141,7 +141,7 @@ class StreamsMappingSpec extends mutable.Specification { mkReadStreamReq(sid, from, rd, count, rlt) shouldEqual ReadReq().withOptions( ReadReq .Options() - .withStream(ReadReq.Options.StreamOptions(sid.stringValue, mapEventNumber(from))) + .withStream(ReadReq.Options.StreamOptions(sid.esSid.some, mapEventNumber(from))) .withCount(count) .withReadDirection(mapDirection(rd)) .withResolveLinks(rlt) @@ -187,7 +187,7 @@ class StreamsMappingSpec extends mutable.Specification { def test(sr: c.StreamRevision, esr: ExpectedStreamRevision) = mkSoftDeleteReq(sid, sr) shouldEqual - DeleteReq().withOptions(DeleteReq.Options(sid.stringValue, esr)) + DeleteReq().withOptions(DeleteReq.Options(sid.esSid.some, esr)) test(c.EventNumber.exact(0L), ExpectedStreamRevision.Revision(0L)) test(c.StreamRevision.NoStream, ExpectedStreamRevision.NoStream(empty)) @@ -202,7 +202,7 @@ class StreamsMappingSpec extends mutable.Specification { def test(sr: c.StreamRevision, esr: ExpectedStreamRevision) = mkHardDeleteReq(sid, sr) shouldEqual - TombstoneReq().withOptions(TombstoneReq.Options(sid.stringValue, esr)) + TombstoneReq().withOptions(TombstoneReq.Options(sid.esSid.some, esr)) test(c.EventNumber.exact(0L), ExpectedStreamRevision.Revision(0L)) test(c.StreamRevision.NoStream, ExpectedStreamRevision.NoStream(empty)) @@ -217,7 +217,7 @@ class StreamsMappingSpec extends mutable.Specification { def test(sr: c.StreamRevision, esr: ExpectedStreamRevision) = mkAppendHeaderReq(sid, sr) shouldEqual - AppendReq().withOptions(AppendReq.Options(sid.stringValue, esr)) + AppendReq().withOptions(AppendReq.Options(sid.esSid.some, esr)) test(c.EventNumber.exact(0L), ExpectedStreamRevision.Revision(0L)) test(c.StreamRevision.NoStream, ExpectedStreamRevision.NoStream(empty)) @@ -275,7 +275,7 @@ class StreamsMappingSpec extends mutable.Specification { import grpc.constants.Metadata.{ContentType, ContentTypes, Created, Type} import ContentTypes.{ApplicationJson => Json, ApplicationOctetStream => Binary} - val empty = com.eventstore.client.shared.Empty() + val empty = com.eventstore.client.Empty() type ErrorOr[A] = Either[Throwable, A] @@ -295,7 +295,7 @@ class StreamsMappingSpec extends mutable.Specification { val eventProto = ReadResp.ReadEvent .RecordedEvent() - .withStreamName(streamId) + .withStreamIdentifier(streamId.toStreamIdentifer) .withStreamRevision(revision) .withCommitPosition(commit) .withPreparePosition(prepare) @@ -317,7 +317,7 @@ class StreamsMappingSpec extends mutable.Specification { val linkProto = ReadResp.ReadEvent .RecordedEvent() - .withStreamName(linkStreamId) + .withStreamIdentifier(linkStreamId.toStreamIdentifer) .withStreamRevision(linkRevision) .withCommitPosition(linkCommit) .withPreparePosition(linkPrepare) @@ -378,7 +378,7 @@ class StreamsMappingSpec extends mutable.Specification { val recordedEvent = ReadResp.ReadEvent .RecordedEvent() - .withStreamName(streamId) + .withStreamIdentifier(streamId.toStreamIdentifer) .withStreamRevision(revision) .withCommitPosition(commit) .withPreparePosition(prepare) @@ -399,7 +399,7 @@ class StreamsMappingSpec extends mutable.Specification { mkEventRecord[ErrorOr](recordedEvent) shouldEqual eventRecord.asRight // Bad StreamId - mkEventRecord[ErrorOr](recordedEvent.withStreamName("")) shouldEqual + mkEventRecord[ErrorOr](recordedEvent.withStreamIdentifier("".toStreamIdentifer)) shouldEqual ProtoResultError("name cannot be empty").asLeft // Missing UUID @@ -431,16 +431,6 @@ class StreamsMappingSpec extends mutable.Specification { ProtoResultError(s"Required value $Created missing or invalid.").asLeft } - "mkStreamId" >> { - mkStreamId[ErrorOr](null) shouldEqual ProtoResultError("name cannot be empty").asLeft - mkStreamId[ErrorOr]("") shouldEqual ProtoResultError("name cannot be empty").asLeft - mkStreamId[ErrorOr]("abc") shouldEqual c.StreamId.normal("abc").unsafe.asRight - mkStreamId[ErrorOr]("$abc") shouldEqual c.StreamId.system("abc").unsafe.asRight - mkStreamId[ErrorOr]("$all") shouldEqual c.StreamId.All.asRight - mkStreamId[ErrorOr]("$$abc") shouldEqual c.StreamId.MetaId(c.StreamId.normal("abc").unsafe).asRight - mkStreamId[ErrorOr]("$$$streams") shouldEqual c.StreamId.MetaId(c.StreamId.Streams).asRight - } - "mkEventType" >> { mkEventType[ErrorOr](null) shouldEqual ProtoResultError("Event type name cannot be empty").asLeft mkEventType[ErrorOr]("") shouldEqual ProtoResultError("Event type name cannot be empty").asLeft @@ -450,14 +440,26 @@ class StreamsMappingSpec extends mutable.Specification { } "mkWriteResult" >> { - mkWriteResult[ErrorOr](AppendResp().withCurrentRevision(1L)) shouldEqual + + import AppendResp.{Result, Success} + + val test: AppendResp => ErrorOr[WriteResult] = mkWriteResult[ErrorOr](c.StreamId.from("abc").unsafe, _) + + test(AppendResp().withSuccess(Success().withCurrentRevision(1L))) shouldEqual WriteResult(c.EventNumber.exact(1L)).asRight - mkWriteResult[ErrorOr](AppendResp().withNoStream(empty)) shouldEqual + test(AppendResp().withSuccess(Success().withNoStream(empty))) shouldEqual ProtoResultError("Did not expect NoStream when using NonEmptyList").asLeft - mkWriteResult[ErrorOr](AppendResp().withNoPosition(empty)) shouldEqual + test( + AppendResp().withSuccess(Success().withCurrentRevisionOption(Success.CurrentRevisionOption.Empty))) shouldEqual ProtoResultError("CurrentRevisionOptions is missing").asLeft + + // TODO: WrongExpectedVersion + + test(AppendResp().withResult(Result.Empty)) shouldEqual + ProtoResultError("Result is missing").asLeft + } "mkDeleteResult" >> {