Skip to content

Commit

Permalink
Switch to Akka HTTP based AWS connector
Browse files Browse the repository at this point in the history
- Removes AWS SDK
- Adds Alpakka as new Client
- To perform a clean change we refactored the DynamoDBJournal so that the Helper can be replaced in isolation.
- Introduces settings for Backoff and Retries instead of magic numbers.
- Refactored Describe from Type Class to pattern matching for better testablility and simpler code
- Removed no longer used AWS Client Settings
- Removed Documentation of Threading Issue from Readme
- Bump Version to 1.1.0

This fixes issue akka#3
  • Loading branch information
markus1189 authored and christianuhl committed Mar 24, 2017
1 parent df3e737 commit 0de0819
Show file tree
Hide file tree
Showing 22 changed files with 340 additions and 366 deletions.
31 changes: 11 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ Configuration
-------------

~~~
akka.persistence.journal.plugin = "my-dynamodb-journal"
my-dynamodb-journal = ${dynamodb-journal} # include the default settings
my-dynamodb-journal { # and add some overrides
journal-table = <the name of the table to be used>
journal-name = <prefix to be used for all keys stored by this plugin>
aws-access-key-id = <your key>
aws-secret-access-key = <your secret>
endpoint = "https://dynamodb.us-east-1.amazonaws.com" # or where your deployment is
dynamodb-journal {
journal-table = <your table name>
akka.stream.alpakka.dynamodb {
port = <default for HTTP/HTTPS>
host = <check https://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region>
region = "<your region>
parallelism = <a sensible number>
}
}
akka.persistence.journal.plugin = "dynamodb-journal"
~~~

For details on the endpoint URL please refer to the [DynamoDB documentation](http://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region). There are many more settings that can be used for fine-tuning and adapting this journal plugin to your use-case, please refer to the [reference.conf](https://github.com/akka/akka-persistence-dynamodb/blob/master/src/main/resources/reference.conf) file.
Expand Down Expand Up @@ -73,17 +75,6 @@ In the first case a recovery will only ever see all of the events or none of the

In the second case each event is treated in isolation and may or may not be replayed depending on whether it was persisted successfully or not.

Performance Considerations
--------------------------

This plugin uses the AWS Java SDK which means that the number of requests that can be made concurrently is limited by the number of connections to DynamoDB and by the number of threads in the thread-pool that is used by the AWS HTTP client. The default setting is 50 connections which for a deployment that is used from the same EC2 region allows roughly 5000 requests per second (where every persisted event batch is roughly one request). If a single ActorSystem needs to persist more than this number of events per second then you may want to tune the parameter

~~~
my-dynamodb-journal.aws-client-config.max-connections = <your value here>
~~~

Changing this number changes both the number of concurrent connections and the used thread-pool size.

Compatibility with pre-1.0 versions
-----------------------------------

Expand Down
18 changes: 9 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ val akkaVersion = "2.4.14"
val amzVersion = "1.11.66"

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-core" % amzVersion,
"com.amazonaws" % "aws-java-sdk-dynamodb" % amzVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-tck" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"commons-io" % "commons-io" % "2.4" % "test",
"org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test"
"com.lightbend.akka" %% "akka-stream-alpakka-dynamodb" % "0.6",
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-tck" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"commons-io" % "commons-io" % "2.4" % "test",
"org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test",
"org.mockito" % "mockito-core" % "2.7.19" % "test"
)

parallelExecution in Test := false
Expand Down
2 changes: 1 addition & 1 deletion scripts/dev-setup.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

cd $(dirname $0)/..

Expand Down
2 changes: 1 addition & 1 deletion scripts/get-dynamodb-local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

ROOT=$(cd $(dirname $0)/..; pwd)

Expand Down
2 changes: 1 addition & 1 deletion scripts/kill-dynamoDB.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

cd $(dirname $0)/..

Expand Down
14 changes: 12 additions & 2 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ dynamodb-journal {
# empty in order to use the default credentials provider chain, see
# http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#using-the-default-credential-provider-chain
aws-secret-access-key = ""

# number of concurrently running replay prefetch operations for a
# single PersistentActor; this prefetch means that during a replay
# more events might be retrieved than specified with the `max`
Expand All @@ -77,6 +77,8 @@ dynamodb-journal {
max-batch-get = 100
max-batch-write = 25
max-item-size = 400000
max-retries = 10
initial-backoff-ms = 100 // milliseconds
}

# AWS client configuration settings, see
Expand All @@ -90,7 +92,6 @@ dynamodb-journal {
connection-timeout = default # int
connection-ttl = default # long
local-address = default # InetAddress
max-connections = default # int
max-error-retry = default # int
preemptive-basic-proxy-auth = default # boolean
protocol = default # HTTP or HTTPS
Expand Down Expand Up @@ -120,4 +121,13 @@ dynamodb-journal {
parallelism-max = 8
}
}

akka.stream.alpakka.dynamodb {
port = 8000
host = "localhost"
region = us-east-1
parallelism = 5
}
}


219 changes: 82 additions & 137 deletions src/main/scala/akka/persistence/dynamodb/journal/DynamoDBHelper.scala
Original file line number Diff line number Diff line change
@@ -1,134 +1,108 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence.dynamodb.journal

import com.amazonaws.{ AmazonServiceException, AmazonWebServiceRequest }
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient
import com.amazonaws.services.dynamodbv2.model._
import akka.actor.Scheduler
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{ ActorRef, Scheduler }
import akka.event.LoggingAdapter
import akka.pattern.after
import java.util.{ concurrent => juc }
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits.{ BatchGetItem, BatchWriteItem, DescribeTable, PutItem, Query }
import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.services.dynamodbv2.model._

import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import scala.concurrent.{ ExecutionContext, Future }

case class LatencyReport(nanos: Long, retries: Int)
private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis)

trait DynamoDBHelper {

implicit val ec: ExecutionContext
val scheduler: Scheduler
val dynamoDB: AmazonDynamoDBAsyncClient

val client: DynamoClient
val log: LoggingAdapter
val settings: DynamoDBJournalConfig
import settings._
var reporter: Option[ActorRef] = None

def shutdown(): Unit = dynamoDB.shutdown()
def query(request: QueryRequest): Future[QueryResult] = sendSingleRequest(new Query(request))

private var reporter: ActorRef = _
def setReporter(ref: ActorRef): Unit = reporter = ref
def batchGetItem(request: BatchGetItemRequest): Future[BatchGetItemResult] =
sendSingleRequest(new BatchGetItem(request))

private def send[In <: AmazonWebServiceRequest, Out](aws: In, func: AsyncHandler[In, Out] => juc.Future[Out])(implicit d: Describe[_ >: In]): Future[Out] = {
def putItem(request: PutItemRequest): Future[PutItemResult] =
sendSingleRequest(new PutItem(request))

def name = d.desc(aws)
def batchWriteItem(request: BatchWriteItemRequest): Future[BatchWriteItemResult] =
sendSingleRequest(new BatchWriteItem(request))

def sendSingle(): Future[Out] = {
val p = Promise[Out]
def setReporter(ref: ActorRef): Unit = this.reporter = Some(ref)

val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) = ex match {
case e: ProvisionedThroughputExceededException =>
p.tryFailure(ex)
case _ =>
val n = name
log.error(ex, "failure while executing {}", n)
p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, ex))
}
override def onSuccess(req: In, resp: Out) = p.trySuccess(resp)
}
def describeTable(request: DescribeTableRequest): Future[DescribeTableResult] =
sendSingleRequest(new DescribeTable(request))

try {
func(handler)
} catch {
case ex: Throwable =>
log.error(ex, "failure while preparing {}", name)
p.tryFailure(ex)
}
private[journal] def sendSingleRequest(
awsOp: AwsOp
): Future[awsOp.B] = {
val remainingRetries = new AtomicInteger(settings.ApiRequestMaxRetries)
val initialBackoff = settings.ApiRequestInitialBackoff.millis
val start = System.nanoTime()
val operationName = Describe.describe(awsOp.request)
val f = retry(initialBackoff, remainingRetries)(() => client.single(awsOp))

p.future
}

val state = new RetryStateHolder

lazy val retry: PartialFunction[Throwable, Future[Out]] = {
case _: ProvisionedThroughputExceededException if state.retries > 0 =>
val backoff = state.backoff
state.retries -= 1
state.backoff *= 2
after(backoff, scheduler)(sendSingle().recoverWith(retry))
case other => Future.failed(other)
}
reporter.foreach(r => f.onComplete(_ => r ! LatencyReport(System.nanoTime - start, settings.ApiRequestMaxRetries - remainingRetries.get())))

if (Tracing) log.debug("{}", name)
val start = if (reporter ne null) System.nanoTime else 0L
// Wrapping the resulting future and log a message. The tests made me do it.
// Preserves the log ordering expected by the original tests

// backoff retries when sending too fast
val f = sendSingle().recoverWith(retry)

if (reporter ne null) f.onComplete(_ => reporter ! LatencyReport(System.nanoTime - start, 10 - state.retries))

f
}

trait Describe[T] {
def desc(t: T): String
protected def formatKey(i: Item): String = {
val key = i.get(Key) match { case null => "<none>" case x => x.getS }
val sort = i.get(Sort) match { case null => "<none>" case x => x.getN }
s"[$Key=$key,$Sort=$sort]"
}
f.asInstanceOf[Future[awsOp.B]]
.transform(
identity, {
case pe: ProvisionedThroughputExceededException => pe
case e =>
log.error(
e,
"failure while executing {}",
operationName
)
new DynamoDBJournalFailure("failure while executing " + operationName, e)
}
)
}

object Describe {
implicit object GenericDescribe extends Describe[AmazonWebServiceRequest] {
def desc(aws: AmazonWebServiceRequest): String = aws.getClass.getSimpleName
def retry[A](backoff: FiniteDuration, retries: AtomicInteger)(f: () => Future[A]): Future[A] = {
f().recoverWith {
case e: ProvisionedThroughputExceededException =>
if (retries.getAndDecrement() > 0) {
Future {
log.error(s"ProvisionedThroughputExceededException, backing of $backoff ms, retries left ${retries.get()}")
Thread.sleep(backoff.toMillis)
}.flatMap(_ => retry(backoff * 2, retries)(f))
} else Future.failed(e)
}
}
}

implicit object DescribeDescribe extends Describe[DescribeTableRequest] {
def desc(aws: DescribeTableRequest): String = s"DescribeTableRequest(${aws.getTableName})"
}

implicit object QueryDescribe extends Describe[QueryRequest] {
def desc(aws: QueryRequest): String = s"QueryRequest(${aws.getTableName},${aws.getExpressionAttributeValues})"
}

implicit object PutItemDescribe extends Describe[PutItemRequest] {
def desc(aws: PutItemRequest): String = s"PutItemRequest(${aws.getTableName},${formatKey(aws.getItem)})"
}
object Describe {

implicit object DeleteDescribe extends Describe[DeleteItemRequest] {
def desc(aws: DeleteItemRequest): String = s"DeleteItemRequest(${aws.getTableName},${formatKey(aws.getKey)})"
}

implicit object BatchGetItemDescribe extends Describe[BatchGetItemRequest] {
def desc(aws: BatchGetItemRequest): String = {
val entry = aws.getRequestItems.entrySet.iterator.next()
val table = entry.getKey
val keys = entry.getValue.getKeys.asScala.map(formatKey)
s"BatchGetItemRequest($table, ${keys.mkString("(", ",", ")")})"
private def formatKey(i: Item): String = {
val key = i.get(Key) match {
case null => "<none>"
case x => x.getS
}
val sort = i.get(Sort) match {
case null => "<none>"
case x => x.getN
}
s"[$Key=$key,$Sort=$sort]"
}

implicit object BatchWriteItemDescribe extends Describe[BatchWriteItemRequest] {
def desc(aws: BatchWriteItemRequest): String = {
def describe(request: AmazonWebServiceRequest): String = request match {

case aws: BatchWriteItemRequest =>
val entry = aws.getRequestItems.entrySet.iterator.next()
val table = entry.getKey
val keys = entry.getValue.asScala.map { write =>
Expand All @@ -138,46 +112,17 @@ trait DynamoDBHelper {
}
}
s"BatchWriteItemRequest($table, ${keys.mkString("(", ",", ")")})"
}
}

def listTables(aws: ListTablesRequest): Future[ListTablesResult] =
send[ListTablesRequest, ListTablesResult](aws, dynamoDB.listTablesAsync(aws, _))

def describeTable(aws: DescribeTableRequest): Future[DescribeTableResult] =
send[DescribeTableRequest, DescribeTableResult](aws, dynamoDB.describeTableAsync(aws, _))

def createTable(aws: CreateTableRequest): Future[CreateTableResult] =
send[CreateTableRequest, CreateTableResult](aws, dynamoDB.createTableAsync(aws, _))

def updateTable(aws: UpdateTableRequest): Future[UpdateTableResult] =
send[UpdateTableRequest, UpdateTableResult](aws, dynamoDB.updateTableAsync(aws, _))

def deleteTable(aws: DeleteTableRequest): Future[DeleteTableResult] =
send[DeleteTableRequest, DeleteTableResult](aws, dynamoDB.deleteTableAsync(aws, _))

def query(aws: QueryRequest): Future[QueryResult] =
send[QueryRequest, QueryResult](aws, dynamoDB.queryAsync(aws, _))

def scan(aws: ScanRequest): Future[ScanResult] =
send[ScanRequest, ScanResult](aws, dynamoDB.scanAsync(aws, _))

def putItem(aws: PutItemRequest): Future[PutItemResult] =
send[PutItemRequest, PutItemResult](aws, dynamoDB.putItemAsync(aws, _))

def getItem(aws: GetItemRequest): Future[GetItemResult] =
send[GetItemRequest, GetItemResult](aws, dynamoDB.getItemAsync(aws, _))

def updateItem(aws: UpdateItemRequest): Future[UpdateItemResult] =
send[UpdateItemRequest, UpdateItemResult](aws, dynamoDB.updateItemAsync(aws, _))

def deleteItem(aws: DeleteItemRequest): Future[DeleteItemResult] =
send[DeleteItemRequest, DeleteItemResult](aws, dynamoDB.deleteItemAsync(aws, _))

def batchWriteItem(aws: BatchWriteItemRequest): Future[BatchWriteItemResult] =
send[BatchWriteItemRequest, BatchWriteItemResult](aws, dynamoDB.batchWriteItemAsync(aws, _))
case aws: BatchGetItemRequest =>
val entry = aws.getRequestItems.entrySet.iterator.next()
val table = entry.getKey
val keys = entry.getValue.getKeys.asScala.map(formatKey)
s"BatchGetItemRequest($table, ${keys.mkString("(", ",", ")")})"

def batchGetItem(aws: BatchGetItemRequest): Future[BatchGetItemResult] =
send[BatchGetItemRequest, BatchGetItemResult](aws, dynamoDB.batchGetItemAsync(aws, _))
case aws: DeleteItemRequest => s"DeleteItemRequest(${aws.getTableName},${formatKey(aws.getKey)})"
case aws: PutItemRequest => s"PutItemRequest(${aws.getTableName},${formatKey(aws.getItem)})"
case aws: QueryRequest => s"QueryRequest(${aws.getTableName},${aws.getExpressionAttributeValues})"
case aws: DescribeTableRequest => s"DescribeTableRequest(${aws.getTableName})"
case aws => aws.getClass.getSimpleName
}

}
Loading

0 comments on commit 0de0819

Please sign in to comment.