-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33132] Flink Connector Redshift TableSink Implementation #114
base: main
Are you sure you want to change the base?
Conversation
eaaa378
to
0c013d4
Compare
118e644
to
92a7615
Compare
@hlteoh37, @vahmed-hamdy please review in free time 🙏🏻 |
92a7615
to
98901f4
Compare
1、The tuncate table paramter is supported in the batch import scenario. If data exists in a table, duplicate data will be generated and the table must be cleared first |
thank you for reviewing the pr .
If the record exisits in table and redshift table created contains primary key or composite key . it carries out merge into operation . If you check the code we are doing merge into operation if ddl contains primary key .
can you please elaborate more , as per what i understand you are concern how staged data get merged , in code we are using https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html#merge-method-specify-column-list . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some comments, I will continue the review later.
I believe this PR is incomplete right? we still need to add tests.
/** Dynamic Table Factory. */ | ||
@PublicEvolving | ||
public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory { | ||
public static final String IDENTIFIER = "redshift"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would move the configs to a separate file as in flink-connector-aws-kinesis-streams
public static final ConfigOption<String> DATABASE_NAME = | ||
ConfigOptions.key("sink.database-name") | ||
.stringType() | ||
.defaultValue("dev") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to set that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dev
is the default database name created by redshift. i assumed if user dont provide database name as a config then it should assume database name as default one
.noDefaultValue() | ||
.withDescription("AWS Redshift cluster sink table name."); | ||
|
||
public static final ConfigOption<Integer> SINK_BATCH_SIZE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: have you considered using AsyncDynamicTableSink it seems you are reusing some properties here
.stringType() | ||
.noDefaultValue() | ||
.withDescription("using Redshift COPY command must provide a S3 URI."); | ||
public static final ConfigOption<String> IAM_ROLE_ARN = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the existing aws authentication way?
"Currently, 2 modes are supported for Flink connector redshift.\n" | ||
+ "\t 1) COPY Mode." | ||
+ "\t 2) JDBC Mode."); | ||
public static final ConfigOption<String> TEMP_S3_URI = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why TEMP
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in copy mode , redhift data needs to be written into temporary s3 path. these path only useful till copy mode reads the data from temporary location (s3) and uploads to redhsift workers.
In flip this config was mentioned
|
||
private static final Logger LOG = LoggerFactory.getLogger(AbstractRedshiftOutputFormat.class); | ||
|
||
protected transient volatile boolean closed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is smelly, have you tested that away from local clusters and with checkpointing?
public synchronized void close() { | ||
if (!closed) { | ||
closed = true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove new line
try { | ||
flush(); | ||
} catch (Exception exception) { | ||
LOG.warn("Flushing records to Redshift failed.", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are swallowing all exceptions here, this seems like a smell and could possibly break delivery guarantees. We should capture specific exceptions only and bubble/wrap up the rest.
public void scheduledFlush(long intervalMillis, String executorName) { | ||
Preconditions.checkArgument(intervalMillis > 0, "flush interval must be greater than 0"); | ||
scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(executorName)); | ||
scheduledFuture = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks the execution model, You should use the mailboxExecutor
instead.
import java.time.Duration; | ||
import java.util.Optional; | ||
|
||
/** Options. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we use more descriptive Javadoc for example: "Options to configure connection to redshift"
yes , tests were not added , since it will increase the size of PR. |
Purpose of the change
Flink Connector Redshift Sink Implementation
Verifying this change
JDBC mode testing
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving)
)