diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumerPool.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumerPool.scala index 0ddc5214d..711a7fa26 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumerPool.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ConsumerPool.scala @@ -26,10 +26,8 @@ private[journal] object ConsumerPool { for { cores <- Runtime[F].availableCores.toResource pool <- consumer.toResourcePool( - (cores.toDouble * poolConfig.multiplier) - .round - .toInt, - poolConfig.idleTimeout, + maxSize = math.max(1, (cores.toDouble * poolConfig.multiplier).intValue), + expireAfter = poolConfig.idleTimeout, discardTasksOnRelease = true, ) } yield { diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala index fa150950a..f714d0716 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Journal.scala @@ -325,6 +325,15 @@ object Journal { implicit val configReaderCallTimeThresholds: ConfigReader[CallTimeThresholds] = deriveReader[CallTimeThresholds] } + /** + * Configuration for the dynamic pool of Kafka consumers used on recovery in case the data is not yet replicated to + * Cassandra + * + * @param multiplier defines max pool size = multiplier x number of cores, + * if a calculated max pool size is below 1, the size of 1 is used to avoid starting with + * an unusable consumer pool configuration + * @param idleTimeout if idle for this time, Kafka consumers are closed + */ final case class ConsumerPoolConfig( multiplier: Double, idleTimeout: FiniteDuration, @@ -339,12 +348,13 @@ object Journal { final case class DataIntegrityConfig( /** - * If true then duplicated [[SeqNr]] in events will cause [[JournalError]] `Data integrity violated` + * On recovery, if true, duplicated [[SeqNr]] in events will cause [[JournalError]] `Data integrity violated` */ seqNrUniqueness: Boolean, /** - * If true then events with [[RecordId]] different from one in metadata will be filtered out + * On recovery, if true, events with [[RecordId]] different from the one in the current metadata record + * will be filtered out and logged as an error. */ correlateEventsWithMeta: Boolean, ) diff --git a/persistence/src/main/resources/reference.conf b/persistence/src/main/resources/reference.conf index 2d9a16805..f5babccd0 100644 --- a/persistence/src/main/resources/reference.conf +++ b/persistence/src/main/resources/reference.conf @@ -53,6 +53,28 @@ evolutiongaming.kafka-journal.persistence { } json-codec = default + + # Configuration for the dynamic pool of Kafka consumers used on recovery in case the data is not yet replicated to + # Cassandra + consumer-pool { + # Defines max pool size = multiplier x number of cores. + # If a calculated max pool size is below 1, the size of 1 is used to avoid starting with an unusable consumer pool + # configuration. + # Multiplier value could be fractional. + multiplier = 10 + + # if idle for this time, Kafka consumers are closed + idle-timeout = 1min + } + + data-integrity { + # On recovery, if true, duplicated sequence numbers in events will cause a JournalError `Data integrity violated` + seq-nr-uniqueness = true + + # On recovery, if true, events with record IDs different from the one in the current metadata record + # will be filtered out and logged as an error + correlate-events-with-meta = false + } } dispatcher {