Skip to content

Commit

Permalink
tmp thread println
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Jan 9, 2023
1 parent 8d66434 commit 1a03922
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
48 changes: 44 additions & 4 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ akka.persistence.cassandra {
}

# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.persistence.cassandra.default-dispatcher"
session-dispatcher = "akka.persistence.cassandra.the-session-dispatcher"

# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
Expand Down Expand Up @@ -81,7 +81,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.journal.CassandraJournal"

# Dispatcher for the plugin actor
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.journal-dispatcher"

# Parameter indicating whether the journal keyspace should be auto created.
# Not all Cassandra settings are configurable when using autocreate and for
Expand Down Expand Up @@ -205,7 +205,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.query.CassandraReadJournalProvider"

# Dispatcher for the plugin actors.
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.query-dispatcher"

read-profile = "akka-persistence-cassandra-profile"

Expand Down Expand Up @@ -458,7 +458,7 @@ akka.persistence.cassandra {
class = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"

# Dispatcher for the plugin actor
plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher"
plugin-dispatcher = "akka.persistence.cassandra.snapshot-dispatcher"

write-profile = "akka-persistence-cassandra-snapshot-profile"
read-profile = "akka-persistence-cassandra-snapshot-profile"
Expand Down Expand Up @@ -576,6 +576,46 @@ akka.persistence.cassandra {
parallelism-max = 6
}
}

journal-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

query-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

the-session-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}

snapshot-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 6
parallelism-factor = 1
parallelism-max = 6
}
}
}

//#profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ import akka.stream.scaladsl.Source
fromSequenceNr,
toSequenceNr)

println(s"# asyncReplayMessages-1 Thread: ${Thread.currentThread().getName}") // FIXME
queries
.eventsByPersistenceId(
persistenceId,
Expand All @@ -749,9 +750,13 @@ import akka.stream.scaladsl.Source
"asyncReplayMessages",
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization),
ec)
.mapAsync(1)(tr.sendMissingTagWrite(tp))
.mapAsync(1) { x =>
println(s"# asyncReplayMessages-2 Thread: ${Thread.currentThread().getName}") // FIXME
tr.sendMissingTagWrite(tp)(x)
}
}))
.map { te =>
println(s"# asyncReplayMessages-3 Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import akka.persistence.cassandra.PluginSettings
}

def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: ExecutionContext): Future[Option[Row]] = {
println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
Expand All @@ -85,6 +86,7 @@ import akka.persistence.cassandra.PluginSettings
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] = {
println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
Expand Down Expand Up @@ -260,6 +262,7 @@ import akka.persistence.cassandra.PluginSettings
queryState = QueryInProgress(switchPartition = false, fetchMore = false, System.nanoTime())
session.highestDeletedSequenceNumber(persistenceId).onComplete(highestDeletedSequenceNrCb.invoke)

println(s"# EventsByPersistenceIdStage Thread: ${Thread.currentThread().getName}") // FIXME
if (Thread.currentThread().getName.contains("-akka.actor.default-dispatcher-")) {
println(s"# Thread: ${Thread.currentThread().getName}") // FIXME
new RuntimeException("Wrong thread").printStackTrace()
Expand Down

0 comments on commit 1a03922

Please sign in to comment.