-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use journal plugin-dispatcher for eventsByPersistenceId recovery #889
base: main
Are you sure you want to change the base?
Conversation
@@ -742,12 +747,16 @@ import akka.stream.scaladsl.Source | |||
None, | |||
settings.journalSettings.readProfile, | |||
"asyncReplayMessages", | |||
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization)) | |||
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization), | |||
ec) | |||
.mapAsync(1)(tr.sendMissingTagWrite(tp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is likely one of the cases where we also need to set the dispatcher on the inner flow (because of bug where futureSource
does not pass attributes to nested flow before running it correctly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added my println testing again
a5817a6
When running akka.persistence.cassandra.journal.CassandraIntegrationSpec it's all on journal-dispatcher
.
I have tried with both Akka 2.6.9 and 2.6.14.
It seems to work fine. Do you see any mistakes in my println?
.mapMaterializedValue(_ => NotUsed) | ||
|
||
/** | ||
* INTERNAL API | ||
* | ||
* FIXME This is not used. Was ori |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIXME This comment was never completed ;)
} | ||
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) | ||
extractor.extract(row, deserializeEventAsync) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dispatcher here removed because done everywhere this is called instead, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is right. The intention is that we shouldn't introduce an async boundary here.
It's typically used like this:
queries.eventsByPersistenceId(...)
.map(...)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
.run()
I see now that we have a slightly different usage with `mapAsync:
queries.eventsByPersistenceId(...)
.mapAsync(...)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(settings.journalSettings.pluginDispatcher))
.run()
For that case I wonder if the dispatcher propagates all the way back to the eventsByPersistenceId
since there is already an async boundary via the mapAsync
?
* When eventsByPersistenceId is used for recovery it should use the journal.plugin-dispatcher * Use plugin-dispatcher all the way for internal eventsByPersistenceId * ExecutionContexts.parasitic for `.map(_ => ())`
* materializer.executionContext can be the default-dispatcher * used by the Retry
* materializer.executionContext can be default-dispatcher
(cherry picked from commit 3c70c6f55b01e59b2c7ee77948de98c4881ce98e)
a5817a6
to
1a03922
Compare
@johanandren Maybe we should swap in this again since you are looking at the dispatcher attribute? If you look at the log of the ci run you see see "Wrong thread", for example https://github.com/akka/akka-persistence-cassandra/actions/runs/3874066766/jobs/6604818927#step:6:5377 |
This is a forward port of akka/akka#30007 and #888, but also a few more dispatcher related things
Use journal plugin-dispatcher for recovery
.map(_ => ())
ExecutionContext parameter to EventsByTagStage and EventsByPersistenceIdStage