Skip to content

Commit

Permalink
close Kafka producer after sending the message
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankit Nanglia committed Jul 4, 2019
1 parent 321b9ec commit 19e8a4a
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.cognitree.kronos.executor.model.TaskResult;
import com.cognitree.kronos.model.Task;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -44,19 +45,17 @@ public class KafkaMessageHandler implements TaskHandler {
// message to push to kafka topic
private static final String MESSAGE_KEY = "message";

private JsonNode producerConfig;
private String defaultTopic;
private KafkaProducer<String, String> kafkaProducer;
private Task task;

@Override
public void init(Task task, ObjectNode config) {
logger.info("Initializing producer for kafka with config {}", config);
if (config == null || !config.hasNonNull(KAFKA_PRODUCER_CONFIG_KEY)) {
throw new IllegalArgumentException("missing mandatory configuration: [kafkaProducerConfig]");
}
this.task = task;
Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class);
kafkaProducer = new KafkaProducer<>(kafkaProducerConfig);
producerConfig = config.get(KAFKA_PRODUCER_CONFIG_KEY);
defaultTopic = config.get(TOPIC_KEY).asText();
}

Expand All @@ -76,14 +75,17 @@ public TaskResult execute() {
}

private void send(String topic, String record) {
logger.trace("Received request to send message {} to topic {}.", record, topic);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topic, record);
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("Error sending record {} over kafka to topic {}.",
record, topic, exception);
}
});
logger.debug("Received request to send message {} to topic {}.", record, topic);
logger.debug("Initializing producer for kafka with config {}", producerConfig);
final Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(this.producerConfig, Properties.class);
try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProducerConfig)) {
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, record);
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
logger.error("Error sending record {} over kafka to topic {}.",
record, topic, exception);
}
});
}
}
}

0 comments on commit 19e8a4a

Please sign in to comment.