Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][dinky-geteway] Feature Obtain job information using the ingress address #3871 #3887

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Maps;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
Expand Down Expand Up @@ -67,4 +69,11 @@ public class K8sConfig {

@ApiModelProperty(value = "KubeConfig", dataType = "String", example = "kubeconfig.yaml", notes = "KubeConfig file")
private String kubeConfig;

@ApiModelProperty(
value = "Ingress configuration",
dataType = "Map<String, String>",
example = "{\"key1\": \"value1\", \"key2\": \"value2\"}",
notes = "Ingress configuration properties")
private Map<String, String> ingressConfig = Maps.newHashMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@

package org.dinky.gateway.kubernetes;

import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_DOMAIN_KEY;
import static org.dinky.gateway.kubernetes.utils.DinkyKubernetsConstants.DINKY_K8S_INGRESS_ENABLED_KEY;

import org.dinky.assertion.Asserts;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.kubernetes.ingress.DinkyKubernetesIngress;
import org.dinky.gateway.kubernetes.utils.IgnoreNullRepresenter;
import org.dinky.gateway.kubernetes.utils.K8sClientHelper;
import org.dinky.gateway.model.ingress.JobDetails;
import org.dinky.gateway.model.ingress.JobOverviewInfo;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
Expand All @@ -45,15 +55,24 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.yaml.snakeyaml.Yaml;

import com.alibaba.fastjson2.JSONObject;

import cn.hutool.core.date.SystemClock;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpStatus;
import cn.hutool.http.HttpUtil;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -87,7 +106,23 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {

Deployment deployment = getK8sClientHelper().createDinkyResource();

KubernetesResult kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
KubernetesResult kubernetesResult;

String ingressDomain = checkUseIngress();
// if ingress is enabled and ingress domain is not empty, create an ingress service
if (StringUtils.isNotEmpty(ingressDomain)) {
K8sClientHelper k8sClientHelper = getK8sClientHelper();
long ingressStart = SystemClock.now();
DinkyKubernetesIngress ingress = new DinkyKubernetesIngress(k8sClientHelper);
ingress.configureIngress(
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.CLUSTER_ID),
ingressDomain,
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.NAMESPACE));
log.info("Create dinky ingress service success, cost time:{} ms", SystemClock.now() - ingressStart);
kubernetesResult = waitForJmAndJobStartByIngress(kubernetesClient, deployment, clusterClient);
} else {
kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
}
kubernetesResult.success();
return kubernetesResult;
} catch (Exception ex) {
Expand Down Expand Up @@ -214,4 +249,129 @@ public KubernetesResult waitForJmAndJobStart(
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

/**
* Waits for the JobManager and the Job to start in Kubernetes by ingress.
*
* @param deployment The deployment in Kubernetes.
* @param clusterClient The ClusterClientProvider<String> object for accessing the Kubernetes cluster.
* @return A KubernetesResult object containing the Kubernetes gateway's Web URL, the Job ID, and the cluster ID.
* @throws InterruptedException if waiting is interrupted.
*/
public KubernetesResult waitForJmAndJobStartByIngress(
KubernetesClient kubernetesClient, Deployment deployment, ClusterClientProvider<String> clusterClient)
throws InterruptedException {
KubernetesResult result = KubernetesResult.build(getType());
long waitSends = SystemConfiguration.getInstances().getJobIdWait() * 1000L;
long startTime = System.currentTimeMillis();

while (System.currentTimeMillis() - startTime < waitSends) {
List<Pod> pods = kubernetesClient
.pods()
.inNamespace(deployment.getMetadata().getNamespace())
.withLabelSelector(deployment.getSpec().getSelector())
.list()
.getItems();
for (Pod pod : pods) {
if (!checkPodStatus(pod)) {
logger.info("Kubernetes Pod have not ready, reTry at 5 sec later");
continue;
}
try {
logger.info("Start get job list ....");
JobDetails jobDetails = fetchApplicationJob(kubernetesClient, deployment);
if (Objects.isNull(jobDetails) || CollectionUtils.isEmpty(jobDetails.getJobs())) {
logger.error("Get job is empty, will be reconnect alter 5 sec later....");
Thread.sleep(5000);
continue;
}
JobOverviewInfo jobOverviewInfo =
jobDetails.getJobs().stream().findFirst().get();
// To create a cluster ID, you need to combine the cluster ID with the jobID to ensure uniqueness
String cid = configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + jobOverviewInfo.getJid();
logger.info("Success get job status: {}", jobOverviewInfo.getState());

return result.setJids(Collections.singletonList(jobOverviewInfo.getJid()))
.setWebURL(jobDetails.getWebUrl())
.setId(cid);
} catch (GatewayException e) {
throw e;
} catch (Exception ex) {
logger.error("Get job status failed,{}", ex.getMessage());
}
}
Thread.sleep(5000);
}
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

private JobDetails fetchApplicationJob(KubernetesClient kubernetesClient, Deployment deployment) {
// 判断是不是存在ingress, 如果存在ingress的话返回ingress地址
Ingress ingress = kubernetesClient
.network()
.v1()
.ingresses()
.inNamespace(deployment.getMetadata().getNamespace())
.withName(deployment.getMetadata().getName())
.get();
String ingressUrl = getIngressUrl(
ingress,
deployment.getMetadata().getNamespace(),
deployment.getMetadata().getName());
logger.info("Get dinky ingress url:{}", ingressUrl);
return invokeJobsOverviewApi(ingressUrl);
}

private JobDetails invokeJobsOverviewApi(String restUrl) {
try {
String body;
try (HttpResponse execute = HttpUtil.createGet(restUrl + "/jobs/overview")
.timeout(10000)
.execute()) {
// 判断状态码,如果是504的话可能是因为task manage节点还未启动
if (Objects.equals(execute.getStatus(), HttpStatus.HTTP_GATEWAY_TIMEOUT)) {
return null;
}
body = execute.body();
}
if (StringUtils.isNotEmpty(body)) {
JobDetails jobDetails = JSONObject.parseObject(body, JobDetails.class);
jobDetails.setWebUrl(restUrl);
return jobDetails;
}
} catch (Exception e) {
logger.warn("Get job overview warning, task manage is enabled and can be ignored");
}
return null;
}

private String getIngressUrl(Ingress ingress, String namespace, String clusterId) {
if (Objects.nonNull(ingress)
&& Objects.nonNull(ingress.getSpec())
&& Objects.nonNull(ingress.getSpec().getRules())
&& !ingress.getSpec().getRules().isEmpty()) {
String host = ingress.getSpec().getRules().get(0).getHost();
return StrFormatter.format("http://{}/{}/{}", host, namespace, clusterId);
}
throw new GatewayException(
StrFormatter.format("Dinky clusterId {} ingress not found in namespace {}", clusterId, namespace));
}

/**
* Determine whether to use the ingress agent service
* @return ingress domain
*/
private String checkUseIngress() {
Map<String, String> ingressConfig = k8sConfig.getIngressConfig();
if (MapUtils.isNotEmpty(ingressConfig)) {
boolean ingressEnable =
Boolean.parseBoolean(ingressConfig.getOrDefault(DINKY_K8S_INGRESS_ENABLED_KEY, "false"));
String ingressDomain = ingressConfig.getOrDefault(DINKY_K8S_INGRESS_DOMAIN_KEY, StringUtils.EMPTY);
if (ingressEnable && StringUtils.isNotEmpty(ingressDomain)) {
return ingressDomain;
}
}
return StringUtils.EMPTY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.ingress;

import static org.dinky.assertion.Asserts.checkNotNull;

import org.dinky.gateway.kubernetes.utils.K8sClientHelper;

import java.util.Map;

import com.google.common.collect.Maps;

import cn.hutool.core.text.StrFormatter;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DinkyKubernetesIngress {

private final K8sClientHelper k8sClientHelper;

public DinkyKubernetesIngress(K8sClientHelper k8sClientHelper) {
this.k8sClientHelper = k8sClientHelper;
}

public void configureIngress(String clusterId, String domain, String namespace) {
log.info("Dinky ingress configure ingress for cluster {} in namespace {}", clusterId, namespace);
OwnerReference ownerReference = getOwnerReference(namespace, clusterId);
Ingress ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
.addToAnnotations(buildIngressAnnotations(clusterId, namespace))
.addToLabels(buildIngressLabels(clusterId))
.addToOwnerReferences(ownerReference) // Add OwnerReference
.endMetadata()
.withNewSpec()
.addNewRule()
.withHost(domain)
.withNewHttp()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}/", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}(/|$)(.*)", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.endHttp()
.endRule()
.endSpec()
.build();
try (KubernetesClient kubernetesClient = k8sClientHelper.getKubernetesClient()) {
kubernetesClient.network().v1().ingresses().inNamespace(namespace).create(ingress);
}
}

private Map<String, String> buildIngressAnnotations(String clusterId, String namespace) {
Map<String, String> annotations = Maps.newConcurrentMap();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
annotations.put("nginx.ingress.kubernetes.io/proxy-body-size", "1024m");
// Build the configuration snippet
String configurationSnippet =
"rewrite ^(/" + clusterId + ")$ $1/ permanent; " + "sub_filter '<base href=\"./\">' '<base href=\"/"
+ namespace + "/" + clusterId + "/\">'; " + "sub_filter_once off;";
annotations.put("nginx.ingress.kubernetes.io/configuration-snippet", configurationSnippet);
annotations.put("kubernetes.io/ingress.class", "nginx");
return annotations;
}

private Map<String, String> buildIngressLabels(String clusterId) {
Map<String, String> labels = Maps.newConcurrentMap();
labels.put("app", clusterId);
labels.put("type", "flink-native-kubernetes");
labels.put("component", "ingress");
return labels;
}

private OwnerReference getOwnerReference(String namespace, String clusterId) {
log.info("Dinky ingress get owner reference for cluster {} in namespace {}", clusterId, namespace);
KubernetesClient client = k8sClientHelper.getKubernetesClient();
Deployment deployment = client.apps()
.deployments()
.inNamespace(namespace)
.withName(clusterId)
.get();
checkNotNull(
deployment,
StrFormatter.format("Deployment with name {} not found in namespace {}", clusterId, namespace));
return new OwnerReferenceBuilder()
.withUid(deployment.getMetadata().getUid())
.withApiVersion("apps/v1")
.withKind("Deployment")
.withName(clusterId)
.withController(true)
.withBlockOwnerDeletion(true)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@
public class DinkyKubernetsConstants {
public static final String DINKY_CONF_VOLUME = "dinky-config-volume";
public static final String DINKY_CONF_VOLUME_PERFIX = "dinky-config-";

public static final String DINKY_K8S_INGRESS_ENABLED_KEY = "kubernetes.ingress.enabled";
public static final String DINKY_K8S_INGRESS_DOMAIN_KEY = "kubernetes.ingress.domain";
}
Loading
Loading