Skip to content

Commit

Permalink
Propagate extension failures to user-space (refs hseeberger#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-nachos committed Nov 21, 2017
1 parent 21aabdd commit f1b30f1
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 43 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ member-joined │
└───────────────────┘ └───────────────────┘
```

If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates its `ActorSystem` in the spirit of "fail fast".
If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates itself in the spirit of "failing fast".

``` scala
// All releases including intermediate ones are published here,
Expand Down Expand Up @@ -77,6 +77,14 @@ constructr {
}
```

Exceeding the number of max retries will lead to internal failure of the `ConstructrExtension`. You can hook to this event in the following manner:
``` scala
val constructr = ConstructrExtension(system)
constructr.registerOnFailure {
// do something if ConstructR failes
}
```

## Coordination

ConstructR comes with out-of-the-box support for etcd: simply depend on the "constructr-coordination-etcd" module. If you want to use some other coordination backend, e.g. Consul, simply implement the `Coordination` trait from the "constructr-coordination" module and make sure to provide the fully qualified class name via the `constructr.coordination.class-name` configuration setting.
Expand Down
99 changes: 73 additions & 26 deletions core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,66 @@
package de.heikoseeberger.constructr

import akka.actor.{ Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated }
import akka.cluster.{ Cluster, Member }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberExited, MemberLeft, MemberRemoved }
import akka.cluster.MemberStatus.Up
import de.heikoseeberger.constructr.coordination.Coordination

import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS }
import scala.util.control.NonFatal

object Constructr {
private[constructr] object Constructr {

final val Name = "constructr"

def props: Props =
Props(new Constructr)
def props: Props = Props(new Constructr)

case class RegisterFailureHandler(callback: Runnable)
}

final class Constructr private extends Actor with ActorLogging {
private[constructr] class Constructr private
extends Actor
with ActorLogging
with ConstructrFailureListening {

import Constructr.RegisterFailureHandler

override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
override val supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

private val cluster = Cluster(context.system)
private[this] val cluster = Cluster(context.system)
private[this] var machineOption: Option[ActorRef] = None
private[this] var failureListeners = Set.empty[ActorRef]

if (cluster.settings.SeedNodes.isEmpty) {
log.info("Creating constructr-machine, because no seed-nodes defined")
log.info("Creating constructr-machine, because no seed-nodes are defined")
cluster.subscribe(self,
InitialStateAsEvents,
classOf[MemberLeft],
classOf[MemberExited],
classOf[MemberRemoved])
context.become(active(context.watch(createConstructrMachine())))
machineOption = Some(context.watch(createConstructrMachine()))
} else {
log.info("Stopping self, because seed-nodes defined")
log.info("Stopping ConstructR, because seed-nodes are defined in configuration.")
context.stop(self)
}

override def receive = Actor.emptyBehavior
override def receive: Receive = {
case Terminated(machine) if machineOption.contains(machine) =>
machineOption = None

private def active(machine: ActorRef): Receive = {
case Terminated(`machine`) =>
val selfAddress = cluster.selfAddress
def isSelfAndUp(member: Member) =
member.address == selfAddress && member.status == Up
if (cluster.state.members.exists(isSelfAndUp)) {
log.error("Leaving, because constructr-machine terminated!")
cluster.leave(selfAddress)
} else {
log.error("Terminating system, because constructr-machine terminated!")
context.system.terminate()
}
case Terminated(failureListener) if failureListeners.contains(failureListener) =>
failureListeners -= failureListener

case RegisterFailureHandler(callback) if machineOption.nonEmpty =>
failureListeners += context.actorOf(
Props(classOf[ConstructrFailureListener], machineOption.get, callback)
)

case RegisterFailureHandler(callback) if machineOption.isEmpty =>
executeFailureHandler(callback)

case MemberRemoved(member, _) if member.address == cluster.selfAddress =>
log.error("Terminating system, because member has been removed!")
context.system.terminate()
log.warning("Stopping ConstructR because cluster member has been removed!")
context.stop(self)
}

private def createConstructrMachine() = {
Expand Down Expand Up @@ -101,3 +110,41 @@ final class Constructr private extends Actor with ActorLogging {
)
}
}

private[constructr] class ConstructrFailureListener(machine: ActorRef, callback: Runnable)
extends Actor
with ActorLogging
with ConstructrFailureListening {

override def preStart(): Unit = {
super.preStart()
context.watch(machine)
}

override def postStop(): Unit = {
context.unwatch(machine)
super.postStop()
}

override def receive: Receive = {
case Terminated(`machine`) =>
try {
executeFailureHandler(callback)
} finally {
context.stop(self)
}
}
}

private[constructr] trait ConstructrFailureListening {
this: Actor with ActorLogging =>

def executeFailureHandler(callback: Runnable): Unit =
try {
callback.run()
} catch {
case NonFatal(e) =>
log.error(e, "ConstructR failure callback failed with [{}]", e.getMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,32 @@

package de.heikoseeberger.constructr

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionKey }
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }

object ConstructrExtension extends ExtensionKey[ConstructrExtension]
object ConstructrExtension extends ExtensionId[ConstructrExtension] with ExtensionIdProvider {

override def lookup(): ExtensionId[ConstructrExtension] = ConstructrExtension

override def createExtension(system: ExtendedActorSystem): ConstructrExtension =
new ConstructrExtension(system)

/**
* Java API
*/
override def get(system: ActorSystem): ConstructrExtension = super.get(system)
}

final class ConstructrExtension private (system: ExtendedActorSystem) extends Extension {
system.systemActorOf(Constructr.props, Constructr.Name)

private[this] val supervisor = system.systemActorOf(Constructr.props, Constructr.Name)

def registerOnFailure[T](code: => T): Unit = {
val callback = new Runnable {
override def run(): Unit = code
}
registerOnFailure(callback)
}

def registerOnFailure(callback: Runnable): Unit =
supervisor ! Constructr.RegisterFailureHandler(callback)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@

package de.heikoseeberger.constructr

import akka.actor.ActorDSL.{ actor, Act }
import akka.actor.Address
import akka.cluster.{ Cluster, ClusterEvent }
import akka.actor.{Actor, Address, PoisonPill, Props}
import akka.cluster.{Cluster, ClusterEvent}
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.StatusCodes.{ NotFound, OK }
import akka.http.scaladsl.model.StatusCodes.{NotFound, OK}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec}
import akka.stream.ActorMaterializer
import akka.testkit.TestDuration
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.{ BeforeAndAfterAll, FreeSpecLike, Matchers }
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

object ConstructrMultiNodeConfig {
val coordinationHost = {
val coordinationHost: String = {
val dockerHostPattern = """tcp://(\S+):\d{1,5}""".r
sys.env
.get("DOCKER_HOST")
Expand Down Expand Up @@ -98,14 +99,14 @@ abstract class MultiNodeConstructrSpec(
enterBarrier("coordination-started")

ConstructrExtension(system)
val listener = actor(new Act {
val listener = system.actorOf(Props(new Actor {
import ClusterEvent._
var isMember = false
Cluster(context.system).subscribe(self,
InitialStateAsEvents,
classOf[MemberJoined],
classOf[MemberUp])
become {
override def receive: Receive = {
case "isMember" => sender() ! isMember

case MemberJoined(member) if member.address == Cluster(context.system).selfAddress =>
Expand All @@ -114,7 +115,7 @@ abstract class MultiNodeConstructrSpec(
case MemberUp(member) if member.address == Cluster(context.system).selfAddress =>
isMember = true
}
})
}))
within(20.seconds.dilated) {
awaitAssert {
implicit val timeout = Timeout(1.second.dilated)
Expand All @@ -141,17 +142,42 @@ abstract class MultiNodeConstructrSpec(
}
}

enterBarrier("extension-killed")

val failureHandlers = 1.to(5).map(_ => mock(classOf[Runnable]))
failureHandlers.foreach(ConstructrExtension(system).registerOnFailure(_))
system.actorSelection(s"/system/${Constructr.Name}/${ConstructrMachine.Name}") ! PoisonPill

within(5.seconds.dilated) {
awaitAssert {
for (i <- failureHandlers.indices) {
verify(failureHandlers(i), times(1)).run()
}
}
}

enterBarrier("post-extension-killed")

val postFailureHandler = mock(classOf[Runnable])
ConstructrExtension(system).registerOnFailure(postFailureHandler)

within(5.seconds.dilated) {
awaitAssert {
verify(postFailureHandler, times(1)).run()
}
}

enterBarrier("done")
}

override def initialParticipants = roles.size
override def initialParticipants: Int = roles.size

override protected def beforeAll() = {
override protected def beforeAll(): Unit = {
super.beforeAll()
multiNodeSpecBeforeAll()
}

override protected def afterAll() = {
override protected def afterAll(): Unit = {
multiNodeSpecAfterAll()
super.afterAll()
}
Expand Down

0 comments on commit f1b30f1

Please sign in to comment.