Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul based service discovery #202

Merged
merged 9 commits into from
Jun 10, 2018
14 changes: 13 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ lazy val `akka-management-root` = project
`akka-discovery-marathon-api`,
`akka-discovery-aws-api`,
`akka-discovery-aws-api-async`,
`akka-discovery-consul`,
`akka-management`,
`bootstrap-joining-demo-aws-api-ec2-tag-based`,
`bootstrap-joining-demo-aws-api-ecs`,
Expand Down Expand Up @@ -91,6 +92,17 @@ lazy val `akka-discovery-aws-api-async` = project
)
.dependsOn(`akka-discovery`)

lazy val `akka-discovery-consul` = project
.in(file("discovery-consul"))
.enablePlugins(AutomateHeaderPlugin)
.settings(unidocSettings)
.settings(
name := "akka-discovery-consul",
organization := "com.lightbend.akka.discovery",
Dependencies.DiscoveryConsul
)
.dependsOn(`akka-discovery`)

// gathers all enabled routes and serves them (HTTP or otherwise)
lazy val `akka-management` = project
.in(file("management"))
Expand Down Expand Up @@ -200,7 +212,7 @@ lazy val `bootstrap-joining-demo-aws-api-ecs` = project
.enablePlugins(JavaAppPackaging, AshScriptPlugin, DockerPlugin)
.settings(
dockerBaseImage := "openjdk:10-jre-slim",
packageName in Docker := "ecs-bootstrap-demo-app",
com.typesafe.sbt.SbtNativePackager.autoImport.packageName in Docker := "ecs-bootstrap-demo-app",
version in Docker := "1.0"
)

Expand Down
26 changes: 26 additions & 0 deletions discovery-consul/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
######################################################
# Akka Service Discovery Consul Config #
######################################################

akka.discovery {

# Set the following in your application.conf if you want to use this discovery mechanism:
# impl = akka-consul

# configure the akka-consul provider
akka-consul {
class = akka.discovery.consul.ConsulSimpleServiceDiscovery

consul-host = "127.0.0.1"
consul-port = 8500

# Prefix for consul tag with the name of the actor system / application name,
# services with this tag present will be found by the discovery mechanism
# i.e. `system:test` will be found in cluster if the cluster system is named `test`
application-name-tag-prefix = "system:"

# Prefix for tag containing port number where akka management is set up so that
# the seed nodes can be found, an example value for the tag would be `akka-management-port:19999`
application-akka-management-port-tag-prefix = "akka-management-port:"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.discovery.consul

import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }

final class ConsulSettings(system: ExtendedActorSystem) extends Extension {
private val consulConfig = system.settings.config.getConfig("akka.discovery.akka-consul")

val consulHost: String = consulConfig.getString("consul-host")

val consulPort: Int = consulConfig.getInt("consul-port")

val applicationNameTagPrefix: String = consulConfig.getString("application-name-tag-prefix")
val applicationAkkaManagementPortTagPrefix: String =
consulConfig.getString("application-akka-management-port-tag-prefix")
}

object ConsulSettings extends ExtensionId[ConsulSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ConsulSettings = super.get(system)

override def lookup: ConsulSettings.type = ConsulSettings

override def createExtension(system: ExtendedActorSystem): ConsulSettings = new ConsulSettings(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.discovery.consul

import java.util
import java.util.concurrent.TimeoutException

import akka.actor.ActorSystem
import akka.discovery.SimpleServiceDiscovery

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.pattern.after
import com.google.common.net.HostAndPort
import com.orbitz.consul.Consul
import com.orbitz.consul.async.ConsulResponseCallback
import com.orbitz.consul.model.ConsulResponse
import ConsulSimpleServiceDiscovery._
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget }
import com.orbitz.consul.model.catalog.CatalogService
import com.orbitz.consul.option.QueryOptions

import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

class ConsulSimpleServiceDiscovery(system: ActorSystem) extends SimpleServiceDiscovery {

private val settings = ConsulSettings.get(system)
private val consul =
Consul.builder().withHostAndPort(HostAndPort.fromParts(settings.consulHost, settings.consulPort)).build()

override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = {
implicit val ec: ExecutionContext = system.dispatcher
Future.firstCompletedOf(
Seq(
after(resolveTimeout, using = system.scheduler)(
Future.failed(new TimeoutException(s"Lookup for [${name}] timed-out, within [${resolveTimeout}]!"))
),
lookupInConsul(name)
)
)
}

private def lookupInConsul(name: String)(implicit executionContext: ExecutionContext): Future[Resolved] = {
val consulResult = for {
servicesWithTags <- getServicesWithTags
serviceIds = servicesWithTags.getResponse
.entrySet()
.asScala
.filter(e => e.getValue.contains(settings.applicationNameTagPrefix + name))
.map(_.getKey)
catalogServices <- Future.sequence(serviceIds.map(id => getService(id).map(_.getResponse.asScala.toList)))
resolvedTargets = catalogServices.flatten.toSeq.map(
catalogService => extractResolvedTargetFromCatalogService(catalogService))
} yield resolvedTargets
consulResult.map(targets => Resolved(name, scala.collection.immutable.Seq(targets: _*)))
}

private def extractResolvedTargetFromCatalogService(catalogService: CatalogService) = {
val port = catalogService.getServiceTags.asScala
.find(_.startsWith(settings.applicationAkkaManagementPortTagPrefix))
.map(_.replace(settings.applicationAkkaManagementPortTagPrefix, ""))
.flatMap { maybePort =>
Try(maybePort.toInt).toOption
}
ResolvedTarget(catalogService.getServiceAddress, Some(port.getOrElse(catalogService.getServicePort)))
}

private def getServicesWithTags: Future[ConsulResponse[util.Map[String, util.List[String]]]] = {
((callback: ConsulResponseCallback[util.Map[String, util.List[String]]]) =>
consul.catalogClient().getServices(callback)).asFuture
}

private def getService(name: String) =
((callback: ConsulResponseCallback[util.List[CatalogService]]) =>
consul.catalogClient().getService(name, QueryOptions.BLANK, callback)).asFuture

}

object ConsulSimpleServiceDiscovery {

implicit class ConsulResponseFutureDecorator[T](f: ConsulResponseCallback[T] => Unit) {
def asFuture: Future[ConsulResponse[T]] = {
val callback = new ConsulResponseFutureCallback[T]
Try(f(callback)).recover {
case ex: Throwable => callback.fail(ex)
}
callback.future
}
}

final case class ConsulResponseFutureCallback[T]() extends ConsulResponseCallback[T] {

private val promise = Promise[ConsulResponse[T]]

def fail(exception: Throwable) = promise.failure(exception)

def future: Future[ConsulResponse[T]] = promise.future

override def onComplete(consulResponse: ConsulResponse[T]): Unit = {
promise.success(consulResponse)
}

override def onFailure(throwable: Throwable): Unit = {
promise.failure(throwable)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.bootstrap.discovery

import akka.actor.ActorSystem
import akka.discovery.SimpleServiceDiscovery.ResolvedTarget
import akka.discovery.consul.ConsulSimpleServiceDiscovery
import akka.testkit.TestKitBase
import com.google.common.net.HostAndPort
import com.orbitz.consul.Consul
import com.orbitz.consul.model.catalog.{ CatalogRegistration, ImmutableCatalogRegistration }
import com.orbitz.consul.model.health.{ ImmutableService, Service }
import com.pszymczyk.consul.{ ConsulProcess, ConsulStarterBuilder }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }

import scala.concurrent.duration._

class ConsulDiscoverySpec
extends WordSpecLike
with Matchers
with BeforeAndAfterAll
with TestKitBase
with ScalaFutures {

private val consul: ConsulProcess = ConsulStarterBuilder.consulStarter().withHttpPort(8500).build().start()

"Consul Discovery" should {
"work for defaults" in {
val consulAgent =
Consul.builder().withHostAndPort(HostAndPort.fromParts(consul.getAddress, consul.getHttpPort)).build()
consulAgent
.catalogClient()
.register(
ImmutableCatalogRegistration
.builder()
.service(
ImmutableService
.builder()
.addTags(s"system:${system.name}", "akka-management-port:1234")
.address("127.0.0.1")
.id("test")
.service("test")
.port(1235)
.build()
)
.node("testNode")
.address("localhost")
.build()
)

val lookupService = new ConsulSimpleServiceDiscovery(system)
val resolved = lookupService.lookup("test", 10 seconds).futureValue
resolved.addresses should contain(ResolvedTarget("127.0.0.1", Some(1234)))
}
}

override def afterAll(): Unit = {
super.afterAll()
consul.close()
}

override implicit lazy val system: ActorSystem = ActorSystem("test")

implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(50, Millis)))

}
39 changes: 39 additions & 0 deletions docs/src/main/paradox/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,45 @@ Demo:
[principle of least privilege](https://en.wikipedia.org/wiki/Principle_of_least_privilege).


##### akka-discovery-consul

If you are using Consul to do the service discovery this would allow you to base your Cluster on Consul services.

@@dependency[sbt,Gradle,Maven] {
group="com.lightbend.akka.discovery"
artifact="akka-discovery-consul_2.12"
version="$version$"
}

In your application conf add:
```
akka.discovery {
method = akka-consul
akka-consul {

#How to connect to Consul to fetch services data
consul-host = "127.0.0.1"
consul-port = 8500

# Prefix for consul tag with the name of the actor system / application name,
# services with this tag present will be found by the discovery mechanism
# i.e. `system:test` will be found in cluster if the cluster system is named `test`
application-name-tag-prefix = "system:"

# Prefix for tag containing port number where akka management is set up so that
# the seed nodes can be found, an example value for the tag would be `akka-management-port:19999`
application-akka-management-port-tag-prefix = "akka-management-port:"
}
}
```

Notes:

* Since tags in Consul services are simple strings, prefixes are necessary to ensure that proper values are read.

* If Akka management port tag is not found on service in Consul the implementation defaults to catalog service port.


## How to contribute implementations

Contributions to alternative data-stores or service-discovery APIs built-in to specific cloud systems
Expand Down
12 changes: 12 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ object Dependencies {
)
)

val DiscoveryConsul = Seq(
libraryDependencies ++=
DependencyGroups.AkkaActor ++
DependencyGroups.AkkaTesting ++
Seq(
//License: Apache 2.0
"com.orbitz.consul" % "consul-client" % "1.1.2",
//License: Apache 2.0
"com.pszymczyk.consul" % "embedded-consul" % "1.0.2" % "test"
)
)

val DiscoveryKubernetesApi = Seq(
libraryDependencies ++=
DependencyGroups.AkkaActor ++
Expand Down