Skip to content

Commit

Permalink
Merge pull request #121 from ahjohannessen/wip-builders-and-options
Browse files Browse the repository at this point in the history
core: initial work of making user friendly api
  • Loading branch information
ahjohannessen authored Aug 27, 2020
2 parents a486ccd + d5bbde4 commit ed03055
Show file tree
Hide file tree
Showing 21 changed files with 478 additions and 266 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ lazy val core = project
IntegrationTest / parallelExecution := false,
scalapbCodeGeneratorOptions += CodeGeneratorOption.FlatPackage,
libraryDependencies ++=
compileM(cats, catsEffect, fs2, log4cats, scodecBits, circe, circeParser, scalaPb) ++ protobufM(scalaPb) ++
compileM(cats, catsEffect, fs2, log4cats, log4catsNoop, scodecBits, circe, circeParser, scalaPb) ++
protobufM(scalaPb) ++
testM(
specs2Cats,
catsEffectTesting,
Expand All @@ -33,7 +34,6 @@ lazy val core = project
circeGeneric,
grpcNetty,
log4catsSlf4j,
log4catsNoop,
logback
)
)
Expand Down
84 changes: 0 additions & 84 deletions core/src/main/scala/sec/EsClient.scala

This file was deleted.

16 changes: 12 additions & 4 deletions core/src/main/scala/sec/api/cluster/settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package cluster

import scala.concurrent.duration._

final private[sec] case class Settings(
final private[sec] case class ClusterSettings(
maxDiscoverAttempts: Option[Int],
retryDelay: FiniteDuration,
retryStrategy: RetryStrategy,
Expand All @@ -13,15 +13,23 @@ final private[sec] case class Settings(
preference: NodePreference
)

private[sec] object Settings {
private[sec] object ClusterSettings {

val default: Settings = Settings(
val default: ClusterSettings = ClusterSettings(
maxDiscoverAttempts = None,
retryDelay = 200.millis,
retryStrategy = RetryStrategy.Identity,
readTimeout = 2.seconds,
readTimeout = 5.seconds,
notificationInterval = 100.millis,
preference = NodePreference.Leader
)

implicit final class SettingsOps(val s: ClusterSettings) extends AnyVal {
def withMaxDiscoverAttempts(max: Option[Int]): ClusterSettings = s.copy(maxDiscoverAttempts = max)
def withRetryDelay(delay: FiniteDuration): ClusterSettings = s.copy(retryDelay = delay)
def withReadTimeout(timeout: FiniteDuration): ClusterSettings = s.copy(readTimeout = timeout)
def withNotificationInterval(interval: FiniteDuration): ClusterSettings = s.copy(notificationInterval = interval)
def withNodePreference(np: NodePreference): ClusterSettings = s.copy(preference = np)
}

}
8 changes: 4 additions & 4 deletions core/src/main/scala/sec/api/cluster/watch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[sec] object ClusterWatch {

def apply[F[_]: ConcurrentEffect: Timer, MCB <: ManagedChannelBuilder[MCB]](
builderFromTarget: String => F[MCB],
settings: Settings,
settings: ClusterSettings,
gossipFn: ManagedChannel => Gossip[F],
seed: NonEmptySet[Endpoint],
authority: String,
Expand Down Expand Up @@ -54,7 +54,7 @@ private[sec] object ClusterWatch {

def create[F[_]: ConcurrentEffect: Timer](
readFn: F[ClusterInfo],
settings: Settings,
settings: ClusterSettings,
store: Cache[F],
log: Logger[F]
): Resource[F, ClusterWatch[F]] = {
Expand All @@ -74,7 +74,7 @@ private[sec] object ClusterWatch {

def mkFetcher[F[_]: ConcurrentEffect: Timer](
readFn: F[ClusterInfo],
settings: Settings,
settings: ClusterSettings,
setInfo: ClusterInfo => F[Unit],
log: Logger[F]
): Stream[F, Unit] = {
Expand All @@ -84,7 +84,7 @@ private[sec] object ClusterWatch {
val nextDelay = retryStrategy.nextDelay _
val maxAttempts = maxDiscoverAttempts.getOrElse(Int.MaxValue)

val action = retryF(readFn, "gossip", delay, nextDelay, maxAttempts, readTimeout.some, log) {
val action = retry(readFn, "gossip", delay, nextDelay, maxAttempts, readTimeout.some, log) {
case _: TimeoutException | _: ServerUnavailable | _: NotLeader => true
case _ => false
}
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/sec/api/gossip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit.SECONDS
import cats._
import cats.implicits._
import cats.effect.Sync
import cats.effect.{Concurrent, Timer}
import com.eventstore.client.gossip.{GossipFs2Grpc, ClusterInfo => PClusterInfo}
import com.eventstore.client.Empty
import mapping.gossip.mkClusterInfo
Expand All @@ -17,14 +17,16 @@ trait Gossip[F[_]] {

object Gossip {

private[sec] def apply[F[_]: Sync](
client: GossipFs2Grpc[F, Context],
mkCtx: Option[UserCredentials] => Context
private[sec] def apply[F[_]: Concurrent: Timer, C](
client: GossipFs2Grpc[F, C],
mkCtx: Option[UserCredentials] => C,
opts: Opts[F]
): Gossip[F] = new Gossip[F] {
def read(creds: Option[UserCredentials]): F[ClusterInfo] = read0(client.read(Empty(), mkCtx(creds)))
def read(creds: Option[UserCredentials]): F[ClusterInfo] = read0(opts)(client.read(Empty(), mkCtx(creds)))
}

private[sec] def read0[F[_]: ErrorM](f: F[PClusterInfo]): F[ClusterInfo] = f >>= mkClusterInfo[F]
private[sec] def read0[F[_]: Concurrent: Timer](o: Opts[F])(f: F[PClusterInfo]): F[ClusterInfo] =
o.run(f, "read") >>= mkClusterInfo[F]

//======================================================================================================================

Expand Down
21 changes: 15 additions & 6 deletions core/src/main/scala/sec/api/grpc/convert.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package grpc

import cats.implicits._
import io.grpc.{Metadata, Status, StatusRuntimeException}
import java.nio.channels.ClosedChannelException
import grpc.constants.{Exceptions => ce}
import sec.core._

Expand Down Expand Up @@ -33,7 +34,6 @@ private[sec] object convert {

val unknown = "<unknown>"
val md = ex.getTrailers
val status = ex.getStatus
val exception = md.getOpt(keys.exception)
def streamName = md.getOpt(keys.streamName).getOrElse(unknown)
def groupName = md.getOpt(keys.groupName).getOrElse(unknown)
Expand All @@ -59,12 +59,21 @@ private[sec] object convert {
case unknown => UnknownError(s"Exception key: $unknown")
}

def serverUnavailable: Option[EsException] =
Option.when(status.getCode == Status.Code.UNAVAILABLE)(
ServerUnavailable(Option(ex.getMessage).getOrElse("Server unavailable"))
)
reified orElse serverUnavailable(ex)
}

private val serverUnavailable: StatusRuntimeException => Option[ServerUnavailable] = { ex =>

val status = ex.getStatus
val code = status.getCode
val cause = status.getCause

val unavailable = code == Status.Code.UNAVAILABLE
def channelClosed = code == Status.Code.UNKNOWN && cause.isInstanceOf[ClosedChannelException]

val msg = if (unavailable) Option(ex.getMessage).getOrElse("Server unavailable") else "Channel closed."

reified orElse serverUnavailable
Option.when(unavailable || channelClosed)(ServerUnavailable(msg))
}

//======================================================================================================================
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,43 @@
package sec
package api

import scala.concurrent.duration._
import cats.implicits._
import api.cluster.NodePreference
import api.UserCredentials
import cats.effect.{Concurrent, Timer}
import io.chrisdavenport.log4cats.Logger

//======================================================================================================================

// TODO: Redo this wrt. cluster / operation / log / ... settings

final private[sec] case class Options(
connectionName: String,
nodePreference: NodePreference,
defaultCreds: Option[UserCredentials],
operationOptions: OperationOptions
final private[sec] case class Opts[F[_]](
oo: OperationOptions,
retryOn: Throwable => Boolean,
log: Logger[F]
)

private[sec] object Options {
val default: Options = Options(
connectionName = "sec",
nodePreference = NodePreference.Leader,
defaultCreds = UserCredentials.unsafe("admin", "changeit").some,
operationOptions = OperationOptions.default
)
private[sec] object Opts {

implicit final class OptsOps[F[_]](val opts: Opts[F]) extends AnyVal {
import opts._
import opts.oo._

def run[A](fa: F[A], opName: String)(implicit F: Concurrent[F], T: Timer[F]): F[A] = retryEnabled.fold(
retry[F, A](fa, opName, retryDelay, retryStrategy.nextDelay, retryMaxAttempts, None, log)(retryOn),
fa
)
}
}

//======================================================================================================================

final private[sec] case class OperationOptions(
retryEnabled: Boolean,
retryDelay: FiniteDuration,
retryStrategy: RetryStrategy,
retryMaxAttempts: Int
)

private[sec] object OperationOptions {

val default: OperationOptions = OperationOptions(
retryEnabled = true,
retryDelay = 200.millis,
retryStrategy = RetryStrategy.Identity,
retryMaxAttempts = 100
Expand All @@ -55,3 +59,5 @@ private[sec] object RetryStrategy {
}
}
}

//======================================================================================================================
24 changes: 6 additions & 18 deletions core/src/main/scala/sec/api/streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import cats.effect._
import cats.effect.concurrent.Ref
import fs2.{Pipe, Pull, Stream}
import com.eventstore.client.streams._
import io.chrisdavenport.log4cats.Logger
import sec.core._
import sec.syntax.StreamsSyntax
import mapping.shared._
Expand Down Expand Up @@ -89,15 +88,9 @@ object Streams {

//======================================================================================================================

final private[sec] case class Opts[F[_]](
oo: OperationOptions,
retryOn: Throwable => Boolean,
log: Logger[F]
)

private[sec] def apply[F[_]: Concurrent: Timer](
client: StreamsFs2Grpc[F, Context],
mkCtx: Option[UserCredentials] => Context,
private[sec] def apply[F[_]: Concurrent: Timer, C](
client: StreamsFs2Grpc[F, C],
mkCtx: Option[UserCredentials] => C,
opts: Opts[F]
): Streams[F] = new Streams[F] {

Expand Down Expand Up @@ -257,22 +250,22 @@ object Streams {
val events: List[AppendReq] = mkAppendProposalsReq(eventsNel).toList
val operation: F[AppendResp] = f(Stream.emit(header) ++ Stream.emits(events))

withRetryF(operation, opts, "appendToStream") >>= { ar => mkWriteResult[F](streamId, ar) }
opts.run(operation, "appendToStream") >>= { ar => mkWriteResult[F](streamId, ar) }
}

private[sec] def softDelete0[F[_]: Concurrent: Timer](
streamId: StreamId,
expectedRevision: StreamRevision,
opts: Opts[F]
)(f: DeleteReq => F[DeleteResp]): F[DeleteResult] =
withRetryF(f(mkSoftDeleteReq(streamId, expectedRevision)), opts, "softDelete") >>= mkDeleteResult[F]
opts.run(f(mkSoftDeleteReq(streamId, expectedRevision)), "softDelete") >>= mkDeleteResult[F]

private[sec] def hardDelete0[F[_]: Concurrent: Timer](
streamId: StreamId,
expectedRevision: StreamRevision,
opts: Opts[F]
)(f: TombstoneReq => F[TombstoneResp]): F[DeleteResult] =
withRetryF(f(mkHardDeleteReq(streamId, expectedRevision)), opts, "hardDelete") >>= mkDeleteResult[F]
opts.run(f(mkHardDeleteReq(streamId, expectedRevision)), "hardDelete") >>= mkDeleteResult[F]

//======================================================================================================================

Expand Down Expand Up @@ -355,9 +348,4 @@ object Streams {
run(from, 0, oo.retryDelay)
}

private[sec] def withRetryF[F[_]: Concurrent: Timer, A](fa: F[A], opts: Opts[F], opName: String): F[A] = {
import opts._
retryF[F, A](fa, opName, oo.retryDelay, oo.retryStrategy.nextDelay, oo.retryMaxAttempts, None, log)(retryOn)
}

}
Loading

0 comments on commit ed03055

Please sign in to comment.