-
Notifications
You must be signed in to change notification settings - Fork 127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness #20
[FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness #20
Changes from 20 commits
9c23f7d
b4aa761
83771ec
190e2e6
7c2cf26
cf218ad
ba451d9
d85ecd1
2a6aaa5
c6550c9
167098e
5a7aca7
a829ecf
c98b9f3
be13242
342cd72
d0b24f9
9acf6ba
91cfa27
3cf0794
b0d5d02
e3bd8bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader( | |
Properties props, | ||
SourceReaderContext context, | ||
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) { | ||
this(props, context, kafkaSourceReaderMetrics, () -> null); | ||
} | ||
|
||
public KafkaPartitionSplitReader( | ||
Properties props, | ||
SourceReaderContext context, | ||
KafkaSourceReaderMetrics kafkaSourceReaderMetrics, | ||
Supplier<String> rackIdSupplier) { | ||
this.subtaskId = context.getIndexOfSubtask(); | ||
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; | ||
Properties consumerProps = new Properties(); | ||
consumerProps.putAll(props); | ||
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props)); | ||
setConsumerClientRack(consumerProps, rackIdSupplier); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be resolved in the caller (KafkaSource) and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked into this, and while it's certainly possible to do it that way the testing path is much more complex. Since the Supplier would have to be resolved in another Supplier that's passed to the Reader, testing that the behavior is as expected in the actual execution path is difficult. In KafkaPartitionSplitReader, we can call the constructor directly and verify that the rackIdSupplier is called, and then also verify it does what we need it to by verifying the behavior of the helper method you noted further down. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah KafkaSource couldn't resolve the rack since the rack may not be the same for JM and TM. I would suggest to resolve this at the KafkaSourceReader if possible and avoid this low level change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it can be done in KafkaSourceReader either, since that doesn't interact with the Supplier at all. The alternative seems to be to resolve it in KafkaSourceFetcherManager, since that extends SingleThreadFetcherManager, where both Suppliers eventually run. But that would involve a rather messy overriding of the createSplitFetcher() method from that included class, and I don't think that's any better. |
||
this.consumer = new KafkaConsumer<>(consumerProps); | ||
this.stoppingOffsets = new HashMap<>(); | ||
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG); | ||
|
@@ -256,6 +265,23 @@ KafkaConsumer<byte[], byte[]> consumer() { | |
|
||
// --------------- private helper method ---------------------- | ||
|
||
/** | ||
* This Method performs Null and empty Rack Id validation and sets the rack id to the | ||
* client.rack Consumer Config. | ||
* | ||
* @param consumerProps Consumer Property. | ||
* @param rackIdSupplier Rack Id's. | ||
*/ | ||
@VisibleForTesting | ||
void setConsumerClientRack(Properties consumerProps, Supplier<String> rackIdSupplier) { | ||
if (rackIdSupplier != null) { | ||
String rackId = rackIdSupplier.get(); | ||
if (rackId != null && !rackId.isEmpty()) { | ||
consumerProps.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, rackId); | ||
} | ||
} | ||
} | ||
|
||
private void parseStartingOffsets( | ||
KafkaPartitionSplit split, | ||
List<TopicPartition> partitionsStartingFromEarliest, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ | |
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.CsvSource; | ||
import org.junit.jupiter.params.provider.EmptySource; | ||
import org.junit.jupiter.params.provider.NullAndEmptySource; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
|
||
import java.io.IOException; | ||
|
@@ -63,6 +64,7 @@ | |
import java.util.Properties; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Supplier; | ||
|
||
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
@@ -319,6 +321,46 @@ public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy( | |
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset); | ||
} | ||
|
||
@Test | ||
public void testConsumerClientRackSupplier() { | ||
AtomicReference<Boolean> supplierCalled = new AtomicReference<>(false); | ||
String rackId = "use1-az1"; | ||
Supplier<String> rackIdSupplier = | ||
() -> { | ||
supplierCalled.set(true); | ||
return rackId; | ||
}; | ||
Properties properties = new Properties(); | ||
KafkaPartitionSplitReader reader = | ||
createReader( | ||
properties, | ||
UnregisteredMetricsGroup.createSourceReaderMetricGroup(), | ||
rackIdSupplier); | ||
assertThat(supplierCalled.get()).isEqualTo(true); | ||
|
||
// Here we call the helper function directly, because the KafkaPartitionSplitReader | ||
// doesn't allow us to examine the final ConsumerConfig object | ||
reader.setConsumerClientRack(properties, rackIdSupplier); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Supplier function should get called from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this call is not needed, you can make the implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know, neither the KafkaConsumer nor the KafkaPartitionSplitReader offers any way to examine the final consumer Properties, so to test the behavior of setConsumerClientRack we have to test it directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can expose a method @VisibleForTesting or use reflection? |
||
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId); | ||
} | ||
|
||
@ParameterizedTest | ||
@NullAndEmptySource | ||
public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) { | ||
Properties properties = new Properties(); | ||
Supplier<String> rackIdSupplier = () -> rackId; | ||
KafkaPartitionSplitReader reader = | ||
createReader( | ||
properties, | ||
UnregisteredMetricsGroup.createSourceReaderMetricGroup(), | ||
rackIdSupplier); | ||
|
||
// Here we call the helper function directly, because the KafkaPartitionSplitReader | ||
// doesn't allow us to examine the final ConsumerConfig object | ||
reader.setConsumerClientRack(properties, rackIdSupplier); | ||
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse(); | ||
} | ||
|
||
// ------------------ | ||
|
||
private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId) | ||
|
@@ -383,6 +425,13 @@ private KafkaPartitionSplitReader createReader() { | |
|
||
private KafkaPartitionSplitReader createReader( | ||
Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) { | ||
return createReader(additionalProperties, sourceReaderMetricGroup, () -> null); | ||
} | ||
|
||
private KafkaPartitionSplitReader createReader( | ||
Properties additionalProperties, | ||
SourceReaderMetricGroup sourceReaderMetricGroup, | ||
Supplier<String> rackIdSupplier) { | ||
Properties props = new Properties(); | ||
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class)); | ||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); | ||
|
@@ -394,7 +443,8 @@ private KafkaPartitionSplitReader createReader( | |
return new KafkaPartitionSplitReader( | ||
props, | ||
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup), | ||
kafkaSourceReaderMetrics); | ||
kafkaSourceReaderMetrics, | ||
rackIdSupplier); | ||
} | ||
|
||
private Map<String, KafkaPartitionSplit> assignSplits( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a "CloseableSerializableSupplier". A supplier may contain a http client for example and also needs to be closed properly by Flink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right about the Serializable bit, good catch.
Is there a straightforward way to do that while still allowing for Lambdas? Maybe provide one method for the basic Supplier and another for ClosableSerializableSupplier for when you need that degree of safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we must address this before we merge this PR.
At the very least, the
Supplier
needs to be serializable.You can use the
SerializableSupplier
interface from Flink.