diff --git a/README.md b/README.md index 1e7b729..6c3c007 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ member-joined │ └───────────────────┘ └───────────────────┘ ``` -If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates its `ActorSystem` in the spirit of "fail fast". +If something goes finally wrong when interacting with the coordination service, e.g. a permanent timeout after a configurable number of retries, ConstructR terminates itself in the spirit of "failing fast". ``` scala // All releases including intermediate ones are published here, @@ -77,6 +77,14 @@ constructr { } ``` +Exceeding the number of max retries will lead to internal failure of the `ConstructrExtension`. You can hook to this event in the following manner: +``` scala +val constructr = ConstructrExtension(system) +constructr.registerOnFailure { + // do something if ConstructR failes +} +``` + ## Coordination ConstructR comes with out-of-the-box support for etcd: simply depend on the "constructr-coordination-etcd" module. If you want to use some other coordination backend, e.g. Consul, simply implement the `Coordination` trait from the "constructr-coordination" module and make sure to provide the fully qualified class name via the `constructr.coordination.class-name` configuration setting. diff --git a/core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala b/core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala index 920ef62..da825f5 100644 --- a/core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala +++ b/core/src/main/scala/de/heikoseeberger/constructr/Constructr.scala @@ -17,57 +17,66 @@ package de.heikoseeberger.constructr import akka.actor.{ Actor, ActorLogging, ActorRef, Props, SupervisorStrategy, Terminated } -import akka.cluster.{ Cluster, Member } +import akka.cluster.Cluster import akka.cluster.ClusterEvent.{ InitialStateAsEvents, MemberExited, MemberLeft, MemberRemoved } -import akka.cluster.MemberStatus.Up import de.heikoseeberger.constructr.coordination.Coordination + import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS } +import scala.util.control.NonFatal -object Constructr { +private[constructr] object Constructr { final val Name = "constructr" - def props: Props = - Props(new Constructr) + def props: Props = Props(new Constructr) + + case class RegisterFailureHandler(callback: Runnable) } -final class Constructr private extends Actor with ActorLogging { +private[constructr] class Constructr private + extends Actor + with ActorLogging + with ConstructrFailureListening { + + import Constructr.RegisterFailureHandler - override val supervisorStrategy = SupervisorStrategy.stoppingStrategy + override val supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy - private val cluster = Cluster(context.system) + private[this] val cluster = Cluster(context.system) + private[this] var machineOption: Option[ActorRef] = None + private[this] var failureListeners = Set.empty[ActorRef] if (cluster.settings.SeedNodes.isEmpty) { - log.info("Creating constructr-machine, because no seed-nodes defined") + log.info("Creating constructr-machine, because no seed-nodes are defined") cluster.subscribe(self, InitialStateAsEvents, classOf[MemberLeft], classOf[MemberExited], classOf[MemberRemoved]) - context.become(active(context.watch(createConstructrMachine()))) + machineOption = Some(context.watch(createConstructrMachine())) } else { - log.info("Stopping self, because seed-nodes defined") + log.info("Stopping ConstructR, because seed-nodes are defined in configuration.") context.stop(self) } - override def receive = Actor.emptyBehavior + override def receive: Receive = { + case Terminated(machine) if machineOption.contains(machine) => + machineOption = None - private def active(machine: ActorRef): Receive = { - case Terminated(`machine`) => - val selfAddress = cluster.selfAddress - def isSelfAndUp(member: Member) = - member.address == selfAddress && member.status == Up - if (cluster.state.members.exists(isSelfAndUp)) { - log.error("Leaving, because constructr-machine terminated!") - cluster.leave(selfAddress) - } else { - log.error("Terminating system, because constructr-machine terminated!") - context.system.terminate() - } + case Terminated(failureListener) if failureListeners.contains(failureListener) => + failureListeners -= failureListener + + case RegisterFailureHandler(callback) if machineOption.nonEmpty => + failureListeners += context.actorOf( + Props(classOf[ConstructrFailureListener], machineOption.get, callback) + ) + + case RegisterFailureHandler(callback) if machineOption.isEmpty => + executeFailureHandler(callback) case MemberRemoved(member, _) if member.address == cluster.selfAddress => - log.error("Terminating system, because member has been removed!") - context.system.terminate() + log.warning("Stopping ConstructR because cluster member has been removed!") + context.stop(self) } private def createConstructrMachine() = { @@ -101,3 +110,41 @@ final class Constructr private extends Actor with ActorLogging { ) } } + +private[constructr] class ConstructrFailureListener(machine: ActorRef, callback: Runnable) + extends Actor + with ActorLogging + with ConstructrFailureListening { + + override def preStart(): Unit = { + super.preStart() + context.watch(machine) + } + + override def postStop(): Unit = { + context.unwatch(machine) + super.postStop() + } + + override def receive: Receive = { + case Terminated(`machine`) => + try { + executeFailureHandler(callback) + } finally { + context.stop(self) + } + } +} + +private[constructr] trait ConstructrFailureListening { + this: Actor with ActorLogging => + + def executeFailureHandler(callback: Runnable): Unit = + try { + callback.run() + } catch { + case NonFatal(e) => + log.error(e, "ConstructR failure callback failed with [{}]", e.getMessage) + } + +} diff --git a/core/src/main/scala/de/heikoseeberger/constructr/ConstructrExtension.scala b/core/src/main/scala/de/heikoseeberger/constructr/ConstructrExtension.scala index 60f569c..5ca4446 100644 --- a/core/src/main/scala/de/heikoseeberger/constructr/ConstructrExtension.scala +++ b/core/src/main/scala/de/heikoseeberger/constructr/ConstructrExtension.scala @@ -16,10 +16,32 @@ package de.heikoseeberger.constructr -import akka.actor.{ ExtendedActorSystem, Extension, ExtensionKey } +import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } -object ConstructrExtension extends ExtensionKey[ConstructrExtension] +object ConstructrExtension extends ExtensionId[ConstructrExtension] with ExtensionIdProvider { + + override def lookup(): ExtensionId[ConstructrExtension] = ConstructrExtension + + override def createExtension(system: ExtendedActorSystem): ConstructrExtension = + new ConstructrExtension(system) + + /** + * Java API + */ + override def get(system: ActorSystem): ConstructrExtension = super.get(system) +} final class ConstructrExtension private (system: ExtendedActorSystem) extends Extension { - system.systemActorOf(Constructr.props, Constructr.Name) + + private[this] val supervisor = system.systemActorOf(Constructr.props, Constructr.Name) + + def registerOnFailure[T](code: => T): Unit = { + val callback = new Runnable { + override def run(): Unit = code + } + registerOnFailure(callback) + } + + def registerOnFailure(callback: Runnable): Unit = + supervisor ! Constructr.RegisterFailureHandler(callback) } diff --git a/core/src/multi-jvm/scala/de/heikoseeberger/constructr/MultiNodeConstructrSpec.scala b/core/src/multi-jvm/scala/de/heikoseeberger/constructr/MultiNodeConstructrSpec.scala index bb7d325..b4614ec 100644 --- a/core/src/multi-jvm/scala/de/heikoseeberger/constructr/MultiNodeConstructrSpec.scala +++ b/core/src/multi-jvm/scala/de/heikoseeberger/constructr/MultiNodeConstructrSpec.scala @@ -16,25 +16,26 @@ package de.heikoseeberger.constructr -import akka.actor.ActorDSL.{ actor, Act } -import akka.actor.Address -import akka.cluster.{ Cluster, ClusterEvent } +import akka.actor.{Actor, Address, PoisonPill, Props} +import akka.cluster.{Cluster, ClusterEvent} import akka.http.scaladsl.Http import akka.http.scaladsl.client.RequestBuilding -import akka.http.scaladsl.model.StatusCodes.{ NotFound, OK } +import akka.http.scaladsl.model.StatusCodes.{NotFound, OK} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.pattern.ask -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec} import akka.stream.ActorMaterializer import akka.testkit.TestDuration import akka.util.Timeout import com.typesafe.config.ConfigFactory -import org.scalatest.{ BeforeAndAfterAll, FreeSpecLike, Matchers } +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} + import scala.concurrent.Await import scala.concurrent.duration.DurationInt object ConstructrMultiNodeConfig { - val coordinationHost = { + val coordinationHost: String = { val dockerHostPattern = """tcp://(\S+):\d{1,5}""".r sys.env .get("DOCKER_HOST") @@ -98,14 +99,14 @@ abstract class MultiNodeConstructrSpec( enterBarrier("coordination-started") ConstructrExtension(system) - val listener = actor(new Act { + val listener = system.actorOf(Props(new Actor { import ClusterEvent._ var isMember = false Cluster(context.system).subscribe(self, InitialStateAsEvents, classOf[MemberJoined], classOf[MemberUp]) - become { + override def receive: Receive = { case "isMember" => sender() ! isMember case MemberJoined(member) if member.address == Cluster(context.system).selfAddress => @@ -114,7 +115,7 @@ abstract class MultiNodeConstructrSpec( case MemberUp(member) if member.address == Cluster(context.system).selfAddress => isMember = true } - }) + })) within(20.seconds.dilated) { awaitAssert { implicit val timeout = Timeout(1.second.dilated) @@ -141,17 +142,42 @@ abstract class MultiNodeConstructrSpec( } } + enterBarrier("extension-killed") + + val failureHandlers = 1.to(5).map(_ => mock(classOf[Runnable])) + failureHandlers.foreach(ConstructrExtension(system).registerOnFailure(_)) + system.actorSelection(s"/system/${Constructr.Name}/${ConstructrMachine.Name}") ! PoisonPill + + within(5.seconds.dilated) { + awaitAssert { + for (i <- failureHandlers.indices) { + verify(failureHandlers(i), times(1)).run() + } + } + } + + enterBarrier("post-extension-killed") + + val postFailureHandler = mock(classOf[Runnable]) + ConstructrExtension(system).registerOnFailure(postFailureHandler) + + within(5.seconds.dilated) { + awaitAssert { + verify(postFailureHandler, times(1)).run() + } + } + enterBarrier("done") } - override def initialParticipants = roles.size + override def initialParticipants: Int = roles.size - override protected def beforeAll() = { + override protected def beforeAll(): Unit = { super.beforeAll() multiNodeSpecBeforeAll() } - override protected def afterAll() = { + override protected def afterAll(): Unit = { multiNodeSpecAfterAll() super.afterAll() }