Skip to content

Commit

Permalink
Add docs and non-functional refactoring of Segment & co. (#684)
Browse files Browse the repository at this point in the history
Co-authored-by: Denys Fakhritdinov <[email protected]>
Co-authored-by: Mareks Rampāns <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 24bd19c commit a1d5805
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 109 deletions.
16 changes: 9 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ lazy val commonSettings = Seq(

import com.typesafe.tools.mima.core.*
ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("com.evolutiongaming.kafka.journal.Journal$DataIntegrityConfig"),
ProblemFilters.exclude[MissingClassProblem]("com.evolutiongaming.kafka.journal.Journal$DataIntegrityConfig$*"),
ProblemFilters.exclude[IncompatibleSignatureProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalHead.*"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalHead.*"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.replicator.TopicReplicatorMetrics.*"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.kafka.journal.replicator.TopicReplicatorMetrics.*"),
ProblemFilters.exclude[IncompatibleSignatureProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.JournalStatements#*"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#MetaJournalStatements.of",
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#MetaJournalStatements.fromMetaJournal",
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.of",
),
)

val alias: Seq[sbt.Def.Setting[?]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object EventualCassandra {
for {
log <- LogOf[F].apply(EventualCassandra.getClass)
schema <- SetupSchema[F](schemaConfig, origin, consistencyConfig)
segmentNrsOf = SegmentNrsOf[F](first = Segments.default, second = Segments.old)
segmentNrsOf = SegmentNrs.Of[F](first = Segments.default, second = Segments.old)
statements <- Statements.of(schema, segmentNrsOf, Segments.default, consistencyConfig.read)
_ <- log.info(s"kafka-journal version: ${Version.current.value}")
} yield {
Expand Down Expand Up @@ -115,7 +115,7 @@ object EventualCassandra {
}

val records =
read(from, Segment(from, head.segmentSize))
read(from, Segment.journal(from, head.segmentSize))
.chain {
case (record, segment) =>
for {
Expand Down Expand Up @@ -220,19 +220,19 @@ object EventualCassandra {
}
}

final case class Statements[F[_]](
private[journal] final case class Statements[F[_]](
records: JournalStatements.SelectRecords[F],
metaJournal: MetaJournalStatements[F],
selectOffset2: Pointer2Statements.SelectOffset[F],
)

object Statements {
private[journal] object Statements {

def apply[F[_]](implicit F: Statements[F]): Statements[F] = F

def of[F[_]: Concurrent: CassandraSession: ToTry: JsonCodec.Decode](
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segmentNrsOf: SegmentNrs.Of[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read,
): F[Statements[F]] = {
Expand All @@ -246,7 +246,7 @@ object EventualCassandra {
}
}

trait MetaJournalStatements[F[_]] {
private[journal] trait MetaJournalStatements[F[_]] {

def journalHead(key: Key): F[Option[JournalHead]]

Expand All @@ -255,11 +255,11 @@ object EventualCassandra {
def ids(topic: Topic): Stream[F, String]
}

object MetaJournalStatements {
private[journal] object MetaJournalStatements {

def of[F[_]: Concurrent: CassandraSession](
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segmentNrsOf: SegmentNrs.Of[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read,
): F[MetaJournalStatements[F]] = {
Expand All @@ -268,7 +268,7 @@ object EventualCassandra {

def of[F[_]: Concurrent: CassandraSession](
metaJournal: TableName,
segmentNrsOf: SegmentNrsOf[F],
segmentNrsOf: SegmentNrs.Of[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read,
): F[MetaJournalStatements[F]] = {
Expand All @@ -282,7 +282,7 @@ object EventualCassandra {
}

def fromMetaJournal[F[_]: Concurrent](
segmentNrsOf: SegmentNrsOf[F],
segmentNrsOf: SegmentNrs.Of[F],
journalHead: cassandra.MetaJournalStatements.SelectJournalHead[F],
journalPointer: cassandra.MetaJournalStatements.SelectJournalPointer[F],
ids: cassandra.MetaJournalStatements.SelectIds[F],
Expand All @@ -295,7 +295,7 @@ object EventualCassandra {

def firstOrSecond[A](key: Key)(f: SegmentNr => F[Option[A]]): F[Option[A]] = {
for {
segmentNrs <- segmentNrsOf(key)
segmentNrs <- segmentNrsOf.metaJournal(key)
first = f(segmentNrs.first)
result <- segmentNrs
.second
Expand All @@ -319,7 +319,7 @@ object EventualCassandra {

def ids(topic: Topic) = {
for {
segmentNr <- segments.segmentNrs.toStream1[F]
segmentNr <- segments.metaJournalSegmentNrs.toStream1[F]
id <- ids1(topic, segmentNr)
} yield id
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[journal] object ReplicatedCassandra {
expiryService <- ExpiryService.of[F]
_ <- log.info(s"kafka-journal version: ${Version.current.value}")
} yield {
val segmentOf = SegmentNrsOf[F](first = Segments.default, second = Segments.old)
val segmentOf = SegmentNrs.Of[F](first = Segments.default, second = Segments.old)
val journal = apply[F](config.segmentSize, segmentOf, statements, expiryService).withLog(log)
metrics
.fold(journal) { metrics => journal.withMetrics(metrics) }
Expand All @@ -53,7 +53,7 @@ private[journal] object ReplicatedCassandra {

def apply[F[_]: Sync: Parallel: Fail: UUIDGen](
segmentSizeDefault: SegmentSize,
segmentNrsOf: SegmentNrsOf[F],
segmentNrsOf: SegmentNrs.Of[F],
statements: Statements[F],
expiryService: ExpiryService[F],
): ReplicatedJournal[F] = {
Expand Down Expand Up @@ -109,7 +109,7 @@ private[journal] object ReplicatedCassandra {
}

for {
segmentNrs <- segmentNrsOf(key)
segmentNrs <- segmentNrsOf.metaJournal(key)
result <- head(segmentNrs.first).orElse {
segmentNrs
.second
Expand Down Expand Up @@ -166,7 +166,8 @@ private[journal] object ReplicatedCassandra {
case None => loop(tail, (segment, head :: batch).some, result)
case Some(next) => loop(tail, (next, Nel.of(head)).some, insert(segment, batch))
}
case None => loop(tail, (Segment(seqNr, journalHead.segmentSize), Nel.of(head)).some, result)
case None =>
loop(tail, (Segment.journal(seqNr, journalHead.segmentSize), Nel.of(head)).some, result)
}

case Nil => s.fold(result) { case (segment, batch) => insert(segment, batch) }
Expand Down Expand Up @@ -333,8 +334,8 @@ private[journal] object ReplicatedCassandra {
val segmentSize = journalHead.segmentSize
for {
segmentNrs <- from
.toSegmentNr(segmentSize)
.to[F] { to.toSegmentNr(segmentSize) }
.toJournalSegmentNr(segmentSize)
.to[F] { to.toJournalSegmentNr(segmentSize) }
result <- {
if (to >= seqNr) {
segmentNrs.parFoldMapA { segmentNr =>
Expand Down Expand Up @@ -441,12 +442,12 @@ private[journal] object ReplicatedCassandra {
segmentNrs <- from
.prev[Option]
.getOrElse { from }
.toSegmentNr(segmentSize)
.toJournalSegmentNr(segmentSize)
.to[F] {
to
.next[Option]
.getOrElse { journalHead.seqNr }
.toSegmentNr(segmentSize)
.toJournalSegmentNr(segmentSize)
}
_ <- segmentNrs.parFoldMapA { segmentNr =>
statements
Expand Down Expand Up @@ -474,13 +475,13 @@ private[journal] object ReplicatedCassandra {
}
}

trait MetaJournalStatements[F[_]] {
private[journal] trait MetaJournalStatements[F[_]] {
import MetaJournalStatements.*

def apply(key: Key, segment: SegmentNr): ByKey[F]
}

object MetaJournalStatements {
private[journal] object MetaJournalStatements {

def of[F[_]: Monad: CassandraSession](
schema: Schema,
Expand Down Expand Up @@ -612,7 +613,7 @@ private[journal] object ReplicatedCassandra {
}
}

final case class Statements[F[_]](
private[journal] final case class Statements[F[_]](
insertRecords: JournalStatements.InsertRecords[F],
deleteRecordsTo: JournalStatements.DeleteTo[F],
deleteRecords: JournalStatements.Delete[F],
Expand All @@ -626,7 +627,7 @@ private[journal] object ReplicatedCassandra {
selectTopics2: Pointer2Statements.SelectTopics[F],
)

object Statements {
private[journal] object Statements {

def apply[F[_]](implicit F: Statements[F]): Statements[F] = F

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,41 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra
import cats.syntax.all.*
import com.evolutiongaming.kafka.journal.SeqNr

/**
* Represent a segment in the `journal` table.
*
* @param nr segment number
* @param size maximum segment number
*/
private[journal] final case class Segment(nr: SegmentNr, size: SegmentSize)

private[journal] object Segment {

def apply(seqNr: SeqNr, size: SegmentSize): Segment = {
val segmentNr = SegmentNr(seqNr, size)
apply(segmentNr, size)
@deprecated("use `Segment.journal` instead", "4.1.0")
def apply(seqNr: SeqNr, size: SegmentSize): Segment = journal(seqNr, size)

/**
* Calculate segment number for `seqNr` based on `segmentSize` for `journal` table.
*
* @see based on [[SegmentNr#journal]]
*/
def journal(seqNr: SeqNr, size: SegmentSize): Segment = {
val segmentNr = SegmentNr.journal(seqNr, size)
Segment(segmentNr, size)
}

implicit class SegmentOps(val self: Segment) extends AnyVal {

/**
* Get segment for `seqNr` if its different from current segment `self`.
*
* @param seqNr sequence number
* @return [[Some]] segment if `seqNr` is in different segment, [[None]] otherwise
*/
def next(seqNr: SeqNr): Option[Segment] = {
val segmentNr = SegmentNr(seqNr, self.size)
val segmentNr = SegmentNr.journal(seqNr, self.size)
if (segmentNr === self.nr) none
else self.copy(segmentNr).some
else Segment(segmentNr, self.size).some
}
}
}
Loading

0 comments on commit a1d5805

Please sign in to comment.