Skip to content

Commit

Permalink
Merge branch 'v1.1.x_issue-98' into upstream-master
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyoconnor committed Feb 17, 2022
2 parents 5b2d487 + c659dfb commit 2aaba98
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 29 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ project/project/target
.settings/
.cache
.aws


.bloop
.metals
18 changes: 9 additions & 9 deletions project/Publish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object Publish extends AutoPlugin {

val defaultPublishTo = settingKey[File]("Default publish directory")

override def trigger = allRequirements
override def trigger = allRequirements
override def requires = sbtrelease.ReleasePlugin

override lazy val projectSettings = Seq(
Expand All @@ -27,8 +27,7 @@ object Publish extends AutoPlugin {
homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")),
publishMavenStyle := true,
pomIncludeRepository := { x => false },
defaultPublishTo := crossTarget.value / "repository",
)
defaultPublishTo := crossTarget.value / "repository")

def akkaPomExtra = {
<developers>
Expand All @@ -41,15 +40,16 @@ object Publish extends AutoPlugin {
</developers>
}

private def akkaPublishTo = Def.setting {
sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value)
}
private def akkaPublishTo =
Def.setting {
sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value))
}

private def sonatypeRepo(version: String): Option[Resolver] =
Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ =>
Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ =>
val nexus = "https://oss.sonatype.org/"
if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots"
else "releases" at nexus + "service/local/staging/deploy/maven2"
if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots")
else "releases".at(nexus + "service/local/staging/deploy/maven2")
}

private def localRepo(repository: File) =
Expand Down
18 changes: 9 additions & 9 deletions project/Whitesource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin {
whitesourceProduct := "Lightbend Reactive Platform",
whitesourceAggregateProjectName := {
val projectName = (moduleName in LocalRootProject).value.replace("-root", "")
projectName + "-" + (
if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else CrossVersion.partialVersion((version in LocalRootProject).value)
.map { case (major,minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
projectName + "-" + (if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else
CrossVersion
.partialVersion((version in LocalRootProject).value)
.map { case (major, minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
},
whitesourceForceCheckAllDependencies := true,
whitesourceFailOnError := true
)
whitesourceFailOnError := true)
}
22 changes: 22 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,28 @@ dynamodb-journal {
parallelism-max = 8
}
}

fixes {
# Bug akka/akka-persistence-dynamodb#98 can cause an event source which is written successfully
# and is valid, but where the High Sequence number marker is missing. This causes incomplete
# replay of the event source. A root cause fix prevents this from occurring, but does not
# repair existing event sources. Writing new events to incompletely replayed persistent actors
# will corrupt the event source. If write aligns with an atomic write this will be hidden.
# This fix cannot recover corrupted event sources, but it attempts to avoid corruption from
# taking place where this hasn't happened yet. It does so by not fully trusting the high mark.
# If the last event in a partition is 99, it will attempt to chase the tail of the event
# source. This guards valid event sources with hidden tails from incomplete replay.
# For event sources not suffering from this problem there's 1% chance that this leads to a
# useless query.
# This is a performance/consistency tradeoff to be made. It should not be required for newly
# created event sources that have the root cause fix, hence it is off by default.
# NOTE: With current implementation of the high mark being written after the events,
# there is a slim chance that a valid event source is written, but a network error occurs
# before the high mark is written. In this case the write would be reported as failed to the
# writing party. Whether you want to "keep the event source" is up for discussion. But this fix
# would also recover from that situation.
high-distrust = false
}
}
dynamodb-snapshot-store {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val MaxBatchWrite = c.getInt("aws-api-limits.max-batch-write")
val MaxItemSize = c.getInt("aws-api-limits.max-item-size")

object Fixes {
val HighDistrust = c.getBoolean("fixes.high-distrust")
}

val client = new DynamoDBClientConfig(c)
override def toString: String =
"DynamoDBJournalConfig(" +
Expand All @@ -40,6 +44,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
",MaxBatchGet:" + MaxBatchGet +
",MaxBatchWrite:" + MaxBatchWrite +
",MaxItemSize:" + MaxItemSize +
",Fixes.HighDistrust:" + Fixes.HighDistrust +
",client.config:" + client +
")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
item.put(AtomIndex, N(index))
item.put(AtomEnd, size)
putReq(item)
} ++ (if (low / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)
} ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)

val futures = writes.grouped(MaxBatchWrite).map { batch =>
dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
* for which it was written, all other entries do not update the highest value. Therefore we
* must scan the partition of this Sort=0 entry and find the highest occupied number.
*/
val request = eventQuery(persistenceId, start)
dynamo.query(request).flatMap(getRemainingQueryItems(request, _)).flatMap { result =>
getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
Expand All @@ -285,6 +284,42 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
// this function will keep on chasing the event source tail
// if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
if (nextResults.getItems.isEmpty) {
// first iteraton will not pass here, as the query result is not empty
// if the new query result is empty the highest observed is partition -1
Future.successful(partitionStart - 1)
} else {
/*
* `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
val ret = partitionStart + partitionMax

if (partitionMax == PartitionSize - 1) {
val nextStart = ret + 1
getAllPartitionSequenceNrs(persistenceId, nextStart)
.map { logResult =>
if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
log.warning(
"readSequenceNr(highest=true persistenceId={}) tail found after {}",
persistenceId,
ret)
logResult
}
.flatMap(tailChase(nextStart, _))
} else
Future.successful(ret)
}
}

tailChase(start, result).map { ret =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
Expand Down Expand Up @@ -438,15 +473,17 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
}
}

def getRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
private[dynamodb] def getAllRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
val last = result.getLastEvaluatedKey
if (last == null || last.isEmpty || last.get(Sort).getN.toLong == 99) Future.successful(result)
else {
dynamo.query(request.withExclusiveStartKey(last)).map { next =>
dynamo.query(request.withExclusiveStartKey(last)).flatMap { next =>
val merged = new ArrayList[Item](result.getItems.size + next.getItems.size)
merged.addAll(result.getItems)
merged.addAll(next.getItems)
next.withItems(merged)

// need to keep on reading until there's nothing more to read
getAllRemainingQueryItems(request, next.withItems(merged))
}
}
}
Expand All @@ -460,6 +497,11 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
.withProjectionExpression("num")
.withConsistentRead(true)

private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = {
val request = eventQuery(persistenceId, sequenceNr)
dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _))
}

def batchGetReq(items: JMap[String, KeysAndAttributes]) =
new BatchGetItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
}
1 change: 1 addition & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ my-dynamodb-journal {
aws-access-key-id = "AWS_ACCESS_KEY_ID"
aws-secret-access-key = "AWS_SECRET_ACCESS_KEY"
tracing = off
fixes.high-distrust = true
}

my-dynamodb-snapshot-store = ${dynamodb-snapshot-store}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class DeletionSpec
* noisy logging, and I like my build output clean and green.
*/
Thread.sleep(500)
system.terminate().futureValue
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike
class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike {
implicit val materializer = ActorMaterializer()

assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100")

"A DynamoPartitionGroup should create the correct PartitionKey outputs" when {
"events 1 thru 250 are presented" in {
val sourceUnderTest =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (C) 2021 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess
import com.amazonaws.services.dynamodbv2.model._
import java.util.{ HashMap => JHMap }
import akka.persistence.dynamodb._

class PersistAllConsistencySpec
extends TestKit(ActorSystem("PersistAllConsistencySpec"))
with ImplicitSender
with WordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with ConversionCheckedTripleEquals
with DynamoDBUtils
with IntegSpec {

override def beforeAll(): Unit = {
super.beforeAll()
ensureJournalTableExists()
}

override def afterAll(): Unit = {
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

override val persistenceId = "PersistAllConsistencySpec"
lazy val journal = Persistence(system).journalFor("")

import settings._

"DynamoDB Journal (persistAll)" must {

"recover correctly if the first write is a batch" in {
journal ! Purge(persistenceId, testActor)
expectMsg(Purged(persistenceId))

val start = nextSeqNr
val end = 10
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

for (t <- Seq(("last", 3), ("middle", 2), ("first", 1)))
s"correctly cross page boundaries with AtomicWrite position ${t._1}" in {
val start1 = nextSeqNr
val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2
println(s"start: ${start1}; end: ${end1}")
val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

val start2 = nextSeqNr
val end2 = start2 + 2
println(s"start: ${start2}; end: ${end2}")
val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(subject, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end2))
}

s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in {
val start = nextSeqNr
val end = ((start / PartitionSize) + 1) * PartitionSize - 1
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class RecoveryConsistencySpec

"read correct highest sequence number even if a Sort=0 entry is lost" in {
val start = messages + 19
val end = (start / 100 + 1) * 100
val end = (start / PartitionSize + 1) * PartitionSize
val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i")))
journal ! WriteMessages(more, testActor, 1)
expectMsg(WriteMessagesSuccessful)
Expand All @@ -98,7 +98,8 @@ class RecoveryConsistencySpec

journal ! ListAll(persistenceId, testActor)
val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted
expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids))
expectMsg(
ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids))

journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor)
expectMsg(RecoverySuccess(end))
Expand Down Expand Up @@ -157,8 +158,8 @@ class RecoveryConsistencySpec

private def delete(num: Long) = {
val key: Item = new JHMap
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}"))
key.put(Sort, N(num % 100))
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}"))
key.put(Sort, N(num % PartitionSize))
client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue
}
}

0 comments on commit 2aaba98

Please sign in to comment.