Skip to content

Commit

Permalink
fix k8s native submit bug (#2334)
Browse files Browse the repository at this point in the history
* fix k8s submit bug
  • Loading branch information
gaoyan1998 authored Sep 22, 2023
1 parent 22be7cf commit 8ce4200
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 75 deletions.
6 changes: 6 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ public enum Status {
SYS_RESOURCE_SETTINGS_HDFS_ROOT_USER_NOTE(173, "sys.resource.settings.hdfs.root.user.note"),
SYS_RESOURCE_SETTINGS_HDFS_FS_DEFAULTFS(174, "sys.resource.settings.hdfs.fs.defaultFS"),
SYS_RESOURCE_SETTINGS_HDFS_FS_DEFAULTFS_NOTE(175, "sys.resource.settings.hdfs.fs.defaultFS.note"),

/**
* gateway config
*/
GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
GAETWAY_KUBERNETS_TEST_SUCCESS(180, "gateway.kubernetes.test.success"),
;

private final int code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,8 @@ sys.resource.settings.oss.region.note=region
sys.resource.settings.hdfs.root.user=HDFS operation user name
sys.resource.settings.hdfs.root.user.note=HDFS operation user name
sys.resource.settings.hdfs.fs.defaultFS=HDFS defaultFS
sys.resource.settings.hdfs.fs.defaultFS.note=fs.defaultFS configuration items, such as remote: hdfs://localhost:9000, local: file:///
sys.resource.settings.hdfs.fs.defaultFS.note=fs.defaultFS configuration items, such as remote: hdfs://localhost:9000, local: file:///


#Dinky Gateway
gateway.kubernetes.test.failed= failed to test the Flink configuration:
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,7 @@ sys.resource.settings.oss.region.note=区域
sys.resource.settings.hdfs.root.user=HDFS操作用户名
sys.resource.settings.hdfs.root.user.note=HDFS操作用户名
sys.resource.settings.hdfs.fs.defaultFS=HDFS defaultFS
sys.resource.settings.hdfs.fs.defaultFS.note=fs.defaultFS 配置项,例如远程:hdfs://localhost:9000,本地:file:///
sys.resource.settings.hdfs.fs.defaultFS.note=fs.defaultFS 配置项,例如远程:hdfs://localhost:9000,本地:file:///

#Dinky Gateway
gateway.kubernetes.test.failed=测试 Flink 配置失败:
36 changes: 24 additions & 12 deletions dinky-gateway/src/main/java/org/dinky/gateway/AbstractGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand All @@ -47,12 +48,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hutool.core.text.StrFormatter;

/**
* AbstractGateway
*
Expand Down Expand Up @@ -90,6 +94,14 @@ protected void addConfigParas(Map<String, String> configMap) {
}
}

protected <T> void addConfigParas(ConfigOption<T> key, T value) {
if (Asserts.isNotNull(key) && Asserts.isNotNull(value)) {
this.configuration.set(key, value);
} else {
logger.warn("Gateway config key or value is null, key: {}, value: {}", key, value);
}
}

public SavePointResult savepointCluster() {
return savepointCluster(null);
}
Expand Down Expand Up @@ -173,19 +185,19 @@ public GatewayResult submitJar() {
throw new GatewayException("Couldn't deploy Flink Cluster with User Application Jar.");
}

protected void resetCheckpointInApplicationMode() {
String uuid = UUID.randomUUID().toString().replace("-", "");
if (configuration.contains(CheckpointingOptions.CHECKPOINTS_DIRECTORY)) {
configuration.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY) + "/" + uuid);
}
protected void resetCheckpointInApplicationMode(String jobName) {
String uuid = UUID.randomUUID().toString();
String checkpointsDirectory = configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
String savepointDirectory = configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY);

if (configuration.contains(CheckpointingOptions.SAVEPOINT_DIRECTORY)) {
configuration.set(
CheckpointingOptions.SAVEPOINT_DIRECTORY,
configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY) + "/" + uuid);
}
Optional.ofNullable(checkpointsDirectory)
.ifPresent(dir -> configuration.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
StrFormatter.format("{}/{}/{}", dir, jobName, uuid)));

Optional.ofNullable(savepointDirectory)
.ifPresent(dir -> configuration.set(
CheckpointingOptions.SAVEPOINT_DIRECTORY, StrFormatter.format("{}/{}/{}", dir, jobName, uuid)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
Expand All @@ -63,7 +64,6 @@ public GatewayResult submitJar() {
init();
}

combineFlinkConfig();
AppConfig appConfig = config.getAppConfig();
String[] userJarParas =
Asserts.isNotNull(appConfig.getUserJarParas()) ? appConfig.getUserJarParas() : new String[0];
Expand All @@ -84,51 +84,52 @@ public GatewayResult submitJar() {
clusterClient.listJobs().get();

int counts = SystemConfiguration.getInstances().getJobIdWait();
while (jobStatusMessages.size() == 0 && counts > 0) {
while (jobStatusMessages.isEmpty() && counts > 0) {
Thread.sleep(1000);
counts--;
try {
jobStatusMessages = clusterClient.listJobs().get();
logger.info("Get K8s Job list: {}", jobStatusMessages);
} catch (ExecutionException e) {
logger.error("Get Job list Error: {}", e.getMessage());
if (StrUtil.contains(e.getMessage(), "Number of retries has been exhausted.")) {
// refresh the job manager ip address
clusterClient.close();
clusterClient = clusterClientProvider.getClusterClient();
} else {
LogUtil.getError(e);
throw e;
}
}

if (jobStatusMessages.size() > 0) {
if (!jobStatusMessages.isEmpty()) {
break;
}
}

if (jobStatusMessages.size() > 0) {
// application mode only have one job, so we can get any one to be jobId
String jobId = "";
if (!jobStatusMessages.isEmpty()) {
List<String> jids = new ArrayList<>();
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jids.add(jobStatusMessage.getJobId().toHexString());
jobId = jobStatusMessage.getJobId().toHexString();
jids.add(jobId);
}
result.setJids(jids);
}

String jobId = "";
// application mode only have one job, so we can get any one to be jobId
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jobId = jobStatusMessage.getJobId().toHexString();
}
// if JobStatusMessage not have job id, it`s maybe wrong with submit,throw exception
if (TextUtils.isEmpty(jobId)) {
int cost = SystemConfiguration.getInstances().getJobIdWait() - counts;
String clusterId = clusterClient.getClusterId();
throw new Exception("无法获得jobId请联系管理排查问题,等待时长:" + cost + ",job name:" + clusterId);
throw new Exception(
StrFormatter.format("Unable to get JobID,wait time:{}, Job name:{}", cost, clusterId));
}

result.setId(jobId);
result.setWebURL(clusterClient.getWebInterfaceURL());
waitForTaskManagerToBeReady(result.getWebURL(), jobId);
result.success();
} catch (Exception e) {
logger.error("submit K8s Application error", e);
result.fail(LogUtil.getError(e));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,17 @@
package org.dinky.gateway.kubernetes;

import org.dinky.assertion.Asserts;
import org.dinky.data.enums.Status;
import org.dinky.gateway.AbstractGateway;
import org.dinky.gateway.config.FlinkConfig;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.config.K8sConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.utils.TextUtil;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
Expand All @@ -45,54 +43,57 @@
import java.lang.reflect.Method;
import java.util.Collections;

import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ReflectUtil;

/**
* KubernetesGateway
*
* @since 2021/12/26 14:09
*/
public abstract class KubernetesGateway extends AbstractGateway {

protected FlinkKubeClient client;

public KubernetesGateway() {}

public KubernetesGateway(GatewayConfig config) {
super(config);
}

public void init() {
initConfig();
initKubeClient();
}

private void initConfig() {
String flinkConfigPath = config.getClusterConfig().getFlinkConfigPath();
if (!TextUtil.isEmpty(flinkConfigPath)) {
configuration = GlobalConfiguration.loadConfiguration(flinkConfigPath);
FlinkConfig flinkConfig = config.getFlinkConfig();
K8sConfig k8sConfig = config.getKubernetesConfig();

try {
addConfigParas(
GlobalConfiguration.loadConfiguration(flinkConfigPath).toMap());
} catch (Exception e) {
logger.warn("load locale config yaml failed:{},Skip config it", e.getMessage());
}
addConfigParas(KubernetesConfigOptions.FLINK_CONF_DIR, flinkConfigPath);

FlinkConfig flinkConfig = config.getFlinkConfig();
flinkConfig.getConfiguration().putAll(config.getKubernetesConfig().getConfiguration());
addConfigParas(flinkConfig.getConfiguration());
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
configuration.set(KubernetesConfigOptions.CLUSTER_ID, flinkConfig.getJobName());
addConfigParas(k8sConfig.getConfiguration());
addConfigParas(DeploymentOptions.TARGET, getType().getLongValue());
addConfigParas(KubernetesConfigOptions.CLUSTER_ID, flinkConfig.getJobName());
addConfigParas(
PipelineOptions.JARS,
Collections.singletonList(config.getAppConfig().getUserJarPath()));

K8sConfig k8sConfig = config.getKubernetesConfig();
preparPodTemplate(k8sConfig.getPodTemplate(), KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE);
preparPodTemplate(k8sConfig.getJmPodTemplate(), KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE);
preparPodTemplate(k8sConfig.getTmPodTemplate(), KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE);

if (getType().isApplicationMode()) {
resetCheckpointInApplicationMode();
resetCheckpointInApplicationMode(flinkConfig.getJobName());
}
}

public void preparPodTemplate(String podTemplate, ConfigOption<String> option) {
private void preparPodTemplate(String podTemplate, ConfigOption<String> option) {
if (TextUtil.isEmpty(podTemplate)) {
return;
}
Expand All @@ -103,7 +104,7 @@ public void preparPodTemplate(String podTemplate, ConfigOption<String> option) {
Assert.isTrue(FileUtil.del(filePath));
}
FileUtil.writeUtf8String(podTemplate, filePath);
configuration.set(option, filePath);
addConfigParas(option, filePath);
}

private void initKubeClient() {
Expand All @@ -116,7 +117,7 @@ public SavePointResult savepointCluster(String savePoint) {
}

KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
configuration.set(
addConfigParas(
KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)) {
Expand All @@ -138,16 +139,14 @@ public SavePointResult savepointJob(String savePoint) {
"No job id was specified. Please specify a job to which you would like to" + " savepont.");
}

configuration.set(
addConfigParas(
KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();

String clusterId = clusterClientFactory.getClusterId(configuration);
if (Asserts.isNull(clusterId)) {
throw new GatewayException(
"No cluster id was specified. Please specify a cluster to which you would like" + " to connect.");
}

KubernetesClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);

return runSavePointResult(savePoint, clusterId, clusterDescriptor);
Expand All @@ -156,11 +155,6 @@ public SavePointResult savepointJob(String savePoint) {
public TestResult test() {
try {
initConfig();
} catch (Exception e) {
logger.error("测试 Flink 配置失败:" + e.getMessage());
return TestResult.fail("测试 Flink 配置失败:" + e.getMessage());
}
try {
initKubeClient();
if (client instanceof Fabric8FlinkKubeClient) {
Object internalClient = ReflectUtil.getFieldValue(client, "internalClient");
Expand All @@ -171,11 +165,11 @@ public TestResult test() {
ReflectUtil.getFieldValue(versionInfo, "gitVersion"),
ReflectUtil.getFieldValue(versionInfo, "platform"));
}
logger.info("配置连接测试成功");
return TestResult.success();
} catch (Exception e) {
logger.error("测试 Kubernetes 配置失败:", e);
return TestResult.fail("测试 Kubernetes 配置失败:" + ExceptionUtil.getRootCauseMessage(e));
logger.error(Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e);
return TestResult.fail(
StrFormatter.format("{}:{}", Status.GAETWAY_KUBERNETS_TEST_FAILED.getMessage(), e.getMessage()));
}
}

Expand All @@ -184,7 +178,7 @@ public void killCluster() {
if (Asserts.isNull(client)) {
init();
}
configuration.set(
addConfigParas(
KubernetesConfigOptions.CLUSTER_ID, config.getClusterConfig().getAppId());
KubernetesClusterClientFactory clusterClientFactory = new KubernetesClusterClientFactory();
String clusterId = clusterClientFactory.getClusterId(configuration);
Expand All @@ -197,20 +191,7 @@ public void killCluster() {
try {
clusterDescriptor.killCluster(clusterId);
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}
}

protected void combineFlinkConfig() {
String flinkConfigPath = config.getClusterConfig().getFlinkConfigPath();
Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(flinkConfigPath);
if (loadConfiguration != null) {
loadConfiguration.addAll(configuration);
configuration = loadConfiguration;
}
configuration.set(DeploymentOptionsInternal.CONF_DIR, flinkConfigPath);
configuration.set(
PipelineOptions.JARS,
Collections.singletonList(config.getAppConfig().getUserJarPath()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public GatewayResult deployCluster() {
init();
}

combineFlinkConfig();
ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder =
createClusterSpecificationBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void initConfig() {

if (getType().isApplicationMode()) {
configuration.set(YarnConfigOptions.APPLICATION_TYPE, "Dinky Flink");
resetCheckpointInApplicationMode();
resetCheckpointInApplicationMode(flinkConfig.getJobName());
}

YarnLogConfigUtil.setLogConfigFileInConfig(configuration, clusterConfig.getFlinkConfigPath());
Expand Down

0 comments on commit 8ce4200

Please sign in to comment.