Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot receive data with Kafka #54

Closed
markuslamm opened this issue Jul 5, 2016 · 9 comments
Closed

Cannot receive data with Kafka #54

markuslamm opened this issue Jul 5, 2016 · 9 comments

Comments

@markuslamm
Copy link

Hi, I'm using this container, great work btw, to receive JSON data from Kafka. For that, I installed the kafka-input-plugin as mentioned in the docs, and the plugin is registered as I can see with logstash-plugin list.

My config file looks like this:

input {
    kafka {
        topic_id => 'collectortopic'
        zk_connect => '172.17.0.4:2181'
        type => 'kafka-input'
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        codec => json
    }
    stdout {
        codec => json
    }
}

and is added with ADD ./kafka-input.conf /etc/logstash/conf.d/kafka-input.conf in the Dockerfile.

My Kafka setup is fine, because I can send and receive data with other applications. But anything in my ELK setup seems to be wrong, because I cannot receive any data. There's neither any output from Logstash in the console nor any data in Kibana, because there is no logstash index created, which should be the default behavior according to the plugin docs.

The zk_connect is correct too, because otherwise I get exceptions ...

Any ideas?

Thanks in advance!

@spujadas
Copy link
Owner

spujadas commented Jul 5, 2016

Hi, glad to hear you're enjoying the image.

Here's something that works on my side.

Disclaimer: Kafka newbie here.

My kafka.conf is almost identical to yours, except for zk_connect where I used the string dockerhost, which is resolved at runtime to the host's IP address (and topic_id, where for some reason that now escapes me I used test instead of collectortopic as in your file).

input {
    kafka {
        topic_id => 'test'
        zk_connect => 'dockerhost:2181'
        type => 'kafka-input'
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        codec => json
    }
    stdout {
        codec => json
    }
}

As long as 172.17.0.4 resolves to the IP address where your ZooKeeper is running, I think you can safely use your config file.

In the Dockerfile, I removed the non-needed Logstash config files and added the config file for Kafka input and Elasticsearch output.

FROM sebp/elk

RUN cd /etc/logstash/conf.d/ \
 && rm -f 01-lumberjack-input.conf 02-beats-input.conf 10-syslog.conf 11-nginx.conf \
        30-output.conf
ADD kafka.conf /etc/logstash/conf.d/kafka.conf

I then built the elk-kafka image file as usual:

docker build -t elk-kafka .

So far, business as usual.

Now, at this point, starting the container (command below, but don't do it now), docker exec'ing inside it, manually installing Kafka in the container, and trying to use bin/kafka-console-consumer.sh (having previously produced some events in Kafka) resulted in errors:

root@9154b9e18c6d:/opt/kafka_2.11-0.10.0.0# bin/kafka-console-consumer.sh --zookeeper dockerhost:2181 --topic test --from-beginning
[2016-07-05 18:13:18,092] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,elkdockeres233-l232-k451-2gb-fra1-01,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-05 18:13:18,102] WARN [console-consumer-23218_9154b9e18c6d-1467742396977-384d90cd-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,elkdockeres233-l232-k451-2gb-fra1-01,9092))] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        ... 3 more

The funny thing is that – as you pointed out – Logstash doesn't complain about anything (even when increasing log verbosity by starting the container with env var LS_OPTS set to --verbose or --debug).

Anyway, after tearing my hair out for a bit and some searching around, I found this: http://stackoverflow.com/questions/30606447/kafka-consumer-fetching-metadata-for-topics-failed

So I added this line to Kafka's server.properties:

listeners=PLAINTEXT://X.X.X.X:9092 

Where X.X.X.X is the host's public IP address, where Kafka can be contacted (I believe that using a public hostname such as an FQDN would also work, but I'm running this on a Cloud-hosted VM and didn't set up a DNS hostname for it, so didn't test it). This turned out to be crucial as, without this directive (or a properly set hostname), Kafka was advertising its hostname as the host's hostname (a local name, elkdockeres233-l232-k451-2gb-fra1-01 in my case), which obviously can't be resolved within the container.

From that point, running a Kafka consumer from within the container worked:

root@8c448a33062f:/opt/kafka_2.11-0.10.0.0# bin/kafka-console-consumer.sh --zookeeper dockerhost:2181 --topic test --from-beginning
{'foo': 'bar'}

I then deleted the container and started a clean container from the image that was created previously:

HOST_IP=$(ip route | awk '/docker/ { print $NF }')
docker run --add-host dockerhost:$HOST_IP -p 5601:5601 -p 9200:9200 -it --name elk-kafka elk-kafka

(The first line puts the IP address of the host's Docker interface in the HOST_IP env var, and then the --add-host dockerhost:$HOST_IP option adds an entry for dockerhost in the container's /etc/hosts, which I can then use in the Logstash config file to point back to the host where ZK and Kafka are running.)

At this point still nothing apparent in Logstash… but behind the scenes, something definitely happened, as the test topic had been read by the logstash group:

# bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group logstash --describe                                                              
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
logstash                       test                           0          1               1               0               logstash_4602ed1083fd-1467749196275-43ff126a-0

So Logstash's Kafka input plugin is reading the events (a trivial {'foo': 'bar'} that I had created earlier), but it isn't doing anything with them (even looking at Elasticsearch's contents doesn't show anything), which makes it hard to work out the next step.

Interestingly, sending non-JSON-formatted events to Kafka generates an error in Logstash's logs, shows up in Elasticsearch's index, and is visible in Kibana:

==> /var/log/logstash/logstash.log <==
{:timestamp=>"2016-07-05T20:08:45.398000+0000", :message=>"JSON parse failure. Falling back to plain-text", :error=>#<LogStash::Json::ParserError: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
 at [Source: [B@41a8c95; line: 1, column: 11]>, :data=>"hello", :level=>:info}

Browsing to http://X.X.X.X:9200/_search?pretty (Elasticsearch) shows:

{
  "took" : 7,
  "timed_out" : false,
  "_shards" : {
    "total" : 6,
    "successful" : 6,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : ".kibana",
      "_type" : "config",
      "_id" : "4.5.1",
      "_score" : 1.0,
      "_source" : {
        "buildNum" : 9892
      }
    }, {
      "_index" : "logstash-2016.07.05",
      "_type" : "kafka-input",
      "_id" : "AVW8rV59_Cc-jJPdBcM0",
      "_score" : 1.0,
      "_source" : {
        "message" : "hello",
        "tags" : [ "_jsonparsefailure" ],
        "@version" : "1",
        "@timestamp" : "2016-07-05T20:08:45.401Z",
        "type" : "kafka-input"
      }
    } ]
  }
}

And Kibana shows:
screenshot-46 101 127 1 5601 2016-07-05 22-54-57

At this point, I'm confident that the extended ELK image is behaving properly and playing nicely with ZK and Kafka, but I don't know what kind of input/configuration it would need for Logstash to pass along the events it retrieves from Kafka to Elasticsearch.

Again, not a Kafka expert so can't really help more than that, but the above might help you investigate further. Alternatively, perhaps the Elastic community (https://discuss.elastic.co/) will be able to help you work out how to configure Logstash to process your event data from Kafka. If you do manage to figure it out, I'd be most interested if you could drop a line here to let me know what you did.

Edit: words.

@spujadas
Copy link
Owner

spujadas commented Jul 6, 2016

Started from a fresh VM + container + instance of ZK/Kafka with the config from my previous comment, it turns out that everything is actually working properly… provided that proper JSON is fed into Kafka (i.e. using {"foo": "bar"} with double quotes, rather than {'foo': 'bar'} with single quotes as I incorrectly used yesterday).

Anyway, still no logs from Logstash when everything's OK.

Elasticsearch creates the index as needed.

Logs displayed by the ELK container show up as:

==> /var/log/elasticsearch/elasticsearch.log <==
[2016-07-06 10:17:54,001][INFO ][cluster.metadata         ] [Freakshow] [logstash-2016.07.06] creating index, cause [auto(bulk api)], templates [logstash], shards [5]/[1], mappings [_default_, kafka-input]
[2016-07-06 10:17:54,289][INFO ][cluster.routing.allocation] [Freakshow] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[logstash-2016.07.06][4]] ...]).
[2016-07-06 10:17:54,323][INFO ][cluster.metadata         ] [Freakshow] [logstash-2016.07.06] update_mapping [kafka-input]

Browsing to http://X.X.X.X:9200/_search?pretty (Elasticsearch) shows:

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 6,
    "successful" : 6,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : ".kibana",
      "_type" : "config",
      "_id" : "4.5.1",
      "_score" : 1.0,
      "_source" : {
        "buildNum" : 9892
      }
    }, {
      "_index" : "logstash-2016.07.06",
      "_type" : "kafka-input",
      "_id" : "AVW_tsUbgE4tfBsjMJJK",
      "_score" : 1.0,
      "_source" : {
        "foo" : "bar",
        "@version" : "1",
        "@timestamp" : "2016-07-06T10:17:53.078Z",
        "type" : "kafka-input"
      }
    } ]
  }
}

And Kibana displays:
screenshot-46 101 209 150 5601 2016-07-06 12-53-33

So looks good to me, let me know how it goes on your side.

@markuslamm
Copy link
Author

Thank you very much for your effort!

After I changed my KAFKA_ADVERTISED_HOST_NAME from localhost to 192.168.2.101 Logstash was able to receive data that was a non-json string, the index was created and the messages available in Kibana. When I send json data, I got an error , but after cleaning up the containers everything is fine, my data is available in ES.

@spujadas
Copy link
Owner

spujadas commented Jul 6, 2016

Great to hear that, thanks for the update.

@raiusa
Copy link

raiusa commented Oct 26, 2016

I don't see any KAFKA_ADVERTISED_HOST_NAME property. Is it equivalent of bootstrap_servers property?

@raiusa
Copy link

raiusa commented Oct 26, 2016

I forgot to mention that I have installed kafka-input plugin from here

@raiusa
Copy link

raiusa commented Oct 26, 2016

here

@spujadas
Copy link
Owner

@raiusa see #70 (comment)

@RamvigneshPasupathy
Copy link

RamvigneshPasupathy commented Oct 22, 2019

I had a similar problem with kafka-input-plugin for logstash 7.4.0. There is no such field in the name zk_connect for this version.

Instead, to connect with my kafka server, I was using bootstrap_servers => "localhost:9092" in my logstash config file for kafka. On docker-compose up I could see my docker logs getting dumped with this error message: Connection to node -1 (localhost/127.0.0.1:9092) could not be established.

I ended up this very thread after some search.
Changing my config to bootstrap_servers => "dockerhost:9092" didn't help me.

On searching further, I have got the answer that I needed the most. Changing my config to bootstrap_servers => "host.docker.internal:9092" worked.

Note: This is specific to development with Docker Desktop for Mac.

PS: I have created a github repo that contains all the files that I used to accomplish pushing data from a local kafka queue to elk-stack installed with sebp/elk. Can be helpful..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants