Skip to content

Commit

Permalink
Merge pull request #117 from ahjohannessen/wip-misc-improvements
Browse files Browse the repository at this point in the history
misc: clean ups + improvements
  • Loading branch information
ahjohannessen authored Aug 24, 2020
2 parents 08a0dd9 + c2ac18a commit 1a71f33
Show file tree
Hide file tree
Showing 41 changed files with 919 additions and 348 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ jobs:
EVENTSTORE_GOSSIP_ON_SINGLE_NODE: True
EVENTSTORE_START_STANDARD_PROJECTIONS: True
ports:
- 1113:1113
- 2113:2113

steps:
Expand Down
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@ tmp
# Metals
.metals/*
.bloop/*
.vscode/*
project/.bloop/*
project/metals.sbt
project/metals.sbt

# Generated Certificates
certs/
5 changes: 2 additions & 3 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ rewrite {
docstrings = JavaDoc

project {

git = true

includeFilters = [
".*\\.scala$"
]

excludeFilters = [
".*\\.sbt$"
]

}
62 changes: 49 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import Dependencies._
lazy val root = project
.in(file("."))
.settings(skip in publish := true)
.aggregate(core)
.aggregate(core, netty, demo)

Global / onLoad ~= (_ andThen ("project core" :: _))

lazy val IntegrationTest = config("it") extend Test

Expand All @@ -13,39 +15,72 @@ lazy val core = project
.settings(inConfig(IntegrationTest)(Defaults.testTasks): _*)
.settings(commonSettings)
.settings(
name := "sec",
Test / testOptions := Seq(Tests.Filter(_ endsWith "Spec")),
name := "sec-core",
Test / testOptions := Seq(Tests.Filter(_ endsWith "Spec")),
IntegrationTest / testOptions := Seq(Tests.Filter(_ endsWith "ITest")),
IntegrationTest / parallelExecution := false,
scalapbCodeGeneratorOptions += CodeGeneratorOption.FlatPackage,
libraryDependencies ++=
compileM(cats, catsEffect, fs2, scodecBits, circe, circeParser, scalaPb) ++
protobufM(scalaPb) ++
testM(specs2Cats, catsEffectTesting, catsEffectLaws, specs2ScalaCheck, circeGeneric, grpcNetty, tcnative)
compileM(cats, catsEffect, fs2, log4cats, scodecBits, circe, circeParser, scalaPb) ++ protobufM(scalaPb) ++
testM(
specs2Cats,
catsEffectTesting,
catsEffectLaws,
specs2ScalaCheck,
circeGeneric,
grpcNetty,
log4catsSlf4j,
log4catsNoop,
logback
)
)

lazy val netty = project
.in(file("netty"))
.dependsOn(core)
.settings(
name := "sec",
libraryDependencies ++= compileM(grpcNetty)
)

lazy val demo = project
.dependsOn(netty)
.settings(
name := "demo",
libraryDependencies ++= compileM(log4catsSlf4j, logback),
skip in publish := true
)

// General Settings

lazy val commonSettings = Seq(
addCompilerPlugin(kindProjector),
Compile / scalacOptions ~= devScalacOptions,
Test / scalacOptions ~= devScalacOptions,
Compile / scalacOptions ~= devScalacOptions,
Test / scalacOptions ~= devScalacOptions,
IntegrationTest / scalacOptions ~= devScalacOptions,
libraryDependencies ++= testM(catsLaws, disciplineSpecs2, specs2, specs2ScalaCheck)
)

lazy val metalsEnabled =
scala.util.Properties.envOrElse("METALS_ENABLED", "false").toBoolean
lazy val metalsEnabled =
scala.util.Properties.envOrElse("METALS_ENABLED", "false").toBoolean

val devScalacOptions = { options: Seq[String] =>
if (metalsEnabled) options.filterNot(Set("-Wunused:locals", "-Wunused:params", "-Wunused:imports", "-Wunused:privates")) else options
if (metalsEnabled)
options.filterNot(Set("-Wunused:locals", "-Wunused:params", "-Wunused:imports", "-Wunused:privates"))
else options
}

inThisBuild(
List(
scalaVersion := "2.13.2",
organization := "io.github.ahjohannessen",
developers := List(
Developer("ahjohannessen", "Alex Henning Johannessen", "[email protected]", url("https://github.com/ahjohannessen"))
Developer(
"ahjohannessen",
"Alex Henning Johannessen",
"[email protected]",
url("https://github.com/ahjohannessen")
)
),
homepage := Some(url("https://github.com/ahjohannessen/sec")),
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand All @@ -58,6 +93,7 @@ inThisBuild(
(baseDirectory in LocalRootProject).value.getAbsolutePath,
"-doc-source-url",
"https://github.com/ahjohannessen/sec/blob/v" + version.value + "€{FILE_PATH}.scala"
)
),
shellPrompt := Prompt.enrichedShellPrompt
)
)
65 changes: 48 additions & 17 deletions core/src/main/scala/sec/EsClient.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package sec

import cats.Endo
import cats.effect.{ConcurrentEffect, Resource, Timer}
import fs2.Stream
import org.lyranthe.fs2_grpc.java_runtime.implicits._
import com.eventstore.client.streams.StreamsFs2Grpc
import com.eventstore.client.gossip.GossipFs2Grpc
import io.grpc.{ManagedChannel, ManagedChannelBuilder}
import fs2.Stream
import io.grpc.{CallOptions, ManagedChannel, ManagedChannelBuilder}
import io.chrisdavenport.log4cats.Logger
import sec.core._
import sec.api._
import sec.api.grpc.implicits._
import sec.api.grpc.metadata._
import sec.api.grpc.convert.convertToEs

trait EsClient[F[_]] {
Expand All @@ -19,35 +22,63 @@ object EsClient {

private[sec] def stream[F[_]: ConcurrentEffect: Timer, MCB <: ManagedChannelBuilder[MCB]](
builder: MCB,
options: Options
options: Options,
logger: Logger[F]
): Stream[F, EsClient[F]] =
Stream.resource(resource[F, MCB](builder, options))
Stream.resource(resource[F, MCB](builder, options, logger))

private[sec] def resource[F[_]: ConcurrentEffect: Timer, MCB <: ManagedChannelBuilder[MCB]](
builder: MCB,
options: Options
options: Options,
logger: Logger[F]
): Resource[F, EsClient[F]] =
builder.resource[F].map(apply[F](_, options))
builder.resource[F].map(apply[F](_, options, logger))

private[sec] def apply[F[_]: ConcurrentEffect: Timer](
mc: ManagedChannel,
options: Options
): EsClient[F] =
new Impl[F](mc, options)

final private class Impl[F[_]: ConcurrentEffect: Timer](mc: ManagedChannel, o: Options) extends EsClient[F] {
options: Options,
l: Logger[F]
): EsClient[F] = new EsClient[F] {

val streams: Streams[F] = Streams(
StreamsFs2Grpc.client[F, Context](mc, _.toMetadata, identity, convertToEs),
o
mkStreamsClient[F](mc),
mkContext(options, options.nodePreference.isLeader),
mkStreamsOpts[F](options.operationOptions, l)
)

val gossip: Gossip[F] = Gossip(
GossipFs2Grpc.client[F, Context](mc, _.toMetadata, identity, convertToEs),
o.defaultCreds,
o.connectionName
mkGossipClient[F](mc),
mkContext(options, options.nodePreference.isLeader)
)
}

///

private[sec] def mkContext(o: Options, requiresLeader: Boolean): Option[UserCredentials] => Context =
uc => Context(o.connectionName, uc.orElse(o.defaultCreds), requiresLeader)

/// Streams

private[sec] val streamsRetryOn: Throwable => Boolean = {
case _: ServerUnavailable | _: NotLeader => true
case _ => false
}

private[sec] def mkStreamsClient[F[_]: ConcurrentEffect](
mc: ManagedChannel,
fn: Endo[CallOptions] = identity
): StreamsFs2Grpc[F, Context] =
StreamsFs2Grpc.client[F, Context](mc, _.toMetadata, fn, convertToEs)

private[sec] def mkStreamsOpts[F[_]](oo: OperationOptions, log: Logger[F]): Streams.Opts[F] =
Streams.Opts[F](oo, streamsRetryOn, log.withModifiedString(s => s"Streams: $s"))

/// Gossip

private[sec] def mkGossipClient[F[_]: ConcurrentEffect](
mc: ManagedChannel,
fn: Endo[CallOptions] = identity
): GossipFs2Grpc[F, Context] =
GossipFs2Grpc.client[F, Context](mc, _.toMetadata, fn, convertToEs)

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package sec
package api
package cluster
package grpc

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package sec
package api
package cluster
package grpc

Expand All @@ -24,13 +25,10 @@ object Notifier {

def apply[F[_]: Concurrent](
seed: Nes[Endpoint],
updates: Stream[F, ClusterInfo]
): F[Notifier[F]] = {
for {
halt <- SignallingRef[F, Boolean](false)
endpoints <- Ref[F].of(seed)
} yield create(seed, defaultSelector, updates, endpoints, halt)
}
updates: Stream[F, ClusterInfo],
halt: SignallingRef[F, Boolean]
): F[Notifier[F]] =
Ref[F].of(seed).map(create(seed, defaultSelector, updates, _, halt))

def create[F[_]](
seed: Nes[Endpoint],
Expand All @@ -51,7 +49,6 @@ object Notifier {
val run = updates.evalMap(update).interruptWhen(halt)

bootstrap *> run.compile.drain.start.void

}

def defaultSelector(ci: ClusterInfo, seed: Nes[Endpoint]): Nes[Endpoint] =
Expand All @@ -69,8 +66,12 @@ object Notifier {

object bestNodes {

def apply[F[_]: Concurrent](np: NodePreference, updates: Stream[F, ClusterInfo]): F[Notifier[F]] =
SignallingRef[F, Boolean](false).map(create(np, defaultSelector[F], updates, _))
def apply[F[_]](
np: NodePreference,
updates: Stream[F, ClusterInfo],
halt: SignallingRef[F, Boolean]
)(implicit F: Concurrent[F]): F[Notifier[F]] =
F.delay(create(np, defaultSelector[F], updates, halt))

def create[F[_]](
np: NodePreference,
Expand All @@ -85,9 +86,7 @@ object Notifier {
case x :: xs => l.onResult(mkResult(x :: xs))
}

val run = updates
.evalMap(update)
.interruptWhen(halt)
val run = updates.evalMap(update).interruptWhen(halt)

run.compile.drain.start.void
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package sec
package api
package cluster
package grpc

Expand All @@ -9,6 +10,7 @@ import cats.effect.implicits._
import io.grpc.NameResolver
import io.grpc.NameResolver.Listener2
import fs2.Stream
import fs2.concurrent.SignallingRef
import sec.api.Gossip._

final private[sec] case class Resolver[F[_]: Effect](
Expand All @@ -26,15 +28,17 @@ private[sec] object Resolver {
def gossip[F[_]: ConcurrentEffect](
authority: String,
seed: NonEmptySet[Endpoint],
updates: Stream[F, ClusterInfo]
updates: Stream[F, ClusterInfo],
halt: SignallingRef[F, Boolean]
): F[Resolver[F]] =
Notifier.gossip[F](seed, updates).map(Resolver[F](authority, _))
Notifier.gossip[F](seed, updates, halt).map(Resolver[F](authority, _))

def bestNodes[F[_]: ConcurrentEffect](
authority: String,
np: NodePreference,
updates: Stream[F, ClusterInfo]
updates: Stream[F, ClusterInfo],
halt: SignallingRef[F, Boolean]
): F[Resolver[F]] =
Notifier.bestNodes[F](np, updates).map(Resolver[F](authority, _))
Notifier.bestNodes[F](np, updates, halt).map(Resolver[F](authority, _))

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package sec
package api
package cluster
package grpc

import java.net.URI
import io.grpc._
import io.grpc.NameResolver._
import cats.data.NonEmptySet
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2.Stream
import fs2.concurrent.SignallingRef
import io.chrisdavenport.log4cats.Logger
import sec.api.Gossip._

final private[sec] case class ResolverProvider[F[_]: Effect](
authority: String,
scheme: String,
resolver: F[Resolver[F]]
) extends NameResolverProvider {
Expand All @@ -29,18 +32,29 @@ object ResolverProvider {
final val gossipScheme: String = "eventstore-gossip"
final val clusterScheme: String = "eventstore-cluster"

private def mkHaltR[F[_]: Concurrent](log: Logger[F]): Resource[F, SignallingRef[F, Boolean]] =
Resource.make(SignallingRef[F, Boolean](false)) { sr =>
sr.set(true) *> log.debug("signalled notifier to halt.")
}

def gossip[F[_]: ConcurrentEffect](
authority: String,
seed: NonEmptySet[Endpoint],
updates: Stream[F, ClusterInfo]
): ResolverProvider[F] =
ResolverProvider(authority, gossipScheme, Resolver.gossip(authority, seed, updates))
updates: Stream[F, ClusterInfo],
log: Logger[F]
): Resource[F, ResolverProvider[F]] =
mkHaltR[F](log.withModifiedString(s => s"Gossip resolver: $s")).map { halt =>
ResolverProvider(gossipScheme, Resolver.gossip(authority, seed, updates, halt))
}

def bestNodes[F[_]: ConcurrentEffect](
authority: String,
np: NodePreference,
updates: Stream[F, ClusterInfo]
): ResolverProvider[F] =
ResolverProvider(authority, clusterScheme, Resolver.bestNodes(authority, np, updates))
updates: Stream[F, ClusterInfo],
log: Logger[F]
): Resource[F, ResolverProvider[F]] =
mkHaltR[F](log.withModifiedString(s => s"BestNodes resolver: $s")).map { halt =>
ResolverProvider(clusterScheme, Resolver.bestNodes(authority, np, updates, halt))
}

}
Loading

0 comments on commit 1a71f33

Please sign in to comment.