From 42fb1e3ba9d768d28db1e8ff7b247ec4efc24140 Mon Sep 17 00:00:00 2001 From: Alihosein Shahabi Date: Wed, 4 Sep 2024 20:50:42 +0330 Subject: [PATCH] Forked the repository from https://github.com/alihossein/kafka-sink-connector and added functionality to support copying both key and value between different topics. --- build.gradle | 6 +++--- .../com/kakao/connector/kafka/KafkaSinkTask.java | 15 +++++++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index 4196b92..224c5a1 100644 --- a/build.gradle +++ b/build.gradle @@ -9,9 +9,9 @@ repositories { } ext { - set('connectApi', '2.8.0') - set('slf4jSimple', '1.7.30') - set('jsonPath', '2.6.0') + set('connectApi', '3.8.0') + set('slf4jSimple', '2.0.16') + set('jsonPath', '2.9.0') } dependencies { diff --git a/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java b/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java index b02f0ec..f952fa4 100644 --- a/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java +++ b/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java @@ -90,11 +90,12 @@ public void put(Collection records) { if (record.value() != null) { try { String value = record.value().toString(); + String key = record.key() != null ? record.key().toString() : null; boolean samplingCondition = Math.random() < samplingPercentage; if (samplingCondition && isFilteringMatch(value) ) { - sendRecord(value); + sendRecord(key,value); } } catch (Exception e) { log.error(e.getMessage() + " / " + connectorName, e); @@ -106,8 +107,9 @@ && isFilteringMatch(value) if (record.value() != null) { try { String value = record.value().toString(); + String key = record.key() != null ? record.key().toString() : null; if (isFilteringMatch(value)) { - sendRecord(value); + sendRecord(key,value); } } catch (Exception e) { log.error(e.getMessage() + " / " + connectorName, e); @@ -117,13 +119,14 @@ && isFilteringMatch(value) } } - private void sendRecord(String value) { - ProducerRecord sendRecord = getSinkRecord(value); + private void sendRecord(String key , String value) { + ProducerRecord sendRecord = getSinkRecord(key,value); producer.send(sendRecord, new ProducerCallback()); } - private ProducerRecord getSinkRecord(String value) { - String messageKey = keyParsingEnabled ? parsingMessageKey(value) : null; + private ProducerRecord getSinkRecord(String key , String value) { + String messageKey = (keyParsingEnabled && key == null) ? parsingMessageKey(value) : key; + if (timestampParsingEnabled) { return new ProducerRecord<>(sinkTopic, null, parsingTimestamp(value), messageKey, value); } else {