Skip to content

Commit

Permalink
[Optimization-3917][core] Optimize explain and add test (#3918)
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo authored Nov 17, 2024
1 parent cff1970 commit 5b27418
Show file tree
Hide file tree
Showing 63 changed files with 3,818 additions and 718 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dinky-web/pnpm-lock.yaml
dinky-admin/src/main/resources/static/*
dinky-admin/tmp/*
dependency-reduced-pom.xml
dinky-core/tmp/*

# Dinky Docs gitignore
docs/.docusaurus
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public class TaskDTO extends AbstractStatementDTO {
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction = true;
private boolean mockSinkFunction = false;

@ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier")
private String session;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.dinky.service.impl;

import org.dinky.data.job.SqlType;
import org.dinky.data.vo.PrintTableVo;
import org.dinky.explainer.print_table.PrintStatementExplainer;
import org.dinky.parser.SqlType;
import org.dinky.service.PrintTableService;
import org.dinky.trans.Operations;
import org.dinky.utils.SqlUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) {
TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId());
taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement());
JobConfig jobConfig = taskDTO.getJobConfig();
return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig.getExecutorSetting());
return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ public LineageResult getTaskLineage(Integer id) {
} else {
task.setStatement(buildEnvSql(task) + task.getStatement());
JobConfig jobConfig = task.getJobConfig();
return LineageBuilder.getColumnLineageByLogicalPlan(task.getStatement(), jobConfig.getExecutorSetting());
return LineageBuilder.getColumnLineageByLogicalPlan(task.getStatement(), jobConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import org.dinky.data.app.AppTask;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.job.SqlType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.parser.SqlType;
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.dinky.app.model;

import org.dinky.parser.SqlType;
import org.dinky.data.job.SqlType;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

package org.dinky.executor;

import org.dinky.data.exception.DinkyException;
import org.dinky.data.job.JobStatement;
import org.dinky.data.job.SqlType;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LineageContext;
import org.dinky.utils.SqlUtil;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
Expand All @@ -49,6 +53,8 @@
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
Expand All @@ -57,7 +63,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
Expand All @@ -66,23 +72,31 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;

/**
* 定制TableEnvironmentImpl
*
* @since 2021/10/22 10:02
*/
@Slf4j
public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {

private static final Logger log = LoggerFactory.getLogger(CustomTableEnvironmentImpl.class);

private final CustomExtendedOperationExecutorImpl extendedExecutor = new CustomExtendedOperationExecutorImpl(this);
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
Expand Down Expand Up @@ -253,66 +267,77 @@ private Map<String, Object> getFlinkConfigurationMap() {
}

@Override
public JobPlanInfo getJobPlanInfo(List<String> statements) {
public JobPlanInfo getJobPlanInfo(List<JobStatement> statements) {
return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
}

@Override
public StreamGraph getStreamGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for (String statement : statements) {
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
public StreamGraph getStreamGraphFromInserts(List<JobStatement> statements) {
statements.removeIf(statement -> statement.getSqlType().equals(SqlType.CTAS));
statements.removeIf(statement -> statement.getSqlType().equals(SqlType.RTAS));
List<ModifyOperation> modifyOperations = new ArrayList<>();
statements.stream()
.map(statement -> getParser().parse(statement.getStatement()))
.forEach(operations -> {
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
}
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
} else if (operation instanceof QueryOperation) {
final UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(
"Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());
final ObjectIdentifier objectIdentifier =
getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
modifyOperations.add(new CollectModifyOperation(objectIdentifier, (QueryOperation) operation));
} else {
log.info("Only insert statement is supported now. The statement is skipped: "
+ operation.asSummaryString());
}
});
if (modifyOperations.isEmpty()) {
throw new TableException("Only insert statement is supported now. None operation to execute.");
}
return transOperatoinsToStreamGraph(modifyOperations);
}

private StreamGraph transOperatoinsToStreamGraph(List<ModifyOperation> modifyOperations) {
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if (executor instanceof DefaultExecutor) {
StreamGraph streamGraph =
((DefaultExecutor) executor).getExecutionEnvironment().generateStreamGraph(trans);
if (getConfig().getConfiguration().containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(getConfig().getConfiguration().getString(PipelineOptions.NAME));
}
return streamGraph;
} else {
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
final StreamExecutionEnvironment environment = getStreamExecutionEnvironment();
trans.forEach(environment::addOperator);

StreamGraph streamGraph = environment.getStreamGraph();
final Configuration configuration = getConfig().getConfiguration();
if (configuration.containsKey(PipelineOptions.NAME.key())) {
streamGraph.setJobName(configuration.getString(PipelineOptions.NAME));
}
return streamGraph;
}

@Override
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
List<Operation> operations = getParser().parse(statement);
record.setParseTrue(true);
if (operations.size() != 1) {
throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
}

Operation operation = operations.get(0);
SqlExplainResult data = new SqlExplainResult();
data.setParseTrue(true);
data.setExplainTrue(true);
if (operation instanceof ModifyOperation) {
record.setType("Modify DML");
} else if (operation instanceof ExplainOperation) {
record.setType("Explain DML");
data.setType("DML");
} else if (operation instanceof QueryOperation) {
record.setType("Query DML");
data.setType("DQL");
} else {
record.setExplain(operation.asSummaryString());
record.setType("DDL");
}
record.setExplainTrue(true);
if ("DDL".equals(record.getType())) {
// record.setExplain("DDL语句不进行解释。");
return record;
data.setExplain(operation.asSummaryString());
data.setType("DDL");
return data;
}
record.setExplain(getPlanner().explain(operations, extraDetails));
return record;

data.setExplain(getPlanner().explain(Collections.singletonList(operation), extraDetails));
return data;
}

@Override
Expand Down Expand Up @@ -356,4 +381,98 @@ public TableResult executeInternal(Operation operation) {
}
return super.executeInternal(operation);
}

@Override
public SqlExplainResult explainStatementSet(List<JobStatement> statements, ExplainDetail... extraDetails) {
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
List<Operation> operations = new ArrayList<>();
for (JobStatement statement : statements) {
if (statement.getSqlType().equals(SqlType.CTAS)) {
resultBuilder
.sql(statement.getStatement())
.type(statement.getSqlType().getType())
.error("CTAS is not supported in Apache Flink 1.15.")
.parseTrue(false)
.explainTrue(false)
.explainTime(LocalDateTime.now());
return resultBuilder.build();
}
if (statement.getSqlType().equals(SqlType.RTAS)) {
resultBuilder
.sql(statement.getStatement())
.type(statement.getSqlType().getType())
.error("RTAS is not supported in Apache Flink 1.14.")
.parseTrue(false)
.explainTrue(false)
.explainTime(LocalDateTime.now());
return resultBuilder.build();
}
try {
List<Operation> itemOperations = getParser().parse(statement.getStatement());
if (!itemOperations.isEmpty()) {
for (Operation operation : itemOperations) {
operations.add(operation);
}
}
} catch (Exception e) {
String error = StrFormatter.format(
"Exception in explaining FlinkSQL:\n{}\n{}",
SqlUtil.addLineNumber(statement.getStatement()),
e.getMessage());
resultBuilder
.sql(statement.getStatement())
.type(SqlType.INSERT.getType())
.error(error)
.parseTrue(false)
.explainTrue(false)
.explainTime(LocalDateTime.now());
log.error(error);
return resultBuilder.build();
}
}
if (operations.isEmpty()) {
throw new DinkyException("None of the job in the statement set.");
}
resultBuilder.parseTrue(true);
resultBuilder.explain(getPlanner().explain(operations, extraDetails));
return resultBuilder
.explainTrue(true)
.explainTime(LocalDateTime.now())
.type(SqlType.INSERT.getType())
.build();
}

@Override
public TableResult executeStatementSet(List<JobStatement> statements) {
statements.removeIf(statement -> statement.getSqlType().equals(SqlType.CTAS));
statements.removeIf(statement -> statement.getSqlType().equals(SqlType.RTAS));
statements.removeIf(statement -> !statement.getSqlType().isSinkyModify());
List<ModifyOperation> modifyOperations = statements.stream()
.map(statement -> getModifyOperationFromInsert(statement.getStatement()))
.collect(Collectors.toList());
return executeInternal(modifyOperations);
}

public ModifyOperation getModifyOperationFromInsert(String statement) {
List<Operation> operations = getParser().parse(statement);
if (operations.isEmpty()) {
throw new TableException("None of the statement is parsed.");
}
if (operations.size() > 1) {
throw new TableException("Only single statement is supported.");
}
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
return (ModifyOperation) operation;
} else if (operation instanceof QueryOperation) {
final UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of("Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());
final ObjectIdentifier objectIdentifier = getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
return new CollectModifyOperation(objectIdentifier, (QueryOperation) operation);
} else {
log.info("Only insert statement or select is supported now. The statement is skipped: "
+ operation.asSummaryString());
return null;
}
}
}
Loading

0 comments on commit 5b27418

Please sign in to comment.