Skip to content

Commit

Permalink
Merge pull request #209 from ahjohannessen/wip-use-position-info
Browse files Browse the repository at this point in the history
all: account for esdb missing log pos on streams / rm show
  • Loading branch information
ahjohannessen committed Oct 30, 2020
2 parents ae7eafc + be0e8c9 commit 89ea3b0
Show file tree
Hide file tree
Showing 28 changed files with 664 additions and 400 deletions.
13 changes: 8 additions & 5 deletions core/src/main/scala/sec/api/endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package api

import java.net.InetSocketAddress

import cats.{Order, Show}
import cats.Order
import io.grpc.{Attributes, EquivalentAddressGroup}

/**
Expand All @@ -34,13 +34,16 @@ final case class Endpoint(

object Endpoint {

final private val ae: Attributes = Attributes.EMPTY

implicit val orderForEndpoint: Order[Endpoint] = Order.by(ep => (ep.address, ep.port))
implicit val orderingForEndpoint: Ordering[Endpoint] = orderForEndpoint.toOrdering
implicit val showForEndpoint: Show[Endpoint] = Show.show(ep => s"${ep.address}:${ep.port}")

def render(ep: Endpoint): String = s"${ep.address}:${ep.port}"

implicit final private[sec] class EndpointOps(val ep: Endpoint) extends AnyVal {
def toInetSocketAddress: InetSocketAddress = new InetSocketAddress(ep.address, ep.port)
def toEquivalentAddressGroup: EquivalentAddressGroup =
new EquivalentAddressGroup(ep.toInetSocketAddress, Attributes.EMPTY)
def render: String = Endpoint.render(ep)
def toInetSocketAddress: InetSocketAddress = new InetSocketAddress(ep.address, ep.port)
def toEquivalentAddressGroup: EquivalentAddressGroup = new EquivalentAddressGroup(ep.toInetSocketAddress, ae)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/sec/api/exceptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object exceptions {
object WrongExpectedState {

def msg(sid: StreamId, expected: StreamState, actual: StreamState): String =
s"Wrong expected state for stream: ${sid.show}, expected: ${expected.show}, actual: ${actual.show}"
s"Wrong expected state for stream: ${sid.render}, expected: ${expected.render}, actual: ${actual.render}"

}

Expand Down
31 changes: 22 additions & 9 deletions core/src/main/scala/sec/api/gossip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit.SECONDS
import java.{util => ju}

import cats._
import cats.Order
import cats.implicits._

/**
Expand All @@ -32,12 +32,19 @@ final case class ClusterInfo(
)

object ClusterInfo {

implicit val orderForClusterInfo: Order[ClusterInfo] = Order.by(_.members.toList.sorted)
implicit val showForClusterInfo: Show[ClusterInfo] = Show.show { ci =>
val padTo = ci.members.map(_.state.show).map(_.length).maxOption.getOrElse(0)

def render(ci: ClusterInfo): String = {
val padTo = ci.members.map(_.state.render).map(_.length).maxOption.getOrElse(0)
val members = ci.members.toList.sorted
s"ClusterInfo:\n${members.map(mi => s" ${MemberInfo.mkShow(padTo).show(mi)}").mkString("\n")}"
s"ClusterInfo:\n${members.map(mi => s" ${MemberInfo.render(mi, padTo)}").mkString("\n")}"
}

implicit final class ClusterInfoOps(val ci: ClusterInfo) extends AnyVal {
def render: String = ClusterInfo.render(ci)
}

}

final case class MemberInfo(
Expand All @@ -53,19 +60,23 @@ object MemberInfo {
implicit val orderForMemberInfo: Order[MemberInfo] =
Order.by(mi => (mi.httpEndpoint, mi.state, mi.isAlive, mi.instanceId))

implicit val showForMemberInfo: Show[MemberInfo] = mkShow(0)
def render(mi: MemberInfo): String = render(mi, 0)

private[sec] def mkShow(padTo: Int): Show[MemberInfo] = Show.show { mi =>
private[sec] def render(mi: MemberInfo, padTo: Int): String = {

val alive = s"${if (mi.isAlive) "" else ""}"
val state = s"${mi.state.show.padTo(padTo, ' ')}"
val endpoint = s"${mi.httpEndpoint.show}"
val state = s"${mi.state.render.padTo(padTo, ' ')}"
val endpoint = s"${mi.httpEndpoint.render}"
val ts = s"${mi.timestamp.truncatedTo(SECONDS)}"
val id = s"${mi.instanceId}"

s"$alive $state $endpoint $ts $id"
}

implicit final class MemberInfoOps(val mi: MemberInfo) extends AnyVal {
def render: String = MemberInfo.render(mi)
}

}

sealed trait VNodeState
Expand Down Expand Up @@ -127,6 +138,8 @@ object VNodeState {
case ResigningLeader => 15
}

implicit val showForVNodeState: Show[VNodeState] = Show.fromToString[VNodeState]
implicit final class VNodeStateOps(val vns: VNodeState) extends AnyVal {
def render: String = vns.toString
}

}
68 changes: 46 additions & 22 deletions core/src/main/scala/sec/api/mapping/streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,26 @@ private[sec] object streams {

object incoming {

def mkPositionAll[F[_]: ErrorA](e: ReadResp.ReadEvent.RecordedEvent): F[Position.All] =
(mkStreamPosition[F](e), mkLogPosition[F](e)).mapN(Position.All)

def mkLogPosition[F[_]: ErrorA](e: ReadResp.ReadEvent.RecordedEvent): F[LogPosition.Exact] =
LogPosition(e.commitPosition, e.preparePosition).liftTo[F]

def mkStreamPosition[F[_]: ErrorA](e: ReadResp.ReadEvent.RecordedEvent): F[StreamPosition.Exact] =
StreamPosition(e.streamRevision).liftTo[F]

def mkCheckpoint[F[_]: ErrorA](c: ReadResp.Checkpoint): F[Checkpoint] =
LogPosition(c.commitPosition, c.preparePosition)
.map(Checkpoint)
.leftMap(error => ProtoResultError(s"Invalid position for Checkpoint: ${error.msg}"))
.liftTo[F]

def mkCheckpointOrEvent[F[_]: ErrorM](re: ReadResp): F[Option[Either[Checkpoint, Event]]] = {
def mkCheckpointOrEvent[F[_]: ErrorM](re: ReadResp): F[Option[Either[Checkpoint, AllEvent]]] = {

val event = OptionT(re.content.event.flatTraverse(mkEvent[F]).nested.map(_.asRight[Checkpoint]).value)
val checkpoint = OptionT(re.content.checkpoint.traverse(mkCheckpoint[F]).nested.map(_.asLeft[Event]).value)
val mkEvt = mkEvent[F, Position.All](_, mkPositionAll[F])
val event = OptionT(re.content.event.flatTraverse(mkEvt).nested.map(_.asRight[Checkpoint]).value)
val checkpoint = OptionT(re.content.checkpoint.traverse(mkCheckpoint[F]).nested.map(_.asLeft[AllEvent]).value)

(event <+> checkpoint).value
}
Expand All @@ -243,34 +253,48 @@ private[sec] object streams {
def failStreamNotFound[F[_]: ErrorM](rr: ReadResp): F[ReadResp] =
rr.content.streamNotFound.fold(rr.pure[F])(mkStreamNotFound[F](_) >>= (_.raiseError[F, ReadResp]))

def reqReadEvent[F[_]: ErrorM](rr: ReadResp): F[Option[Event]] =
rr.content.event.require[F]("ReadEvent") >>= mkEvent[F]
def reqReadAll[F[_]: ErrorM](rr: ReadResp): F[Option[AllEvent]] =
reqReadEvent(rr, mkPositionAll[F])

def reqReadStream[F[_]: ErrorM](rr: ReadResp): F[Option[StreamEvent]] =
reqReadEvent(rr, mkStreamPosition[F])

def reqReadEvent[F[_]: ErrorM, P <: Position](
rr: ReadResp,
mkPos: ReadResp.ReadEvent.RecordedEvent => F[P]): F[Option[Event[P]]] =
rr.content.event.require[F]("ReadEvent") >>= (mkEvent[F, P](_, mkPos))

def reqConfirmation[F[_]: ErrorA](rr: ReadResp): F[SubscriptionConfirmation] =
rr.content.confirmation
.map(_.subscriptionId)
.require[F]("SubscriptionConfirmation", details = s"Got ${rr.content} instead".some)
.map(SubscriptionConfirmation)

def mkEvent[F[_]: ErrorM](re: ReadResp.ReadEvent): F[Option[Event]] =
re.event.traverse(mkEventRecord[F]) >>= { eOpt =>
re.link.traverse(mkEventRecord[F]).map(lOpt => eOpt.map(er => lOpt.fold[Event](er)(ResolvedEvent(er, _))))
def mkEvent[F[_]: ErrorM, P <: Position](
re: ReadResp.ReadEvent,
mkPos: ReadResp.ReadEvent.RecordedEvent => F[P]
): F[Option[Event[P]]] =
re.event.traverse(e => mkEventRecord[F, P](e, mkPos)) >>= { eOpt =>
re.link
.traverse(e => mkEventRecord[F, P](e, mkPos))
.map(lOpt => eOpt.map(er => lOpt.fold[Event[P]](er)(ResolvedEvent[P](er, _))))
}

def mkEventRecord[F[_]: ErrorM](e: ReadResp.ReadEvent.RecordedEvent): F[EventRecord] = {

val streamId = mkStreamId[F](e.streamIdentifier)
val streamPosition = StreamPosition.exact(e.streamRevision)
val logPosition = LogPosition.exact(e.commitPosition, e.preparePosition)
val data = e.data.toByteVector
val customMeta = e.customMetadata.toByteVector
val eventId = e.id.require[F]("UUID") >>= mkJuuid[F]
val eventType = e.metadata.get(Type).require[F](Type) >>= mkEventType[F]
val contentType = e.metadata.get(ContentType).require[F](ContentType) >>= mkContentType[F]
val created = e.metadata.get(Created).flatMap(_.toLongOption).require[F](Created) >>= fromTicksSinceEpoch[F]
val eventData = (eventType, eventId, contentType).mapN((t, i, ct) => EventData(t, i, data, customMeta, ct))

(streamId, eventData, created).mapN((id, ed, c) => sec.EventRecord(id, streamPosition, logPosition, ed, c))
def mkEventRecord[F[_]: ErrorM, P <: Position](
e: ReadResp.ReadEvent.RecordedEvent,
mkPos: ReadResp.ReadEvent.RecordedEvent => F[P]): F[EventRecord[P]] = {

val streamId = mkStreamId[F](e.streamIdentifier)
val position = mkPos(e)
val data = e.data.toByteVector
val customMeta = e.customMetadata.toByteVector
val eventId = e.id.require[F]("UUID") >>= mkJuuid[F]
val eventType = e.metadata.get(Type).require[F](Type) >>= mkEventType[F]
val contentType = e.metadata.get(ContentType).require[F](ContentType) >>= mkContentType[F]
val created = e.metadata.get(Created).flatMap(_.toLongOption).require[F](Created) >>= fromTicksSinceEpoch[F]
val eventData = (eventType, eventId, contentType).mapN((t, i, ct) => EventData(t, i, data, customMeta, ct))

(streamId, position, eventData, created).mapN((id, p, ed, c) => sec.EventRecord(id, p, ed, c))

}

Expand Down
Loading

0 comments on commit 89ea3b0

Please sign in to comment.