Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aiven Connector issue with Elastic Search Version 7.4.0 #11

Open
sumeethtewar opened this issue Sep 8, 2020 · 5 comments
Open

Aiven Connector issue with Elastic Search Version 7.4.0 #11

sumeethtewar opened this issue Sep 8, 2020 · 5 comments

Comments

@sumeethtewar
Copy link

Hello,

I deployed Elastic search version 7.4.0 on Azure, and I am using Aiven Kafka + Aiven Schema Registry + Aiven Kafka Connect for copying my messages from Kafka topic to the deployed Elastic Search.

connector configuration:

{
    "name": "engagement-aggregate-sink",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "24",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTenantId,IndexRouter",
    "errors.retry.timeout": "0",
    "errors.retry.delay.max.ms": "60000",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "customer-journey-engagement-aggregate",
    "errors.deadletterqueue.topic.name": "customer-journey-engagement-aggregate-deadletterqueue",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "connection.url": "ES_URL",
    "connection.username":"USERNAME",
    "connection.password":"PASSWORD",
    "batch.size": "10000",
    "max.in.flight.requests": "3",
    "max.buffered.records": "1000",
    "linger.ms": "100",
    "flush.timeout.ms": "20000",
    "max.retries": "10",
    "retry.backoff.ms": "1000",
    "connection.timeout.ms": "1000",
    "read.timeout.ms": "1000",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "compact.map.entries": "false",
    "drop.invalid.message": "true",
    "behavior.on.null.values": "ignore",
    "behavior.on.malformed.documents": "warn",
    "transforms.IndexRouter.timestamp.format": "YYYYMM",
    "transforms.ExtractTenantId.type": "io.aiven.kafka.connect.transforms.ExtractTopic$Value",
    "transforms.ExtractTenantId.skip.missing.or.null": "false",
    "transforms.ExtractTenantId.field.name": "tenantId",
    "transforms.IndexRouter.topic.format": "engagement-${topic}-${timestamp}",
    "transforms.IndexRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "value.converter.schema.registry.url": "KAFKA_URL",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"UNM:PWD",
    "value.converter.max.schemas.per.subject": "1000",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "value.converter.auto.register.schemas": "false"
}

Even if I use both the sink connector versions 5.0.3 and 6.0.3 it throws the same exception as shown below:

java.lang.NullPointerException
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:179)
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:148)
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:117)
	at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:123)
	at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:52)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:304)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:195)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

If I use the Aiven Managed Elastic Search everything works fine.
Could you please help figure out, what is the issue in using a elastic search service deployed on Azure?

@ivanyu
Copy link
Contributor

ivanyu commented Sep 14, 2020

Hi @sumeethtewar
Could you please tell me what connector version generated the stack trace you provided? We need it to match the line numbers in the code.

@willyborankin
Copy link
Contributor

willyborankin commented Sep 14, 2020

Hi @sumeethtewar what i see from the configuration, you use io.confluent.connect.elasticsearch.ElasticsearchSinkConnector - this is Confluent connector which doesn't work with ES 7.x. Our connector is a fork of the Confluent one, and it contains fix for ES 6.x - ES 7.x.

@sumeethtewar
Copy link
Author

sumeethtewar commented Sep 14, 2020 via email

@ivanyu
Copy link
Contributor

ivanyu commented Sep 23, 2020

Hi @sumeethtewar
It would really help if you provide us with the NPE stack trace paired with the exact version of the connector. The stacktrace you provided doesn't seem to match the master of this connector.

@sumeethtewar
Copy link
Author

sumeethtewar commented Sep 23, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants