-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix][FlinkSQL] V1.1.0 version Keberos bug related repair, SQL SET value does not take effect, etc #3875
Changes from 1 commit
7e38eb4
2f1b13a
9fb3ccd
1bc8fe1
b7b549e
6a87846
c337fcb
e9a1a21
99ddc5b
1b6d647
adca57a
a8db496
07e3a00
33c2245
4519505
95a1a17
add6705
81147fa
664b7ff
34c8611
960579a
2e6f1f0
3c77f29
e7ed1c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,7 +131,10 @@ protected Properties getProperties() { | |
&& entry.getKey().startsWith("properties") | ||
&& Asserts.isNotNullString(entry.getValue())) { | ||
properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue()); | ||
} else { | ||
properties.setProperty(entry.getKey(), entry.getValue()); | ||
} | ||
logger.info("sink config k/v:{}", properties); | ||
} | ||
return properties; | ||
} | ||
|
@@ -587,4 +590,26 @@ protected List<String> getPKList(Table table) { | |
protected ZoneId getSinkTimeZone() { | ||
return this.sinkTimeZone; | ||
} | ||
|
||
protected Map<String, String> getTableTopicMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 不应该在 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 有好几个KakfaSinkBuilder, KafkaSinkJsonBuilder , 不放这里需要写好几次 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add AbstractKafkaSinkBuilder. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
it just a single method , Is it necessary to add an AbstractKafkaSinkBuilder class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If sinks other than kafka use this method, your current code logic is feasible. |
||
String topicMapStr = this.config.getSink().get("table.topic.map"); | ||
Map<String, String> tableTopicMap = new HashMap<>(); | ||
if (topicMapStr != null) { | ||
String[] topicTabArray = topicMapStr.split(";"); | ||
for (String topicTab : topicTabArray) { | ||
if (topicTab != null) { | ||
String[] topicTable = topicTab.split(":"); | ||
if (topicTable.length > 1) { | ||
String[] tables = topicTable[1].split(","); | ||
for (String table : tables) { | ||
tableTopicMap.put(table, topicTable[0]); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
logger.info("topic map," + tableTopicMap); | ||
return tableTopicMap; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,8 @@ public void run() throws Exception { | |
} | ||
GatewayResult gatewayResult = null; | ||
config.addGatewayConfig(executor.getSetConfig()); | ||
config.addGatewayConfig( | ||
executor.getCustomTableEnvironment().getConfig().getConfiguration()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flink sql set 参数不生效,我加了这一行放到这里是否合理,麻烦确认下 |
||
config.getGatewayConfig().setSql(jobParam.getParsedSql()); | ||
|
||
if (runMode.isApplicationMode()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why set other keys that do not start with an attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because there's no place in the constructor of kafkaSinkBuilder to set properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx. I get it.