Skip to content

Commit

Permalink
Add missing default config options in plugin's reference.conf (#701)
Browse files Browse the repository at this point in the history
- without it, if clients tried to partially override an option inside a
nested section, it failed at runtime:
```
pureconfig.error.ConfigReaderException: Cannot convert configuration to a scala.runtime.Nothing$. Failures are:
  at 'data-integrity':
    - (deployments/common/persistence.conf @ file:/Users/msokolov/prog/evo/rbow/betting/target/scala-2.13/classes/deployments/common/persistence.conf: 318) Key not found: 'correlate-events-with-meta'.
```
- added docs to config options
- fixed ConsumerPool max size potentially going below 1
  • Loading branch information
migesok authored Nov 29, 2024
1 parent 655dcdd commit f773088
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ private[journal] object ConsumerPool {
for {
cores <- Runtime[F].availableCores.toResource
pool <- consumer.toResourcePool(
(cores.toDouble * poolConfig.multiplier)
.round
.toInt,
poolConfig.idleTimeout,
maxSize = math.max(1, (cores.toDouble * poolConfig.multiplier).intValue),
expireAfter = poolConfig.idleTimeout,
discardTasksOnRelease = true,
)
} yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@ object Journal {
implicit val configReaderCallTimeThresholds: ConfigReader[CallTimeThresholds] = deriveReader[CallTimeThresholds]
}

/**
* Configuration for the dynamic pool of Kafka consumers used on recovery in case the data is not yet replicated to
* Cassandra
*
* @param multiplier defines max pool size = multiplier x number of cores,
* if a calculated max pool size is below 1, the size of 1 is used to avoid starting with
* an unusable consumer pool configuration
* @param idleTimeout if idle for this time, Kafka consumers are closed
*/
final case class ConsumerPoolConfig(
multiplier: Double,
idleTimeout: FiniteDuration,
Expand All @@ -339,12 +348,13 @@ object Journal {

final case class DataIntegrityConfig(
/**
* If true then duplicated [[SeqNr]] in events will cause [[JournalError]] `Data integrity violated`
* On recovery, if true, duplicated [[SeqNr]] in events will cause [[JournalError]] `Data integrity violated`
*/
seqNrUniqueness: Boolean,

/**
* If true then events with [[RecordId]] different from one in metadata will be filtered out
* On recovery, if true, events with [[RecordId]] different from the one in the current metadata record
* will be filtered out and logged as an error.
*/
correlateEventsWithMeta: Boolean,
)
Expand Down
22 changes: 22 additions & 0 deletions persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,28 @@ evolutiongaming.kafka-journal.persistence {
}

json-codec = default

# Configuration for the dynamic pool of Kafka consumers used on recovery in case the data is not yet replicated to
# Cassandra
consumer-pool {
# Defines max pool size = multiplier x number of cores.
# If a calculated max pool size is below 1, the size of 1 is used to avoid starting with an unusable consumer pool
# configuration.
# Multiplier value could be fractional.
multiplier = 10

# if idle for this time, Kafka consumers are closed
idle-timeout = 1min
}

data-integrity {
# On recovery, if true, duplicated sequence numbers in events will cause a JournalError `Data integrity violated`
seq-nr-uniqueness = true

# On recovery, if true, events with record IDs different from the one in the current metadata record
# will be filtered out and logged as an error
correlate-events-with-meta = false
}
}

dispatcher {
Expand Down

0 comments on commit f773088

Please sign in to comment.