Skip to content

Commit

Permalink
Merge pull request #846 from ahjohannessen/wip-23.10.x-compat
Browse files Browse the repository at this point in the history
streams: add support for esdb 23.10.x
  • Loading branch information
ahjohannessen authored Oct 17, 2023
2 parents 21a9859 + 31d9c21 commit 738e6f3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .docker/images.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
EVENTSTORE_CLI_IMAGE="${EVENTSTORE_CLI_IMAGE:-ghcr.io/eventstore/es-gencert-cli/es-gencert-cli:1.1.0}"
EVENTSTORE_DB_IMAGE="${EVENTSTORE_DB_IMAGE:-ghcr.io/eventstore/eventstore:23.6.0-focal}"
EVENTSTORE_DB_IMAGE="${EVENTSTORE_DB_IMAGE:-ghcr.io/eventstore/eventstore:23.10.0-focal}"
24 changes: 17 additions & 7 deletions fs2/src/main/scala/sec/api/streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,24 +401,27 @@ object Streams:
private[sec] def subscriptionAllPipe[F[_]: MonadThrow](
log: Logger[F]
): Pipe[F, ReadResp, Event] =
_.filterNot(_.content.isCheckpoint)
_.filterNot(_.isCheckpoint)
.through(subConfirmationPipe(log))
.through(_.filter(_.content.isEvent).evalMap(reqReadEvent[F]).unNone)
.through(_.filter(_.isEvent).evalMap(reqReadEvent[F]).unNone)

private[sec] def subscriptionStreamPipe[F[_]: MonadThrow](
log: Logger[F]
): Pipe[F, ReadResp, Event] =
_.through(subConfirmationPipe(log)).evalMap(reqReadEvent[F]).unNone
_.through(subConfirmationPipe(log))
.through(_.filter(_.isEvent).evalMap(reqReadEvent[F]).unNone)

private[sec] def subAllFilteredPipe[F[_]: MonadThrow](
log: Logger[F]
): Pipe[F, ReadResp, Either[Checkpoint, Event]] =
// Defensive filtering to guard against ESDB emitting checkpoints
// with `LogPosition.Exact.MaxValue`, i.e. end of stream.
_.through(subConfirmationPipe(log)).through(_.evalMap(mkCheckpointOrEvent[F]).unNone).collect {
case r @ Right(_) => r
case l @ Left(v) if v != Checkpoint.endOfStream => l
}
_.through(subConfirmationPipe(log))
.through(_.filter(_.isCheckpointOrEvent).evalMap(mkCheckpointOrEvent[F]).unNone)
.collect {
case r @ Right(_) => r
case l @ Left(v) if v != Checkpoint.endOfStream => l
}

private[sec] def withRetry[F[_]: Temporal, T: Order, O](
from: T,
Expand Down Expand Up @@ -468,3 +471,10 @@ object Streams:
run(from, 1, o.retryConfig.delay)
}
else streamFn(from)

//======================================================================================================================

extension (rr: ReadResp)
private[sec] def isEvent: Boolean = rr.content.isEvent
private[sec] def isCheckpoint: Boolean = rr.content.isCheckpoint
private[sec] def isCheckpointOrEvent: Boolean = rr.isEvent || rr.isCheckpoint
6 changes: 6 additions & 0 deletions protobuf/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ message ReadResp {
uint64 first_stream_position = 5;
uint64 last_stream_position = 6;
AllStreamPosition last_all_stream_position = 7;
CaughtUp caught_up = 8;
FellBehind fell_behind = 9;
}

message CaughtUp {}

message FellBehind {}

message ReadEvent {
RecordedEvent event = 1;
RecordedEvent link = 2;
Expand Down

0 comments on commit 738e6f3

Please sign in to comment.