FDB-PubSub is a publish subscribe layer for FoundationDB, built on top of Akka Streams and it provides Java and Scala API. It is inspired by Kafka.
Getting data from the database to publish subscribe system is surprisingly hard. It would be much simpler if developer could publish the event within the business transaction - and it's exactly what FDB-PubSub does, it supports publishing events within FoundationDB transaction.
- Support for publishing events and committing offsets within FoundationDB transaction
- Easily scalable and fault tolerant storage (thanks to FoundationDB)
- Easy integration with Apache Cassandra, Apache Kafka, Elasticsearch and more (thanks to Akka Streams and Alpakka)
- Exposed as a library, so if you already opearate FoundationDB there is no new stateful component to maintain
Concepts and assumptions are similar to Kafka. Here is brief overview of FDB-PubSub:
- User data can be published to topics
- Topics are divided into partitions
- Number of partitions is defined during topic creation
- User data consists of key (byte array) and value (byte array)
- Values with the same key will end up in the same partition (unless user specified different partition explicitly)
- Records are ordered within the partition
- Within topic each record has unique offset assigned. Offset is of type Versionstamp
- Each consumer belongs to a consumer group
- Records within a partition will be processed in order
- Records for consumers with the same consumer groups will be load balanced (e.g. given topic with 10 partitions if at first there is a single consumer
A
that process data from all partitions, and consumerB
joins, consumerB
will take over processing data from 5 partitions and consumerA
will continue processing data from the other 5 partitions) - Consumers keep track of its position by storing last processed offset from each partition (i.e. offset is being stored for every tuple
(topic, consumerGroup, partitionNumber)
)
- It's not yet production ready. If you'd like to use it in your project, please get it touch, I will be happy to help.
- To stream data from a partition consumers acquire locks (those locks are later used to atomically commit offset; outdated locks fail to commit an offset and underlying partition stream is stopped). Currently, those locks are being acquired rather aggressively by consumers that joined: consumer that held a lock is not aware about the fact that other consumer wants to join, and newly connected consumer simply acquires some of the locks that were held by others. As the result, when processing data using at least once delivery semantics, it causes messages to be processed more times that it would be necessary if locks were acquired gracefully. It will be addressed in future releases.
- No performance tests were performed as of now. Currently - with default settings - having up to 10 consumers and 1000 partitions per topic should be perfectly fine.
It's not well-tested on production. I'd recommend using Kafka or Pulsar instead.
FDB-PubSub provides both Java and Scala API. Java API is present in package com.github.pwliwanow.fdb.pubsub.javadsl
and Scala API is present in com.github.pwliwanow.fdb.pubsub.scaladsl
. Module example
contains small examples written in Java and in Scala.
To get started with FDB-PubSub add the following dependency with SBT:
val fdbPubSubVersion = "0.2.0"
libraryDependencies += "com.github.pwliwanow.fdb-pubsub" %% "pubsub" % fdbPubSubVersion
or Maven:
<dependency>
<groupId>com.github.pwliwanow.fdb-pubsub</groupId>
<artifactId>pubsub_2.12</artifactId>
<version>0.2.0</version>
</dependency>
To start you need to create a PubSubClient
, it's an immutable class that can be freely shared within the application. It allows user to create a topic, get producer and create a consumer. PubSubClient
requires a Subspace
that it will operate on and a Database
to be provided:
// Scala
import com.github.pwliwanow.fdb.pubsub.scaladsl.PubSubClient
val system = ActorSystem()
implicit ec = system.dispacher()
val db = FDB.selectAPIVersion(620).open(null, ec)
val pubSubSubspace = new Subspace(Tuple.from("PubSubExample"))
val pubSubClient = PubSubClient(pubSubSubspace, database)
// Java
import com.github.pwliwanow.fdb.pubsub.javadsl.PubSubClient;
ActorSystem system = ActorSystem.create();
ExecutionContextExecutor ec = system.dispatcher();
Database db = FDB.selectAPIVersion(600).open(null, ec);
Subspace pubSubSubspace = new Subspace(Tuple.from("PubSubExample"));
PubSubClient pubSubClient = PubSubClient.create(pubSubSubspace, db);
// Scala
val futureResult: Future[NotUsed] = pubSubClient.createTopic("testTopic", numberOfPartitions = 10)
// Java
int numberOfPartitions = 10;
CompletableFuture<NotUsed> futureResult = pubSubClient.createTopic("testTopic", numberOfPartitions, ec);
Producers from Java and Scala API differ in how they compose transactions. Java API takes additional TransactionContext
as a parameter and returns CompletableFuture<NotUsed>
, and Scala API returns DBIO[NotUsed]
(which is a type from foundationdb4s).
// Scala
val producer = pubSubClient.producer
val dbio =
producer.send(
"testTopic",
Tuple.from("ExampleKey").pack(),
Tuple.from("ExampleValue").pack())
val futureResult: Future[NotUsed] = dbio.transact(database);
// Java
Producer producer = pubSubClient.producer();
CompletableFuture<NotUsed> futureResult =
database.runAsync(tx ->
producer.send(
tx,
"testTopic",
Tuple.from("ExampleKey").pack(),
Tuple.from("ExampleValue").pack()));
Consumers are exposed as substreams, where each partition forms a separate stream (which is especially useful during committing offsets, when each stream may perform commit action independently). To create a consumer topic, consumerGroup, ConsumerSettings and Materializer need to be provided:
// Scala
implicit val mat = ActorMaterializer()
val defaultSettings = ConsumerSettings()
val consumer = pubSubClient.consumer("testTopic", "testConsumerGroup", defaultSettings)
// Java
Materializer mat = ActorMaterializer.create();
ConsumerSettings defaultSettings = ConsumerSettings.create();
SubSource<ConsumerRecord<KeyValue>, NotUsed> consumer =
pubSubClient.consumer("testTopic", "testConsumerGroup", defaultSettings, mat);
FDB-PubSub offers committableFlow
and committableSink
that should be used for committing offsets. It's guaranteed to be run exactly once for each ConsumerRecord
.
Optionally, user can add custom transactional logic.
// Scala
def transactionToCompose(record: ConsumerRecord[KeyValue]): DBIO[Unit] = {
// some implementation
}
val runnableGraph = consumer.to(Consumer.committableSink(database, transactionToCompose))
// Java
RunnableGraph<NotUsed> runnableGraph =
consumer.to(Consumer.committableSink(database, (tx, record) -> performTransaction(tx, record), ec));
CompletableFuture<Void> performTransaction(TransactionContext tx, ConsumerRecord<KeyValue> record) {
// some implementation
}
// Scala
runnableGraph.run()
// Java
runnableGraph.run(mat);
Depending on the use case, different processing semantics may become useful
Exactly once processing is only possible within FoundationDB by using committableFlow
or committableSink
, as shown in Committing offsets section.
To process data at least once, additional processing should be done before offset is committed. Depending on your use case it may be a good idea to commit offsets in batch:
// Scala
def updateElasticsearchInBatch(records: Seq[ConsumerRecord[Entity]]): Future[Unit] = {
// implementation here
}
consumer
.groupedWithin(1000, 5.seconds)
// at the end get only the last record to perform batch commit
.mapAsync(1)(records => updateElasticsearchInBatch(records).map(_ => records.last))
.via(Consumer.commitableFlow(database))
// Java
consumer
.groupedWithin(1000, Duration.of(5, ChronoUnit.SECONDS))
// at the end get only the last record to perform batch commit
.mapAsync(1, records -> updateElasticsearchInBatch(records).thenApply(() -> records.get(records.size() - 1)))
.via(Consumer.commitableFlow(database));
CompletionStage<Void> updateElasticsearchInBatch(List<ConsumerRecord<Entity>> records) {
// implementation here
}
To process data at most once, addtional processing should be done after offset is committed:
// Scala
def sendNotCriticalNotification(record: ConsumerRecord[Entity]): Future[Unit] = {
// implementation here
}
val parallelism = 10
consumer
.via(Consumer.commitableFlow(database))
.mapAsync(parallelism)(sendNotCriticalNotification)
.to(Sink.ignore)
.run()
// Java
int parallelism = 10;
consumer
.via(Consumer.commitableFlow(database))
.mapAsync(parallelism, this::sendNotCriticalNotification)
.to(Sink.ignore)
.run(mat);
CompletionStage<Void> sendNotCriticalNotification(ConsumerRecord<Entity> record) {
// implementation here
}
To enable cleaning data from topics three basic methods are provided: clear
, getOffset
and getPartitions
.
Those methods can be combined as follows:
// Scala
import cats.implicits._
val topic = "products"
val consumerGroups = List("cg1", "cg2")
val dbio = for {
partitions <- pubSubClient.getPartitions(topic).toDBIO
consumerGroupPartitions = partitions.flatMap(p => consumerGroups.map(c => (p, c)))
_ <- consumerGroupPartitions.parTraverse { case (p, c) =>
pubSubClient
.getOffset(topic, c, p)
.toDBIO
.flatMap(_.fold(DBIO.unit)(offset => pubSubClient.clear(topic, p, offset)))
}
} yield ()
dbio.transact(database)
// Java, for simplicity example below is blocking
String topic = "products";
List<String> consumerGroups = Arrays.asList("cg1", "cg2");
db.run((Transaction tr) -> {
List<Int> partitions = pubSubClient.getPartitions(tr, topic).join();
for (String c: consumerGroups)
for (int p: partitions) {
pubSubClient.getOffset(tr, topic, p, c).join().ifPresent(offset -> {
pubSubClient.clear(tr, topic, p, offset).join();
});
}
});