Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to consume/produce data in Kafka Docker Swarm Mode #376

Closed
TechDevCode opened this issue Aug 2, 2018 · 14 comments
Closed

Unable to consume/produce data in Kafka Docker Swarm Mode #376

TechDevCode opened this issue Aug 2, 2018 · 14 comments

Comments

@TechDevCode
Copy link

TechDevCode commented Aug 2, 2018

I am deploying kafka cluster on swarm docker cluster. I am running example at https://github.com/wurstmeister/kafka-docker/blob/master/docker-compose-swarm.yml with deploy mode as "global". But in Kafka config values I get null advertized hostname as below:

[2018-08-02 09:13:26,029] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = INSIDE://:9092,OUTSIDE://quad.cmp.com:9094
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = -1
broker.id.generation.enable = true
broker.rack = null
client.quota.callback.class = null
compression.type = producer

Due to this my other services are unable to produce data and consume data from these kafka brokers.
In my kafka consumer & producer settings I give brokers ip addresses as : hostname_of_machine1:9092, hostname_of_machine2:9092 for reading from and producing to these kafka brokers.

What is the issue?

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 2, 2018

You've specified advertised.listeners so advertised.host.name will be ignored (http://kafka.apache.org/documentation/#brokerconfigs). Consumer and producers should connect to the public port (9094). 9092 is the port used for inter-broker communication.

@sscaling sscaling closed this as completed Aug 2, 2018
@TechDevCode
Copy link
Author

TechDevCode commented Aug 2, 2018

@sscaling Do I have to give brokers as "OUTSIDE://quad.cmp.com:9094" OR only "quad.cmp.com:9094" to consumers/producers services? Because I am unable to produce/consume messages when I pass "quad.cmp.com:9094" to producers/consumers services. Also do I need to publish 9094 port in producer/consumers services?

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 2, 2018

brokers should connect via host:port (OUTSIDE/INSIDE scheme is completely arbitrary text that is only known to Kafka because of the listeners and advertised.listeners configuration).

I'm not sure about your usage of 'global' the example file already contains the ports in host networking mode which implies that only one service can be run on each node.

Also, if you are running multiple brokers each broker should have a unique OUTSIDE host name otherwise the broker will advertise the same address for each partition's metadata.

@TechDevCode
Copy link
Author

In link : https://hub.docker.com/r/wurstmeister/kafka/ it is mentioned for swarm mode use "global" deployment. But now again I tried without global mode, still I am not able to connect my consumer/producer services to kafka brokers. Also each Kafka broker has unique OUTSIDE hostname.

Is there any need to publish 9094 port & set host mode in my producer/consumer services too?

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 2, 2018

(just as a note, that documentation doesn't get updated anymore - there is a ticket open to manually update it to link to the up-to-date readme on github)

Is there any need to publish 9094 port & set host mode in my producer/consumer services too?

no

Looking at the docker swarm documentation I guess it depends what you want to do. If you want to run on all nodes then global is fine but the cluster will be scaled with the number of swarm nodes (for me the number of kafka brokers and swarm nodes should be independent).

Refs:

Try some basic networking debugging - can each of the nodes resolve the other node by the name supplied? assuming you have something like kafka.host1.com and kafka.host2.com

  • kafka.host1.com can talk to kafka.host2.com port 9094 (and vica versa)
  • consumer/producers can talk to the various hosts on port 9094

If you post the log output of the producer/consumer and brokers when trying to send/receive messages I can take a quick look.

@TechDevCode
Copy link
Author

TechDevCode commented Aug 3, 2018

Hi,

Attached is the log file of kafka broker.

kafka_logs.txt

I tried pinging from consumer and producer services to kafka brokers, but got following error:

ping slth.test.com
ping: unknown host

But I am able to ping by ip address of host machine running kafka broker from within consumer/producer services

Also I checked /etc/hosts file on host machine for node name mapping, it contains following details :

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
194.160.111.46  slth.test.com  slth
194.160.111.45  slth2.test.com  slth2

What might be the issue?

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 3, 2018

You didn't include the producer/consumer logs (only the broker) so there's only one half of the communication path to look at. That looks OK (other than the warning about the topic name containing an underscore).

If consumer and producer cannot resolve the broker address then there is no way for them to connect. What producer / consumer are you using? How are they run? inside the docker network or from your host machine?

@TechDevCode
Copy link
Author

  1. Producers and Consumers are run inside docker network as docker service[containers]
  2. We are consuming messages using Kafka Consumer in Apache Ignite Framework following is the error log for same :
[09:17:15]    __________  ________________
[09:17:15]   /  _/ ___/ |/ /  _/_  __/ __/
[09:17:15]  _/ // (7 7    // /  / / / _/
[09:17:15] /___/\___/_/|_/___/ /_/ /___/
[09:17:15]
[09:17:15] ver. 1.9.0#20170302-sha1:a8169d0a
[09:17:15] 2017 Copyright(C) Apache Software Foundation
[09:17:15]
[09:17:15] Ignite documentation: http://ignite.apache.org
[09:17:15]
[09:17:15] Quiet mode.
[09:17:15]   ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat}
[09:17:15]
[09:17:15] OS: Linux 3.10.0-229.14.1.el7.x86_64 amd64
[09:17:15] VM information: OpenJDK Runtime Environment 1.8.0_111-8u111-b14-2~bpo8+1-b14 Oracle Corporation OpenJDK 64-Bit Server VM 25.111-b14
[09:17:17] Configured plugins:
[09:17:17]   ^-- None
[09:17:17]
[09:17:25] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
[09:17:25] Security status [authentication=off, tls/ssl=off]
[09:17:28] Performance suggestions for grid  (fix if possible)
[09:17:28] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true
[09:17:28]   ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options)
[09:17:28]   ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
[09:17:28]   ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
[09:17:28]   ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options)
[09:17:28] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning
[09:17:28]
[09:17:28] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}
[09:17:28]
[09:17:28] Ignite node started OK (id=f239895c)
[09:17:28] Topology snapshot [ver=1, servers=1, clients=0, CPUs=56, heap=27.0GB]
[09:17:48] Topology snapshot [ver=2, servers=2, clients=0, CPUs=112, heap=31.0GB]
Executing Service....Topic Name is : check_new
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:rsrc:slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:rsrc:slf4j-log4j12-1.7.21_2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Aug 03, 2018 9:18:33 AM org.apache.ignite.logger.java.JavaLogger error
SEVERE: Service execution stopped with error [name=TestService, execId=55627ec3-6b78-49cd-afe1-e768a40cd97f]
kafka.common.ConsumerRebalanceFailedException: IgniteKafkagroup_1_ignite-1533287898720-b8d65d64 can't rebalance after 4 retries
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:977)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:264)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
        at org.apache.ignite.stream.kafka.KafkaStreamer.start(KafkaStreamer.java:135)
        at tcs.poc.surveillance.SurveillanceAlert.kafkaConsumerDataExtraction(SurveillanceAlert.java:433)
        at tcs.poc.surveillance.SurveillanceAlert.readTradeStreamData(SurveillanceAlert.java:284)
        at tcs.poc.surveillance.SurveillanceAlert.execute(SurveillanceAlert.java:131)
        at org.apache.ignite.internal.processors.service.GridServiceProcessor$3.run(GridServiceProcessor.java:1140)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

[2018-08-03 09:18:41,084] ERROR [IgniteKafkagroup_1_ignite-1533287898720-b8d65d64], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector:103)
kafka.common.ConsumerRebalanceFailedException: IgniteKafkagroup_1_ignite-1533287898720-b8d65d64 can't rebalance after 4 retries
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$2.run(ZookeeperConsumerConnector.scala:589)
  1. Producer is simple Java based program reading from file and producing to kafka brokers. On Producers Side no errors are shown but messages are also not produced in Kafka brokers
  2. Is there any other path or parameter we have to set host name mapping as we do in /etc/hosts?, As consumer/producer are unable resolve kafka broker host name.

@sscaling
Copy link
Collaborator

sscaling commented Aug 3, 2018

The Exception looks to be in ZookeeperConsumerConnector. I know nothing of Ignite, but zookeeper consumers are the old, deprecated way of connecting to kafka.

Producer is simple Java based program reading from file and producing to kafka brokers. On Producers Side no errors are shown but messages are also not produced in Kafka brokers

I would suggest using basic consumer to validate that kafka is configured correctly without having to configure a full distributed system such as ignite. Once that is working, then try move onto a more complex system. I'm not sure how you determine that messages are not produced in Kafka if you can't read from it.

If clients are inside the docker network, I'm not sure they will have any knowledge of the hosts /etc/hosts file - the docker cli normally uses --add-hosts, looks like docker-compose uses https://docs.docker.com/compose/compose-file/#extra_hosts. Have a look at #377 to see if using IP helps with the routing issues.

@TechDevCode
Copy link
Author

Hi @sscaling

I am checking messages produced in topic using kafka manager service "sheepkiller/kafka-manager" within docker.

I checked by adding "host-machine-name:ipaddress" mapping using "extra_hosts" parameter in docker-compose. But still consumer and producer containers are unable to resolve my host-machine-name[Unable to ping Kafka broker by host-machine-name either from producer/consumer].

In docker configuration what does command HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2" do? It gets host-machine-name of host on which docker is running is that right? and same host-machine-name list I have to provide in "extra_hosts" mapping with actual ip-addresses of hosts?

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 4, 2018

If you want to see what docker info | grep ^Name: | cut -d' ' -f 2 does, just run it on the node. It should be the same as docker info -f '{{ .Name }}'. The purpose, looking at it, would be to get the hostname of where the docker daemon is running.

If this does not have a resolvable name from the other nodes (e.g. statically / via DNS) then it won't be routable from other nodes without addressing this issue. Also, as your producers/consumers run within docker, the address will need to be resolvable from inside the containers. From what you've described, it seems like you have a fundamental Swarm / networking setup. I'd recommend posting on the Docker forums for more help if you haven't found the solution by now.

Also, make sure the firewall is disabled when testing this to rule that out as the problem. (Enable again after any testing)

@TechDevCode
Copy link
Author

TechDevCode commented Aug 8, 2018

Hi @sscaling ,

Is it posssible to use KAFKA_ADVERTISED_HOST_NAME INSTEAD of listeners?

and use it as follows? for launching one Kafka broker per host machine in swarm mode?

HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
KAFKA_ADVERTISED_HOST_NAME: _{HOSTNAME_COMMAND}

with these settings it gives error :

java.io.IOException: Can't resolve address: slth.test.com:9092
        at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
        at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
        at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
        at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
        ... 7 more

Thanks,
Rishikesh

@sscaling
Copy link
Collaborator

sscaling commented Aug 9, 2018

Is it posssible to use KAFKA_ADVERTISED_HOST_NAME INSTEAD of listeners?

Yes (https://kafka.apache.org/documentation/#brokerconfigs) it's deprecated, but still supported - even in Kafka 2.0.

and use it as follows?

It depends on your host and Swarm networking and security configuration.

java.io.IOException: Can't resolve address: slth.test.com:9092

Fundamentally means that that component can't talk to the location you've specified. So you need to address that first - As far as I am aware this is not a kafka-docker image issue, this is a system configuration issue.

@rohammosalli
Copy link

you can change this line bash HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
To

HOSTNAME_COMMAND: "docker info | grep 'Node Address:' | cut -d' ' -f 4"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants