Skip to content

Commit

Permalink
Merge pull request #203 from ahjohannessen/wip-make-optional-user-cre…
Browse files Browse the repository at this point in the history
…ds-a-separate-operation

api: make creds a method + meta on client
  • Loading branch information
ahjohannessen authored Oct 23, 2020
2 parents 7cce567 + f747be0 commit 5d717ed
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 534 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v5
uses: olafurpg/setup-scala@v10
with:
java-version: ${{ matrix.java }}

Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v5
uses: olafurpg/setup-scala@v10
with:
java-version: ${{ matrix.java }}

Expand Down
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ inThisBuild(
githubWorkflowJavaVersions := Seq("[email protected]"),
githubWorkflowTargetTags += "v*",
githubWorkflowTargetBranches := Seq("master"),
githubWorkflowJobSetup := {
githubWorkflowJobSetup.value.map {
case use @ WorkflowStep.Use("olafurpg", "setup-scala", _, _, _, _, _, _) => use.copy(ref = "v10")
case ws => ws
}
},
githubWorkflowBuildPreamble += WorkflowStep.Run(
name = Some("Start Single Node"),
commands = List(".docker/single-node.sh up -d"),
Expand Down
98 changes: 4 additions & 94 deletions fs2/src/main/scala-2.13/sec/syntax/metastreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,116 +20,26 @@ package syntax
import scala.concurrent.duration.FiniteDuration
import cats.syntax.all._
import sec.api._
import sec.api.MetaStreams._
import StreamPosition.Exact
import StreamId.Id

//====================================================================================================================

trait MetaStreamsSyntax {

implicit final def syntaxForMetaStreams[F[_]: ErrorM](ms: MetaStreams[F]): MetaStreamsOps[F] =
new MetaStreamsOps[F](ms)
}

//====================================================================================================================

final class MetaStreamsOps[F[_]: ErrorM](val ms: MetaStreams[F]) {

def getMaxAge(id: Id): F[Option[ReadResult[MaxAge]]] =
ms.getMaxAge(id, None)

def setMaxAge(id: Id, expectedState: StreamState, age: FiniteDuration): F[WriteResult] =
setMaxAgeF(id, expectedState, age, None)

def setMaxAge(id: Id, expectedState: StreamState, age: FiniteDuration, uc: UserCredentials): F[WriteResult] =
setMaxAgeF(id, expectedState, age, uc.some)

private def setMaxAgeF(id: Id, er: StreamState, age: FiniteDuration, uc: Option[UserCredentials]): F[WriteResult] =
MaxAge(age).liftTo[F] >>= (ms.setMaxAge(id, er, _, uc))

def unsetMaxAge(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMaxAge(id, expectedState, None)

def getMaxCount(id: Id): F[Option[ReadResult[MaxCount]]] =
ms.getMaxCount(id, None)
MaxAge(age).liftTo[F] >>= (ms.setMaxAge(id, expectedState, _))

def setMaxCount(id: Id, expectedState: StreamState, count: Int): F[WriteResult] =
setMaxCountF(id, expectedState, count, None)

def setMaxCount(id: Id, expectedState: StreamState, count: Int, uc: UserCredentials): F[WriteResult] =
setMaxCountF(id, expectedState, count, uc.some)

private def setMaxCountF(id: Id, er: StreamState, count: Int, uc: Option[UserCredentials]): F[WriteResult] =
MaxCount(count).liftTo[F] >>= (ms.setMaxCount(id, er, _, uc))

def unsetMaxCount(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMaxCount(id, expectedState, None)

def getCacheControl(id: Id): F[Option[ReadResult[CacheControl]]] =
ms.getCacheControl(id, None)
MaxCount(count).liftTo[F] >>= (ms.setMaxCount(id, expectedState, _))

def setCacheControl(id: Id, expectedState: StreamState, cacheControl: FiniteDuration): F[WriteResult] =
setCacheControlF(id, expectedState, cacheControl, None)

def setCacheControl(
id: Id,
expectedState: StreamState,
cacheControl: FiniteDuration,
uc: UserCredentials
): F[WriteResult] =
setCacheControlF(id, expectedState, cacheControl, uc.some)

private def setCacheControlF(
id: Id,
er: StreamState,
cc: FiniteDuration,
uc: Option[UserCredentials]
): F[WriteResult] =
CacheControl(cc).liftTo[F] >>= (ms.setCacheControl(id, er, _, uc))

def unsetCacheControl(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetCacheControl(id, expectedState, None)

def getAcl(id: Id): F[Option[ReadResult[StreamAcl]]] =
ms.getAcl(id, None)

def setAcl(id: Id, expectedState: StreamState, acl: StreamAcl): F[WriteResult] =
ms.setAcl(id, expectedState, acl, None)

def unsetAcl(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetAcl(id, expectedState, None)

def getTruncateBefore(id: Id): F[Option[ReadResult[Exact]]] =
ms.getTruncateBefore(id, None)
CacheControl(cacheControl).liftTo[F] >>= (ms.setCacheControl(id, expectedState, _))

def setTruncateBefore(id: Id, expectedState: StreamState, truncateBefore: Long): F[WriteResult] =
setTruncateBeforeF(id, expectedState, truncateBefore, None)

def setTruncateBefore(
id: Id,
expectedState: StreamState,
truncateBefore: Long,
uc: UserCredentials
): F[WriteResult] =
setTruncateBeforeF(id, expectedState, truncateBefore, uc.some)
StreamPosition(truncateBefore).liftTo[F] >>= (ms.setTruncateBefore(id, expectedState, _))

private def setTruncateBeforeF(id: Id, er: StreamState, tb: Long, uc: Option[UserCredentials]): F[WriteResult] =
StreamPosition(tb).liftTo[F] >>= (ms.setTruncateBefore(id, er, _, uc))

def unsetTruncateBefore(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetTruncateBefore(id, expectedState, None)

///

private[sec] def getMetadata(id: Id): F[Option[MetaResult]] =
ms.getMetadata(id, None)

private[sec] def setMetadata(id: Id, expectedState: StreamState, data: StreamMetadata): F[WriteResult] =
ms.setMetadata(id, expectedState, data, None)

private[sec] def unsetMetadata(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMetadata(id, expectedState, None)
}

//====================================================================================================================
89 changes: 9 additions & 80 deletions fs2/src/main/scala-3/sec/syntax/metastreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,95 +20,24 @@ package syntax
import scala.concurrent.duration.FiniteDuration
import cats.syntax.all._
import sec.api._
import sec.api.MetaStreams._
import StreamPosition.Exact
import StreamId.Id

//====================================================================================================================

trait MetaStreamsSyntax {

extension [F[_]: ErrorM](ms: MetaStreams[F]) {

def getMaxAge(id: Id): F[Option[ReadResult[MaxAge]]] =
ms.getMaxAge(id, None)

def setMaxAge(id: Id, expectedState: StreamState, age: FiniteDuration): F[WriteResult] =
setMaxAgeF(id, expectedState, age, None)

def setMaxAge(id: Id, expectedState: StreamState, age: FiniteDuration, uc: UserCredentials): F[WriteResult] =
setMaxAgeF(id, expectedState, age, uc.some)

private def setMaxAgeF(id: Id, er: StreamState, m: FiniteDuration, uc: Option[UserCredentials]): F[WriteResult] =
MaxAge(m).liftTo[F] >>= (ms.setMaxAge(id, er, _, uc))

def unsetMaxAge(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMaxAge(id, expectedState, None)

def getMaxCount(id: Id): F[Option[ReadResult[MaxCount]]] =
ms.getMaxCount(id, None)

def setMaxCount(id: Id, expectedState: StreamState, count: Int): F[WriteResult] =
setMaxCountF(id, expectedState, count, None)

def setMaxCount(id: Id, expectedState: StreamState, count: Int, uc: UserCredentials): F[WriteResult] =
setMaxCountF(id, expectedState, count, uc.some)

private def setMaxCountF(id: Id, er: StreamState, count: Int, uc: Option[UserCredentials]): F[WriteResult] =
MaxCount(count).liftTo[F] >>= (ms.setMaxCount(id, er, _, uc))

def unsetMaxCount(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMaxCount(id, expectedState, None)

def getCacheControl(id: Id): F[Option[ReadResult[CacheControl]]] =
ms.getCacheControl(id, None)

def setCacheControl(id: Id, expectedState: StreamState, cacheControl: FiniteDuration): F[WriteResult] =
setCacheControlF(id, expectedState, cacheControl, None)
def setMaxAge(id: Id, expectedState: StreamState, age: FiniteDuration): F[WriteResult] =
MaxAge(age).liftTo[F] >>= (ms.setMaxAge(id, expectedState, _))

def setCacheControl(id: Id, expectedState: StreamState, cacheControl: FiniteDuration, uc: UserCredentials): F[WriteResult] =
setCacheControlF(id, expectedState, cacheControl, uc.some)

private def setCacheControlF(id: Id, er: StreamState, cacheControl: FiniteDuration, uc: Option[UserCredentials]): F[WriteResult] =
CacheControl(cacheControl).liftTo[F] >>= (ms.setCacheControl(id, er, _, uc))
def setMaxCount(id: Id, expectedState: StreamState, count: Int): F[WriteResult] =
MaxCount(count).liftTo[F] >>= (ms.setMaxCount(id, expectedState, _))

def unsetCacheControl(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetCacheControl(id, expectedState, None)
def setCacheControl(id: Id, expectedState: StreamState, cacheControl: FiniteDuration): F[WriteResult] =
CacheControl(cacheControl).liftTo[F] >>= (ms.setCacheControl(id, expectedState, _))

def getAcl(id: Id): F[Option[ReadResult[StreamAcl]]] =
ms.getAcl(id, None)
def setTruncateBefore(id: Id, expectedState: StreamState, truncateBefore: Long): F[WriteResult] =
StreamPosition(truncateBefore).liftTo[F] >>= (ms.setTruncateBefore(id, expectedState, _))

def setAcl(id: Id, expectedState: StreamState, acl: StreamAcl): F[WriteResult] =
ms.setAcl(id, expectedState, acl, None)

def unsetAcl(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetAcl(id, expectedState, None)

def getTruncateBefore(id: Id): F[Option[ReadResult[Exact]]] =
ms.getTruncateBefore(id, None)

def setTruncateBefore(id: Id, expectedState: StreamState, truncateBefore: Long): F[WriteResult] =
setTruncateBeforeF(id, expectedState, truncateBefore, None)

def setTruncateBefore(id: Id, expectedState: StreamState, truncateBefore: Long, uc: UserCredentials): F[WriteResult] =
setTruncateBeforeF(id, expectedState, truncateBefore, uc.some)

private def setTruncateBeforeF(id: Id, er: StreamState, tb: Long, uc: Option[UserCredentials]): F[WriteResult] =
StreamPosition(tb).liftTo[F] >>= (ms.setTruncateBefore(id, er, _, uc))

def unsetTruncateBefore(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetTruncateBefore(id, expectedState, None)

private[sec] def getMetadata(id: Id): F[Option[MetaResult]] =
ms.getMetadata(id, None)

private[sec] def setMetadata(id: Id, expectedState: StreamState, data: StreamMetadata): F[WriteResult] =
ms.setMetadata(id, expectedState, data, None)

private[sec] def removeMetadata(id: Id, expectedState: StreamState): F[WriteResult] =
ms.unsetMetadata(id, expectedState, None)
}

}

//====================================================================================================================
}
3 changes: 3 additions & 0 deletions fs2/src/main/scala/sec/api/client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.concurrent.duration._

trait EsClient[F[_]] {
def streams: Streams[F]
def metaStreams: MetaStreams[F]
def gossip: Gossip[F]
}

Expand Down Expand Up @@ -63,6 +64,8 @@ object EsClient {
mkOpts[F](options.operationOptions, logger, "Streams")
)

val metaStreams: MetaStreams[F] = MetaStreams[F](streams)

val gossip: Gossip[F] = Gossip(
mkGossipFs2Grpc[F](mc),
mkContext(options, requiresLeader),
Expand Down
2 changes: 1 addition & 1 deletion fs2/src/main/scala/sec/api/cluster/watch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[sec] object ClusterWatch {
updates = mkWatch(store.get, options.notificationInterval).subscribe
provider <- mkProvider(updates)
channel <- mkChannel(provider)
watch <- create[F](gossipFn(channel).read(None), options, store, logger)
watch <- create[F](gossipFn(channel).read, options, store, logger)
} yield watch

}
Expand Down
7 changes: 4 additions & 3 deletions fs2/src/main/scala/sec/api/gossip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import com.eventstore.dbclient.proto.shared.Empty
import sec.api.mapping.gossip.mkClusterInfo

trait Gossip[F[_]] {
final def read: F[ClusterInfo] = read(None)
def read(creds: Option[UserCredentials]): F[ClusterInfo]
def read: F[ClusterInfo]
def withCredentials(creds: UserCredentials): Gossip[F]
}

object Gossip {
Expand All @@ -35,7 +35,8 @@ object Gossip {
mkCtx: Option[UserCredentials] => C,
opts: Opts[F]
): Gossip[F] = new Gossip[F] {
def read(creds: Option[UserCredentials]): F[ClusterInfo] = read0(opts)(client.read(Empty(), mkCtx(creds)))
val read: F[ClusterInfo] = read0(opts)(client.read(Empty(), mkCtx(None)))
def withCredentials(creds: UserCredentials): Gossip[F] = Gossip[F, C](client, _ => mkCtx(creds.some), opts)
}

private[sec] def read0[F[_]: Concurrent: Timer](o: Opts[F])(f: F[PClusterInfo]): F[ClusterInfo] =
Expand Down
Loading

0 comments on commit 5d717ed

Please sign in to comment.