-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consul based service discovery #202
Changes from 8 commits
e4f77cd
fd37b93
552d9a2
dc4542b
370691b
4aa8a2a
7f7702b
9a17b9e
ba29496
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
How to set up Consul based discovery for Akka Cluster | ||
===================================================== | ||
|
||
Step 1: Register cluster instances in Consul | ||
-------------------------------------------- | ||
Imagine that your app consists of 4 nodes, registered in consul as the following services: | ||
1. `service-a-api` on node `A` with port `1234` | ||
1. `service-a-api` on node `B` with port `1235` | ||
1. `service-a-domain` on node `C` with port `1236` | ||
1. `service-a-domain` on node `B` with port `1237` | ||
|
||
Step 2: Expose Akka Management port in those services | ||
----------------------------------------------------- | ||
When Akka Management is started register its binding port as a tag in those services in the following way: | ||
`akka-management-port:19999` | ||
|
||
|
||
Step 3: Register actor system name in consul for services in cluster | ||
-------------------------------------------------------------------- | ||
Add the following tag to Consul entries for the services | ||
`system:cluster-a` | ||
|
||
The registered services in consul might look like this now: | ||
1. `service-a-api` on node `A` with port `1234` tags: `akka-management-port:19999`, `system:cluster-a` | ||
1. `service-a-api` on node `B` with port `1235` tags: `akka-management-port:20012`, `system:cluster-a` | ||
1. `service-a-domain` on node `C` with port `1236` tags: `akka-management-port:35012`, `system:cluster-a` | ||
1. `service-a-domain` on node `B` with port `1237` tags: `akka-management-port:24678`, `system:cluster-a` | ||
|
||
The important part is that the tag with system name `system:cluster-a` has to be the same for all nodes in cluster and every node should serve Akka Management endpoints under port defined in `akka-management-port:...`. | ||
|
||
|
||
Step 4: Start Cluster Bootstrap | ||
------------------------------- | ||
After a moment the cluster should be set up and ready. | ||
|
||
This approach does not require creating locks in Consul. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
###################################################### | ||
# Akka Service Discovery Consul Config # | ||
###################################################### | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick about the # position ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
akka.discovery { | ||
|
||
# Set the following in your application.conf if you want to use this discovery mechanism: | ||
# impl = akka-consul | ||
|
||
# configure the akka-consul provider | ||
akka-consul { | ||
class = akka.discovery.consul.ConsulSimpleServiceDiscovery | ||
|
||
consul-host = "127.0.0.1" | ||
consul-port = 8500 | ||
|
||
# Prefix for consul tag with the name of the actor system / application name, | ||
# services with this tag present will be found by the discovery mechanism | ||
# i.e. `system:test` will be found in cluster if the cluster system is named `test` | ||
application-name-tag-prefix = "system:" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a small comment above each of the keys what it's about? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So here we would have to explain that we expect values to be present under "system:name-of-my-actor-system" right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
||
# Prefix for tag containing port number where akka management is set up so that | ||
# the seed nodes can be found, an example value for the tag would be `akka-management-port:19999` | ||
application-akka-management-port-tag-prefix = "akka-management-port:" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.discovery.consul | ||
|
||
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } | ||
|
||
final class ConsulSettings(system: ExtendedActorSystem) extends Extension { | ||
private val consulConfig = system.settings.config.getConfig("akka.discovery.akka-consul") | ||
|
||
val consulHost: String = consulConfig.getString("consul-host") | ||
|
||
val consulPort: Int = consulConfig.getInt("consul-port") | ||
|
||
val applicationNameTagPrefix: String = consulConfig.getString("application-name-tag-prefix") | ||
val applicationAkkaManagementPortTagPrefix: String = | ||
consulConfig.getString("application-akka-management-port-tag-prefix") | ||
} | ||
|
||
object ConsulSettings extends ExtensionId[ConsulSettings] with ExtensionIdProvider { | ||
override def get(system: ActorSystem): ConsulSettings = super.get(system) | ||
|
||
override def lookup: ConsulSettings.type = ConsulSettings | ||
|
||
override def createExtension(system: ExtendedActorSystem): ConsulSettings = new ConsulSettings(system) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.discovery.consul | ||
|
||
import java.util | ||
import java.util.concurrent.TimeoutException | ||
|
||
import akka.actor.ActorSystem | ||
import akka.discovery.SimpleServiceDiscovery | ||
|
||
import scala.collection.immutable.Seq | ||
import scala.concurrent.{ ExecutionContext, Future, Promise } | ||
import akka.pattern.after | ||
import com.google.common.net.HostAndPort | ||
import com.orbitz.consul.Consul | ||
import com.orbitz.consul.async.ConsulResponseCallback | ||
import com.orbitz.consul.model.ConsulResponse | ||
import ConsulSimpleServiceDiscovery._ | ||
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget } | ||
import com.orbitz.consul.model.catalog.CatalogService | ||
import com.orbitz.consul.option.QueryOptions | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.concurrent.duration.FiniteDuration | ||
import scala.util.Try | ||
|
||
class ConsulSimpleServiceDiscovery(system: ActorSystem) extends SimpleServiceDiscovery { | ||
|
||
private val settings = ConsulSettings.get(system) | ||
private val consul = | ||
Consul.builder().withHostAndPort(HostAndPort.fromParts(settings.consulHost, settings.consulPort)).build() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do they have to be lazy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope, |
||
|
||
override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = { | ||
implicit val ec: ExecutionContext = system.dispatcher | ||
Future.firstCompletedOf( | ||
Seq( | ||
after(resolveTimeout, using = system.scheduler)( | ||
Future.failed(new TimeoutException(s"Lookup for [${name}] timed-out, within [${resolveTimeout}]!")) | ||
), | ||
lookupInConsul(name) | ||
) | ||
) | ||
} | ||
|
||
private def lookupInConsul(name: String)(implicit executionContext: ExecutionContext): Future[Resolved] = { | ||
val consulResult = for { | ||
servicesWithTags <- getServicesWithTags | ||
serviceIds = servicesWithTags.getResponse | ||
.entrySet() | ||
.asScala | ||
.filter(e => e.getValue.contains(settings.applicationNameTagPrefix + name)) | ||
.map(_.getKey) | ||
catalogServices <- Future.sequence(serviceIds.map(id => getService(id).map(_.getResponse.asScala.toList))) | ||
resolvedTargets = catalogServices.flatten.toSeq.map( | ||
catalogService => extractResolvedTargetFromCatalogService(catalogService)) | ||
} yield resolvedTargets | ||
consulResult.map(targets => Resolved(name, scala.collection.immutable.Seq(targets: _*))) | ||
} | ||
|
||
private def extractResolvedTargetFromCatalogService(catalogService: CatalogService) = { | ||
val port = catalogService.getServiceTags.asScala | ||
.find(_.startsWith(settings.applicationAkkaManagementPortTagPrefix)) | ||
.map(_.replace(settings.applicationAkkaManagementPortTagPrefix, "")) | ||
.flatMap { maybePort => | ||
Try(maybePort.toInt).toOption | ||
} | ||
ResolvedTarget(catalogService.getServiceAddress, Some(port.getOrElse(catalogService.getServicePort))) | ||
} | ||
|
||
private def getServicesWithTags: Future[ConsulResponse[util.Map[String, util.List[String]]]] = { | ||
((callback: ConsulResponseCallback[util.Map[String, util.List[String]]]) => | ||
consul.catalogClient().getServices(callback)).asFuture | ||
} | ||
|
||
private def getService(name: String) = | ||
((callback: ConsulResponseCallback[util.List[CatalogService]]) => | ||
consul.catalogClient().getService(name, QueryOptions.BLANK, callback)).asFuture | ||
|
||
} | ||
|
||
object ConsulSimpleServiceDiscovery { | ||
|
||
implicit class ConsulResponseFutureDecorator[T](f: ConsulResponseCallback[T] => Unit) { | ||
def asFuture: Future[ConsulResponse[T]] = { | ||
val callback = new ConsulResponseFutureCallback[T] | ||
Try(f(callback)).recover { | ||
case ex: Throwable => callback.fail(ex) | ||
} | ||
callback.future | ||
} | ||
} | ||
|
||
final case class ConsulResponseFutureCallback[T]() extends ConsulResponseCallback[T] { | ||
|
||
private val promise = Promise[ConsulResponse[T]] | ||
|
||
def fail(exception: Throwable) = promise.failure(exception) | ||
|
||
def future: Future[ConsulResponse[T]] = promise.future | ||
|
||
override def onComplete(consulResponse: ConsulResponse[T]): Unit = { | ||
promise.success(consulResponse) | ||
} | ||
|
||
override def onFailure(throwable: Throwable): Unit = { | ||
promise.failure(throwable) | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> | ||
*/ | ||
package akka.cluster.bootstrap.discovery | ||
|
||
import akka.actor.ActorSystem | ||
import akka.discovery.SimpleServiceDiscovery.ResolvedTarget | ||
import akka.discovery.consul.ConsulSimpleServiceDiscovery | ||
import akka.testkit.TestKitBase | ||
import com.google.common.net.HostAndPort | ||
import com.orbitz.consul.Consul | ||
import com.orbitz.consul.model.catalog.{ CatalogRegistration, ImmutableCatalogRegistration } | ||
import com.orbitz.consul.model.health.{ ImmutableService, Service } | ||
import com.pszymczyk.consul.{ ConsulProcess, ConsulStarterBuilder } | ||
import org.scalatest.concurrent.ScalaFutures | ||
import org.scalatest.time.{ Millis, Seconds, Span } | ||
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } | ||
|
||
import scala.concurrent.duration._ | ||
|
||
class ConsulDiscoverySpec | ||
extends WordSpecLike | ||
with Matchers | ||
with BeforeAndAfterAll | ||
with TestKitBase | ||
with ScalaFutures { | ||
|
||
private val consul: ConsulProcess = ConsulStarterBuilder.consulStarter().withHttpPort(8500).build().start() | ||
|
||
"Consul Discovery" should { | ||
"work for defaults" in { | ||
val consulAgent = | ||
Consul.builder().withHostAndPort(HostAndPort.fromParts(consul.getAddress, consul.getHttpPort)).build() | ||
consulAgent | ||
.catalogClient() | ||
.register( | ||
ImmutableCatalogRegistration | ||
.builder() | ||
.service( | ||
ImmutableService | ||
.builder() | ||
.addTags(s"system:${system.name}", "akka-management-port:1234") | ||
.address("127.0.0.1") | ||
.id("test") | ||
.service("test") | ||
.port(1235) | ||
.build() | ||
) | ||
.node("testNode") | ||
.address("localhost") | ||
.build() | ||
) | ||
|
||
val lookupService = new ConsulSimpleServiceDiscovery(system) | ||
val resolved = lookupService.lookup("test", 10 seconds).futureValue | ||
resolved.addresses should contain(ResolvedTarget("127.0.0.1", Some(1234))) | ||
} | ||
} | ||
|
||
override def afterAll(): Unit = { | ||
super.afterAll() | ||
consul.close() | ||
} | ||
|
||
override implicit lazy val system: ActorSystem = ActorSystem("test") | ||
|
||
implicit override val patienceConfig: PatienceConfig = | ||
PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(50, Millis))) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,18 @@ object Dependencies { | |
) | ||
) | ||
|
||
val DiscoveryConsul = Seq( | ||
libraryDependencies ++= | ||
DependencyGroups.AkkaActor ++ | ||
DependencyGroups.AkkaTesting ++ | ||
Seq( | ||
//License: Apache 2.0 | ||
"com.orbitz.consul" % "consul-client" % "1.1.2", | ||
//License: Apache 2.0 | ||
"com.pszymczyk.consul" % "embedded-consul" % "1.0.2" % "test" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment with the license next to each of those lines, we have to make sure they are apache compatible (i.e. no GPL code) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
) | ||
|
||
val DiscoveryKubernetesApi = Seq( | ||
libraryDependencies ++= | ||
DependencyGroups.AkkaActor ++ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs in general are under the docs project :-)
https://github.com/akka/akka-management/blob/master/docs/src/main/paradox/discovery.md
Here no one would see the documentation after all :-)
Would you want to give adjusting it a try and putting in that spot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I knew something was not right ... will add it there