Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][dinky-geteway] Feature Obtain job information using the ingress address #3871 #3887

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,14 @@ public enum Status {
204, "sys.flink.settings.flinkHistoryServerArchiveRefreshInterval"),
SYS_FLINK_SETTINGS_FLINK_HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL_NOTE(
205, "sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note"),

/**
* ingress config
* */
SYS_INGRESS_SETTINGS_ENABLE(206, "sys.ingress.settings.enable"),
SYS_INGRESS_SETTINGS_ENABLE_NOTE(207, "sys.ingress.settings.enable.note"),
SYS_INGRESS_SETTINGS_DOMAIN(208, "sys.ingress.settings.domain"),
SYS_INGRESS_SETTINGS_DOMAIN_NOTE(209, "sys.ingress.settings.domain.note"),
;
private final int code;
private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ public static Configuration.OptionBuilder key(Status status) {
.defaultValue(true)
.note(Status.SYS_RESOURCE_SETTINGS_PATH_STYLE_ACCESS_NOTE);

private final Configuration<Boolean> ingressEnable = key(Status.SYS_INGRESS_SETTINGS_ENABLE)
.booleanType()
.defaultValue(false)
.note(Status.SYS_INGRESS_SETTINGS_ENABLE_NOTE);

private final Configuration<String> ingressDomain = key(Status.SYS_INGRESS_SETTINGS_DOMAIN)
.stringType()
.defaultValue("")
.note(Status.SYS_INGRESS_SETTINGS_DOMAIN_NOTE);

/**
* Initialize after spring bean startup
*/
Expand Down Expand Up @@ -415,6 +425,14 @@ public String getPythonHome() {
return pythonHome.getValue();
}

public Boolean getIngressEnable() {
return ingressEnable.getValue();
}

public String getIngressDomain() {
return ingressDomain.getValue();
}

public OssProperties getOssProperties() {
return OssProperties.builder()
.enable(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,8 @@ sys.flink.settings.flinkHistoryServerPort=Flink History Server Port
sys.flink.settings.flinkHistoryServerPort.note=Flink History Server Port,For example, 8082, make sure that the port is not occupied
sys.flink.settings.flinkHistoryServerArchiveRefreshInterval= Flink History Server refresh Interval
sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note=For example, 10,000 refresh interval of the Flink History Server is refreshed every 10 seconds

sys.ingress.settings.enable=Whether to enable Ingress
sys.ingress.settings.enable.note=After the Ingress is enabled, in Kubernetes Application mode, the Job running status is obtained using the Ingress address
sys.ingress.settings.domain=Kubernetes Ingress domain
sys.ingress.settings.domain.note=Kubernetes Ingress domain
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,8 @@ sys.flink.settings.flinkHistoryServerPort=Flink History Server 端口
sys.flink.settings.flinkHistoryServerPort.note=Flink History Server 端口,例如:8082,确保端口没有被占用
sys.flink.settings.flinkHistoryServerArchiveRefreshInterval= Flink History Server 刷新间隔
sys.flink.settings.flinkHistoryServerArchiveRefreshInterval.note=Flink History Server 刷新间隔,单位:毫秒,例如:10000,表示每隔10秒刷新一次

sys.ingress.settings.enable=是否启用Ingress
sys.ingress.settings.enable.note=启用Ingress后,在Kubernetes Application模式下,通过Ingress地址获取Job运行状
Zzm0809 marked this conversation as resolved.
Show resolved Hide resolved
sys.ingress.settings.domain=Ingress域名地址
sys.ingress.settings.domain.note=Kubernetes Ingress 域名地址配置
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@
import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.kubernetes.ingress.DinkyKubernetesIngress;
import org.dinky.gateway.kubernetes.utils.IgnoreNullRepresenter;
import org.dinky.gateway.kubernetes.utils.K8sClientHelper;
import org.dinky.gateway.model.ingress.JobDetails;
import org.dinky.gateway.model.ingress.JobOverviewInfo;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
Expand All @@ -45,15 +51,23 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.yaml.snakeyaml.Yaml;

import com.alibaba.fastjson2.JSONObject;

import cn.hutool.core.date.SystemClock;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpStatus;
import cn.hutool.http.HttpUtil;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -87,7 +101,24 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {

Deployment deployment = getK8sClientHelper().createDinkyResource();

KubernetesResult kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
KubernetesResult kubernetesResult;

boolean ingressEnable = SystemConfiguration.getInstances().getIngressEnable();
String ingressDomain = SystemConfiguration.getInstances().getIngressDomain();
// if ingress is enabled and ingress domain is not empty, create an ingress service
if (ingressEnable && StringUtils.isNotEmpty(ingressDomain)) {
K8sClientHelper k8sClientHelper = getK8sClientHelper();
long ingressStart = SystemClock.now();
DinkyKubernetesIngress ingress = new DinkyKubernetesIngress(getK8sClientHelper());
Zzm0809 marked this conversation as resolved.
Show resolved Hide resolved
ingress.configureIngress(
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.CLUSTER_ID),
ingressDomain,
k8sClientHelper.getConfiguration().getString(KubernetesConfigOptions.NAMESPACE));
log.info("Create dinky ingress service success, cost time:{} ms", SystemClock.now() - ingressStart);
kubernetesResult = waitForJmAndJobStartByIngress(kubernetesClient, deployment, clusterClient);
} else {
kubernetesResult = waitForJmAndJobStart(kubernetesClient, deployment, clusterClient);
}
kubernetesResult.success();
return kubernetesResult;
} catch (Exception ex) {
Expand Down Expand Up @@ -214,4 +245,112 @@ public KubernetesResult waitForJmAndJobStart(
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

/**
* Waits for the JobManager and the Job to start in Kubernetes by ingress.
*
* @param deployment The deployment in Kubernetes.
* @param clusterClient The ClusterClientProvider<String> object for accessing the Kubernetes cluster.
* @return A KubernetesResult object containing the Kubernetes gateway's Web URL, the Job ID, and the cluster ID.
* @throws InterruptedException if waiting is interrupted.
*/
public KubernetesResult waitForJmAndJobStartByIngress(
KubernetesClient kubernetesClient, Deployment deployment, ClusterClientProvider<String> clusterClient)
throws InterruptedException {
KubernetesResult result = KubernetesResult.build(getType());
long waitSends = SystemConfiguration.getInstances().getJobIdWait() * 1000L;
long startTime = System.currentTimeMillis();

while (System.currentTimeMillis() - startTime < waitSends) {
List<Pod> pods = kubernetesClient
.pods()
.inNamespace(deployment.getMetadata().getNamespace())
.withLabelSelector(deployment.getSpec().getSelector())
.list()
.getItems();
for (Pod pod : pods) {
if (!checkPodStatus(pod)) {
logger.info("Kubernetes Pod have not ready, reTry at 5 sec later");
continue;
}
try {
logger.info("Start get job list ....");
JobDetails jobDetails = fetchApplicationJob(kubernetesClient, deployment);
if (Objects.isNull(jobDetails) || CollectionUtils.isEmpty(jobDetails.getJobs())) {
logger.error("Get job is empty, will be reconnect alter 5 sec later....");
Thread.sleep(5000);
continue;
}
JobOverviewInfo jobOverviewInfo =
jobDetails.getJobs().stream().findFirst().get();
// To create a cluster ID, you need to combine the cluster ID with the jobID to ensure uniqueness
String cid = configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + jobOverviewInfo.getJid();
logger.info("Success get job status: {}", jobOverviewInfo.getState());

return result.setJids(Collections.singletonList(jobOverviewInfo.getJid()))
.setWebURL(jobDetails.getWebUrl())
.setId(cid);
} catch (GatewayException e) {
throw e;
} catch (Exception ex) {
logger.error("Get job status failed,{}", ex.getMessage());
}
}
Thread.sleep(5000);
}
throw new GatewayException(
"The number of retries exceeds the limit, check the K8S cluster for more information");
}

private JobDetails fetchApplicationJob(KubernetesClient kubernetesClient, Deployment deployment) {
// 判断是不是存在ingress, 如果存在ingress的话返回ingress地址
Ingress ingress = kubernetesClient
.network()
.v1()
.ingresses()
.inNamespace(deployment.getMetadata().getNamespace())
.withName(deployment.getMetadata().getName())
.get();
String ingressUrl = getIngressUrl(
ingress,
deployment.getMetadata().getNamespace(),
deployment.getMetadata().getName());
logger.info("Get dinky ingress url:{}", ingressUrl);
return invokeJobsOverviewApi(ingressUrl);
}

private JobDetails invokeJobsOverviewApi(String restUrl) {
try {
String body;
try (HttpResponse execute = HttpUtil.createGet(restUrl + "/jobs/overview")
.timeout(10000)
.execute()) {
// 判断状态码,如果是504的话可能是因为task manage节点还未启动
if (Objects.equals(execute.getStatus(), HttpStatus.HTTP_GATEWAY_TIMEOUT)) {
return null;
}
body = execute.body();
}
if (StringUtils.isNotEmpty(body)) {
JobDetails jobDetails = JSONObject.parseObject(body, JobDetails.class);
jobDetails.setWebUrl(restUrl);
return jobDetails;
}
} catch (Exception e) {
logger.warn("Get job overview warning, task manage is enabled and can be ignored");
}
return null;
}

private String getIngressUrl(Ingress ingress, String namespace, String clusterId) {
if (Objects.nonNull(ingress)
&& Objects.nonNull(ingress.getSpec())
&& Objects.nonNull(ingress.getSpec().getRules())
&& !ingress.getSpec().getRules().isEmpty()) {
String host = ingress.getSpec().getRules().get(0).getHost();
return StrFormatter.format("http://{}/{}/{}", host, namespace, clusterId);
}
throw new GatewayException(
StrFormatter.format("Dinky clusterId {} ingress not found in namespace {}", clusterId, namespace));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.gateway.kubernetes.ingress;

import static org.dinky.assertion.Asserts.checkNotNull;

import org.dinky.gateway.kubernetes.utils.K8sClientHelper;

import java.util.Map;

import com.google.common.collect.Maps;

import cn.hutool.core.text.StrFormatter;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DinkyKubernetesIngress {

private final K8sClientHelper k8sClientHelper;

public DinkyKubernetesIngress(K8sClientHelper k8sClientHelper) {
this.k8sClientHelper = k8sClientHelper;
}

public void configureIngress(String clusterId, String domain, String namespace) {
log.info("Dinky ingress configure ingress for cluster {} in namespace {}", clusterId, namespace);
OwnerReference ownerReference = getOwnerReference(namespace, clusterId);
Ingress ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
.addToAnnotations(buildIngressAnnotations(clusterId, namespace))
.addToLabels(buildIngressLabels(clusterId))
.addToOwnerReferences(ownerReference) // Add OwnerReference
.endMetadata()
.withNewSpec()
.addNewRule()
.withHost(domain)
.withNewHttp()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}/", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.addNewPath()
.withPath(StrFormatter.format("/{}/{}(/|$)(.*)", namespace, clusterId))
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(StrFormatter.format("{}-rest", clusterId))
.withNewPort()
.withNumber(8081)
.endPort()
.endService()
.endBackend()
.endPath()
.endHttp()
.endRule()
.endSpec()
.build();
try (KubernetesClient kubernetesClient = k8sClientHelper.getKubernetesClient()) {
kubernetesClient.network().v1().ingresses().inNamespace(namespace).create(ingress);
}
}

private Map<String, String> buildIngressAnnotations(String clusterId, String namespace) {
Map<String, String> annotations = Maps.newConcurrentMap();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target", "/$2");
annotations.put("nginx.ingress.kubernetes.io/proxy-body-size", "1024m");
// Build the configuration snippet
String configurationSnippet =
"rewrite ^(/" + clusterId + ")$ $1/ permanent; " + "sub_filter '<base href=\"./\">' '<base href=\"/"
+ namespace + "/" + clusterId + "/\">'; " + "sub_filter_once off;";
annotations.put("nginx.ingress.kubernetes.io/configuration-snippet", configurationSnippet);
annotations.put("kubernetes.io/ingress.class", "nginx");
return annotations;
}

private Map<String, String> buildIngressLabels(String clusterId) {
Map<String, String> labels = Maps.newConcurrentMap();
labels.put("app", clusterId);
labels.put("type", "flink-native-kubernetes");
labels.put("component", "ingress");
return labels;
}

private OwnerReference getOwnerReference(String namespace, String clusterId) {
log.info("Dinky ingress get owner reference for cluster {} in namespace {}", clusterId, namespace);
KubernetesClient client = k8sClientHelper.getKubernetesClient();
Deployment deployment = client.apps()
.deployments()
.inNamespace(namespace)
.withName(clusterId)
.get();
checkNotNull(
deployment,
StrFormatter.format("Deployment with name {} not found in namespace {}", clusterId, namespace));
return new OwnerReferenceBuilder()
.withUid(deployment.getMetadata().getUid())
.withApiVersion("apps/v1")
.withKind("Deployment")
.withName(clusterId)
.withController(true)
.withBlockOwnerDeletion(true)
.build();
}
}
Loading
Loading