diff --git a/build.sbt b/build.sbt index 4ace64611..826257920 100644 --- a/build.sbt +++ b/build.sbt @@ -37,13 +37,15 @@ lazy val commonSettings = Seq( import com.typesafe.tools.mima.core.* ThisBuild / mimaBinaryIssueFilters ++= Seq( - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("com.evolutiongaming.kafka.journal.Journal$DataIntegrityConfig"), - ProblemFilters.exclude[MissingClassProblem]("com.evolutiongaming.kafka.journal.Journal$DataIntegrityConfig$*"), - ProblemFilters.exclude[IncompatibleSignatureProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalHead.*"), - ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalHead.*"), - ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.replicator.TopicReplicatorMetrics.*"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.kafka.journal.replicator.TopicReplicatorMetrics.*"), - ProblemFilters.exclude[IncompatibleSignatureProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalStatements#*"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#MetaJournalStatements.of", + ), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#MetaJournalStatements.fromMetaJournal", + ), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.of", + ), ) val alias: Seq[sbt.Def.Setting[?]] = diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandra.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandra.scala index d9ea2465e..9dc363d9f 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandra.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandra.scala @@ -74,7 +74,7 @@ object EventualCassandra { for { log <- LogOf[F].apply(EventualCassandra.getClass) schema <- SetupSchema[F](schemaConfig, origin, consistencyConfig) - segmentNrsOf = SegmentNrsOf[F](first = Segments.default, second = Segments.old) + segmentNrsOf = SegmentNrs.Of[F](first = Segments.default, second = Segments.old) statements <- Statements.of(schema, segmentNrsOf, Segments.default, consistencyConfig.read) _ <- log.info(s"kafka-journal version: ${Version.current.value}") } yield { @@ -115,7 +115,7 @@ object EventualCassandra { } val records = - read(from, Segment(from, head.segmentSize)) + read(from, Segment.journal(from, head.segmentSize)) .chain { case (record, segment) => for { @@ -220,19 +220,19 @@ object EventualCassandra { } } - final case class Statements[F[_]]( + private[journal] final case class Statements[F[_]]( records: JournalStatements.SelectRecords[F], metaJournal: MetaJournalStatements[F], selectOffset2: Pointer2Statements.SelectOffset[F], ) - object Statements { + private[journal] object Statements { def apply[F[_]](implicit F: Statements[F]): Statements[F] = F def of[F[_]: Concurrent: CassandraSession: ToTry: JsonCodec.Decode]( schema: Schema, - segmentNrsOf: SegmentNrsOf[F], + segmentNrsOf: SegmentNrs.Of[F], segments: Segments, consistencyConfig: CassandraConsistencyConfig.Read, ): F[Statements[F]] = { @@ -246,7 +246,7 @@ object EventualCassandra { } } - trait MetaJournalStatements[F[_]] { + private[journal] trait MetaJournalStatements[F[_]] { def journalHead(key: Key): F[Option[JournalHead]] @@ -255,11 +255,11 @@ object EventualCassandra { def ids(topic: Topic): Stream[F, String] } - object MetaJournalStatements { + private[journal] object MetaJournalStatements { def of[F[_]: Concurrent: CassandraSession]( schema: Schema, - segmentNrsOf: SegmentNrsOf[F], + segmentNrsOf: SegmentNrs.Of[F], segments: Segments, consistencyConfig: CassandraConsistencyConfig.Read, ): F[MetaJournalStatements[F]] = { @@ -268,7 +268,7 @@ object EventualCassandra { def of[F[_]: Concurrent: CassandraSession]( metaJournal: TableName, - segmentNrsOf: SegmentNrsOf[F], + segmentNrsOf: SegmentNrs.Of[F], segments: Segments, consistencyConfig: CassandraConsistencyConfig.Read, ): F[MetaJournalStatements[F]] = { @@ -282,7 +282,7 @@ object EventualCassandra { } def fromMetaJournal[F[_]: Concurrent]( - segmentNrsOf: SegmentNrsOf[F], + segmentNrsOf: SegmentNrs.Of[F], journalHead: cassandra.MetaJournalStatements.SelectJournalHead[F], journalPointer: cassandra.MetaJournalStatements.SelectJournalPointer[F], ids: cassandra.MetaJournalStatements.SelectIds[F], @@ -295,7 +295,7 @@ object EventualCassandra { def firstOrSecond[A](key: Key)(f: SegmentNr => F[Option[A]]): F[Option[A]] = { for { - segmentNrs <- segmentNrsOf(key) + segmentNrs <- segmentNrsOf.metaJournal(key) first = f(segmentNrs.first) result <- segmentNrs .second @@ -319,7 +319,7 @@ object EventualCassandra { def ids(topic: Topic) = { for { - segmentNr <- segments.segmentNrs.toStream1[F] + segmentNr <- segments.metaJournalSegmentNrs.toStream1[F] id <- ids1(topic, segmentNr) } yield id } diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandra.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandra.scala index 882e10be5..e6de9bd1b 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandra.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandra.scala @@ -43,7 +43,7 @@ private[journal] object ReplicatedCassandra { expiryService <- ExpiryService.of[F] _ <- log.info(s"kafka-journal version: ${Version.current.value}") } yield { - val segmentOf = SegmentNrsOf[F](first = Segments.default, second = Segments.old) + val segmentOf = SegmentNrs.Of[F](first = Segments.default, second = Segments.old) val journal = apply[F](config.segmentSize, segmentOf, statements, expiryService).withLog(log) metrics .fold(journal) { metrics => journal.withMetrics(metrics) } @@ -53,7 +53,7 @@ private[journal] object ReplicatedCassandra { def apply[F[_]: Sync: Parallel: Fail: UUIDGen]( segmentSizeDefault: SegmentSize, - segmentNrsOf: SegmentNrsOf[F], + segmentNrsOf: SegmentNrs.Of[F], statements: Statements[F], expiryService: ExpiryService[F], ): ReplicatedJournal[F] = { @@ -109,7 +109,7 @@ private[journal] object ReplicatedCassandra { } for { - segmentNrs <- segmentNrsOf(key) + segmentNrs <- segmentNrsOf.metaJournal(key) result <- head(segmentNrs.first).orElse { segmentNrs .second @@ -166,7 +166,8 @@ private[journal] object ReplicatedCassandra { case None => loop(tail, (segment, head :: batch).some, result) case Some(next) => loop(tail, (next, Nel.of(head)).some, insert(segment, batch)) } - case None => loop(tail, (Segment(seqNr, journalHead.segmentSize), Nel.of(head)).some, result) + case None => + loop(tail, (Segment.journal(seqNr, journalHead.segmentSize), Nel.of(head)).some, result) } case Nil => s.fold(result) { case (segment, batch) => insert(segment, batch) } @@ -333,8 +334,8 @@ private[journal] object ReplicatedCassandra { val segmentSize = journalHead.segmentSize for { segmentNrs <- from - .toSegmentNr(segmentSize) - .to[F] { to.toSegmentNr(segmentSize) } + .toJournalSegmentNr(segmentSize) + .to[F] { to.toJournalSegmentNr(segmentSize) } result <- { if (to >= seqNr) { segmentNrs.parFoldMapA { segmentNr => @@ -441,12 +442,12 @@ private[journal] object ReplicatedCassandra { segmentNrs <- from .prev[Option] .getOrElse { from } - .toSegmentNr(segmentSize) + .toJournalSegmentNr(segmentSize) .to[F] { to .next[Option] .getOrElse { journalHead.seqNr } - .toSegmentNr(segmentSize) + .toJournalSegmentNr(segmentSize) } _ <- segmentNrs.parFoldMapA { segmentNr => statements @@ -474,13 +475,13 @@ private[journal] object ReplicatedCassandra { } } - trait MetaJournalStatements[F[_]] { + private[journal] trait MetaJournalStatements[F[_]] { import MetaJournalStatements.* def apply(key: Key, segment: SegmentNr): ByKey[F] } - object MetaJournalStatements { + private[journal] object MetaJournalStatements { def of[F[_]: Monad: CassandraSession]( schema: Schema, @@ -612,7 +613,7 @@ private[journal] object ReplicatedCassandra { } } - final case class Statements[F[_]]( + private[journal] final case class Statements[F[_]]( insertRecords: JournalStatements.InsertRecords[F], deleteRecordsTo: JournalStatements.DeleteTo[F], deleteRecords: JournalStatements.Delete[F], @@ -626,7 +627,7 @@ private[journal] object ReplicatedCassandra { selectTopics2: Pointer2Statements.SelectTopics[F], ) - object Statements { + private[journal] object Statements { def apply[F[_]](implicit F: Statements[F]): Statements[F] = F diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segment.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segment.scala index c4d041b4c..bf2857980 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segment.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segment.scala @@ -3,21 +3,41 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra import cats.syntax.all.* import com.evolutiongaming.kafka.journal.SeqNr +/** + * Represent a segment in the `journal` table. + * + * @param nr segment number + * @param size maximum segment number + */ private[journal] final case class Segment(nr: SegmentNr, size: SegmentSize) private[journal] object Segment { - def apply(seqNr: SeqNr, size: SegmentSize): Segment = { - val segmentNr = SegmentNr(seqNr, size) - apply(segmentNr, size) + @deprecated("use `Segment.journal` instead", "4.1.0") + def apply(seqNr: SeqNr, size: SegmentSize): Segment = journal(seqNr, size) + + /** + * Calculate segment number for `seqNr` based on `segmentSize` for `journal` table. + * + * @see based on [[SegmentNr#journal]] + */ + def journal(seqNr: SeqNr, size: SegmentSize): Segment = { + val segmentNr = SegmentNr.journal(seqNr, size) + Segment(segmentNr, size) } implicit class SegmentOps(val self: Segment) extends AnyVal { + /** + * Get segment for `seqNr` if its different from current segment `self`. + * + * @param seqNr sequence number + * @return [[Some]] segment if `seqNr` is in different segment, [[None]] otherwise + */ def next(seqNr: SeqNr): Option[Segment] = { - val segmentNr = SegmentNr(seqNr, self.size) + val segmentNr = SegmentNr.journal(seqNr, self.size) if (segmentNr === self.nr) none - else self.copy(segmentNr).some + else Segment(segmentNr, self.size).some } } } diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNr.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNr.scala index 0e00c1b5d..86f349ab6 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNr.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNr.scala @@ -3,11 +3,52 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra import cats.kernel.Eq import cats.syntax.all.* import cats.{Applicative, Id, Monad, Order, Show} -import com.evolutiongaming.kafka.journal.SeqNr import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.kafka.journal.util.Fail.implicits.* +import com.evolutiongaming.kafka.journal.{Key, SeqNr} import com.evolutiongaming.scassandra.{DecodeByName, DecodeRow, EncodeByName, EncodeRow} +/** + * Segment fields in `journal` and `metajournal` tables. Part of clustering key in both tables. + * + * [[SegmentNr]] can be limited: in `metajournal` table cannot be more than [[Segments]] thus: + * ``` + * val metaJournalSegmentNr: SegmentNr = ??? + * val metaJournalSegments: Segments = ??? + * metaJournalSegmentNr <= metaJournalSegments // have to be true + * ``` + * + * In `journal` table [[SegmentNr]] is not limited but rather each segment contains not more than [[SegmentSize]] events: + * ``` + * val journalSegmentNr: SegmentNr = ??? + * val journalSegmentSize: SegmentSize = ??? + * journalSegmentNr.value > journalSegmentSize.value // can be true but not have to be + * ``` + * + * Used to partition the `journal` table into segments thus distribute data between Cassandra partitions. + * For `journal` table [[SegmentNr]] calculated based on [[SeqNr]] and [[SegmentSize]] in [[SegmentNr.journal]]: + * ``` + * val seqNr: SeqNr = SeqNr.unsafe(45629) + * val segmentSize: SegmentSize = SegmentSize.default // 10000 + * val segmentNr: SegmentNr = SegmentNr.journal(seqNr, segmentSize) // (45629 - 1) / 10000 = 4 where `1` is SeqNr.min + * ``` + * + * Used to partition the `metajournal` table into segments thus simplify iterating over all records in the table. + * For `metajournal` table [[SegmentNr]] calculated based on [[Key]] and [[Segments]] in [[SegmentNr.metaJournal]]: + * ``` + * val key: Key = Key("journal ID", "journal topic") // key.id.toLowerCase.hashCode = 969045668 + * val segments: Segments = Segments.default // 10000 + * val segmentNr: SegmentNr = SegmentNr.metaJournal(key, segments) // Math.abs("journal ID".toLowerCase.hashCode) % 10000 = 5668 + * ``` + * + * @see [[Segments]] for `metajournal` table + * @see [[SegmentNrs]] for `metajournal` table + * @see [[SegmentOf]] for `metajournal` table + * @see [[SegmentNrsOf]] for `metajournal` table + * + * @see [[SegmentSize]] for `journal` table + * @see [[Segment]] for `journal` table + */ sealed abstract case class SegmentNr(value: Long) { override def toString: String = value.toString @@ -15,6 +56,37 @@ sealed abstract case class SegmentNr(value: Long) { object SegmentNr { + /** + * [[SegmentNr]] factory. + */ + sealed trait Of[F[_]] { + + /** + * Calculate [[SegmentNr]] for `metajournal` table. + * + * @see [[SegmentNr.metaJournal]] for the actual algorithm. + */ + def metaJournal(key: Key): F[SegmentNr] + } + + object Of { + def const[F[_]: Applicative](nr: SegmentNr): Of[F] = + new Of[F] { + def metaJournal(key: Key): F[SegmentNr] = nr.pure[F] + } + + /** + * Use [[Segments]] to calculate [[SegmentNr]] for `metajournal` table. + * + * @param segments total number of segments in `metajournal` table + * @return [[SegmentNr.Of]] factory + */ + def apply[F[_]: Applicative](segments: Segments): Of[F] = + new Of[F] { + def metaJournal(key: Key): F[SegmentNr] = SegmentNr.metaJournal(key, segments).pure[F] + } + } + val min: SegmentNr = new SegmentNr(0L) {} val max: SegmentNr = new SegmentNr(Long.MaxValue) {} @@ -49,21 +121,45 @@ object SegmentNr { } } - def apply(seqNr: SeqNr, segmentSize: SegmentSize): SegmentNr = { + @deprecated("use `journal` instead", "4.1.0") + def apply(seqNr: SeqNr, segmentSize: SegmentSize): SegmentNr = journal(seqNr, segmentSize) + + /** + * Calculate segment number for `journal` table + */ + def journal(seqNr: SeqNr, segmentSize: SegmentSize): SegmentNr = { val segmentNr = (seqNr.value - SeqNr.min.value) / segmentSize.value new SegmentNr(segmentNr) {} } + @deprecated("use `metaJournal` instead", "4.1.0") def apply(hashCode: Int, segments: Segments): SegmentNr = { val segmentNr = math.abs(hashCode.toLong % segments.value) new SegmentNr(segmentNr) {} } + /** + * Calculate segment number for `metajournal` table + */ + def metaJournal(key: Key, segments: Segments): SegmentNr = { + val hashCode = key.id.toLowerCase.hashCode + val segmentNr = math.abs(hashCode.toLong % segments.value) + new SegmentNr(segmentNr) {} + } + def opt(value: Long): Option[SegmentNr] = of[Option](value) def unsafe[A](value: A)(implicit numeric: Numeric[A]): SegmentNr = of[Id](numeric.toLong(value)) + @deprecated("use `allForSegmentSize` instead", "4.1.0") def fromSegments(segments: Segments): List[SegmentNr] = { + allForSegmentSize(segments) + } + + /** + * All possible [[SegmentNr]] values for the given [[Segments]]. + */ + def allForSegmentSize(segments: Segments): List[SegmentNr] = { min .value .until(segments.value.toLong) @@ -101,7 +197,13 @@ object SegmentNr { object implicits { implicit class SeqNrOpsSegmentNr(val self: SeqNr) extends AnyVal { - def toSegmentNr(segmentSize: SegmentSize): SegmentNr = SegmentNr(self, segmentSize) + + @deprecated("use `seqNr.toJournalSegmentNr` instead", "4.1.0") + def toSegmentNr(segmentSize: SegmentSize): SegmentNr = { + toJournalSegmentNr(segmentSize) + } + + def toJournalSegmentNr(segmentSize: SegmentSize): SegmentNr = SegmentNr.journal(self, segmentSize) } } } diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrs.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrs.scala index 9c08092e9..8ee46382f 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrs.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrs.scala @@ -1,6 +1,8 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra +import cats.Applicative import cats.syntax.all.* +import com.evolutiongaming.kafka.journal.Key /** Contains segments that query should be performed in. * @@ -17,6 +19,26 @@ private[journal] sealed abstract case class SegmentNrs(first: SegmentNr, second: private[journal] object SegmentNrs { + sealed trait Of[F[_]] { + def metaJournal(key: Key): F[SegmentNrs] + } + + object Of { + + def const[F[_]: Applicative](nrs: SegmentNrs): Of[F] = new Of[F] { + def metaJournal(key: Key) = nrs.pure[F] + } + + def apply[F[_]: Applicative](first: Segments, second: Segments): Of[F] = new Of[F] { + def metaJournal(key: Key) = + SegmentNrs( + first = SegmentNr.metaJournal(key, first), + second = SegmentNr.metaJournal(key, second), + ).pure[F] + } + + } + def apply(first: SegmentNr, second: SegmentNr): SegmentNrs = { new SegmentNrs(first, if (first == second) none else second.some) {} } diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOf.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOf.scala index 392a3451f..94b8f1dda 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOf.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOf.scala @@ -4,7 +4,7 @@ import cats.syntax.all.* import cats.{Applicative, ~>} import com.evolutiongaming.kafka.journal.Key -/** Calculate [[SegementNrs]] using the passed journal key. +/** Calculate [[SegmentNrs]] from passed journal key for use in `metajournal` table. * * It is expected that for the same key the same [[SegmentNrs]] will be * returned. @@ -16,11 +16,13 @@ import com.evolutiongaming.kafka.journal.Key * [[SegmentOf]] for a more simple implementation of this factory without * support of backwards compatible change of the segmenting algorithm. */ +@deprecated("use `SegmentNrs.Of` instead", "4.1.0") private[journal] trait SegmentNrsOf[F[_]] { def apply(key: Key): F[SegmentNrs] } +@deprecated("use `SegmentNrs.Of` instead", "4.1.0") private[journal] object SegmentNrsOf { /** Always return one and the same [[SegmentNrs]] instance. @@ -30,14 +32,16 @@ private[journal] object SegmentNrsOf { */ def const[F[_]: Applicative](segmentNrs: SegmentNrs): SegmentNrsOf[F] = (_: Key) => segmentNrs.pure[F] - /** Calculate both [[SegmentNrs]] values by a key using a hashing alorithm */ + /** Calculate both [[SegmentNrs]] values by a key using a hashing algorithm */ def apply[F[_]: Applicative](first: Segments, second: Segments): SegmentNrsOf[F] = { key => - val hashCode = key.id.toLowerCase.hashCode - val segmentNrs = SegmentNrs(first = SegmentNr(hashCode, first), second = SegmentNr(hashCode, second)) - segmentNrs.pure[F] + SegmentNrs( + first = SegmentNr.metaJournal(key, first), + second = SegmentNr.metaJournal(key, second), + ).pure[F] } /** Calculate [[SegmentNrs#first]] value only and ignore [[SegmentNrs#second]] */ + @deprecated("use `apply(first, second)` instead", "4.1.0") def apply[F[_]: Applicative](segmentOf: SegmentOf[F]): SegmentNrsOf[F] = { key => segmentOf(key).map { segmentNr => SegmentNrs(segmentNr) } } diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOf.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOf.scala index a80e9823a..c4d4899c7 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOf.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOf.scala @@ -4,7 +4,7 @@ import cats.syntax.all.* import cats.{Applicative, ~>} import com.evolutiongaming.kafka.journal.Key -/** Calculate [[SegementNr]] using the passed journal key. +/** Calculate [[SegmentNr]] from passed journal key for use in `metajournal` table. * * It is expected that for the same key the same [[SegmentNr]] will be * returned. @@ -14,12 +14,15 @@ import com.evolutiongaming.kafka.journal.Key * * @see [[SegmentNrsOf]] for an implementation supporting backwards compatible * change of the segmenting algorithm. + * @see [[SegmentNr.metaJournal]] for the actual algorithm. */ +@deprecated("use `SegmentNr.Of` instead", "4.1.0") private[journal] trait SegmentOf[F[_]] { def apply(key: Key): F[SegmentNr] } +@deprecated("use `SegmentNr.Of` instead", "4.1.0") private[journal] object SegmentOf { /** Always return one and the same [[SegmentNrs]] instance. @@ -29,13 +32,9 @@ private[journal] object SegmentOf { */ def const[F[_]: Applicative](segmentNr: SegmentNr): SegmentOf[F] = (_: Key) => segmentNr.pure[F] - /** Calculate [[SegmentNr]] value by key using a hashing alorithm */ + /** Calculate [[SegmentNr]] value by key using a hashing algorithm */ def apply[F[_]: Applicative](segments: Segments): SegmentOf[F] = { (key: Key) => - { - val hashCode = key.id.toLowerCase.hashCode - val segmentNr = SegmentNr(hashCode, segments) - segmentNr.pure[F] - } + SegmentNr.metaJournal(key, segments).pure[F] } implicit class SegmentOfOps[F[_]](val self: SegmentOf[F]) extends AnyVal { diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentSize.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentSize.scala index 643f9668c..57a70d871 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentSize.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentSize.scala @@ -8,24 +8,20 @@ import com.evolutiongaming.scassandra.* import pureconfig.error.{CannotParse, ConfigReaderFailures} import pureconfig.{ConfigCursor, ConfigReader} -/** The size of a segment in Cassandra table. +/** The size of a segment in `journal` table. * * When [[SegmentSize]] is used then the segment column is used akin to a page * number. I.e. segment number increments as soon as more than [[SegmentSize#value]] - * rows accumulate. + * rows accumulate. This allows the nearby journal events reside mostly in the same + * partitions in Cassandra, making recovery quicker and less resource consuming. * - * The logic itself could be found in [[SegmentNr]] class constructors - * (apply methods). + * The logic itself could be found in [[SegmentNr#journal]] class constructor. * - * The only place where such approach is used right now is a journal table - * specified by [[SchemaConfig#journalTable]]. This allows the nearby journal - * events reside mostly in the same partitions in Cassandra making recovery - * quicker and less resource consuming. - * - * The value is configured per journal in [[EventualCassandraConfig#segmentSize]] + * The value is configured per journal in `metajournal` table in `segment_size` column. * and stays the same during the life of the persistent journal. * - * @see [[Segments]] for alternative way used for some other tables. + * @see [[SegmentNr]] for usage in `journal` table. + * @see [[Segments]] for alternative way, used in `metajournal` table. */ private[journal] sealed abstract case class SegmentSize(value: Int) { diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segments.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segments.scala index 8ffc773ca..7ea2b2da3 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segments.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/Segments.scala @@ -6,23 +6,20 @@ import cats.{Applicative, Id, Order, Show} import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.kafka.journal.util.Fail.implicits.* -/** The maximum number of segments in Cassandra table. +/** The maximum number of segments in 'metajournal' table. * * When [[Segments]] is used then the segment column value is determined by * consistent hashing of the key column. I.e. there always no more than - * [[Segments#value]] different values. + * [[Segments#value]] different values. This allow selecting all records + * in the table by iterating over all segments. * - * The logic itself could be found in [[SegmentNr]] class constructors - * (apply methods). - * - * The only place where such approach is used right now is a metajournal - * specified by [[SchemaConfig#metaJournalTable]]. This allows the fair - * distribution of the journal keys between the Cassandra partitions. + * The logic itself could be found in [[SegmentNr#metaJournal]]. * * The value is not end-user configurable and, currently, set in * [[EventualCassandra]]. * - * @see [[SegmentSize]] for alternative way used for some other tables. + * @see [[SegmentNr]] for usage in `metajournal` table. + * @see [[SegmentSize]] for alternative way, used in `journal` table. */ private[journal] sealed abstract case class Segments(value: Int) { @@ -66,6 +63,10 @@ private[journal] object Segments { def unsafe[A](value: A)(implicit numeric: Numeric[A]): Segments = of[Id](numeric.toInt(value)) implicit class SegmentsOps(val self: Segments) extends AnyVal { - def segmentNrs: List[SegmentNr] = SegmentNr.fromSegments(self) + + @deprecated("use `metaJournalSegmentNrs` instead", "4.1.0") + def segmentNrs: List[SegmentNr] = metaJournalSegmentNrs + + def metaJournalSegmentNrs: List[SegmentNr] = SegmentNr.allForSegmentSize(self) } } diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala index dfd841297..d0c8a1ac2 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala @@ -108,7 +108,7 @@ object EventualCassandraSpec { implicit val log: Log[StateT] = Log.empty[StateT] - def eventualJournalOf(segmentNrsOf: SegmentNrsOf[StateT], segments: Segments): EventualJournal[StateT] = { + def eventualJournalOf(segmentNrsOf: SegmentNrs.Of[StateT], segments: Segments): EventualJournal[StateT] = { val selectRecords = new JournalStatements.SelectRecords[StateT] { @@ -146,7 +146,7 @@ object EventualCassandraSpec { def replicatedJournalOf( segmentSize: SegmentSize, delete: Boolean, - segmentNrsOf: SegmentNrsOf[StateT], + segmentNrsOf: SegmentNrs.Of[StateT], ): ReplicatedJournal[StateT] = { val insertRecords: JournalStatements.InsertRecords[StateT] = { (key, segment, insert) => @@ -386,7 +386,7 @@ object EventualCassandraSpec { segmentsFirst: Segments, segmentsSecond: Segments, ): EventualAndReplicated[StateT] = { - val segmentNrsOf = SegmentNrsOf[StateT](first = segmentsFirst, second = segmentsSecond) + val segmentNrsOf = SegmentNrs.Of[StateT](first = segmentsFirst, second = segmentsSecond) val replicatedJournal = replicatedJournalOf(segmentSize, delete, segmentNrsOf) val eventualJournal = eventualJournalOf(segmentNrsOf, segmentsFirst max segmentsSecond) EventualAndReplicated(eventualJournal, replicatedJournal) diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala index 95cbeda74..d7ea344bf 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala @@ -60,8 +60,8 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { correlateEventsWithMeta = true, ) - val segmentOf = SegmentOf[Id](segments) - val segmentNrsOf = SegmentNrsOf[StateT](first = segments, Segments.default) + val segmentOf = SegmentNr.Of[Id](segments) + val segmentNrsOf = SegmentNrs.Of[StateT](first = segments, Segments.default) val statements = statementsOf(segmentNrsOf, Segments.default) val journal = EventualCassandra(statements, config) @@ -94,7 +94,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { test(s"pointer, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOf(key) + val segment = segmentOf.metaJournal(key) val seqNr = SeqNr.min val stateT = for { pointer <- journal.pointer(key) @@ -140,7 +140,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOf(key) + val segment = segmentOf.metaJournal(key) val record1 = { record.copy( event = record @@ -209,7 +209,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { test(s"read duplicated seqNr, $suffix") { val seqNr = SeqNr.min val key = Key(id = "id", topic = topic0) - val segment = segmentOf(key) + val segment = segmentOf.metaJournal(key) val record1 = record.copy(event = record.event.copy(timestamp = timestamp1)) val stateT = for { @@ -253,7 +253,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { test(s"read only events that corelate with meta, $suffix") { val seqNr = SeqNr.min val key = Key(id = "id", topic = topic0) - val segment = segmentOf(key) + val segment = segmentOf.metaJournal(key) val actual = RecordId.unsafe val legacy = RecordId.unsafe @@ -292,7 +292,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { test(s"read events with duplicated seqNr if only one 'branch' correlate with meta, $suffix") { val seqNr = SeqNr.min val key = Key(id = "id", topic = topic0) - val segment = segmentOf(key) + val segment = segmentOf.metaJournal(key) val recordId = RecordId.unsafe val stateT = journal.read(key, seqNr).toList @@ -526,7 +526,7 @@ object EventualCassandraTest { } } - def statementsOf(segmentNrsOf: SegmentNrsOf[StateT], segments: Segments): EventualCassandra.Statements[StateT] = { + def statementsOf(segmentNrsOf: SegmentNrs.Of[StateT], segments: Segments): EventualCassandra.Statements[StateT] = { val concurrentStateT: Concurrent[StateT] = ConcurrentOf.fromMonad[StateT] val metaJournalStatements = EventualCassandra diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala index d2d486686..4cb0d796d 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala @@ -59,8 +59,8 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { segments <- List((Segments.min, Segments.old), (Segments.old, Segments.default)) } { val (segmentsFirst, segmentsSecond) = segments - val segmentNrsOf = SegmentNrsOf[StateT](first = segmentsFirst, second = segmentsSecond) - val segmentOfId = SegmentOf[Id](segmentsFirst) + val segmentNrsOf = SegmentNrs.Of[StateT](first = segmentsFirst, second = segmentsSecond) + val segmentOfId = SegmentNr.Of[Id](segmentsFirst) val journal = { implicit val parallel = Parallel.identity[StateT] implicit val uuidGen = new UUIDGen[StateT] { @@ -79,7 +79,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"topics, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val offset1 = partitionOffset.offset.inc[Try].get val stateT = for { @@ -149,7 +149,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"offset, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { offset <- journal.offset(topic0, Partition.min) _ = offset shouldEqual none @@ -222,8 +222,8 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { val id1 = "id1" val key0 = Key(id0, topic0) val key1 = Key(id1, topic1) - val segment0 = segmentOfId(key0) - val segment1 = segmentOfId(key1) + val segment0 = segmentOfId.metaJournal(key0) + val segment1 = segmentOfId.metaJournal(key1) val expiry = Expiry(1.minute.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) val stateT = for { _ <- journal.append( @@ -401,7 +401,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"append & override expireAfter, $suffix") { val id = "id" val key = Key(id, topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val expiry0 = Expiry(1.minute.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) val expiry1 = Expiry(2.minute.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) val stateT = for { @@ -518,7 +518,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { val expiry1 = Expiry(2.minutes.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -613,7 +613,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { val expiry = Expiry(1.minute.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -707,7 +707,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { val expiry = Expiry(1.minute.toExpireAfter, LocalDate.of(2019, 12, 12).toExpireOn) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -797,7 +797,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"not repeat appends, $suffix") { val id = "id" val key = Key(id, topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = journal.append( key = key, Partition.min, @@ -882,7 +882,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"delete, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -966,7 +966,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"not repeat deletions, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = journal.delete( key = key, Partition.min, @@ -1039,7 +1039,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"purge, $suffix") { val id = "id" val key = Key(id, topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -1088,7 +1088,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"repeat purge again for the same offset, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append(key, partitionOffset.partition, partitionOffset.offset, timestamp0, none, Nel.of(record.event)) _ <- journal.purge(key, partitionOffset.partition, partitionOffset.offset, timestamp0) @@ -1124,7 +1124,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"purge meta journal only, $suffix") { val id = "id" val key = Key(id, topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = for { _ <- journal.append( key = key, @@ -1174,7 +1174,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"not set correlation ID, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) def partitionOffset(offset: Long) = PartitionOffset(Partition.min, Offset.unsafe(offset)) @@ -1269,7 +1269,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"not update correlation ID, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val rid0 = RecordId.unsafe def partitionOffset(offset: Long) = PartitionOffset(Partition.min, Offset.unsafe(offset)) @@ -1365,7 +1365,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { test(s"optimization: delete action advances head's seq_nr, $suffix") { val id = "id" val key = Key(id = id, topic = topic0) - val segment = segmentOfId(key) + val segment = segmentOfId.metaJournal(key) val stateT = journal.delete( key = key, Partition.min, diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOfTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOfTest.scala index aa81cc42f..453709700 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOfTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentNrsOfTest.scala @@ -18,9 +18,9 @@ class SegmentNrsOfTest extends AnyFunSuite with Matchers { ) } yield { test(s"id: $id, first: $first, second: $second, segmentNrs: $segmentNrs") { - val segmentNrsOf = SegmentNrsOf[Id](first, second) + val segmentNrsOf = SegmentNrs.Of[Id](first, second) val key = Key(id = id, topic = "topic") - segmentNrsOf(key) shouldEqual segmentNrs + segmentNrsOf.metaJournal(key) shouldEqual segmentNrs } } } diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOfTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOfTest.scala index e4d1d910b..588fce4f5 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOfTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentOfTest.scala @@ -16,9 +16,9 @@ class SegmentOfTest extends AnyFunSuite with Matchers { ) } yield { test(s"id: $id, segments: $segments, segmentNr: $segmentNr") { - val segmentOf = SegmentOf[Id](segments) + val segmentOf = SegmentNr.Of[Id](segments) val key = Key(id = id, topic = "topic") - segmentOf(key) shouldEqual segmentNr + segmentOf.metaJournal(key) shouldEqual segmentNr } } } diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentsTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentsTest.scala index f6e92f9e8..5746387c9 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentsTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/SegmentsTest.scala @@ -12,7 +12,7 @@ class SegmentsTest extends AnyFunSuite with Matchers { ) } { test(s"$segments.segmentNrs") { - segments.segmentNrs.map { _.value } shouldEqual expected + segments.metaJournalSegmentNrs.map { _.value } shouldEqual expected } } } diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/PartitionsToSegments.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/PartitionsToSegments.scala index 40ef4f561..452083b15 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/PartitionsToSegments.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/PartitionsToSegments.scala @@ -17,7 +17,7 @@ object PartitionsToSegments { def apply(partitions: Int, segments: Segments = Segments.default): PartitionsToSegments = { val segmentNrs = segments - .segmentNrs + .metaJournalSegmentNrs .toSortedSet val filter = { if (partitions >= segments.value) { (a: Partition, b: SegmentNr) => diff --git a/version.sbt b/version.sbt index 9afbb0e1e..7443b0369 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "4.0.3-SNAPSHOT" +ThisBuild / version := "4.1.0"