Skip to content

Commit

Permalink
[BugFix][Flink]Fix the Flink task to submit the session mode. (#3967)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Nov 27, 2024
1 parent e9061cb commit cc10a28
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void initPyUDF(String executable, String... udfPyFilePath) {
private void addJar(String... jarPath) {
Configuration configuration = tableEnvironment.getRootConfiguration();
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
if (CollUtil.isEmpty(jars)) {
tableEnvironment.addConfiguration(PipelineOptions.JARS, CollUtil.newArrayList(jarPath));
} else {
CollUtil.addAll(jars, jarPath);
Expand Down
11 changes: 7 additions & 4 deletions dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,22 +143,25 @@ public SqlExplainResult explain(JobStatement jobStatement) {
}

private void executeAdd(String statement) {
AddJarSqlParseStrategy.getAllFilePath(statement)
.forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t));
Set<File> allFilePath = AddJarSqlParseStrategy.getAllFilePath(statement);
allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t));
(jobManager.getExecutor().getDinkyClassLoader())
.addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles()));
}

private void executeAddFile(String statement) {
AddFileSqlParseStrategy.getAllFilePath(statement)
.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t));
Set<File> allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement);
allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t));
(jobManager.getExecutor().getDinkyClassLoader())
.addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles()));
jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class));
}

private void executeAddJar(String statement) {
Set<File> allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement);
Configuration combinationConfig = getCombinationConfig();
FileSystem.initialize(combinationConfig, null);
jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class));
jobManager.getExecutor().executeSql(statement);
}

Expand Down

0 comments on commit cc10a28

Please sign in to comment.