Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhang2.zhang committed Nov 27, 2024
2 parents 40845e6 + 3820d0f commit 6763dfc
Show file tree
Hide file tree
Showing 114 changed files with 9,406 additions and 766 deletions.
8 changes: 1 addition & 7 deletions .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,10 @@ body:
Which version of Dinky are you running? We only accept bugs report from the LTS projects.
options:
- dev
- 1.2.0
- 1.1.0
- 1.0.3
- 1.0.1
- 1.0.0
- 0.7.5(Archived, Not Maintained)
- 0.7.4(Archived, Not Maintained)
- 0.7.3(Archived, Not Maintained)
- 0.7.2(Archived, Not Maintained)
- 0.7.1(Archived, Not Maintained)
- 0.7.0(Archived, Not Maintained)

validations:
required: true
Expand Down
20 changes: 8 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,21 @@ Its main features are as follows:

## Run the Screenshot

> FlinkSQL Studio
> Data Studio
![datastudio](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/datastudio.png)
![datastudio](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/datastudio.png)

> Grammar Check
> Data Debug
![checksql](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/checksql.png)
![datadebug](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/data-debug.png)

> Version Management
> Task Monitor
![versiondiff](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/versiondiff.png)
![devops](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/devops.png)

> lineage
> Task Metrics
![lineage](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/lineage.png)

> Task Monitoring
![monitor](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/monitor.png)
![metrics](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/metrics.png)

## Participate in Contributions
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](https://github.com/DataLinkDC/dinky/pulls)
Expand Down
22 changes: 9 additions & 13 deletions README_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

- FlinkSQL 数据开发:提示补全、语句美化、在线调试、逻辑校验、执行计划、Catalog、血缘分析、版本对比等
- 支持 FlinkSQL 多版本开发及多种执行模式:Local、Standalone、Yarn/Kubernetes Session、Yarn Per-Job、Yarn/Kubernetes Application
- 支持 Apache Flink 生态:CDC、Connector、FlinkCEP、FlinkCDC、Paimon、PyFlink 等
- 支持 Apache Flink 生态:FlinkCDC、Connector、FlinkCEP、Paimon、PyFlink 等
- 支持 FlinkSQL 语法增强:整库同步、执行环境、全局变量、表值聚合、加载依赖、行级权限、执行Jar任务等
- 支持 FlinkCDC 整库实时入仓入湖与 FlinkCDCPipeline 整库同步
- 支持实时在线调试预览 Table、ChangeLog 和 UDF
Expand All @@ -38,25 +38,21 @@

## 运行效果

> FlinkSQL Studio
> 数据开发
![datastudio](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/datastudio.png)
![datastudio](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/datastudio.png)

> 语法检查
> 数据调试
![checksql](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/checksql.png)
![datadebug](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/data-debug.png)

> 版本管理
> 作业监控
![versiondiff](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/versiondiff.png)
![devops](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/devops.png)

> 血缘分析
> 作业指标
![lineage](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/lineage.png)

> 任务监控
![monitor](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1/monitor.png)
![metrics](https://raw.githubusercontent.com/DataLinkDC/dinky/dev/images/v1-2/metrics.png)

## 参与贡献

Expand Down
4 changes: 4 additions & 0 deletions dinky-admin/src/main/java/org/dinky/Dinky.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky;

import org.dinky.security.NoExitSecurityManager;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.freemarker.FreeMarkerAutoConfiguration;
Expand All @@ -45,6 +47,8 @@ public class Dinky {

@SneakyThrows
public static void main(String[] args) {
// Prevent System.exit calls
System.setSecurityManager(new NoExitSecurityManager());
// Initialize the JDBC Driver, because the number of packages is very large, so it needs to be executed
// asynchronously and loaded in advance
// chinese: 初始化JDBC Driver,因为包的数量特别庞大,所以这里需要异步执行,并提前加载Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TaskOperationPermissionAspect {
@Around(value = "@annotation(checkTaskOwner)")
public Object processAround(ProceedingJoinPoint joinPoint, CheckTaskOwner checkTaskOwner) throws Throwable {
if (!TaskOwnerLockStrategyEnum.ALL.equals(
SystemConfiguration.getInstances().getTaskOwnerLockStrategy())
SystemConfiguration.getInstances().GetTaskOwnerLockStrategyValue())
&& BaseConstant.ADMIN_ID != StpUtil.getLoginIdAsInt()) {
Class checkParam = checkTaskOwner.checkParam();
Object param = getParam(joinPoint, checkParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class WebExceptionHandler {
@ExceptionHandler
@ResponseBody
public Result<Void> busException(BusException e) {
log.error("BusException:", e);
return Result.failed(e.getMsg());
log.error(e.getMessage(), e);
return Result.failed(e.getMessage());
}

private static final Map<String, Status> ERR_CODE_MAPPING = MapUtil.<String, Status>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.web.bind.annotation.RestController;

import cn.dev33.satoken.annotation.SaCheckLogin;
import cn.hutool.core.lang.Singleton;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
Expand All @@ -45,14 +46,13 @@
@SaCheckLogin
@RequiredArgsConstructor
public class FlinkController {

protected static final CheckpointRead INSTANCE = new CheckpointRead();
private final FlinkService flinkService;

@GetMapping("/readCheckPoint")
@ApiOperation("Read Checkpoint")
public Result<Map<String, Map<String, CheckPointReadTable>>> readCheckPoint(String path, String operatorId) {
return Result.data(INSTANCE.readCheckpoint(path, operatorId));
CheckpointRead checkpointRead = Singleton.get(CheckpointRead.class);
return Result.data(checkpointRead.readCheckpoint(path, operatorId));
}

@GetMapping("/configOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ public Result getOneById(@RequestBody ID id) {
required = true)
public Result<JobInfoDetail> refreshJobInfoDetail(
@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean isForce) {
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, isForce));
JobInstance jobInstance = jobInstanceService.getById(id);
if (jobInstance == null) {
return Result.failed(Status.JOB_INSTANCE_NOT_EXIST);
}
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, jobInstance.getTaskId(), isForce));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,18 @@ public Result<List<Configuration<?>>> getOneTypeByKey(@RequestParam("type") Stri
.collect(Collectors.toList());
return Result.succeed(configList);
}

@GetMapping("/getNeededCfg")
@ApiOperation("Get Needed Config")
@SaIgnore
public Result<Map<String, Object>> getNeededCfg() {
return sysConfigService.getNeededCfg();
}

@PostMapping("/setInitConfig")
@ApiOperation("Get Needed Config")
@SaIgnore
public Result<Void> setInitConfig(@RequestBody Map<String, Object> params) {
return sysConfigService.setInitConfig(params);
}
}
67 changes: 67 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,23 @@
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.exception.SqlExplainExcepition;
import org.dinky.data.model.JarSubmitParam;
import org.dinky.data.model.Task;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.data.vo.FlinkJarSqlConvertVO;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.mybatis.annotation.Save;
import org.dinky.service.TaskService;
import org.dinky.trans.ExecuteJarParseStrategyUtil;
import org.dinky.utils.SqlUtil;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -61,7 +67,14 @@

import cn.dev33.satoken.annotation.SaCheckLogin;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.extra.template.TemplateConfig;
import cn.hutool.extra.template.TemplateEngine;
import cn.hutool.extra.template.engine.freemarker.FreemarkerEngine;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand All @@ -77,6 +90,8 @@
public class TaskController {

private final TaskService taskService;
private static final TemplateEngine ENGINE =
new FreemarkerEngine(new TemplateConfig("templates", TemplateConfig.ResourceMode.CLASSPATH));

@GetMapping("/submitTask")
@ApiOperation("Submit Task")
Expand Down Expand Up @@ -283,4 +298,56 @@ public Result<List<TaskDTO>> getMyTask() {
int id = StpUtil.getLoginIdAsInt();
return Result.succeed(taskService.getUserTasks(id));
}

@PostMapping("/flinkJarSqlConvertForm")
@ApiOperation("FlinkJar SqlConvertForm")
public Result<FlinkJarSqlConvertVO> flinkJarSqlConvertForm(@RequestBody TaskDTO taskDTO) {

String sqlStatement = taskDTO.getStatement();
String[] statements = SqlUtil.getStatements(sqlStatement);
FlinkJarSqlConvertVO flinkJarSqlConvertVO = new FlinkJarSqlConvertVO();
flinkJarSqlConvertVO.setJarSubmitParam(JarSubmitParam.empty());
if (ArrayUtil.isEmpty(statements)) {
flinkJarSqlConvertVO.setInitSqlStatement(sqlStatement);
return Result.succeed(flinkJarSqlConvertVO);
}
Integer lastExecuteJarSqlStatementIndex = null;
for (int i = 0; i < statements.length; i++) {
if (ExecuteJarParseStrategyUtil.match(statements[i])) {
lastExecuteJarSqlStatementIndex = i;
}
}
if (lastExecuteJarSqlStatementIndex == null) {
return Result.succeed(flinkJarSqlConvertVO);
}
String lastSqlStatement = statements[lastExecuteJarSqlStatementIndex];
JarSubmitParam info = JarSubmitParam.getInfo(lastSqlStatement);
flinkJarSqlConvertVO.setJarSubmitParam(info);
String sql = Arrays.stream(ArrayUtil.remove(statements, lastExecuteJarSqlStatementIndex))
.map(x -> x + ";")
.collect(Collectors.joining("\n"));
flinkJarSqlConvertVO.setInitSqlStatement(sql);
return Result.succeed(flinkJarSqlConvertVO);
}

@PostMapping("/flinkJarFormConvertSql")
@ApiOperation("FlinkJar FormConvertSql")
public Result<String> flinkJarFormConvertSql(@RequestBody FlinkJarSqlConvertVO dto) {
JarSubmitParam jarSubmitParam = dto.getJarSubmitParam();
Dict objectMap = Dict.create()
.set("uri", Opt.ofNullable(jarSubmitParam.getUri()).orElse(""))
.set(
"args",
"base64@"
+ Base64.encode(
Opt.ofNullable(jarSubmitParam.getArgs()).orElse("")))
.set("mainClass", Opt.ofNullable(jarSubmitParam.getMainClass()).orElse(""))
.set(
"allowNonRestoredState",
Opt.ofNullable(jarSubmitParam.getAllowNonRestoredState())
.orElse(false)
.toString());
String executeJarSql = ENGINE.getTemplate("executeJar.sql").render(objectMap);
return Result.succeed(dto.getInitSqlStatement() + "\n" + executeJarSql, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.vo;

import org.dinky.data.model.JarSubmitParam;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class FlinkJarSqlConvertVO {
private String initSqlStatement;
private JarSubmitParam jarSubmitParam;
}
40 changes: 25 additions & 15 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,30 @@ public class SystemInit implements ApplicationRunner {

@Override
public void run(ApplicationArguments args) {
TenantContextHolder.ignoreTenant();
initResources();
List<Tenant> tenants = tenantService.list();
sysConfigService.initSysConfig();
sysConfigService.initExpressionVariables();

for (Tenant tenant : tenants) {
taskService.initDefaultFlinkSQLEnv(tenant.getId());
try {
TenantContextHolder.ignoreTenant();
initResources();
List<Tenant> tenants = tenantService.list();
sysConfigService.initSysConfig();
sysConfigService.initExpressionVariables();

for (Tenant tenant : tenants) {
taskService.initDefaultFlinkSQLEnv(tenant.getId());
}
initDaemon();
initDolphinScheduler();
registerUDF();
updateGitBuildState();
registerURL();
} catch (NoClassDefFoundError e) {
if (e.getMessage().contains("org/apache/flink")) {
log.error(
"No Flink Jar dependency detected, please put the Flink Jar dependency into the DInky program first. (未检测到有 Flink Jar依赖,请先放入 Flink Jar 依赖到 DInky程序里)",
e);
} else {
log.error("", e);
}
}
initDaemon();
initDolphinScheduler();
registerUDF();
updateGitBuildState();
registerURL();
}

private void registerURL() {
Expand Down Expand Up @@ -152,11 +162,11 @@ private void initDaemon() {
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance();
for (JobInstance jobInstance : jobInstances) {
DaemonTaskConfig config = new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId());
DaemonTaskConfig config =
DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId());
DaemonTask daemonTask = DaemonTask.build(config);
flinkJobThreadPool.execute(daemonTask);
}
// SseSessionContextHolder.init(schedule);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public boolean success() {
: null)
.build();
jobHistoryService.save(jobHistory);
DaemonTaskConfig taskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId());
DaemonTaskConfig taskConfig =
DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId());
FlinkJobThreadPool.getInstance().execute(DaemonTask.build(taskConfig));
return true;
}
Expand Down
Loading

0 comments on commit 6763dfc

Please sign in to comment.