Skip to content

Commit

Permalink
[Optimization][dinky-getaway] Add Deployment status monitoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhang2.zhang committed Nov 30, 2024
1 parent 37e0b2b commit bbd1066
Showing 1 changed file with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.dinky.gateway.kubernetes.utils;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import org.apache.commons.lang3.StringUtils;
import org.dinky.gateway.kubernetes.decorate.DinkySqlConfigMapDecorate;
import org.dinky.utils.TextUtil;

Expand Down Expand Up @@ -46,9 +52,6 @@
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.utils.Serialization;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -65,6 +68,7 @@ public class K8sClientHelper {
private KubernetesClient kubernetesClient;
protected Configuration configuration;
private DinkySqlConfigMapDecorate sqlFileDecorate;
private Watcher<Deployment> deploymentStatusWatch = getDeploymentStatusWatch();

public K8sClientHelper(Configuration configuration, String kubeConfig) {
this.configuration = configuration;
Expand All @@ -86,6 +90,46 @@ public Optional<Deployment> getJobService(String clusterId) {
return Optional.of(deployment);
}

/**
* getDeploymentStatusWatch
*/
private Watcher<Deployment> getDeploymentStatusWatch() {
return new Watcher<Deployment>() {

@Override
public void eventReceived(Action action, Deployment deployment) {
String deploymentName = deployment.getMetadata().getName();
log.info("deployment name: {}, deployment action: {}", deploymentName, action);
if (ObjectUtil.isNotNull(deployment.getStatus())
&& CollectionUtil.isNotEmpty(deployment.getStatus().getConditions())) {
List<DeploymentCondition> conditions = deployment.getStatus().getConditions();
conditions.forEach(condition -> {
if (StringUtils.equalsIgnoreCase(condition.getStatus(), "true")) {
log.info("deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
} else {
log.warn("deployment name: {}, deployment status: {}, message: {}",
deploymentName,
condition.getStatus(),
condition.getMessage());
}
});
}
}

@Override
public void onClose(WatcherException cause) {
if (cause != null) {
log.error("Watcher closed due to exception: {}", cause.getMessage());
} else {
log.info("Watcher closed gracefully.");
}
}
};
}

public boolean getClusterIsPresent(String clusterId) {
return getJobService(clusterId).isPresent();
}
Expand Down Expand Up @@ -113,12 +157,12 @@ private void initKubeClient(String kubeConfig) {
*/
public Deployment createDinkyResource() {
log.info("createDinkyResource");
Deployment deployment = kubernetesClient
RollableScalableResource<Deployment> deploymentRollableScalableResource = kubernetesClient
.apps()
.deployments()
.inNamespace(configuration.get(KubernetesConfigOptions.NAMESPACE))
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID))
.get();
.withName(configuration.get(KubernetesConfigOptions.CLUSTER_ID));
Deployment deployment = deploymentRollableScalableResource.get();
List<HasMetadata> resources = getSqlFileDecorate().buildResources();
// set owner reference
OwnerReference deploymentOwnerReference = new OwnerReferenceBuilder()
Expand All @@ -134,13 +178,15 @@ public Deployment createDinkyResource() {
resource.getMetadata().setOwnerReferences(Collections.singletonList(deploymentOwnerReference)));
// create resources
resources.forEach(resource -> log.info(Serialization.asYaml(resource)));
deploymentRollableScalableResource.watch(deploymentStatusWatch);
kubernetesClient.resourceList(resources).createOrReplace();
return deployment;
}

/**
* initPodTemplate
* Preprocess the pod template
*
* @param sqlStatement
* @return
*/
Expand All @@ -166,8 +212,7 @@ public Pod decoratePodTemplate(String sqlStatement, String podTemplate) {

/**
* dumpPod2Str
*
* */
*/
public String dumpPod2Str(Pod pod) {
// use snakyaml to serialize the pod
Representer representer = new IgnoreNullRepresenter();
Expand All @@ -179,9 +224,11 @@ public String dumpPod2Str(Pod pod) {
Yaml yaml = new Yaml(representer, options);
return yaml.dump(pod);
}

/**
* close
* delete the temporary directory and close the client
*
* @return
*/
public boolean close() {
Expand Down

0 comments on commit bbd1066

Please sign in to comment.