Skip to content

Commit

Permalink
[Feature][dinky-geteway] Feature Obtain job information using the ing…
Browse files Browse the repository at this point in the history
…ress address DataLinkDC#3871 (DataLinkDC#3887)

Co-authored-by: jianjun.xu <[email protected]>
  • Loading branch information
2 people authored and donotcoffee committed Nov 12, 2024
1 parent 07e3a00 commit 33c2245
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Maps;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
Expand Down Expand Up @@ -67,4 +69,11 @@ public class K8sConfig {

@ApiModelProperty(value = "KubeConfig", dataType = "String", example = "kubeconfig.yaml", notes = "KubeConfig file")
private String kubeConfig;

@ApiModelProperty(
value = "Ingress configuration",
dataType = "Map<String, String>",
example = "{\"key1\": \"value1\", \"key2\": \"value2\"}",
notes = "Ingress configuration properties")
private Map<String, String> ingressConfig = Maps.newHashMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@

package org.dinky.gateway.kubernetes;

import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_DOMAIN_KEY;
import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_ENABLED_KEY;

import org.dinky.assertion.Asserts;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.kubernetes.ingress.DinkyKubernetesIngress;
import org.dinky.gateway.kubernetes.utils.IgnoreNullRepresenter;
import org.dinky.gateway.kubernetes.utils.K8sClientHelper;
import org.dinky.gateway.model.ingress.JobDetails;
import org.dinky.gateway.model.ingress.JobOverviewInfo;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
Expand All @@ -45,15 +55,24 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.yaml.snakeyaml.Yaml;

import com.alibaba.fastjson2.JSONObject;

import cn.hutool.core.date.SystemClock;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpStatus;
import cn.hutool.http.HttpUtil;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -87,7 +106,23 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {

Deployment deployment = getK8sClientHelper().createDinkyResource();

KubernetesResult kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
KubernetesResult kubernetesResult;

String ingressDomain = checkUseIngress();
// if ingress is enabled and ingress domain is not empty, create an ingress service
if (StringUtils.isNotEmpty(ingressDomain)) {
K8sClientHelper k8sClientHelper = getK8sClientHelper();
long ingressStart = SystemClock.now();
DinkyKubernetesIngress ingress = new DinkyKubernetesIngress(k8sClientHelper);
ingress.configureIngress(
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.CLUSTER_ID),
ingressDomain,
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.NAMESPACE));
log.info("Create dinky ingress service success, cost time:{} ms", SystemClock.now() - ingressStart);
kubernetesResult = waitForJmAndJobStartByIngress(kubernetesClient, deployment, clusterClient);
} else {
kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
}
kubernetesResult.success();
return kubernetesResult;
} catch (Exception ex) {
Expand Down Expand Up @@ -214,4 +249,129 @@ public KubernetesResult waitForJmAndJobStart(
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

/**
* Waits for the JobManager and the Job to start in Kubernetes by ingress.
*
* @param deployment The deployment in Kubernetes.
* @param clusterClient The ClusterClientProvider<String> object for accessing the Kubernetes cluster.
* @return A KubernetesResult object containing the Kubernetes gateway's Web URL, the Job ID, and the cluster ID.
* @throws InterruptedException if waiting is interrupted.
*/
public KubernetesResult waitForJmAndJobStartByIngress(
KubernetesClient kubernetesClient, Deployment deployment, ClusterClientProvider<String> clusterClient)
throws InterruptedException {
KubernetesResult result = KubernetesResult.build(getType());
long waitSends = SystemConfiguration.getInstances().getJobIdWait() * 1000L;
long startTime = System.currentTimeMillis();

while (System.currentTimeMillis() - startTime < waitSends) {
List<Pod> pods = kubernetesClient
.pods()
.inNamespace(deployment.getMetadata().getNamespace())
.withLabelSelector(deployment.getSpec().getSelector())
.list()
.getItems();
for (Pod pod : pods) {
if (!checkPodStatus(pod)) {
logger.info("Kubernetes Pod have not ready, reTry at 5 sec later");
continue;
}
try {
logger.info("Start get job list ....");
JobDetails jobDetails = fetchApplicationJob(kubernetesClient, deployment);
if (Objects.isNull(jobDetails) || CollectionUtils.isEmpty(jobDetails.getJobs())) {
logger.error("Get job is empty, will be reconnect alter 5 sec later....");
Thread.sleep(5000);
continue;
}
JobOverviewInfo jobOverviewInfo =
jobDetails.getJobs().stream().findFirst().get();
// To create a cluster ID, you need to combine the cluster ID with the jobID to ensure uniqueness
String cid = configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + jobOverviewInfo.getJid();
logger.info("Success get job status: {}", jobOverviewInfo.getState());

return result.setJids(Collections.singletonList(jobOverviewInfo.getJid()))
.setWebURL(jobDetails.getWebUrl())
.setId(cid);
} catch (GatewayException e) {
throw e;
} catch (Exception ex) {
logger.error("Get job status failed,{}", ex.getMessage());
}
}
Thread.sleep(5000);
}
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

private JobDetails fetchApplicationJob(KubernetesClient kubernetesClient, Deployment deployment) {
// 判断是不是存在ingress, 如果存在ingress的话返回ingress地址
Ingress ingress = kubernetesClient
.network()
.v1()
.ingresses()
.inNamespace(deployment.getMetadata().getNamespace())
.withName(deployment.getMetadata().getName())
.get();
String ingressUrl = getIngressUrl(
ingress,
deployment.getMetadata().getNamespace(),
deployment.getMetadata().getName());
logger.info("Get dinky ingress url:{}", ingressUrl);
return invokeJobsOverviewApi(ingressUrl);
}

private JobDetails invokeJobsOverviewApi(String restUrl) {
try {
String body;
try (HttpResponse execute = HttpUtil.createGet(restUrl + "/jobs/overview")
.timeout(10000)
.execute()) {
// 判断状态码,如果是504的话可能是因为task manage节点还未启动
if (Objects.equals(execute.getStatus(), HttpStatus.HTTP_GATEWAY_TIMEOUT)) {
return null;
}
body = execute.body();
}
if (StringUtils.isNotEmpty(body)) {
JobDetails jobDetails = JSONObject.parseObject(body, JobDetails.class);
jobDetails.setWebUrl(restUrl);
return jobDetails;
}
} catch (Exception e) {
logger.warn("Get job overview warning, task manage is enabled and can be ignored");
}
return null;
}

private String getIngressUrl(Ingress ingress, String namespace, String clusterId) {
if (Objects.nonNull(ingress)
&& Objects.nonNull(ingress.getSpec())
&& Objects.nonNull(ingress.getSpec().getRules())
&& !ingress.getSpec().getRules().isEmpty()) {
String host = ingress.getSpec().getRules().get(0).getHost();
return StrFormatter.format("http://{}/{}/{}", host, namespace, clusterId);
}
throw new GatewayException(
StrFormatter.format("Dinky clusterId {} ingress not found in namespace {}", clusterId, namespace));
}

/**
* Determine whether to use the ingress agent service
* @return ingress domain
*/
private String checkUseIngress() {
Map<String, String> ingressConfig = k8sConfig.getIngressConfig();
if (MapUtils.isNotEmpty(ingressConfig)) {
boolean ingressEnable =
Boolean.parseBoolean(ingressConfig.getOrDefault(DINKY_K8S_INGRESS_ENABLED_KEY, "false"));
String ingressDomain = ingressConfig.getOrDefault(DINKY_K8S_INGRESS_DOMAIN_KEY, StringUtils.EMPTY);
if (ingressEnable && StringUtils.isNotEmpty(ingressDomain)) {
return ingressDomain;
}
}
return StringUtils.EMPTY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.ingress;

import static org.dinky.assertion.Asserts.checkNotNull;

import org.dinky.gateway.kubernetes.utils.K8sClientHelper;

import java.util.Map;

import com.google.common.collect.Maps;

import cn.hutool.core.text.StrFormatter;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DinkyKubernetesIngress {

private final K8sClientHelper k8sClientHelper;

public DinkyKubernetesIngress(K8sClientHelper k8sClientHelper) {
this.k8sClientHelper = k8sClientHelper;
}

public void configureIngress(String clusterId, String domain, String namespace) {
log.info("Dinky ingress configure ingress for cluster {} in namespace {}", clusterId, namespace);
OwnerReference ownerReference = getOwnerReference(namespace, clusterId);
Ingress ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
.addToAnnotations(buildIngressAnnotations(clusterId, namespace))
.addToLabels(buildIngressLabels(clusterId))
.addToOwnerReferences(ownerReference) // Add OwnerReference
.endMetadata()
.withNewSpec()
.addNewRule()
.withHost(domain)
.withNewHttp()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}/", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}(/|$)(.*)", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.endHttp()
.endRule()
.endSpec()
.build();
try (KubernetesClient kubernetesClient = k8sClientHelper.getKubernetesClient()) {
kubernetesClient.network().v1().ingresses().inNamespace(namespace).create(ingress);
}
}

private Map<String, String> buildIngressAnnotations(String clusterId, String namespace) {
Map<String, String> annotations = Maps.newConcurrentMap();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
annotations.put("nginx.ingress.kubernetes.io/proxy-body-size", "1024m");
// Build the configuration snippet
String configurationSnippet =
"rewrite ^(/" + clusterId + ")$ $1/ permanent; " + "sub_filter '<base href=\"./\">' '<base href=\"/"
+ namespace + "/" + clusterId + "/\">'; " + "sub_filter_once off;";
annotations.put("nginx.ingress.kubernetes.io/configuration-snippet", configurationSnippet);
annotations.put("kubernetes.io/ingress.class", "nginx");
return annotations;
}

private Map<String, String> buildIngressLabels(String clusterId) {
Map<String, String> labels = Maps.newConcurrentMap();
labels.put("app", clusterId);
labels.put("type", "flink-native-kubernetes");
labels.put("component", "ingress");
return labels;
}

private OwnerReference getOwnerReference(String namespace, String clusterId) {
log.info("Dinky ingress get owner reference for cluster {} in namespace {}", clusterId, namespace);
KubernetesClient client = k8sClientHelper.getKubernetesClient();
Deployment deployment = client.apps()
.deployments()
.inNamespace(namespace)
.withName(clusterId)
.get();
checkNotNull(
deployment,
StrFormatter.format("Deployment with name {} not found in namespace {}", clusterId, namespace));
return new OwnerReferenceBuilder()
.withUid(deployment.getMetadata().getUid())
.withApiVersion("apps/v1")
.withKind("Deployment")
.withName(clusterId)
.withController(true)
.withBlockOwnerDeletion(true)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@
public class DinkyKubernetsConstants {
public static final String DINKY_CONF_VOLUME = "dinky-config-volume";
public static final String DINKY_CONF_VOLUME_PERFIX = "dinky-config-";

public static final String DINKY_K8S_INGRESS_ENABLED_KEY = "kubernetes.ingress.enabled";
public static final String DINKY_K8S_INGRESS_DOMAIN_KEY = "kubernetes.ingress.domain";
}
Loading

0 comments on commit 33c2245

Please sign in to comment.