Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore binary backwards compatibility of classes relying on consistency level #593

Merged
merged 3 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object EventualCassandra {
): Resource[F, EventualJournal[F]] = {

def journal(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = {
of(config.schema, origin, metrics, config.consistencyConfig.toCassandraConsistencyConfig)
of(config.schema, origin, metrics, config.consistencyConfig)
}

for {
Expand All @@ -75,7 +75,7 @@ object EventualCassandra {
schemaConfig: SchemaConfig,
origin: Option[Origin],
metrics: Option[EventualJournal.Metrics[F]],
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[EventualJournal[F]] = {

for {
Expand Down Expand Up @@ -193,7 +193,7 @@ object EventualCassandra {
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[Statements[F]] = {
for {
selectRecords <- JournalStatements.SelectRecords.of[F](schema.journal, consistencyConfig)
Expand Down Expand Up @@ -222,7 +222,7 @@ object EventualCassandra {
schema: Schema,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[MetaJournalStatements[F]] = {
of(schema.metaJournal, segmentNrsOf, segments, consistencyConfig)
}
Expand All @@ -231,7 +231,7 @@ object EventualCassandra {
metaJournal: TableName,
segmentNrsOf: SegmentNrsOf[F],
segments: Segments,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[MetaJournalStatements[F]] = {
for {
selectJournalHead <- cassandra.MetaJournalStatements.SelectJournalHead.of[F](metaJournal, consistencyConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Encode](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[InsertRecords[F]] = {

implicit val encodeTry: JsonCodec.Encode[Try] = JsonCodec.Encode.summon[F].mapK(ToTry.functionK)
Expand Down Expand Up @@ -148,7 +148,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession : ToTry : JsonCodec.Decode](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read): F[SelectRecords[F]] = {
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectRecords[F]] = {

implicit val encodeTry: JsonCodec.Decode[Try] = JsonCodec.Decode.summon[F].mapK(ToTry.functionK)
implicit val decodeByNameByteVector: DecodeByName[ByteVector] = DecodeByName[Array[Byte]]
Expand Down Expand Up @@ -242,7 +242,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[DeleteTo[F]] = {

val query =
Expand Down Expand Up @@ -280,7 +280,7 @@ object JournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Delete[F]] = {

val query =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -122,7 +122,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectJournalHead[F]] = {

val query =
Expand Down Expand Up @@ -164,7 +164,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectJournalPointer[F]] = {

val query =
Expand Down Expand Up @@ -207,7 +207,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndExpireOn[F]] = {

val query =
Expand Down Expand Up @@ -244,7 +244,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndCreated[F]] = {

val query =
Expand Down Expand Up @@ -281,7 +281,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[IdByTopicAndSegment[F]] = {

val query =
Expand Down Expand Up @@ -321,7 +321,7 @@ object MetaJournalStatements {

object Update {

def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write
def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Update[F]] = {

val query =
Expand Down Expand Up @@ -367,7 +367,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateSeqNr[F]] = {

val query =
Expand Down Expand Up @@ -414,7 +414,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateExpiry[F]] = {

val query =
Expand Down Expand Up @@ -455,7 +455,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdateDeleteTo[F]] = {

val query =
Expand Down Expand Up @@ -494,7 +494,7 @@ object MetaJournalStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[UpdatePartitionOffset[F]] = {
s"""
|UPDATE ${ name.toCql }
Expand Down Expand Up @@ -528,7 +528,7 @@ object MetaJournalStatements {

object Delete {

def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Delete[F]] = {
def of[F[_] : Monad : CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Delete[F]] = {

val query =
s"""
Expand Down Expand Up @@ -562,7 +562,7 @@ object MetaJournalStatements {

def of[F[_] : Monad : CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[DeleteExpiry[F]] = {

val query =
Expand Down Expand Up @@ -598,7 +598,7 @@ object MetaJournalStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectIds[F]] = {
for {
prepared <- s"SELECT id FROM ${ name.toCql } WHERE topic = ? AND segment = ?".prepare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Pointer2Statements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectTopics[F]] = {

val query = s"""SELECT DISTINCT topic, partition FROM ${ name.toCql }""".stripMargin
Expand Down Expand Up @@ -76,7 +76,7 @@ object Pointer2Statements {
}
}

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[Select[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[Select[F]] = {
s"""
|SELECT created FROM ${ name.toCql }
|WHERE topic = ?
Expand Down Expand Up @@ -105,7 +105,7 @@ object Pointer2Statements {

object SelectOffset {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[SelectOffset[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectOffset[F]] = {

val query =
s"""
Expand Down Expand Up @@ -138,7 +138,7 @@ object Pointer2Statements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -173,7 +173,7 @@ object Pointer2Statements {

object Update {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Update[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Update[F]] = {

val query =
s"""
Expand Down Expand Up @@ -207,7 +207,7 @@ object Pointer2Statements {

object UpdateCreated {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[UpdateCreated[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[UpdateCreated[F]] = {
s"""
|UPDATE ${ name.toCql }
|SET offset = ?, created = ?, updated = ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object PointerStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Write
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write
): F[Insert[F]] = {

val query =
Expand Down Expand Up @@ -73,7 +73,7 @@ object PointerStatements {

object Update {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Write): F[Update[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write): F[Update[F]] = {

val query =
s"""
Expand Down Expand Up @@ -118,7 +118,7 @@ object PointerStatements {
}
}

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[Select[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[Select[F]] = {
s"""
|SELECT created FROM ${ name.toCql }
|WHERE topic = ?
Expand Down Expand Up @@ -147,7 +147,7 @@ object PointerStatements {

object SelectOffset {

def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: CassandraConsistencyConfig.Read): F[SelectOffset[F]] = {
def of[F[_]: Monad: CassandraSession](name: TableName, consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read): F[SelectOffset[F]] = {

val query =
s"""
Expand Down Expand Up @@ -180,7 +180,7 @@ object PointerStatements {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): F[SelectTopics[F]] = {

val query = s"""SELECT DISTINCT topic FROM ${ name.toCql }""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object ReplicatedCassandra {
): F[ReplicatedJournal[F]] = {

for {
schema <- SetupSchema[F](config.schema, origin, config.consistencyConfig.toCassandraConsistencyConfig)
statements <- Statements.of[F](schema, config.consistencyConfig.toCassandraConsistencyConfig)
schema <- SetupSchema[F](config.schema, origin, config.consistencyConfig)
statements <- Statements.of[F](schema, config.consistencyConfig)
log <- LogOf[F].apply(ReplicatedCassandra.getClass)
expiryService <- ExpiryService.of[F]
} yield {
Expand Down Expand Up @@ -540,15 +540,15 @@ object ReplicatedCassandra {

def of[F[_]: Monad: CassandraSession](
schema: Schema,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[MetaJournalStatements[F]] = {
of[F](schema.metaJournal, consistencyConfig)
}


def of[F[_]: Monad: CassandraSession](
metaJournal: TableName,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[MetaJournalStatements[F]] = {

for {
Expand Down Expand Up @@ -694,7 +694,7 @@ object ReplicatedCassandra {

def of[F[_]: Monad: CassandraSession: ToTry: JsonCodec.Encode](
schema: Schema,
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[Statements[F]] = {
for {
insertRecords <- JournalStatements.InsertRecords.of[F](schema.journal, consistencyConfig.write)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object SetupSchema {
def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
config: SchemaConfig,
origin: Option[Origin],
consistencyConfig: CassandraConsistencyConfig
consistencyConfig: EventualCassandraConfig.ConsistencyConfig
): F[Schema] = {

def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSchema(config)
Expand All @@ -53,7 +53,7 @@ object SetupSchema {
cassandraSync <- CassandraSync.of[F](config.keyspace, config.locksTable, origin)
ab <- createSchema(cassandraSync)
(schema, fresh) = ab
settings <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig)
settings <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig.toCassandraConsistencyConfig)
_ <- migrate(schema, fresh, settings, cassandraSync)
} yield schema
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.evolutiongaming.kafka.journal.replicator

import cats.data.{NonEmptySet => Nes}
import cats.effect.{Clock, Resource}
import cats.effect.syntax.resource._
import cats.effect.{Clock, Resource}
import cats.syntax.all._
import cats.{Applicative, Monad, ~>}
import com.evolutiongaming.catshelper.DataHelper._
import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, Log, MeasureDuration, MonadThrowable}
import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraConsistencyConfig
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession, ExpireOn, MetaJournalStatements, SegmentNr}
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession, EventualCassandraConfig, ExpireOn, MetaJournalStatements, SegmentNr}
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.StreamHelper._
import com.evolutiongaming.scassandra.TableName
Expand All @@ -33,7 +32,7 @@ object PurgeExpired {
producerConfig: ProducerConfig,
tableName: TableName,
metrics: Option[Metrics[F]],
consistencyConfig: CassandraConsistencyConfig.Read
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read
): Resource[F, PurgeExpired[F]] = {

implicit val fromAttempt = FromAttempt.lift[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SettingsIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers
cassandraSession: CassandraSession[F]) = {

for {
schema <- SetupSchema[F](config, origin, CassandraConsistencyConfig.default)
schema <- SetupSchema[F](config, origin, EventualCassandraConfig.ConsistencyConfig.default)
settings <- SettingsCassandra.of[F](schema.setting, origin, CassandraConsistencyConfig.default)
} yield settings
}
Expand Down
Loading