-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36780] Kafka source disables partition discovery unexpectedly #136
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
@@ -217,6 +217,29 @@ public void testSettingInvalidCustomDeserializers( | |||
.hasMessageContaining(expectedError); | |||
} | |||
|
|||
@Test | |||
public void testDefaultPartitionDiscovery() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test fails without the fix, and passes now with the fix.
} | ||
|
||
@Test | ||
public void testPeriodPartitionDiscovery() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This newly added test is to avoid regressions for bounded source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change looks good to me. It would be good to have some feedback from someone more familiar with the Kafka Source. If that doesn't happen within a couple of days, I would propose to just go ahead and merge it.
Currently Kafka source enables partition discovery. This is set by partition.discovery.interval.ms, aka KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS. The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka.
However, it's disabled by default unexpectedly in the source builder (code). The intention I believe was to only disable for bounded source.
We need a fix that is able to keep the default partition discovery. This could cause data loss after Kafka retention if the new partitions are not consumed silently.