Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Commit

Permalink
improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Nov 3, 2016
1 parent 9c502bb commit 35f1444
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 98 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
</tr>
<tr>
<td><a href="http://akka.io">Akka</a> </td>
<td>2.4.11</td>
<td>2.4.12</td>
</tr>
<tr>
<td><a href="https://github.com/EventStore/EventStore.JVM">EventStore client</a> </td>
<td>2.4.1</td>
<td>3.0.0</td>
</tr>
</table>

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ scalacOptions in(Compile, doc) ++= Seq("-groups", "-implicits", "-no-link-warnin

resolvers += "spray" at "http://repo.spray.io/"

val AkkaVersion = "2.4.11"
val AkkaVersion = "2.4.12"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
Expand All @@ -44,7 +44,7 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-testkit" % AkkaVersion % IntegrationTest,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % IntegrationTest,
"com.geteventstore" %% "eventstore-client" % "2.4.1",
"com.geteventstore" %% "eventstore-client" % "3.0.0",
"org.specs2" %% "specs2-core" % "3.8.5.1" % IntegrationTest,
"org.json4s" %% "json4s-native" % "3.4.2" % IntegrationTest,
"io.spray" %% "spray-json" % "1.3.2" % IntegrationTest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class EventStoreReadJournalIntegrationSpec extends ActorSpec with Matchers {

val src = queries.allPersistenceIds().filter { x => persistenceIds contains x }
val probe = src.runWith(TestSink.probe[String])
.request(persistenceIds.size)
.request(persistenceIds.size.toLong)
.expectNextUnorderedN(persistenceIds)
}
}
Expand All @@ -43,7 +43,7 @@ class EventStoreReadJournalIntegrationSpec extends ActorSpec with Matchers {

val src = queries.currentPersistenceIds().filter { x => persistenceIds contains x }
src.runWith(TestSink.probe[String])
.request(persistenceIds.size)
.request(persistenceIds.size.toLong)
.expectNextUnorderedN(persistenceIds)
.expectComplete()
}
Expand Down Expand Up @@ -130,9 +130,9 @@ class EventStoreReadJournalIntegrationSpec extends ActorSpec with Matchers {
for { (event, idx) <- events.zipWithIndex } yield {
val seqNr = idx + 1
EventEnvelope(
offset = seqNr,
offset = seqNr.toLong,
persistenceId = persistenceId,
sequenceNr = seqNr,
sequenceNr = seqNr.toLong,
event = event
)
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ eventstore.persistence {
class = akka.persistence.eventstore.journal.EventStoreJournal

# Dispatcher for the plugin actor.
plugin-dispatcher = akka.actor.default-dispatcher
plugin-dispatcher = akka.persistence.dispatchers.default-plugin-dispatcher

# Prefix prepended to persistenceId
# streamId = stream-prefix + persistenceId
Expand Down
16 changes: 14 additions & 2 deletions src/main/scala/akka/persistence/eventstore/EventStorePlugin.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package akka.persistence.eventstore

import akka.actor.{ Actor, ActorLogging }
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import eventstore._
import eventstore.tcp.ConnectionActor

import scala.concurrent.Future
import scala.util.control.NonFatal

trait EventStorePlugin extends ActorLogging { self: Actor =>
val connection: EsConnection = EventStoreExtension(context.system).connection
val settings = Settings(context.system.settings.config)

val connection: EsConnection = {
val dispatcher = config.getString("plugin-dispatcher")
val props = ConnectionActor.props(settings).withDispatcher(dispatcher)
val ref = context.actorOf(props, "eventstore")
new EsConnection(ref, context, settings)
}

val serialization = EventStoreSerialization(context.system)
implicit val materializer = ActorMaterializer()(context)
val prefix: String = config.getString("stream-prefix")
import context.dispatcher

def config: Config

def asyncUnit(x: => Future[_]): Future[Unit] = async(x).map[Unit](_ => ())
def asyncUnit(x: => Future[_]): Future[Unit] = async(x).map(_ => ())

def async[T](x: => Future[T]): Future[T] = try x catch {
case NonFatal(f) => Future.failed(f)
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/akka/persistence/eventstore/Helpers.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package akka.persistence.eventstore

import scala.concurrent.{ ExecutionContext, Future }
import eventstore._

import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future }

object Helpers {
type Timestamp = Long
type PersistenceId = String
Expand Down Expand Up @@ -42,14 +44,15 @@ object Helpers {
import Batch._

def readBatch(req: ReadStreamEvents): Future[Batch] = {
self.future(req).map(Batch(_)).recover {
case _: StreamNotFoundException => Batch.Empty
}
self(req) map { Batch.apply } recover { case _: StreamNotFoundException => Batch.Empty }
}

def loop(events: List[Event], t: T, quit: T => Future[T]): Future[T] = events match {
case Nil => quit(t)
case x :: xs => pf.lift(t -> x).fold(Future.successful(t))(loop(xs, _, quit))
@tailrec def loop(events: List[Event], t: T, quit: T => Future[T]): Future[T] = events match {
case Nil => quit(t)
case x :: xs => pf.lift(t -> x) match {
case None => Future successful t
case Some(x) => loop(xs, x, quit)
}
}

def foldLeft(from: EventNumber, t: T): Future[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import akka.persistence.eventstore.EventStorePlugin
import akka.persistence.eventstore.Helpers._
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.stream.scaladsl.Source
import eventstore._
import play.api.libs.json._

import scala.annotation.tailrec
import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }

class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {
import EventStoreJournal._
Expand All @@ -22,68 +22,52 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {

def asyncWriteMessages(messages: Seq[AtomicWrite]) = {

@tailrec def writeAtomicWrites(
messages: Iterator[AtomicWrite],
progress: Map[String, Future[Try[Unit]]],
results: List[Future[Try[Unit]]]
): Seq[Future[Try[Unit]]] = {

if (messages.isEmpty) results.reverse
else {
val message = messages.next()
val payload = message.payload

val persistenceId = message.persistenceId
def write: Future[Try[Unit]] = {
Try {
payload.map(x => serialization.serialize(x, Some(x.payload)))
} map { events =>

@tailrec def writePayloads(
events: Iterator[Seq[EventData]],
previous: Future[Try[Unit]],
sequenceNr: Long
): Future[Try[Unit]] = {

if (events.isEmpty) previous
else {
val batch = events.next()

def write = {
val expVer = {
val expVer = sequenceNr - 1
if (expVer == 0L) ExpectedVersion.NoStream else ExpectedVersion.Exact(eventNumber(expVer))
}
val req = WriteEvents(eventStream(persistenceId), batch.toList, expVer)
(connection future req) map { _ => Success(()) } recover { case e => Failure(e) }
}

val result = for {
previous <- previous
result <- if (previous.isFailure) Future successful previous else write
} yield result

writePayloads(events, result, sequenceNr + batch.size)
}
}

writePayloads(events grouped writeBatchSize, Future successful Success(()), payload.head.sequenceNr)
} recover {
case e => Future successful Failure(e)
if (messages.isEmpty) Future successful Nil
else Future {
val atomicWrite = messages.head
val persistenceId = atomicWrite.persistenceId
val seqNr = atomicWrite.lowestSequenceNr
val events = for {
atomicWrite <- messages
persistentRepr <- atomicWrite.payload
} yield serialization.serialize(persistentRepr, Some(persistentRepr.payload))

def writeEvents(events: Seq[EventData], seqNr: Long): Future[Nil.type] = {
if (events.isEmpty) Future successful Nil
else {
val expVer = {
val expVer = seqNr - 1
if (expVer == 0L) ExpectedVersion.NoStream else ExpectedVersion.Exact(eventNumber(expVer))
}
}.get

val result = for {
previous <- progress.getOrElse(persistenceId, Future successful Success(()))
result <- if (previous.isFailure) Future successful previous else write
} yield result
val req = WriteEvents(eventStream(persistenceId), events.toList, expVer)
for { _ <- connection(req) } yield Nil
}
}

writeAtomicWrites(messages, progress + (persistenceId -> result), result :: results)
@tailrec def loop(
future: Future[Nil.type],
batches: Traversable[Seq[EventData]],
seqNr: Long
): Future[Nil.type] = {

if (batches.isEmpty) future
else {
val events = batches.head
val result = for {
future <- future
result <- writeEvents(events, seqNr)
} yield result
loop(result, batches.tail, seqNr + events.size)
}
}
}

val futures = writeAtomicWrites(messages.toIterator, Map(), Nil)
Future sequence futures
if (events.size <= writeBatchSize) {
writeEvents(events, seqNr)
} else {
val batches = events grouped writeBatchSize
loop(Future successful Nil, batches.toTraversable, seqNr)
}
} flatMap { identity }
}

def asyncDeleteMessagesTo(persistenceId: PersistenceId, to: SequenceNr) = {
Expand All @@ -93,7 +77,7 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {
val eventData = EventData.StreamMetadata(Content.Json(json.toString()))
val streamId = eventStream(persistenceId).metadata
val req = WriteEvents(streamId, List(eventData))
connection future req
connection(req)
}

if (to != Long.MaxValue) delete(to)
Expand All @@ -105,8 +89,8 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {

def asyncReadHighestSequenceNr(persistenceId: PersistenceId, from: SequenceNr) = async {
val stream = eventStream(persistenceId)
val req = ReadEvent(eventStream(persistenceId), EventNumber.Last)
(connection future req).map {
val req = ReadEvent(stream, EventNumber.Last)
connection(req) map {
case ReadEventCompleted(event) => sequenceNumber(event.number)
} recoverWith {
case _: StreamNotFoundException => Future successful 0L
Expand All @@ -132,23 +116,24 @@ class EventStoreJournal extends AsyncWriteJournal with EventStorePlugin {
max: Long
)(recoveryCallback: (PersistentRepr) => Unit) = asyncUnit {

def asyncReplayMessages(from: EventNumber.Exact, to: EventNumber.Exact, max: Int) = {
val req = ReadStreamEvents(eventStream(persistenceId), from)
connection.foldLeft(req, max) {
case (left, event) if event.number <= to && left > 0 =>
val repr = serialization.deserialize[PersistentRepr](event)
recoveryCallback(repr)
left - 1
}
def asyncReplayMessages(from: Option[EventNumber.Exact], to: EventNumber.Exact) = {
val streamId = eventStream(persistenceId)
val publisher = connection.streamPublisher(streamId, from, infinite = false)
val source = Source.fromPublisher(publisher)
source
.takeWhile { event => event.number <= to }
.take(max)
.runForeach { event => recoveryCallback(serialization.deserialize[PersistentRepr](event)) }
.map { _ => Unit }
}

if (to == 0L) Future(())
else asyncReplayMessages(eventNumber(from max 1), eventNumber(to), max.toIntOrError)
else asyncReplayMessages(if (from <= 1) None else Some(eventNumber(from - 1)), eventNumber(to))
}

def eventStream(x: PersistenceId): EventStream.Id = EventStream(prefix + x) match {
case id: EventStream.Id => id
case other => sys.error(s"Cannot create id event stream for $x")
case other => sys.error(s"Cannot create EventStream.Id for $x")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package akka.persistence.eventstore.snapshot

import akka.persistence.eventstore.Helpers._
import akka.persistence.eventstore.EventStorePlugin
import akka.persistence.eventstore.Helpers._
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import eventstore.ReadDirection.Backward
Expand Down Expand Up @@ -35,9 +35,12 @@ class EventStoreSnapshotStore extends SnapshotStore with EventStorePlugin {
connection.foldLeft(req, Empty) { case (deletes: Deletes, event) => fold(deletes, event) }.map(_.selected)
}

def saveAsync(metadata: SnapshotMetadata, snapshot: Any) = asyncUnit {
val streamId = eventStream(metadata.persistenceId)
connection.future(WriteEvents(streamId, List(serialization.serialize(Snapshot(snapshot, metadata), Some(snapshot)))))
def saveAsync(metadata: SnapshotMetadata, snapshot: Any) = async {
for {
event <- Future { serialization.serialize(Snapshot(snapshot, metadata), Some(snapshot)) }
streamId = eventStream(metadata.persistenceId)
_ <- connection(WriteEvents(streamId, List(event)))
} yield ()
}

def deleteAsync(metadata: SnapshotMetadata) = {
Expand All @@ -56,9 +59,12 @@ class EventStoreSnapshotStore extends SnapshotStore with EventStorePlugin {

def eventStream(x: PersistenceId): EventStream.Id = EventStream.Id(prefix + x + "-snapshots")

def delete(persistenceId: PersistenceId, se: DeleteEvent): Future[Unit] = asyncUnit {
val streamId = eventStream(persistenceId)
connection.future(WriteEvents(streamId, List(serialization.serialize(se, None))))
def delete(persistenceId: PersistenceId, se: DeleteEvent): Future[Unit] = async {
for {
event <- Future { serialization.serialize(se, None) }
streamId = eventStream(persistenceId)
_ <- connection(WriteEvents(streamId, List(event)))
} yield ()
}
}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "2.3.1-SNAPSHOT"
version in ThisBuild := "3.0.0-SNAPSHOT"

0 comments on commit 35f1444

Please sign in to comment.