Skip to content

Commit

Permalink
Restore binary backwards compatibility for CassandraHealthCheck (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar authored Mar 12, 2024
1 parent 05b30e2 commit 8cd160c
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra
package com.evolutiongaming.kafka.journal.cassandra

import cats.Monad
import cats.effect._
Expand All @@ -7,16 +7,40 @@ import cats.syntax.all._
import com.evolutiongaming.catshelper.{Log, LogOf, Schedule}
import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession
import com.evolutiongaming.kafka.journal.util.CatsHelper._

import scala.concurrent.duration._

/** Performs a check if Cassandra is alive.
*
* The common implementation is to periodically do a simple query and check
* if it returns an error.
*/
trait CassandraHealthCheck[F[_]] {

/** @return `None` if Cassandra healthy, and `Some(error)` otherwise */
def error: F[Option[Throwable]]

}

object CassandraHealthCheck {

/** Checks if Cassandra is alive by requesting a current timestamp from Cassandra.
*
* I.e., it does the following query every second after initial ramp-up delay of 10 seconds.
* ```sql
* SELECT now() FROM system.local
* ```
*
* @param session
* Cassandra session factory to use to perform queries with.
* @param consistencyConfig
* Read consistency level to use for a query.
*
* @return
* Factory for `CassandraHealthCheck` instances.
*/
def of[F[_] : Temporal : LogOf](
session: Resource[F, CassandraSession[F]],
consistencyConfig: CassandraConsistencyConfig.Read
Expand All @@ -36,6 +60,21 @@ object CassandraHealthCheck {
} yield result
}

/** Checks if server is alive by doing a custom `F[Unit]` call.
*
* @param initial
* Initial ramp-up delay before health checks are started.
* @param interval
* How often the provided function should be called.
* @param statement
* The function to call to check if server is alive. The function is expected to throw an error if server is not
* healthy.
* @param log
* The log to write an error to, in addition to throwing an error in [[CassandraHealthCheck#error]] call.
*
* @return
* Factory for `CassandraHealthCheck` instances.
*/
def of[F[_] : Temporal](
initial: FiniteDuration,
interval: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.evolutiongaming.kafka.journal.cassandra

import cats.effect.IO
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.journal.IOSuite._
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.duration._
import scala.util.control.NoStackTrace

class CassandraHealthCheckSpec extends AsyncFunSuite {

test("CassandraHealthCheck#of(statement)") {

val expectedError = new RuntimeException with NoStackTrace

val healthCheck = CassandraHealthCheck.of[IO](
initial = 0.seconds,
interval = 1.second,
statement = expectedError.raiseError[IO, Unit].pure[IO].toResource,
log = Log.empty[IO]
)

val actualError = healthCheck.use(_.error.untilDefinedM)

val program = actualError.map { actualError =>
assert(actualError == expectedError)
}

program.run()
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.Monad
import cats.effect._
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.journal.cassandra.{CassandraHealthCheck => CassandraHealthCheck2}
import com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandraConfig.ConsistencyConfig

import scala.concurrent.duration._

@deprecated(since = "3.3.9", message = "Use a class from `com.evolutiongaming.kafka.journal.cassandra` (without `eventual` part) package instead")
trait CassandraHealthCheck[F[_]] {
def error: F[Option[Throwable]]
}

@deprecated(since = "3.3.9", message = "Use a class from `com.evolutiongaming.kafka.journal.cassandra` (without `eventual` part) package instead")
object CassandraHealthCheck {

private[cassandra] def apply[F[_]](cassandraHealthCheck2: CassandraHealthCheck2[F]): CassandraHealthCheck[F] =
new CassandraHealthCheck[F] {
def error: F[Option[Throwable]] = cassandraHealthCheck2.error
}

def of[F[_] : Temporal : LogOf](
session: Resource[F, CassandraSession[F]],
consistencyConfig: ConsistencyConfig.Read
): Resource[F, CassandraHealthCheck[F]] =
CassandraHealthCheck2
.of(session, consistencyConfig.toCassandraConsistencyConfig)
.map(CassandraHealthCheck(_))

def of[F[_] : Temporal](
initial: FiniteDuration,
interval: FiniteDuration,
statement: Resource[F, Statement[F]],
log: Log[F]
): Resource[F, CassandraHealthCheck[F]] =
CassandraHealthCheck2
.of(initial = initial, interval = interval, statement = statement, log = log)
.map(CassandraHealthCheck(_))


type Statement[F[_]] = CassandraHealthCheck2.Statement[F]

object Statement {

def of[F[_] : Monad : CassandraSession](consistency: ConsistencyConfig.Read): F[Statement[F]] =
CassandraHealthCheck2.Statement.of(consistency.toCassandraConsistencyConfig)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.effect.IO
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.journal.IOSuite._
import org.scalatest.funsuite.AsyncFunSuite

import scala.concurrent.duration._
import scala.util.control.NoStackTrace

class CassandraHealthCheckSpec extends AsyncFunSuite {

test("CassandraHealthCheck#of(statement)") {

val expectedError = new RuntimeException with NoStackTrace

val healthCheck = CassandraHealthCheck.of[IO](
initial = 0.seconds,
interval = 1.second,
statement = expectedError.raiseError[IO, Unit].pure[IO].toResource,
log = Log.empty[IO]
)

val actualError = healthCheck.use(_.error.untilDefinedM)

val program = actualError.map { actualError =>
assert(actualError == expectedError)
}

program.run()
}

}

0 comments on commit 8cd160c

Please sign in to comment.