Skip to content

Commit

Permalink
Updated Kronos extensions to work with Kronos 3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankit Nanglia committed Jul 3, 2019
1 parent 3a98f6c commit 321b9ec
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 179 deletions.
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.cognitree.kronos.extensions</groupId>
<artifactId>kronos-extensions</artifactId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>

<artifactId>distribution</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion embedded-hsql-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion helm-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion jdbc-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.cognitree.kronos.scheduler.store.jdbc;

import com.cognitree.kronos.ServiceProvider;
import com.cognitree.kronos.scheduler.store.JobStore;
import com.cognitree.kronos.scheduler.store.NamespaceStore;
import com.cognitree.kronos.scheduler.store.StoreService;
Expand All @@ -40,7 +39,6 @@

import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;

public class StdJDBCStoreService extends StoreService {
Expand Down
2 changes: 1 addition & 1 deletion kafka-message-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,23 @@ public class KafkaMessageHandler implements TaskHandler {

private String defaultTopic;
private KafkaProducer<String, String> kafkaProducer;
private Task task;

@Override
public void init(ObjectNode config) {
public void init(Task task, ObjectNode config) {
logger.info("Initializing producer for kafka with config {}", config);
if (config == null || !config.hasNonNull(KAFKA_PRODUCER_CONFIG_KEY)) {
throw new IllegalArgumentException("missing mandatory configuration: [kafkaProducerConfig]");
}
this.task = task;
Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class);
kafkaProducer = new KafkaProducer<>(kafkaProducerConfig);
defaultTopic = config.get(TOPIC_KEY).asText();
}

@Override
public TaskResult handle(Task task) {
logger.info("Received request to handle task {}", task);
public TaskResult execute() {
logger.info("Received request to execute task {}", task);
final Map<String, Object> taskProperties = task.getProperties();
final String topic = (String) taskProperties.getOrDefault(TOPIC_KEY, defaultTopic);
try {
Expand Down
2 changes: 1 addition & 1 deletion kafka-queue/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -29,12 +30,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link Consumer} implementation using Kafka as queue in backend.
Expand All @@ -44,93 +41,101 @@ public class KafkaConsumerImpl implements Consumer {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String GROUP_ID = "group.id";
private static final String CONSUMER_KEY = "consumerKey";
private static final String MAX_POLL_RECORDS = "max.poll.records";
private static final String POLL_TIMEOUT_IN_MS = "pollTimeoutInMs";
private static final String KAFKA_CONSUMER_CONFIG = "kafkaConsumerConfig";
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final short TOPIC_REPLICATION_FACTOR = (short) 1;

private final Map<String, KafkaConsumer<String, String>> topicToKafkaConsumerMap = new HashMap<>();
private Properties kafkaConsumerConfig;
private long pollTimeoutInMs;
private String topic;
private KafkaConsumer<String, String> kafkaConsumer;
private ObjectNode config;

public void init(String topic, ObjectNode config) {
logger.info("Initializing Kafka consumer on topic {} with config {}", topic, config);
this.topic = topic;
this.config = config;
pollTimeoutInMs = config.get(POLL_TIMEOUT_IN_MS).asLong();
createTopic();
initConsumer();
}

private void createTopic() {
try {
final Properties properties =
OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class);
final AdminClient adminClient = AdminClient.create(properties);
final NewTopic kafkaTopic = new NewTopic(topic, NUM_TOPIC_PARTITIONS, TOPIC_REPLICATION_FACTOR);
adminClient.createTopics(Collections.singleton(kafkaTopic)).all().get();
} catch (Exception e) {
logger.warn("Error creating topic {}, error: {}", topic, e.getMessage());
}
}

public void init(ObjectNode config) {
logger.info("Initializing consumer for kafka with config {}", config);
kafkaConsumerConfig = OBJECT_MAPPER.convertValue(config.get("kafkaConsumerConfig"), Properties.class);
private void initConsumer() {
final Properties kafkaConsumerConfig =
OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class);
// force override consumer configuration for kafka to poll max 1 message at a time
kafkaConsumerConfig.put("max.poll.records", 1);
pollTimeoutInMs = config.get("pollTimeoutInMs").asLong();
kafkaConsumerConfig.put(MAX_POLL_RECORDS, 1);
kafkaConsumerConfig.put(GROUP_ID, config.get(CONSUMER_KEY).asText());
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerConfig);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}

@Override
public List<String> poll(String topic) {
return poll(topic, Integer.MAX_VALUE);
public List<String> poll() {
return poll(Integer.MAX_VALUE);
}

@Override
public List<String> poll(String topic, int size) {
public synchronized List<String> poll(int size) {
logger.trace("Received request to poll messages from topic {} with max size {}", topic, size);
List<String> tasks = new ArrayList<>();
if (!topicToKafkaConsumerMap.containsKey(topic)) {
createKafkaConsumer(topic);
}

final KafkaConsumer<String, String> kafkaConsumer = topicToKafkaConsumerMap.get(topic);
synchronized (kafkaConsumer) {
while (tasks.size() < size) {
final ConsumerRecords<String, String> consumerRecords = kafkaConsumer
.poll(Duration.ofMillis(pollTimeoutInMs));
if (consumerRecords.isEmpty()) {
break;
}
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
tasks.add(consumerRecord.value());
}
while (tasks.size() < size) {
final ConsumerRecords<String, String> consumerRecords = kafkaConsumer
.poll(Duration.ofMillis(pollTimeoutInMs));
if (consumerRecords.isEmpty()) {
break;
}
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
tasks.add(consumerRecord.value());
}
}

return tasks;
}

private synchronized void createKafkaConsumer(String topic) {
if (!topicToKafkaConsumerMap.containsKey(topic)) {
final Properties kafkaConsumerConfig = new Properties();
kafkaConsumerConfig.putAll(this.kafkaConsumerConfig);
kafkaConsumerConfig.put(GROUP_ID, kafkaConsumerConfig.getProperty(GROUP_ID) + "-" + topic);
logger.info("Creating kafka consumer on topic {} with consumer config {}", topic, kafkaConsumerConfig);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerConfig);
kafkaConsumer.subscribe(Collections.singletonList(topic));
topicToKafkaConsumerMap.put(topic, kafkaConsumer);
}
}

@Override
public void close() {
topicToKafkaConsumerMap.forEach((topic, kafkaConsumer) -> {
synchronized (kafkaConsumer) {
try {
kafkaConsumer.close();
} catch (Exception e) {
logger.warn("Error closing Kafka consumer for topic {}", topic, e);
}
public synchronized void close() {
try {
if (kafkaConsumer != null) {
kafkaConsumer.close();
}
});
} catch (Exception e) {
logger.warn("Error closing Kafka consumer for topic {}", topic, e);
}
}

@Override
public void destroy() {
logger.info("Received request to delete created topics and consumer groups from Kafka");
AdminClient adminClient = AdminClient.create(kafkaConsumerConfig);
Set<String> topics = topicToKafkaConsumerMap.keySet();
Set<String> consumerGroups =
topics.stream().map(topic -> kafkaConsumerConfig.getProperty(GROUP_ID) + "-" + topic)
.collect(Collectors.toSet());
logger.info("Deleting Kafka consumer group {} and topics {}", consumerGroups, topics);
public synchronized void destroy() {
logger.info("Received request to destroy consumer and topic {}", topic);
final Properties properties =
OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class);
final AdminClient adminClient = AdminClient.create(properties);

String consumerGroupKey = config.get(CONSUMER_KEY).asText();
logger.info("Deleting Kafka consumer group {} and topic {}", consumerGroupKey, topic);
try {
adminClient.deleteConsumerGroups(consumerGroups);
adminClient.deleteConsumerGroups(Collections.singletonList(consumerGroupKey));
} catch (Exception e) {
logger.warn("Error deleting Kafka consumer groups {}", consumerGroups, e);
logger.warn("Error deleting Kafka consumer group {}", consumerGroupKey, e);
}
try {
adminClient.deleteTopics(topics);
adminClient.deleteTopics(Collections.singleton(topic));
} catch (Exception e) {
logger.warn("Error deleting Kafka topics {}", topics, e);
logger.warn("Error deleting Kafka topic {}", topic, e);
}
try {
adminClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Properties;

/**
Expand All @@ -33,25 +36,55 @@ public class KafkaProducerImpl implements Producer {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerImpl.class);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String KAFKA_PRODUCER_CONFIG_KEY = "kafkaProducerConfig";
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final short TOPIC_REPLICATION_FACTOR = (short) 1;

private KafkaProducer<String, String> kafkaProducer;
private String topic;
private ObjectNode config;

public void init(ObjectNode config) {
logger.info("Initializing producer for kafka with config {}", config);
Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(config.get("kafkaProducerConfig"), Properties.class);
public void init(String topic, ObjectNode config) {
logger.info("Initializing Kafka producer for topic {} with config {}", topic, config);
this.topic = topic;
this.config = config;
createTopic();
initProducer();
}

private void createTopic() {
try {
final Properties properties =
OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class);
final AdminClient adminClient = AdminClient.create(properties);
final NewTopic kafkaTopic = new NewTopic(topic, NUM_TOPIC_PARTITIONS, TOPIC_REPLICATION_FACTOR);
adminClient.createTopics(Collections.singleton(kafkaTopic)).all().get();
} catch (Exception e) {
logger.error("Error creating topic {}, error: {}", topic, e.getMessage());
}
}

private void initProducer() {
final Properties kafkaProducerConfig =
OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class);
kafkaProducer = new KafkaProducer<>(kafkaProducerConfig);
}

@Override
public void send(String topic, String record) {
sendInOrder(topic, record, null);
public void broadcast(String record) {
sendInOrder(record, null);
}

@Override
public void send(String record) {
sendInOrder(record, null);
}

@Override
public void sendInOrder(String topic, String record, String orderingKey) {
public void sendInOrder(String record, String orderingKey) {
logger.trace("Received request to send message {} to topic {} with orderingKey {}",
record, topic, orderingKey);
ProducerRecord<String, String> producerRecord = orderingKey == null ?
final ProducerRecord<String, String> producerRecord = orderingKey == null ?
new ProducerRecord<>(topic, record) : new ProducerRecord<>(topic, orderingKey, record);
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
Expand Down
2 changes: 1 addition & 1 deletion mongo-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
<name>kronos-extensions</name>
<artifactId>kronos-extensions</artifactId>
<packaging>pom</packaging>
<version>2.2.4</version>
<version>3.0.0-RC1</version>

<properties>
<kronos.version>2.2.4-RC2</kronos.version>
<kronos.version>3.0.0-RC2</kronos.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
Expand Down
2 changes: 1 addition & 1 deletion spark-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>kronos-extensions</artifactId>
<groupId>com.cognitree.kronos.extensions</groupId>
<version>2.2.4</version>
<version>3.0.0-RC1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading

0 comments on commit 321b9ec

Please sign in to comment.