diff --git a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala index 05ee3e3..49c1ca9 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala @@ -269,7 +269,8 @@ import org.slf4j.Logger def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = { val aheadOfInitial = - initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp) + initialOffset == TimestampOffset.Zero || + state.latestBacktracking.timestamp.compareTo(initialOffset.timestamp) >= 0 val previousTimestamp = if (state.previous == TimestampOffset.Zero) state.latest.timestamp diff --git a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySliceBacktrackingSpec.scala index 3ed22a4..1d60a99 100644 --- a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySliceBacktrackingSpec.scala @@ -258,7 +258,7 @@ class EventsBySliceBacktrackingSpec result1.cancel() } - "still make initial backtracking until ahead of start offset" in { + "still make initial backtracking until caught up to start offset, then skip backtracking" in { val entityType = nextEntityType() val pid1 = nextPersistenceId(entityType) val slice = query.sliceForPersistenceId(pid1.id) @@ -275,7 +275,11 @@ class EventsBySliceBacktrackingSpec writeEvent(slice, pid, seqNr, startTime.plusMillis(n), s"e${mod}-${seqNr}") } - (3 to 10).foreach { n => + // will start query at next event + val startOffset = TimestampOffset(startTime.plusSeconds(23).plusMillis(1), Map.empty) + + // go past switch-to-backtracking trigger of 3 * buffer size (of 10) + (3 to 30).foreach { n => writeEvent(slice, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n") writeEvent(slice, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n") } @@ -298,15 +302,16 @@ class EventsBySliceBacktrackingSpec env.offset } - val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty)) + val result1 = startQuery(startOffset) // from backtracking expect(result1.expectNext(), pid1, 1, None) expect(result1.expectNext(), pid2, 1, None) expect(result1.expectNext(), pid1, 2, None) expect(result1.expectNext(), pid2, 2, None) + expect(result1.expectNext(), pid1, 3, None) // start offset // from normal - (3 to 10).foreach { n => + (3 to 30).foreach { n => expect(result1.expectNext(), pid1, n, Some(s"e1-$n")) expect(result1.expectNext(), pid2, n, Some(s"e2-$n")) }