Skip to content

Commit

Permalink
6928-Doc-MP-Reactive-Messaging-Stream Operators-3.0-1
Browse files Browse the repository at this point in the history
 6928-Doc-MP-Reactive-Messaging-Stream Operators-3.0-1

#6928
  • Loading branch information
ramkumar-k-9286 committed Feb 8, 2024
1 parent b3ac7cc commit be76e43
Showing 1 changed file with 304 additions and 0 deletions.
304 changes: 304 additions & 0 deletions modules/ROOT/pages/liberty-kafka-connector.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright (c) 2024 IBM Corporation and others.
// Licensed under Creative Commons Attribution-NoDerivatives
// 4.0 International (CC BY-ND 4.0)
// https://creativecommons.org/licenses/by-nd/4.0/
//
// Contributors:
// IBM Corporation
//
:page-layout: general-reference
:page-type: general
:page-description: The integration of MicroProfile Reactive Messaging with Apache Kafka in Open Liberty applications is a significant development in cloud-native microservice designs as it provides an efficient method of asynchronous communication.
:page-categories: MicroProfile Reactive Messaging
:seo-title: Integrating MicroProfile Reactive Messaging with Apache Kafka in Open Liberty Applications
:seo-description: The integration of MicroProfile Reactive Messaging with Apache Kafka in Open Liberty applications is a significant development in cloud-native microservice designs as it provides an efficient method of asynchronous communication.


= Integrating MicroProfile Reactive Messaging with Apache Kafka in Open Liberty Applications

The integration of MicroProfile Reactive Messaging with Apache Kafka in Open Liberty applications is a significant development in cloud-native microservice designs as it provides an efficient method of asynchronous communication.

MicroProfile Reactive Messaging offers a solution, that enables applications to handle data streams reactively across microservices, ensuring systems are scalable and resilient. Apache Kafka, a distributed event-streaming platform, enhances this ecosystem by providing a robust platform for high-throughput, fault-tolerant event streaming, essential for event-driven architectures.

By focussing to integrate these technologies through the `liberty-kafka` connector in Open Liberty, you get a comprehensive overview of setting up, configuring, and optimizing the `liberty-kafka` connector within the Open Liberty ecosystem, ensuring that you leverage Kafka's capabilities within a MicroProfile Reactive Messaging framework to build robust, efficient microservices.


== Liberty-kafka Connector

The `liberty-kafka` connector is a feature within Open Liberty that facilitates seamless integration with Apache Kafka, enabling applications to send and receive messages from a Apache Kafka broker.

It uses MicroProfile Reactive Messaging standards for robust, asynchronous communication in microservices architectures.

You can fine tune your application's interaction with kafka by configuring the connector in the `microprofile-config.properties` file, as shown in the following example.

----
mp.messaging.connector.liberty-kafka.bootstrap.servers=localhost:9082
----

You can define a channel to use this connector, directly linking a message channel to Kafka as shown in the following example.

----
mp.messaging.incoming.myChannel.connector=liberty-kafka
----

This allows for detailed control over messaging channels, including attributes like `bootstrap.servers` for connection setup and topic for message targeting, enhancing application scalability and resilience through efficient message handling.


== Configuration

To configure the `Liberty-kafka` connector effectively, follow these steps:

1. Configure Kafka Broker Connection

In the `microprofile-config.properties` file, specify the Kafka broker addresses to establish a connection.

----
mp.messaging.connector.liberty-kafka.bootstrap.servers=myKafkaBroker:9092
----
Indicating where your Kafka broker is hosted.

+
2. Define Channels for Messaging

* Create a channel for incoming messages, to specify the Kafka topic from which messages are received:
----
mp.messaging.incoming.myChannel.connector=liberty-kafka
mp.messaging.incoming.myChannel.topic=myTopicName
----


* Create a channel for outgoing messages, to specify the Kafka topic from which messages are sent:
----
mp.messaging.outgoing.myChannel.connector=liberty-kafka
mp.messaging.outgoing.myChannel.topic=myTopicName.
----

+
3. Include Kafka Client Libraries

When using the Kafka connector included in Open Liberty, you must include the the Kafka client API jar in your application or include it using a shared library.

If you are building your application with Maven, add the Kafka client dependency in your Maven `pom.xml` file.

[source,XML]
----
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
----


== Kafka Connector Configuration and Security

For configuring the Kafka connector and security in Open Liberty, focus on the distinction between channel-specific and connector-wide properties for tailored messaging behavior.

Connector-wide properties, like `bootstrap.servers`, apply globally, whereas channel-specific properties, such as `topic` or `group.id`, customize individual channel behavior.

For security, Open Liberty supports multiple authentication methods:

* Basic Authentication
* SASL_Plain
* SASL_SSL
* Mutual TLS

Configure these by setting the appropriate security properties in the `microprofile-config.properties` file, ensuring secure communication with Kafka brokers.

Examples include specifying security protocols and credentials, allowing for secure message exchanges across your microservices architecture.


== Sending and receiving messages between applications using connectors

To send and receive messages from other systems, reactive messaging uses connectors. Connectors can be attached to one end of a channel and are configured using MicroProfile Config.

Open Liberty includes the `liberty-kafka` connector for sending and receiving messages from an Apache Kafka broker.

The following example example of how to set up a microservice to consume messages from a Kafka topic using MicroProfile (MP) Reactive Messaging with a Kafka connector.

----
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.bootstrap.servers=kafkabrokerhost:9092
mp.messaging.incoming.foo.group.id=foo-reader
mp.messaging.incoming.foo.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.foo.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
----

The example shows you how to set up a microservice to consume messages from a Kafka topic using the MicroProfile Reactive Messaging framework with a Liberty Kafka connector.
It indicates the `liberty-kafka` connector type for the incoming channel `foo`. The `kafkabrokerhost:9092` Kafka broker address , the `foo-reader` consumer group ID , and the deserializers for both `key` and `value` are `org.apache.kafka.common.serialization.StringDeserializer`, indicating that both keys and values are expected to be strings.

This configuration is essential for consuming messages from the specified topic, thereby facilitating the building of reactive applications that can efficiently process data streams.


Similarly, the following example example of how to set up a microservice to to send messages to a Kafka broker.
----
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.bootstrap.servers=
mp.messaging.outgoing.bar.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.bar.value.serializer=org.apache.kafka.common.serialization.StringSerializer
----

The example shows how to how to redirect messages from a Java application using the MicroProfile Reactive Messaging framework to a Kafka broker.
It indicates the use of the `liberty-kafka` connector for managing the connection between the application and Kafka. The `bootstrap.servers` setting points to `kafkabrokerhost:9092`, the Kafka broker's network address, allowing the application to locate and send messages to the Kafka cluster. Both the `key` and `value` of messages are configured to use `StringSerializer`, indicating that the application will serialize both parts of the message as strings before sending them to Kafka.

This enables the application to offload messages to the Kafka topic `bar`, to distributed messaging, which can enhance scalability and flexibility in handling data flows.


== Connector Options and Channel Properties

The `Liberty-kafka` connector offers a range of properties to fine-tune its operation. You can set these properties on the `Liberty-kafka` connector to define certain behaviors during operation.

You can use all these options as attributes on either the connector or a channel that uses the `Liberty-kafka` connector. If you specify the option on both the channel and the connector, the channel takes precedence.


.Properties supported by the `liberty-kafka` connector for incoming channels
[cols="a,a,a",width="100%"]
|===
|Property Name |Default |Description

|topic
|
|The Kafka topic that the channel is to either send or receive messages from.

|unacked.limit
|Defaults to the value of max.poll.records if set, or to 500.
|The number outstanding unacknowledged messages.
{empty} +
If this limit is reached, the connector will stop retrieving records from Kafka until some messages have been acknowledged.

|fast.ack
|- MicroProfile Reactive Messaging 1.0 - `False`
{empty} +
- MicroProfile Reactive Messaging 3.0 - true
|Defines the acknowledge behavior of the `liberty-kafka` connector within the MicroProfile Reactive Messaging framework for `incoming` channels in relation to activities with the Kafka topic.
{empty} +
If the value of `fast.ack` is `false`, the acknowledgement is not reported as complete until the partition offset has been committed to the Kafka broker. If an error occurs during this process, then the acknowledgement is reported as failed.
{empty} +
If the value of `fast.ack` is `true`, and the acknowledgement is reported as complete as soon as the Kafka Connector receives the acknowledgement signal.

|context.service
|If the `concurrent-x.y` feature is enabled. the default context service is used.
{empty} +
If the concurrent feature is not enabled, the built in Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.
|Allows the setting of the Context Service used for Asynchronous tasks.
{empty} +
For the `context.service` option to take effect for the `liberty-kafka` connector, the concurrent feature must be enabled.

|
|Uses the Kafka Client default
|All other properties are passed directly as config parameters to the KafkaConsumer API. A list of required and optional properties can be found in the http://kafka.apache.org/documentation.html#consumerconfigs[Kafka documentation].

|===



.Properties supported by the `liberty-kafka` connector for outgoing channels
[cols="a,a,a",width="100%"]
|===
|Property Name |Default |Description

|topic
|
|The Kafka topic that the channel is to either send or receive messages from.


|context.service
|If the `concurrent-x.y` feature is enabled. the default context service is used.
{empty} +
If the concurrent feature is not enabled, the built in Liberty context service is used with a set list of context types to capture and apply around asynchronous tasks.
|Allows the setting of the Context Service used for Asynchronous tasks.
{empty} +
For the `context.service` option to take effect for the `liberty-kafka` connector, the concurrent feature must be enabled.

|
|Uses the Kafka Client default
|All other properties are passed directly as config parameters to the KafkaProducer API. A list of required and optional properties can be found in the http://kafka.apache.org/documentation.html#producerconfigs[Kafka documentation].

|===


=== fast.ack
Properties like `fast.ack` allow for control over message acknowledgment processes, enhancing message handling efficiency.

In the following example, the `fast.ack` property in the application's `microprofile-config.properties` file is set to `false` on the connector as the default for any channels in the application. However, for a specific incoming channel named `foo`,this is overridden to `true`. Thus opting for a faster acknowledgment strategy, potentially improving performance for messages received on this channel.

----
mp.messaging.connector.liberty-kafka.fast.ack=false
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.fast.ack=true
----

=== context.service

The `context.service` attribute specifies the Context Service for asynchronous operations, critical for performance tuning.

Context Services that are defined within the application itself cannot be used with the `liberty-kafka` connector.

In the following example, the `server.xml` file defines three different context services, each with a unique identifier (`rst`, `uvw`, and `xyz`).

server.xml
----
<contextService id=“rst”/>
<contextService id=“uvw”/>
<contextService id=“xyz”/>
----

This `microprofile-config.properties` file is part of the application's configuration and specifies how MicroProfile features should be used within the application.

microprofile-config.properties
----
mp.messaging.connector.liberty-kafka.context.service=rst
mp.messaging.incoming.def.connector=liberty-kafka
mp.messaging.incoming.foo.connector=liberty-kafka
mp.messaging.incoming.foo.context.service=uvw
mp.messaging.outgoing.bar.connector=liberty-kafka
mp.messaging.outgoing.bar.context.service=xyz
----

The property `mp.messaging.connector.liberty-kafka.context.service=rst` indicates that the Kafka connector that is used for handling messaging between services should use the `rst`` context service by default for its operations.

The application has three channels (`def`, `foo`, and `bar`), which are logical endpoints for incoming and outgoing messages. The configuration for these channels specifies which Kafka connector to use (`liberty-kafka`) and, for two of the channels (`foo` and `bar`), overrides the default context service with their own (`uvw` and `xyz`, respectively).

The `def` channel does not specify its own `context.service`, so it inherits the default one (`rst`) defined at the connector level.

By defining separate context services, the application can isolate certain operations or configurations, which can be particularly useful in complex applications or when integrating with external systems.

These configurations demonstrate the flexibility and control you have over message processing in Open Liberty applications.


== Troubleshooting

For troubleshooting the `Liberty-kafka`` connector, focus on resolving common issues such as connectivity with Kafka, managing multiple server instances, and assigning unique identifiers to producers and consumers.

Ensure proper configuration of `bootstrap.servers` for connectivity, utilize unique `group.id` for each consumer in different instances to avoid conflicts, and set distinct `client.id` for producers to prevent identifier overlap.

=== Multiple Server instances

If multiple instances of Open Liberty with the same application are started. For all incoming channels you must specify a unique `group.id` on the channel in each server instance, otherwise the server will reject any additional connections to a topic above the first connection.

=== Multiple Reactive Messaging Applications using the same Kafka server

If multiple applications that use a Kafka client are deployed to liberty and attempt to connect to the same Kafka server then errors might occur due to conflict identifiers used by both Kafka Producers and Consumers across the two applications.
This is due to how kafka generates the `client.id` for both. Consumers will generate identifiers based on their `group.id` or their `client.id`.

- For consumers, it is recommended to create unique `group.id` for each incoming channel.

- For producers, it is recommended to create unique `client.id` for each outgoing channel.

Specifying either attribute on the `liberty-kafka` Connector will not resolve the issue and is not a best practice.

These steps help in diagnosing and resolving typical challenges faced when integrating Kafka with Open Liberty, ensuring smooth operation of your microservices architecture.

For more information on Apache Kafka, see the https://kafka.apache.org/documentation.html#gettingStarted[Apache Kafka documentation].










0 comments on commit be76e43

Please sign in to comment.