Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul based service discovery #202

Merged
merged 9 commits into from
Jun 10, 2018
2 changes: 1 addition & 1 deletion bootstrap-joining-demo/aws-api-ec2/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
enablePlugins(JavaAppPackaging)

packageName in Universal := "app" // should produce app.zip
com.typesafe.sbt.SbtNativePackager.autoImport.packageName in Universal := "app" // should produce app.zip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My sbt was complaining about, but it's probably a local issue.


libraryDependencies += "com.amazonaws" % "aws-java-sdk-cloudformation" % "1.11.271" % IntegrationTest

Expand Down
14 changes: 13 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ lazy val `akka-management-root` = project
`akka-discovery-marathon-api`,
`akka-discovery-aws-api`,
`akka-discovery-aws-api-async`,
`akka-discovery-consul`,
`akka-management`,
`bootstrap-joining-demo-aws-api-ec2-tag-based`,
`bootstrap-joining-demo-aws-api-ecs`,
Expand Down Expand Up @@ -91,6 +92,17 @@ lazy val `akka-discovery-aws-api-async` = project
)
.dependsOn(`akka-discovery`)

lazy val `akka-discovery-consul` = project
.in(file("discovery-consul"))
.enablePlugins(AutomateHeaderPlugin)
.settings(unidocSettings)
.settings(
name := "akka-discovery-consul",
organization := "com.lightbend.akka.discovery",
Dependencies.DiscoveryConsul
)
.dependsOn(`akka-discovery`)

// gathers all enabled routes and serves them (HTTP or otherwise)
lazy val `akka-management` = project
.in(file("management"))
Expand Down Expand Up @@ -200,7 +212,7 @@ lazy val `bootstrap-joining-demo-aws-api-ecs` = project
.enablePlugins(JavaAppPackaging, AshScriptPlugin, DockerPlugin)
.settings(
dockerBaseImage := "openjdk:10-jre-slim",
packageName in Docker := "ecs-bootstrap-demo-app",
com.typesafe.sbt.SbtNativePackager.autoImport.packageName in Docker := "ecs-bootstrap-demo-app",
version in Docker := "1.0"
)

Expand Down
20 changes: 20 additions & 0 deletions discovery-consul/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
######################################################
# Akka Service Discovery Consul Config #
######################################################
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick about the # position ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


akka.discovery {

# Set the following in your application.conf if you want to use this discovery mechanism:
# impl = akka-consul

# configure the akka-consul provider
akka-consul {
class = akka.discovery.consul.ConsulSimpleServiceDiscovery

consul-host = "127.0.0.1"
consul-port = 8500

application-name-tag-prefix = "system:"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a small comment above each of the keys what it's about?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we would have to explain that we expect values to be present under "system:name-of-my-actor-system" right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

application-akka-management-port-tag-prefix = "akka-management-port:"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.discovery.consul

import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }

final class ConsulSettings(system: ExtendedActorSystem) extends Extension {
private val consulConfig = system.settings.config.getConfig("akka.discovery.akka-consul")

val consulHost: String = consulConfig.getString("consul-host")

val consulPort: Int = consulConfig.getInt("consul-port")

val applicationNameTagPrefix: String = consulConfig.getString("application-name-tag-prefix")
val applicationAkkaManagementPortTagPrefix: String =
consulConfig.getString("application-akka-management-port-tag-prefix")
}

object ConsulSettings extends ExtensionId[ConsulSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ConsulSettings = super.get(system)

override def lookup: ConsulSettings.type = ConsulSettings

override def createExtension(system: ExtendedActorSystem): ConsulSettings = new ConsulSettings(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.discovery.consul

import java.util
import java.util.concurrent.TimeoutException

import akka.actor.ActorSystem
import akka.discovery.SimpleServiceDiscovery

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.pattern.after
import com.google.common.net.HostAndPort
import com.orbitz.consul.Consul
import com.orbitz.consul.async.ConsulResponseCallback
import com.orbitz.consul.model.ConsulResponse
import ConsulSimpleServiceDiscovery._
import akka.discovery.SimpleServiceDiscovery.{ Resolved, ResolvedTarget }
import com.orbitz.consul.model.catalog.CatalogService
import com.orbitz.consul.option.QueryOptions

import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

class ConsulSimpleServiceDiscovery(system: ActorSystem) extends SimpleServiceDiscovery {

private lazy val settings = ConsulSettings.get(system)
private lazy val consul =
Consul.builder().withHostAndPort(HostAndPort.fromParts(settings.consulHost, settings.consulPort)).build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do they have to be lazy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope,


override def lookup(name: String, resolveTimeout: FiniteDuration): Future[SimpleServiceDiscovery.Resolved] = {
import system.dispatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we try to use implicit val ec = system.dispatchet everywhere for safety -- the import if executed in a future technically could reach a null (rarely, and only during system shutdown etc etc, but we try to use the val anyway)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Future.firstCompletedOf(
Seq(
after(resolveTimeout, using = system.scheduler)(
Future.failed(new TimeoutException("Future timed out!"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refine the message a bit: Lookup for [${name}] timed-out, within [${resolveTimeout}]!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, fixed

),
lookupInConsul(name)
)
)
}

private def lookupInConsul(name: String)(implicit executionContext: ExecutionContext): Future[Resolved] = {
val consulResult = for {
servicesWithTags <- getServicesWithTags
serviceIds = servicesWithTags.getResponse
.entrySet()
.asScala
.filter(e => e.getValue.contains(settings.applicationNameTagPrefix + name))
.map(_.getKey)
catalogServices <- Future.sequence(serviceIds.map(id => getService(id).map(_.getResponse.asScala.toList)))
resolvedTargets = catalogServices.flatten.toSeq.map(
catalogService => extractResolvedTargetFromCatalogService(catalogService))
} yield resolvedTargets
consulResult.map(targets => Resolved(name, scala.collection.immutable.Seq(targets: _*)))
}

private def extractResolvedTargetFromCatalogService(catalogService: CatalogService) = {
val port = catalogService.getServiceTags.asScala
.find(_.startsWith(settings.applicationAkkaManagementPortTagPrefix))
.map(_.replace(settings.applicationAkkaManagementPortTagPrefix, ""))
.flatMap { maybePort =>
Try(maybePort.toInt).toOption
}
ResolvedTarget(catalogService.getServiceAddress, Some(port.getOrElse(catalogService.getServicePort)))
}

private val getServicesWithTags = ((callback: ConsulResponseCallback[util.Map[String, util.List[String]]]) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be a normal method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

consul.catalogClient().getServices(callback)).asFuture

private def getService(name: String) =
((callback: ConsulResponseCallback[util.List[CatalogService]]) =>
consul.catalogClient().getService(name, QueryOptions.BLANK, callback)).asFuture

}

object ConsulSimpleServiceDiscovery {

implicit class ConsulResponseFutureDecorator[T](f: ConsulResponseCallback[T] => Unit) {
def asFuture: Future[ConsulResponse[T]] = {
val callback = new ConsulResponseFutureCallback[T]
f(callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could f throw? Let's make sure that then the Future is failed rather than the asFuture throwing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did what you asked for, but please take another look

callback.future
}
}

case class ConsulResponseFutureCallback[T]() extends ConsulResponseCallback[T] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final case class please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private val promise = Promise[ConsulResponse[T]]

def future: Future[ConsulResponse[T]] = promise.future

override def onComplete(consulResponse: ConsulResponse[T]): Unit = {
promise.success(consulResponse)
}

override def onFailure(throwable: Throwable): Unit = {
promise.failure(throwable)
}
}

}
3 changes: 3 additions & 0 deletions discovery-consul/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
akka {

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailing \n is needed in reference confs to avoid issues in general, but here perhaps we can just remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the file I mean)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.bootstrap.discovery

import akka.actor.ActorSystem
import akka.discovery.SimpleServiceDiscovery.ResolvedTarget
import akka.discovery.consul.ConsulSimpleServiceDiscovery
import akka.testkit.TestKitBase
import com.google.common.net.HostAndPort
import com.orbitz.consul.Consul
import com.orbitz.consul.model.catalog.{ CatalogRegistration, ImmutableCatalogRegistration }
import com.orbitz.consul.model.health.{ ImmutableService, Service }
import com.pszymczyk.consul.{ ConsulProcess, ConsulStarterBuilder }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }

import scala.concurrent.duration._

class ConsulDiscoverySpec
extends WordSpecLike
with Matchers
with BeforeAndAfterAll
with TestKitBase
with ScalaFutures {

private var consul: ConsulProcess = null

"Consul Discovery" should {
"work for defaults" in {
val consulAgent =
Consul.builder().withHostAndPort(HostAndPort.fromParts(consul.getAddress, consul.getHttpPort)).build()
consulAgent
.catalogClient()
.register(
ImmutableCatalogRegistration
.builder()
.service(
ImmutableService
.builder()
.addTags(s"system:${system.name}", "akka-management-port:1234")
.address("127.0.0.1")
.id("test")
.service("test")
.port(1235)
.build()
)
.node("testNode")
.address("localhost")
.build()
)

val lookupService = new ConsulSimpleServiceDiscovery(system)
val resolved = lookupService.lookup("test", 10 seconds).futureValue
resolved.addresses should contain(ResolvedTarget("127.0.0.1", Some(1234)))
}
}

override def beforeAll(): Unit = {
super.beforeAll()
consul = ConsulStarterBuilder.consulStarter().withHttpPort(8500).build().start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems there's no reason to keep it a var, can we make it a val and innit in constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move the beforeAll above the tests (right next to the var)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works without var


override def afterAll(): Unit = {
super.afterAll()
consul.close()
}

override implicit lazy val system: ActorSystem = ActorSystem("test")

implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(50, Millis)))

}
10 changes: 10 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ object Dependencies {
)
)

val DiscoveryConsul = Seq(
libraryDependencies ++=
DependencyGroups.AkkaActor ++
DependencyGroups.AkkaTesting ++
Seq(
"com.orbitz.consul" % "consul-client" % "1.1.2",
"com.pszymczyk.consul" % "embedded-consul" % "1.0.2" % "test"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment with the license next to each of those lines, we have to make sure they are apache compatible (i.e. no GPL code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

)

val DiscoveryKubernetesApi = Seq(
libraryDependencies ++=
DependencyGroups.AkkaActor ++
Expand Down