From 321b9ec46afd57af497d79bba0e016724a6c7424 Mon Sep 17 00:00:00 2001 From: Ankit Nanglia Date: Wed, 3 Jul 2019 21:36:41 +0530 Subject: [PATCH] Updated Kronos extensions to work with Kronos 3.0.0 --- distribution/pom.xml | 2 +- embedded-hsql-store/pom.xml | 2 +- helm-handler/pom.xml | 2 +- .../executor/handlers/HelmTaskHandler.java | 156 +++++++++++------- jdbc-store/pom.xml | 2 +- .../store/jdbc/StdJDBCStoreService.java | 2 - kafka-message-handler/pom.xml | 2 +- .../handlers/KafkaMessageHandler.java | 8 +- kafka-queue/pom.xml | 2 +- .../queue/consumer/KafkaConsumerImpl.java | 129 ++++++++------- .../queue/producer/KafkaProducerImpl.java | 47 +++++- mongo-store/pom.xml | 2 +- pom.xml | 4 +- spark-handler/pom.xml | 2 +- .../executor/handlers/SparkHandler.java | 95 +++++++---- .../spark/restclient/SparkRestClient.java | 10 +- 16 files changed, 288 insertions(+), 179 deletions(-) diff --git a/distribution/pom.xml b/distribution/pom.xml index c797913..9c0f940 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -7,7 +7,7 @@ com.cognitree.kronos.extensions kronos-extensions - 2.2.4 + 3.0.0-RC1 distribution diff --git a/embedded-hsql-store/pom.xml b/embedded-hsql-store/pom.xml index 05ab0b7..aca2628 100644 --- a/embedded-hsql-store/pom.xml +++ b/embedded-hsql-store/pom.xml @@ -22,7 +22,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/helm-handler/pom.xml b/helm-handler/pom.xml index 2ca699e..cfb935d 100644 --- a/helm-handler/pom.xml +++ b/helm-handler/pom.xml @@ -22,7 +22,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/helm-handler/src/main/java/com/cognitree/kronos/executor/handlers/HelmTaskHandler.java b/helm-handler/src/main/java/com/cognitree/kronos/executor/handlers/HelmTaskHandler.java index 8faf97f..c3c9832 100755 --- a/helm-handler/src/main/java/com/cognitree/kronos/executor/handlers/HelmTaskHandler.java +++ b/helm-handler/src/main/java/com/cognitree/kronos/executor/handlers/HelmTaskHandler.java @@ -26,8 +26,11 @@ import hapi.chart.ChartOuterClass.Chart; import hapi.release.ReleaseOuterClass.Release; import hapi.release.StatusOuterClass.Status.Code; -import hapi.services.tiller.Tiller.*; +import hapi.services.tiller.Tiller.GetReleaseStatusRequest; +import hapi.services.tiller.Tiller.GetReleaseStatusResponse; +import hapi.services.tiller.Tiller.InstallReleaseRequest; import hapi.services.tiller.Tiller.InstallReleaseRequest.Builder; +import hapi.services.tiller.Tiller.UninstallReleaseRequest; import io.fabric8.kubernetes.api.model.batch.Job; import io.fabric8.kubernetes.api.model.batch.JobList; import io.fabric8.kubernetes.client.DefaultKubernetesClient; @@ -52,7 +55,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import java.util.zip.ZipInputStream; import static com.cognitree.kronos.executor.handlers.ChartType.directory; @@ -78,31 +81,32 @@ public class HelmTaskHandler implements TaskHandler { private static final String PROP_VALUES = "values"; private static final String PROP_VALUES_FILE = "valuesFile"; private static final String PROP_TIMEOUT = "timeout"; - private static final String PROP_MAX_WAIT_TIMEOUT = "maxWaitTimeout"; private static final String PROP_IGNORE_JOB_STATUS = "ignoreJobStatus"; private static final ChartType DEFAULT_CHART_TYPE = directory; private static final String DEFAULT_RELEASE_PREFIX = "release"; - private static final long DEFAULT_WAIT_TIMEOUT = 600L; private static final long DEFAULT_HELM_TIMEOUT = 300L; private static final boolean DEFAULT_IGNORE_JOB_STATUS = false; - private static final int SLEEP_INTERVAL_IN_SECONDS = 5; private static final int MAX_RETRY_COUNT = 3; private static final int RETRY_SLEEP_INTERVAL = 5000; + private Task task; + private boolean abort = false; + private Release release; + @Override - public void init(ObjectNode handlerConfig) { + public void init(Task task, ObjectNode config) { + this.task = task; } @Override - public TaskResult handle(Task task) { - logger.info("received request to handle task {}", task); + public TaskResult execute() { + logger.info("received request to execute task {}", task); final Map taskProperties = task.getProperties(); if (!taskProperties.containsKey(PROP_CHART_PATH)) { return new TaskResult(false, "missing mandatory property: " + PROP_CHART_PATH); } - String releaseName; if (isRetry(task)) { releaseName = (String) task.getContext().get(PROP_RELEASE_NAME); @@ -112,7 +116,7 @@ public TaskResult handle(Task task) { releaseName = taskProperties.getOrDefault(PROP_RELEASE_PREFIX, DEFAULT_RELEASE_PREFIX) + "-" + System.currentTimeMillis(); } - return handle(task, releaseName, 0); + return execute(task, releaseName, 0); } /** @@ -125,17 +129,16 @@ private boolean isRetry(Task task) { return task.getContext() != null && task.getContext().containsKey(PROP_RELEASE_NAME); } - private TaskResult handle(Task task, String releaseName, int retryCount) { + private TaskResult execute(Task task, String releaseName, int retryCount) { logger.info("Installing helm release: {}, retry count {}", releaseName, retryCount); final Map taskResult = new HashMap<>(); taskResult.put(PROP_RELEASE_NAME, releaseName); final Map taskProperties = task.getProperties(); - long waitTimeout = Long.parseLong(taskProperties.getOrDefault(PROP_MAX_WAIT_TIMEOUT, DEFAULT_WAIT_TIMEOUT).toString()); - try (DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(); - Tiller tiller = new Tiller(kubernetesClient); - ReleaseManager releaseManager = new ReleaseManager(tiller)) { + try (final DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(); + final Tiller tiller = new Tiller(kubernetesClient); + final ReleaseManager releaseManager = new ReleaseManager(tiller)) { // check existing releases in helm with same name // If it exists upgrade the release if it failed or wait for completion if already deployed. @@ -143,7 +146,7 @@ private TaskResult handle(Task task, String releaseName, int retryCount) { Iterator releases = releaseManager.list(ListReleasesRequest.newBuilder() .setFilter(releaseName) .build()); - Release release = null; + release = null; while (releases.hasNext()) { final Release existingRelease = releases.next().getReleases(0); // get the first release if (existingRelease.getName().equals(releaseName)) { @@ -156,12 +159,9 @@ private TaskResult handle(Task task, String releaseName, int retryCount) { switch (releaseStatus) { case FAILED: logger.info("Redeploying failed helm release: {}", releaseName); - final Chart.Builder chart = loadHelmChart(taskProperties); - final UpdateReleaseRequest.Builder requestBuilder = buildUpdateRequest(releaseName, taskProperties); - final Release updatedRelease = releaseManager.update(requestBuilder, chart).get() - .getRelease(); + release = updateRelease(releaseName, taskProperties, releaseManager); logger.info("Successfully redeployed helm release: {} in namespace: {}", - updatedRelease.getName(), updatedRelease.getNamespace()); + release.getName(), release.getNamespace()); // Post the deployment, wait for the deployment to complete. case DEPLOYED: // do nothing @@ -170,17 +170,12 @@ private TaskResult handle(Task task, String releaseName, int retryCount) { throw new HelmExecutionException("Error deploying helm chart, current state is: " + releaseStatus); } } else { - // it might be a retry case and still the helm chart was never deployed - // can happen if it fails while deploying the helm release - // for this cases we will deploy a fresh helm release - final Chart.Builder chart = loadHelmChart(taskProperties); - final Builder requestBuilder = buildInstallRequest(releaseName, taskProperties); - release = releaseManager.install(requestBuilder, chart).get().getRelease(); + release = installRelease(releaseName, taskProperties, releaseManager); logger.info("Successfully installed release: {} in namespace: {}", releaseName, release.getNamespace()); } boolean ignoreJobStatus = (boolean) taskProperties.getOrDefault(PROP_IGNORE_JOB_STATUS, DEFAULT_IGNORE_JOB_STATUS); waitForReleaseAndJobCompletion(releaseManager, kubernetesClient, releaseName, - release.getNamespace(), waitTimeout, ignoreJobStatus); + release.getNamespace(), ignoreJobStatus); logger.info("Successfully completed release: {} in namespace {}", releaseName, release.getNamespace()); return new TaskResult(true, null, taskResult); @@ -188,7 +183,7 @@ private TaskResult handle(Task task, String releaseName, int retryCount) { logger.error("Error deploying helm chart with release name {}", releaseName, e); return new TaskResult(false, "error deploying helm chart. error : " + e.getMessage(), taskResult); } catch (Exception e) { - if (retryCount > MAX_RETRY_COUNT) { + if (retryCount >= MAX_RETRY_COUNT) { logger.error("Error deploying helm chart with release name {} after {} retries, failing the task", releaseName, MAX_RETRY_COUNT, e); return new TaskResult(false, "error deploying helm chart. error : " + e.getMessage(), taskResult); @@ -198,16 +193,24 @@ private TaskResult handle(Task task, String releaseName, int retryCount) { Thread.sleep(RETRY_SLEEP_INTERVAL); } catch (InterruptedException ignored) { } - return handle(task, releaseName, ++retryCount); + return execute(task, releaseName, ++retryCount); } } } private void waitForReleaseAndJobCompletion(ReleaseManager releaseManager, KubernetesClient kubernetesClient, - String releaseName, String namespace, long waitTimeout, - boolean ignoreJobStatus) throws Exception { - logger.info("Waiting for release {} in namespace {} to complete.", releaseName, namespace); + String releaseName, String namespace, boolean ignoreJobStatus) + throws IOException, HelmExecutionException, ExecutionException, InterruptedException { + logger.info("Waiting for release {} in namespace {} to be deployed with ignoreJobStatus {}.", + releaseName, namespace, ignoreJobStatus); + waitForReleaseToDeploy(releaseManager, releaseName, namespace); + if (!ignoreJobStatus) { + waitForJobCompletion(kubernetesClient, releaseName, namespace); + } + } + private void waitForReleaseToDeploy(ReleaseManager releaseManager, String releaseName, String namespace) + throws IOException, HelmExecutionException, ExecutionException, InterruptedException { boolean deployed = false; while (!deployed) { Code statusCode = getHelmReleaseStatus(releaseManager, releaseName); @@ -236,15 +239,10 @@ private void waitForReleaseAndJobCompletion(ReleaseManager releaseManager, Kuber " in namespace " + namespace + " current state is: " + statusCode); } } - if (ignoreJobStatus) { - logger.info("Task is configured to not wait for the completion" + - " of release {} in namespace {}", releaseName, namespace); - } else { - waitForJobCompletion(kubernetesClient, releaseName, namespace, waitTimeout); - } } - private Code getHelmReleaseStatus(ReleaseManager releaseManager, String releaseName) throws Exception { + private Code getHelmReleaseStatus(ReleaseManager releaseManager, String releaseName) + throws IOException, ExecutionException, InterruptedException { final GetReleaseStatusRequest releaseStatusRequest = GetReleaseStatusRequest.newBuilder() .setName(releaseName) .build(); @@ -252,28 +250,25 @@ private Code getHelmReleaseStatus(ReleaseManager releaseManager, String releaseN return releaseStatusResponse.getInfo().getStatus().getCode(); } - private void waitForJobCompletion(KubernetesClient kubernetesClient, String releaseName, - String namespace, long waitTimeout) throws Exception { - logger.info("waiting for job to complete deployed as part of release {}, namespace {}", + private void waitForJobCompletion(KubernetesClient kubernetesClient, String releaseName, String namespace) + throws HelmExecutionException { + logger.info("waiting for job to complete, deployed as part of release {}, namespace {}", releaseName, namespace); while (true) { + if (abort) { + logger.warn("Task has been aborted"); + throw new HelmExecutionException("Task has been aborted"); + } boolean jobCompleted = true; JobList jobList = getJobs(kubernetesClient, releaseName, namespace, MAX_RETRY_COUNT); List items = jobList.getItems(); for (Job item : items) { - if (waitTimeout <= 0) { - throw new HelmExecutionException("Unable to finish execution of" + - " job " + item.getMetadata().getName() + - " deployed as part of helm release " + releaseName + - " in namespace " + namespace + " within the maxWaitTimeout, failing the task"); - } if (item.getStatus().getSucceeded() == null || item.getStatus().getSucceeded().equals(0)) { logger.debug("Job [" + item.getMetadata().getName() + "] is still active"); jobCompleted = false; - waitTimeout -= SLEEP_INTERVAL_IN_SECONDS; try { - Thread.sleep(TimeUnit.SECONDS.toMillis(SLEEP_INTERVAL_IN_SECONDS)); - } catch (Exception ignored) { + Thread.sleep(RETRY_SLEEP_INTERVAL); + } catch (InterruptedException ignored) { } break; } @@ -368,18 +363,14 @@ private Map getValues(Map taskProperties) throws return valuesMap; } - private Chart.Builder loadHelmChart(Map taskProperties) throws Exception { - ChartType chartType; + private Chart.Builder loadHelmChart(Map taskProperties) throws IOException, HelmExecutionException { + final ChartType chartType; if (taskProperties.containsKey(PROP_CHART_TYPE)) { chartType = ChartType.valueOf(getProperty(taskProperties, PROP_CHART_TYPE)); } else { chartType = DEFAULT_CHART_TYPE; } - String chartPath = getProperty(taskProperties, PROP_CHART_PATH); - return loadHelmChart(chartType, chartPath); - } - - private Chart.Builder loadHelmChart(ChartType chartType, String chartPath) throws Exception { + final String chartPath = getProperty(taskProperties, PROP_CHART_PATH); Chart.Builder helmChart = null; switch (chartType) { case directory: @@ -419,4 +410,51 @@ private Chart.Builder loadHelmChart(ChartType chartType, String chartPath) throw private String getProperty(Map properties, String key) { return String.valueOf(properties.get(key)); } + + private synchronized Release installRelease(String releaseName, Map taskProperties, + ReleaseManager releaseManager) + throws IOException, HelmExecutionException, InterruptedException, ExecutionException { + if (abort) { + logger.warn("Task has been aborted, do not install"); + throw new HelmExecutionException("Task has been aborted"); + } + final Chart.Builder chart = loadHelmChart(taskProperties); + final Builder requestBuilder = buildInstallRequest(releaseName, taskProperties); + return releaseManager.install(requestBuilder, chart).get().getRelease(); + } + + private synchronized Release updateRelease(String releaseName, Map taskProperties, + ReleaseManager releaseManager) + throws IOException, HelmExecutionException, InterruptedException, ExecutionException { + if (abort) { + logger.warn("Task has been aborted, do not update"); + throw new HelmExecutionException("Task has been aborted"); + } + final Chart.Builder chart = loadHelmChart(taskProperties); + final UpdateReleaseRequest.Builder requestBuilder = buildUpdateRequest(releaseName, taskProperties); + return releaseManager.update(requestBuilder, chart) + .get().getRelease(); + } + + private synchronized Release deleteRelease(String releaseName, ReleaseManager releaseManager) + throws IOException, ExecutionException, InterruptedException { + final UninstallReleaseRequest uninstallReleaseRequest = UninstallReleaseRequest.newBuilder() + .setName(releaseName).build(); + return releaseManager.uninstall(uninstallReleaseRequest).get().getRelease(); + } + + @Override + public void abort() { + logger.info("Received request to abort task {}", task); + abort = true; + if (release != null) { + try (final DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(); + final Tiller tiller = new Tiller(kubernetesClient); + final ReleaseManager releaseManager = new ReleaseManager(tiller)) { + deleteRelease(release.getName(), releaseManager); + } catch (IOException | InterruptedException | ExecutionException e) { + logger.error("Error uninstalling helm release {}", release.getName(), e); + } + } + } } \ No newline at end of file diff --git a/jdbc-store/pom.xml b/jdbc-store/pom.xml index c5fed33..3d5aefa 100755 --- a/jdbc-store/pom.xml +++ b/jdbc-store/pom.xml @@ -22,7 +22,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/jdbc-store/src/main/java/com/cognitree/kronos/scheduler/store/jdbc/StdJDBCStoreService.java b/jdbc-store/src/main/java/com/cognitree/kronos/scheduler/store/jdbc/StdJDBCStoreService.java index 93eed59..d69a3bf 100644 --- a/jdbc-store/src/main/java/com/cognitree/kronos/scheduler/store/jdbc/StdJDBCStoreService.java +++ b/jdbc-store/src/main/java/com/cognitree/kronos/scheduler/store/jdbc/StdJDBCStoreService.java @@ -17,7 +17,6 @@ package com.cognitree.kronos.scheduler.store.jdbc; -import com.cognitree.kronos.ServiceProvider; import com.cognitree.kronos.scheduler.store.JobStore; import com.cognitree.kronos.scheduler.store.NamespaceStore; import com.cognitree.kronos.scheduler.store.StoreService; @@ -40,7 +39,6 @@ import java.io.InputStream; import java.io.InputStreamReader; -import java.sql.Connection; import java.sql.SQLException; public class StdJDBCStoreService extends StoreService { diff --git a/kafka-message-handler/pom.xml b/kafka-message-handler/pom.xml index 52fe12f..9f24ffd 100644 --- a/kafka-message-handler/pom.xml +++ b/kafka-message-handler/pom.xml @@ -5,7 +5,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/kafka-message-handler/src/main/java/com/cognitree/kronos/executor/handlers/KafkaMessageHandler.java b/kafka-message-handler/src/main/java/com/cognitree/kronos/executor/handlers/KafkaMessageHandler.java index 1479273..e3eec57 100644 --- a/kafka-message-handler/src/main/java/com/cognitree/kronos/executor/handlers/KafkaMessageHandler.java +++ b/kafka-message-handler/src/main/java/com/cognitree/kronos/executor/handlers/KafkaMessageHandler.java @@ -46,21 +46,23 @@ public class KafkaMessageHandler implements TaskHandler { private String defaultTopic; private KafkaProducer kafkaProducer; + private Task task; @Override - public void init(ObjectNode config) { + public void init(Task task, ObjectNode config) { logger.info("Initializing producer for kafka with config {}", config); if (config == null || !config.hasNonNull(KAFKA_PRODUCER_CONFIG_KEY)) { throw new IllegalArgumentException("missing mandatory configuration: [kafkaProducerConfig]"); } + this.task = task; Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class); kafkaProducer = new KafkaProducer<>(kafkaProducerConfig); defaultTopic = config.get(TOPIC_KEY).asText(); } @Override - public TaskResult handle(Task task) { - logger.info("Received request to handle task {}", task); + public TaskResult execute() { + logger.info("Received request to execute task {}", task); final Map taskProperties = task.getProperties(); final String topic = (String) taskProperties.getOrDefault(TOPIC_KEY, defaultTopic); try { diff --git a/kafka-queue/pom.xml b/kafka-queue/pom.xml index 967aa29..efc0c6d 100755 --- a/kafka-queue/pom.xml +++ b/kafka-queue/pom.xml @@ -22,7 +22,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/kafka-queue/src/main/java/com/cognitree/kronos/queue/consumer/KafkaConsumerImpl.java b/kafka-queue/src/main/java/com/cognitree/kronos/queue/consumer/KafkaConsumerImpl.java index 4c6be43..e972a2a 100755 --- a/kafka-queue/src/main/java/com/cognitree/kronos/queue/consumer/KafkaConsumerImpl.java +++ b/kafka-queue/src/main/java/com/cognitree/kronos/queue/consumer/KafkaConsumerImpl.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -29,12 +30,8 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; /** * A {@link Consumer} implementation using Kafka as queue in backend. @@ -44,93 +41,101 @@ public class KafkaConsumerImpl implements Consumer { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String GROUP_ID = "group.id"; + private static final String CONSUMER_KEY = "consumerKey"; + private static final String MAX_POLL_RECORDS = "max.poll.records"; + private static final String POLL_TIMEOUT_IN_MS = "pollTimeoutInMs"; + private static final String KAFKA_CONSUMER_CONFIG = "kafkaConsumerConfig"; + private static final int NUM_TOPIC_PARTITIONS = 3; + private static final short TOPIC_REPLICATION_FACTOR = (short) 1; - private final Map> topicToKafkaConsumerMap = new HashMap<>(); - private Properties kafkaConsumerConfig; private long pollTimeoutInMs; + private String topic; + private KafkaConsumer kafkaConsumer; + private ObjectNode config; + public void init(String topic, ObjectNode config) { + logger.info("Initializing Kafka consumer on topic {} with config {}", topic, config); + this.topic = topic; + this.config = config; + pollTimeoutInMs = config.get(POLL_TIMEOUT_IN_MS).asLong(); + createTopic(); + initConsumer(); + } + + private void createTopic() { + try { + final Properties properties = + OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class); + final AdminClient adminClient = AdminClient.create(properties); + final NewTopic kafkaTopic = new NewTopic(topic, NUM_TOPIC_PARTITIONS, TOPIC_REPLICATION_FACTOR); + adminClient.createTopics(Collections.singleton(kafkaTopic)).all().get(); + } catch (Exception e) { + logger.warn("Error creating topic {}, error: {}", topic, e.getMessage()); + } + } - public void init(ObjectNode config) { - logger.info("Initializing consumer for kafka with config {}", config); - kafkaConsumerConfig = OBJECT_MAPPER.convertValue(config.get("kafkaConsumerConfig"), Properties.class); + private void initConsumer() { + final Properties kafkaConsumerConfig = + OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class); // force override consumer configuration for kafka to poll max 1 message at a time - kafkaConsumerConfig.put("max.poll.records", 1); - pollTimeoutInMs = config.get("pollTimeoutInMs").asLong(); + kafkaConsumerConfig.put(MAX_POLL_RECORDS, 1); + kafkaConsumerConfig.put(GROUP_ID, config.get(CONSUMER_KEY).asText()); + kafkaConsumer = new KafkaConsumer<>(kafkaConsumerConfig); + kafkaConsumer.subscribe(Collections.singletonList(topic)); } @Override - public List poll(String topic) { - return poll(topic, Integer.MAX_VALUE); + public List poll() { + return poll(Integer.MAX_VALUE); } @Override - public List poll(String topic, int size) { + public synchronized List poll(int size) { logger.trace("Received request to poll messages from topic {} with max size {}", topic, size); List tasks = new ArrayList<>(); - if (!topicToKafkaConsumerMap.containsKey(topic)) { - createKafkaConsumer(topic); - } - final KafkaConsumer kafkaConsumer = topicToKafkaConsumerMap.get(topic); - synchronized (kafkaConsumer) { - while (tasks.size() < size) { - final ConsumerRecords consumerRecords = kafkaConsumer - .poll(Duration.ofMillis(pollTimeoutInMs)); - if (consumerRecords.isEmpty()) { - break; - } - for (ConsumerRecord consumerRecord : consumerRecords) { - tasks.add(consumerRecord.value()); - } + while (tasks.size() < size) { + final ConsumerRecords consumerRecords = kafkaConsumer + .poll(Duration.ofMillis(pollTimeoutInMs)); + if (consumerRecords.isEmpty()) { + break; + } + for (ConsumerRecord consumerRecord : consumerRecords) { + tasks.add(consumerRecord.value()); } } - return tasks; } - private synchronized void createKafkaConsumer(String topic) { - if (!topicToKafkaConsumerMap.containsKey(topic)) { - final Properties kafkaConsumerConfig = new Properties(); - kafkaConsumerConfig.putAll(this.kafkaConsumerConfig); - kafkaConsumerConfig.put(GROUP_ID, kafkaConsumerConfig.getProperty(GROUP_ID) + "-" + topic); - logger.info("Creating kafka consumer on topic {} with consumer config {}", topic, kafkaConsumerConfig); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaConsumerConfig); - kafkaConsumer.subscribe(Collections.singletonList(topic)); - topicToKafkaConsumerMap.put(topic, kafkaConsumer); - } - } - @Override - public void close() { - topicToKafkaConsumerMap.forEach((topic, kafkaConsumer) -> { - synchronized (kafkaConsumer) { - try { - kafkaConsumer.close(); - } catch (Exception e) { - logger.warn("Error closing Kafka consumer for topic {}", topic, e); - } + public synchronized void close() { + try { + if (kafkaConsumer != null) { + kafkaConsumer.close(); } - }); + } catch (Exception e) { + logger.warn("Error closing Kafka consumer for topic {}", topic, e); + } } @Override - public void destroy() { - logger.info("Received request to delete created topics and consumer groups from Kafka"); - AdminClient adminClient = AdminClient.create(kafkaConsumerConfig); - Set topics = topicToKafkaConsumerMap.keySet(); - Set consumerGroups = - topics.stream().map(topic -> kafkaConsumerConfig.getProperty(GROUP_ID) + "-" + topic) - .collect(Collectors.toSet()); - logger.info("Deleting Kafka consumer group {} and topics {}", consumerGroups, topics); + public synchronized void destroy() { + logger.info("Received request to destroy consumer and topic {}", topic); + final Properties properties = + OBJECT_MAPPER.convertValue(config.get(KAFKA_CONSUMER_CONFIG), Properties.class); + final AdminClient adminClient = AdminClient.create(properties); + + String consumerGroupKey = config.get(CONSUMER_KEY).asText(); + logger.info("Deleting Kafka consumer group {} and topic {}", consumerGroupKey, topic); try { - adminClient.deleteConsumerGroups(consumerGroups); + adminClient.deleteConsumerGroups(Collections.singletonList(consumerGroupKey)); } catch (Exception e) { - logger.warn("Error deleting Kafka consumer groups {}", consumerGroups, e); + logger.warn("Error deleting Kafka consumer group {}", consumerGroupKey, e); } try { - adminClient.deleteTopics(topics); + adminClient.deleteTopics(Collections.singleton(topic)); } catch (Exception e) { - logger.warn("Error deleting Kafka topics {}", topics, e); + logger.warn("Error deleting Kafka topic {}", topic, e); } try { adminClient.close(); diff --git a/kafka-queue/src/main/java/com/cognitree/kronos/queue/producer/KafkaProducerImpl.java b/kafka-queue/src/main/java/com/cognitree/kronos/queue/producer/KafkaProducerImpl.java index 19507c3..0b8a4ec 100755 --- a/kafka-queue/src/main/java/com/cognitree/kronos/queue/producer/KafkaProducerImpl.java +++ b/kafka-queue/src/main/java/com/cognitree/kronos/queue/producer/KafkaProducerImpl.java @@ -19,11 +19,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Properties; /** @@ -33,25 +36,55 @@ public class KafkaProducerImpl implements Producer { private static final Logger logger = LoggerFactory.getLogger(KafkaProducerImpl.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String KAFKA_PRODUCER_CONFIG_KEY = "kafkaProducerConfig"; + private static final int NUM_TOPIC_PARTITIONS = 3; + private static final short TOPIC_REPLICATION_FACTOR = (short) 1; private KafkaProducer kafkaProducer; + private String topic; + private ObjectNode config; - public void init(ObjectNode config) { - logger.info("Initializing producer for kafka with config {}", config); - Properties kafkaProducerConfig = OBJECT_MAPPER.convertValue(config.get("kafkaProducerConfig"), Properties.class); + public void init(String topic, ObjectNode config) { + logger.info("Initializing Kafka producer for topic {} with config {}", topic, config); + this.topic = topic; + this.config = config; + createTopic(); + initProducer(); + } + + private void createTopic() { + try { + final Properties properties = + OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class); + final AdminClient adminClient = AdminClient.create(properties); + final NewTopic kafkaTopic = new NewTopic(topic, NUM_TOPIC_PARTITIONS, TOPIC_REPLICATION_FACTOR); + adminClient.createTopics(Collections.singleton(kafkaTopic)).all().get(); + } catch (Exception e) { + logger.error("Error creating topic {}, error: {}", topic, e.getMessage()); + } + } + + private void initProducer() { + final Properties kafkaProducerConfig = + OBJECT_MAPPER.convertValue(config.get(KAFKA_PRODUCER_CONFIG_KEY), Properties.class); kafkaProducer = new KafkaProducer<>(kafkaProducerConfig); } @Override - public void send(String topic, String record) { - sendInOrder(topic, record, null); + public void broadcast(String record) { + sendInOrder(record, null); + } + + @Override + public void send(String record) { + sendInOrder(record, null); } @Override - public void sendInOrder(String topic, String record, String orderingKey) { + public void sendInOrder(String record, String orderingKey) { logger.trace("Received request to send message {} to topic {} with orderingKey {}", record, topic, orderingKey); - ProducerRecord producerRecord = orderingKey == null ? + final ProducerRecord producerRecord = orderingKey == null ? new ProducerRecord<>(topic, record) : new ProducerRecord<>(topic, orderingKey, record); kafkaProducer.send(producerRecord, (metadata, exception) -> { if (exception != null) { diff --git a/mongo-store/pom.xml b/mongo-store/pom.xml index 190a42e..c10a626 100644 --- a/mongo-store/pom.xml +++ b/mongo-store/pom.xml @@ -6,7 +6,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/pom.xml b/pom.xml index e684037..d48a24f 100755 --- a/pom.xml +++ b/pom.xml @@ -26,10 +26,10 @@ kronos-extensions kronos-extensions pom - 2.2.4 + 3.0.0-RC1 - 2.2.4-RC2 + 3.0.0-RC2 1.8 1.8 UTF-8 diff --git a/spark-handler/pom.xml b/spark-handler/pom.xml index 0041827..abcb421 100644 --- a/spark-handler/pom.xml +++ b/spark-handler/pom.xml @@ -22,7 +22,7 @@ kronos-extensions com.cognitree.kronos.extensions - 2.2.4 + 3.0.0-RC1 4.0.0 diff --git a/spark-handler/src/main/java/com/cognitree/kronos/executor/handlers/SparkHandler.java b/spark-handler/src/main/java/com/cognitree/kronos/executor/handlers/SparkHandler.java index 8721826..3067c25 100644 --- a/spark-handler/src/main/java/com/cognitree/kronos/executor/handlers/SparkHandler.java +++ b/spark-handler/src/main/java/com/cognitree/kronos/executor/handlers/SparkHandler.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; import static com.cognitree.spark.restclient.SparkRestClient.ClusterMode; @@ -40,73 +42,102 @@ public class SparkHandler implements TaskHandler { private static final Logger logger = LoggerFactory.getLogger(SparkHandler.class); private static final int STATUS_MONITORING_INTERVAL = 5000; private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + private static final String SUBMISSION_ID = "submissionId"; - @Override - public void init(ObjectNode handlerConfig) { + private Task task; + private SparkRestClient sparkRestClient; + private String sparkJobSubmissionId; - } + private boolean abort = false; @Override - public TaskResult handle(Task task) { - logger.info("Received request to handle task {}", task); + public void init(Task task, ObjectNode config) { + this.task = task; final Map taskProperties = task.getProperties(); final String sparkVersion = (String) taskProperties.get("sparkVersion"); final String masterHost = (String) taskProperties.get("masterHost"); final Integer masterPort = (Integer) taskProperties.get("masterPort"); final ClusterMode clusterMode = ClusterMode.valueOf((String) taskProperties.get("clusterMode")); final boolean secure = (boolean) taskProperties.getOrDefault("secure", false); - final Integer monitoringInterval = (Integer) taskProperties. - getOrDefault("monitoringInterval", STATUS_MONITORING_INTERVAL); - final SparkRestClient sparkRestClient = builder() + sparkRestClient = builder() .masterHost(masterHost) .masterPort(masterPort) .sparkVersion(sparkVersion) .clusterMode(clusterMode) .isSecure(secure) .build(); + } + @Override + public TaskResult execute() { + logger.info("Received request to execute task {}", task); + Map taskProperties = task.getProperties(); if (!taskProperties.containsKey("submitRequest")) { logger.error("Missing Spark job submit request, failing task {}", task); return new TaskResult(false, "missing Spark job submit request"); } + final HashMap context = new HashMap<>(); + final JobSubmitRequest submitRequest = + MAPPER.convertValue(taskProperties.get("submitRequest"), JobSubmitRequest.class); + final JobSubmitResponse jobSubmitResponse; try { - final JobSubmitRequest submitRequest = - MAPPER.convertValue(taskProperties.get("submitRequest"), JobSubmitRequest.class); - final JobSubmitResponse jobSubmitResponse = sparkRestClient.submitJob(submitRequest); + jobSubmitResponse = sparkRestClient.submitJob(submitRequest); + sparkJobSubmissionId = jobSubmitResponse.getSubmissionId(); + context.put(SUBMISSION_ID, sparkJobSubmissionId); if (!jobSubmitResponse.getSuccess()) { logger.error("Unable to submit Spark job request. Response : {}", jobSubmitResponse); - return new TaskResult(false, "Unable to submit Spark job request"); + return new TaskResult(false, "Unable to submit Spark job request", context); } + } catch (IOException e) { + logger.error("Error submitting Spark job, request: {}", submitRequest, e); + return new TaskResult(false, "Unable to submit Spark job request", context); + } - JobStatusResponse statusResponse = sparkRestClient.getJobStatus(jobSubmitResponse.getSubmissionId()); - long maxExecutionTime = task.getMaxExecutionTimeInMs(); - while (!statusResponse.getDriverState().isFinal() && maxExecutionTime > 0) { + final Integer monitoringInterval = (Integer) taskProperties. + getOrDefault("monitoringInterval", STATUS_MONITORING_INTERVAL); + while (true) { + if (abort) { + logger.error("Task {} has been aborted", task); + return new TaskResult(false, "Task has been aborted"); + } + try { + JobStatusResponse statusResponse = sparkRestClient.getJobStatus(jobSubmitResponse.getSubmissionId()); + if (statusResponse.getDriverState().isFinal()) { + logger.info("Task {} finished execution with state {}", task, statusResponse.getDriverState()); + if (statusResponse.getDriverState() != DriverState.FINISHED) { + return new TaskResult(false, "Spark job finished execution with failure state: " + + statusResponse.getDriverState(), context); + } + break; + } + } catch (IOException e) { + logger.error("Error retrieving Spark job status", e); + return new TaskResult(false, "Unable to retrieve Spark job status", context); + } + try { Thread.sleep(monitoringInterval); - statusResponse = sparkRestClient.getJobStatus(jobSubmitResponse.getSubmissionId()); - maxExecutionTime -= monitoringInterval; + } catch (InterruptedException ignored) { } + } + return new TaskResult(true, null, context); + } - if (!statusResponse.getDriverState().isFinal() && maxExecutionTime < 0) { - logger.error("Task {} exceeded max execution time allowed. Killing job with submission id {}", - task, jobSubmitResponse.getSubmissionId()); - final KillJobResponse killJobResponse = sparkRestClient.killJob(jobSubmitResponse.getSubmissionId()); + @Override + public void abort() { + logger.error("Received request to abort task {}", task.getIdentity()); + abort = true; + if (sparkJobSubmissionId != null) { + try { + final KillJobResponse killJobResponse = sparkRestClient.killJob(sparkJobSubmissionId); if (!killJobResponse.getSuccess()) { logger.error("Unable to kill job with submission id {}, message {}", - jobSubmitResponse.getSubmissionId(), killJobResponse.getMessage()); + sparkJobSubmissionId, killJobResponse.getMessage()); } - return new TaskResult(false, "Spark job exceeded max execution time"); - } - - logger.info("Task {} finished execution with state {}", task, statusResponse.getDriverState()); - if (statusResponse.getDriverState() != DriverState.FINISHED) { - return new TaskResult(false, "Spark job finished execution with failure state " + statusResponse.getDriverState()); + } catch (IOException e) { + logger.error("Error sending request to kill job with submission id {}", sparkJobSubmissionId, e); } - } catch (Exception e) { - logger.error("Error executing task {}", task, e); - return new TaskResult(false, "Error executing Spark job task: " + e.getMessage()); } - return TaskResult.SUCCESS; } } diff --git a/spark-handler/src/main/java/com/cognitree/spark/restclient/SparkRestClient.java b/spark-handler/src/main/java/com/cognitree/spark/restclient/SparkRestClient.java index 0399764..de6336a 100644 --- a/spark-handler/src/main/java/com/cognitree/spark/restclient/SparkRestClient.java +++ b/spark-handler/src/main/java/com/cognitree/spark/restclient/SparkRestClient.java @@ -34,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + import static org.apache.http.entity.ContentType.APPLICATION_JSON; import static org.apache.http.protocol.HTTP.CONTENT_TYPE; @@ -98,7 +100,7 @@ public void setSecure(boolean secure) { this.secure = secure; } - public JobSubmitResponse submitJob(JobSubmitRequest jobSubmitRequest) throws Exception { + public JobSubmitResponse submitJob(JobSubmitRequest jobSubmitRequest) throws IOException { logger.debug("Received request to submit Spark job with request {}", jobSubmitRequest); // override spark version if specified at rest client jobSubmitRequest.setClientSparkVersion(sparkVersion); @@ -140,13 +142,13 @@ private void addAppResourceToJars(JobSubmitRequest jobSubmitRequest) { jobSubmitRequest.getSparkProperties().setJars(jars); } - public JobStatusResponse getJobStatus(String submissionId) throws Exception { + public JobStatusResponse getJobStatus(String submissionId) throws IOException { logger.debug("Received request to query Spark job status with submission id {}", submissionId); final String url = getMasterRestUrl() + JOB_STATUS_URL + "/" + submissionId; return execute(new HttpGet(url), JobStatusResponse.class); } - public KillJobResponse killJob(String submissionId) throws Exception { + public KillJobResponse killJob(String submissionId) throws IOException { logger.debug("Received request to kill Spark job with submission id {}", submissionId); final String url = getMasterRestUrl() + KILL_JOB_URL + "/" + submissionId; return execute(new HttpPost(url), KillJobResponse.class); @@ -156,7 +158,7 @@ private String getMasterRestUrl() { return isSecure() ? "https" : "http" + "://" + masterUrl; } - private T execute(HttpRequestBase httpRequest, Class responseClass) throws Exception { + private T execute(HttpRequestBase httpRequest, Class responseClass) throws IOException { try { final String stringResponse = client.execute(httpRequest, new BasicResponseHandler()); return MAPPER.readValue(stringResponse, responseClass);