Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix][FlinkSQL] V1.1.0 version Keberos bug related repair, SQL SET value does not take effect, etc #3875

Merged
merged 24 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7e38eb4
feature:
donotcoffee Oct 17, 2024
2f1b13a
fix: remove unnecessary properties
donotcoffee Oct 21, 2024
9fb3ccd
Revert "fix: remove unnecessary properties"
donotcoffee Oct 21, 2024
1bc8fe1
Merge remote-tracking branch 'upstream/dev' into dev
donotcoffee Nov 12, 2024
b7b549e
[Feature] 1.Added getJobInstanceListapi in APIController. 2. Added ta…
donotcoffee Oct 17, 2024
6a87846
[Improvement] change token key name (#3865)
Zzm0809 Oct 17, 2024
c337fcb
[BugFix]fix job id is null exception in query model (#3869)
zhuxt2015 Oct 17, 2024
e9a1a21
[Improvement][Docker]Limit the maximum percentage of container memory…
zhuxt2015 Oct 17, 2024
99ddc5b
[Fix-3876] [cdc] Remove quotation marks when building FlinkSQL (#3877)
aiwenmo Oct 18, 2024
1b6d647
upgrade cdc to 3.2.0 (#3878)
gaoyan1998 Oct 18, 2024
adca57a
[BugFix][DevOps]do not save job instance in query mode (#3880)
zhuxt2015 Oct 18, 2024
a8db496
[Doc] Add ws doc (#3882)
gaoyan1998 Oct 20, 2024
07e3a00
[Bug][Web] Fix ws bug (#3881)
gaoyan1998 Oct 22, 2024
33c2245
[Feature][dinky-geteway] Feature Obtain job information using the ing…
jianjun159 Oct 28, 2024
4519505
[Bug]Fix web package (#3885)
gaoyan1998 Oct 28, 2024
95a1a17
add package-lock.json (#3890)
gaoyan1998 Oct 28, 2024
add6705
[hotfix][build] Fix Dinky backend CI workflow with Flink 1.20 (#3891)
yuxiqian Oct 30, 2024
81147fa
[Bug] [dinky-admin] Fix the issue of primary key generation strategy …
Nov 3, 2024
664b7ff
[Refactor][Web]Refactoring a new data development interface to enhanc…
zackyoungh Nov 7, 2024
34c8611
[Refactor] Refactor get version function (#3898)
Zzm0809 Nov 8, 2024
960579a
[Feature-3893][core][admin][web] Flink SQL task Support insert result…
MactavishCui Nov 11, 2024
2e6f1f0
[Optimization-3906][web] Optimize debug task to preview data (#3907)
aiwenmo Nov 12, 2024
3c77f29
[Web]Optimized the new UI (#3904)
gaoyan1998 Nov 12, 2024
e7ed1c5
Merge remote-tracking branch 'origin/dev' into dev
donotcoffee Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.data.vo.task.JobInstanceVo;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
Expand All @@ -45,6 +47,7 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.swagger.annotations.Api;
Expand Down Expand Up @@ -202,4 +205,16 @@ public Result getTaskLineage(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS);
}

@PostMapping("/getJobInstanceList")
@ApiImplicitParam(
name = "para",
value = "Query parameters",
dataType = "JsonNode",
paramType = "body",
required = true,
dataTypeClass = JsonNode.class)
public ProTableResult<JobInstanceVo> listJobInstances(@RequestBody JsonNode para) {
return jobInstanceService.listJobInstances(para);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ protected Properties getProperties() {
&& entry.getKey().startsWith("properties")
&& Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue());
} else {
properties.setProperty(entry.getKey(), entry.getValue());
Comment on lines +134 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set other keys that do not start with an attribute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there's no place in the constructor of kafkaSinkBuilder to set properties.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx. I get it.

}
logger.info("sink config k/v:{}", properties);
}
return properties;
}
Expand Down Expand Up @@ -587,4 +590,26 @@ protected List<String> getPKList(Table table) {
protected ZoneId getSinkTimeZone() {
return this.sinkTimeZone;
}

protected Map<String, String> getTableTopicMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不应该在 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java 中写此方法 因为不属于公共抽象方法 建议下沉到 kafkasinkbuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有好几个KakfaSinkBuilder, KafkaSinkJsonBuilder , 不放这里需要写好几次

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add AbstractKafkaSinkBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add AbstractKafkaSinkBuilder.

it just a single method , Is it necessary to add an AbstractKafkaSinkBuilder class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sinks other than kafka use this method, your current code logic is feasible.

String topicMapStr = this.config.getSink().get("table.topic.map");
Map<String, String> tableTopicMap = new HashMap<>();
if (topicMapStr != null) {
String[] topicTabArray = topicMapStr.split(";");
for (String topicTab : topicTabArray) {
if (topicTab != null) {
String[] topicTable = topicTab.split(":");
if (topicTable.length > 1) {
String[] tables = topicTable[1].split(",");
for (String table : tables) {
tableTopicMap.put(table, topicTable[0]);
}
}
}
}
}

logger.info("topic map," + tableTopicMap);
return tableTopicMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,23 @@ public DataStreamSource build(
.setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setTransactionalIdPrefix(
config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey("transactional.id")
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) {
kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty("transactional.id"));
}
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
Map<Table, OutputTag<String>> tagMap = new LinkedHashMap<>();
Map<String, Table> tableMap = new LinkedHashMap<>();
Map<String, String> tableTopicMap = this.getTableTopicMap();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource
.map(x -> objectMapper.readValue(x, Map.class))
Expand All @@ -141,6 +143,7 @@ public DataStreamSource build(
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
if (Asserts.isNullCollection(schema.getTables())) {
logger.error("Schema:{} tables is empty", schema.getName());
// if schema tables is empty, throw exception
throw new IllegalArgumentException(
"Schema tables is empty, please check your configuration or check your database permission and try again.");
Expand Down Expand Up @@ -176,24 +179,33 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
if (tableTopicMap != null) {
String tableName = k.getName();
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}

org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setTransactionalIdPrefix(
config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey("transactional.id")
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) {
kafkaSinkBuilder.setTransactionalIdPrefix(
kafkaProducerConfig.getProperty("transactional.id") + "-" + topic);
}

KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
debeziumProperties.setProperty("decimal.handling.mode", "string");
if (Asserts.isNotNullString(serverTimeZone)
&& Asserts.isNotNullString(config.getDebezium().get("datetime.type"))) {
debeziumProperties.setProperty("datetime.format.timestamp.zone", serverTimeZone);
}

config.getDebezium().forEach((key, value) -> {
if (Asserts.isNotNullString(key) && Asserts.isNotNullString(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,21 @@ public DataStreamSource<String> build(
.setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
.setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(
DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey(TRANSACTIONAL_ID)
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) {
kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID));
}
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
Map<Table, OutputTag<String>> tagMap = new LinkedHashMap<>();
Map<String, String> tableTopicMap = this.getTableTopicMap();
Map<String, Table> tableMap = new LinkedHashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource
Expand Down Expand Up @@ -147,24 +148,31 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co

tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
if (tableTopicMap != null) {
String tableName = k.getName();
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
.setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(
DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey(TRANSACTIONAL_ID)
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) {
kafkaSinkBuilder.setTransactionalIdPrefix(
kafkaProducerConfig.getProperty(TRANSACTIONAL_ID) + "-" + topic);
}

KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public DataStreamSource<String> build(
String topic = getSinkTableName(table);
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
topic = config.getSink().get("topic");
} else {
Map<String, String> tableTopicMap = this.getTableTopicMap();
if (tableTopicMap != null) {
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}
}
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
Expand Down Expand Up @@ -193,8 +201,7 @@ public void processElement(Map value, Context context, Collector<String> collect
}
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(
config.getSink().get("brokers"), topic, new SimpleStringSchema()));
stringOperator.addSink(new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), getProperties()));
}
}
} catch (Exception ex) {
Expand Down
6 changes: 5 additions & 1 deletion dinky-core/src/main/java/org/dinky/api/FlinkAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public class FlinkAPI {
private static final ObjectMapper mapper = new ObjectMapper();

public FlinkAPI(String address) {
this.address = address;
if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) {
this.address = address;
} else {
this.address = NetConstant.HTTP + address;
}
}

public static FlinkAPI build(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ public interface FlinkConstant {
String LOCAL_HOST = "localhost";
/** changlog op */
String OP = "op";

/** flink 默认端口 **/
Integer FLINK_REST_DEFAULT_PORT = 8081;
}
10 changes: 8 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.dinky.executor;

import org.dinky.assertion.Asserts;
import org.dinky.constant.FlinkConstant;
import org.dinky.data.constant.NetConstant;
import org.dinky.data.enums.GatewayType;

import java.util.ArrayList;
Expand Down Expand Up @@ -167,14 +169,18 @@ public static ExecutorConfig build(

String host = null;
Integer port = null;
String hostPort = address;
if (Asserts.isNotNullString(address)) {
String[] strings = address.split(":");
if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) {
hostPort = address.replace(NetConstant.HTTP, "").replace(NetConstant.HTTPS, "");
}
String[] strings = hostPort.split(":");
if (strings.length > 1) {
host = strings[0];
port = Integer.parseInt(strings[1]);
} else {
host = strings[0];
port = 8081;
port = FlinkConstant.FLINK_REST_DEFAULT_PORT;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void run() throws Exception {
}
GatewayResult gatewayResult = null;
config.addGatewayConfig(executor.getSetConfig());
config.addGatewayConfig(
executor.getCustomTableEnvironment().getConfig().getConfiguration());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink sql set 参数不生效,我加了这一行放到这里是否合理,麻烦确认下

config.getGatewayConfig().setSql(jobParam.getParsedSql());

if (runMode.isApplicationMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ export type OperatorType = {
};
const JobOperator = (props: OperatorType) => {
const { jobDetail, refesh } = props;
const webUri = `${API_CONSTANTS.BASE_URL}/api/flink/${jobDetail?.clusterInstance?.jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`;
const jobManagerHost = jobDetail?.clusterInstance?.jobManagerHost;
const webUri = jobManagerHost?.startsWith('http://') || jobManagerHost?.startsWith('https://')
? jobManagerHost
: `${API_CONSTANTS.BASE_URL}/api/flink/${jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`;

const handleJobOperator = (key: string) => {
Modal.confirm({
Expand Down
Loading