diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java index 8e4eab48a0..4ae095caad 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/utils/K8sClientHelper.java @@ -19,6 +19,12 @@ package org.dinky.gateway.kubernetes.utils; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ObjectUtil; +import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; +import io.fabric8.kubernetes.client.*; +import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import org.apache.commons.lang3.StringUtils; import org.dinky.gateway.kubernetes.decorate.DinkySqlConfigMapDecorate; import org.dinky.utils.TextUtil; @@ -46,9 +52,6 @@ import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.utils.Serialization; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -65,6 +68,7 @@ public class K8sClientHelper { private KubernetesClient kubernetesClient; protected Configuration configuration; private DinkySqlConfigMapDecorate sqlFileDecorate; + private Watcher deploymentStatusWatch = getDeploymentStatusWatch(); public K8sClientHelper(Configuration configuration, String kubeConfig) { this.configuration = configuration; @@ -86,6 +90,46 @@ public Optional getJobService(String clusterId) { return Optional.of(deployment); } + /** + * getDeploymentStatusWatch + */ + private Watcher getDeploymentStatusWatch() { + return new Watcher() { + + @Override + public void eventReceived(Action action, Deployment deployment) { + String deploymentName = deployment.getMetadata().getName(); + log.info("deployment name: {}, deployment action: {}", deploymentName, action); + if (ObjectUtil.isNotNull(deployment.getStatus()) + && CollectionUtil.isNotEmpty(deployment.getStatus().getConditions())) { + List conditions = deployment.getStatus().getConditions(); + conditions.forEach(condition -> { + if (StringUtils.equalsIgnoreCase(condition.getStatus(), "true")) { + log.info("deployment name: {}, deployment status: {}, message: {}", + deploymentName, + condition.getStatus(), + condition.getMessage()); + } else { + log.warn("deployment name: {}, deployment status: {}, message: {}", + deploymentName, + condition.getStatus(), + condition.getMessage()); + } + }); + } + } + + @Override + public void onClose(WatcherException cause) { + if (cause != null) { + log.error("Watcher closed due to exception: {}", cause.getMessage()); + } else { + log.info("Watcher closed gracefully."); + } + } + }; + } + public boolean getClusterIsPresent(String clusterId) { return getJobService(clusterId).isPresent(); } @@ -113,12 +157,12 @@ private void initKubeClient(String kubeConfig) { */ public Deployment createDinkyResource() { log.info("createDinkyResource"); - Deployment deployment = kubernetesClient + RollableScalableResource deploymentRollableScalableResource = kubernetesClient .apps() .deployments() .inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE)) - .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)) - .get(); + .withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID)); + Deployment deployment = deploymentRollableScalableResource.get(); List resources = getSqlFileDecorate().buildResources(); // set owner reference OwnerReference deploymentOwnerReference = new OwnerReferenceBuilder() @@ -134,6 +178,7 @@ public Deployment createDinkyResource() { resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference))); // create resources resources.forEach(resource -> log.info(Serialization.asYaml(resource))); + deploymentRollableScalableResource.watch(deploymentStatusWatch); kubernetesClient.resourceList(resources).createOrReplace(); return deployment; } @@ -141,6 +186,7 @@ public Deployment createDinkyResource() { /** * initPodTemplate * Preprocess the pod template + * * @param sqlStatement * @return */ @@ -166,8 +212,7 @@ public Pod decoratePodTemplate(String sqlStatement, String podTemplate) { /** * dumpPod2Str - * - * */ + */ public String dumpPod2Str(Pod pod) { // use snakyaml to serialize the pod Representer representer = new IgnoreNullRepresenter(); @@ -179,9 +224,11 @@ public String dumpPod2Str(Pod pod) { Yaml yaml = new Yaml(representer, options); return yaml.dump(pod); } + /** * close * delete the temporary directory and close the client + * * @return */ public boolean close() {