Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness #20

Closed
wants to merge 22 commits into from

Conversation

jeremy-degroot
Copy link
Contributor

What is the purpose of the change

This PR adds a new method to KafkaSourceBuilder that sets the client.id config for the KafkaSource to the value returned by the provided Supplier. It needs to be a Supplier because it needs to run on the TaskManager, and can't be determined at Job submit time like other configs.

Brief change log

  • Add setRackId to KafkaSourceBuilder
  • Plumb rackId into KafkaPartitionSplitReader
  • Add rack id tests
  • Document RackId feature

Verifying this change

  • Added tests for the KafkaSplitReader that verify behaviors for null rackId Supplier, null and empty return values, and provided values.
  • Manually verified the change by running a 3-node cluster that covered two "racks" (AWS Availability Zones) against an Amazon MSK cluster.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs/Javadocs

GoutyEthan and others added 12 commits March 30, 2023 11:32
- Added validation for the correctness of the rack ID supplied
Update docummentation to include rack awareness
Update docummentation to include rack awareness
Add RackId Example
remove validation session and add it to general rackID.

Add client_rack documentation,

Add sentence to link rackID to client.rack
FLINK-29398 Provide rack ID to Kafka Source to take advantage of Rack Awareness
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 30, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@jeremy-degroot jeremy-degroot changed the title FLINK-29398 Provide rack ID to Kafka Source to take advantage of Rack Awareness FLINK-29398 [connectors/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness Mar 30, 2023
@jeremy-degroot jeremy-degroot changed the title FLINK-29398 [connectors/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness FLINK-29398 [connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness Mar 30, 2023
@jeremy-degroot jeremy-degroot changed the title FLINK-29398 [connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness [FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness Mar 30, 2023
@tzulitai tzulitai self-requested a review April 3, 2023 15:47
@tzulitai
Copy link
Contributor

tzulitai commented Apr 3, 2023

Thanks for the PR @jeremy-degroot. I'll try to find some time to review this over this week.

this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
setConsumerClientRack(consumerProps, rackIdSupplier);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be resolved in the caller (KafkaSource) and ConsumerConfig.CLIENT_RACK_CONFIG can be set there. The benefit would be that you do not need to change this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, and while it's certainly possible to do it that way the testing path is much more complex. Since the Supplier would have to be resolved in another Supplier that's passed to the Reader, testing that the behavior is as expected in the actual execution path is difficult. In KafkaPartitionSplitReader, we can call the constructor directly and verify that the rackIdSupplier is called, and then also verify it does what we need it to by verifying the behavior of the helper method you noted further down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah KafkaSource couldn't resolve the rack since the rack may not be the same for JM and TM. I would suggest to resolve this at the KafkaSourceReader if possible and avoid this low level change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can be done in KafkaSourceReader either, since that doesn't interact with the Supplier at all. The alternative seems to be to resolve it in KafkaSourceFetcherManager, since that extends SingleThreadFetcherManager, where both Suppliers eventually run. But that would involve a rather messy overriding of the createSplitFetcher() method from that included class, and I don't think that's any better.

Copy link

@RamanVerma RamanVerma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the change @jeremy-degroot, I have made some minor comments

properties,
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
rackIdSupplier);
reader.setConsumerClientRack(properties, rackIdSupplier);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supplier function should get called from KafkaPartitionSplitReader constructor. Do you need to call this method. Same for line 365

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this call is not needed, you can make the implementation of setConsumerClientRack private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, neither the KafkaConsumer nor the KafkaPartitionSplitReader offers any way to examine the final consumer Properties, so to test the behavior of setConsumerClientRack we have to test it directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can expose a method @VisibleForTesting or use reflection?

@@ -98,20 +98,24 @@
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
private final Properties props;
// Client rackId callback
private final Supplier<String> rackIdSupplier;
Copy link
Contributor

@mas-chen mas-chen Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a "CloseableSerializableSupplier". A supplier may contain a http client for example and also needs to be closed properly by Flink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about the Serializable bit, good catch.

Is there a straightforward way to do that while still allowing for Lambdas? Maybe provide one method for the basic Supplier and another for ClosableSerializableSupplier for when you need that degree of safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must address this before we merge this PR.
At the very least, the Supplier needs to be serializable.

You can use the SerializableSupplier interface from Flink.

jeremy-degroot and others added 4 commits June 30, 2023 09:25
- Changed the name of setRackId to setRackIdSupplier
# Conflicts:
#	docs/content/docs/connectors/datastream/kafka.md
#	flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@pzim
Copy link

pzim commented Aug 29, 2023

@jeremy-degroot - this capability is one that we are currently in need of. Any idea when this might be approved/merged/released? Or are there other alternatives we might use to achieve rack awareness in flink?

@pzim
Copy link

pzim commented Sep 14, 2023

@mas-chen - any chance this will be reviewed/approved soon?

@sean-scott-lr
Copy link

I need this change as well, and it appears it wont be going into any release any time soon. I tried to extend the classes to add this functionality, but due to the excessive package/private access modifiers its not possible. Any suggestions on how I may do that without having to fork this project?

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's two remaining items to address before we can merge this:

  1. The rack ID supplier needs to be at least Serializable. Closeable variant can probably be added later.
  2. Move the lazy configuration of rack id on client properties to be higher in the initialization call stacks - as it is right now, it's happening at too low-level (at the split reader).

@@ -98,20 +98,24 @@
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
private final Properties props;
// Client rackId callback
private final Supplier<String> rackIdSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must address this before we merge this PR.
At the very least, the Supplier needs to be serializable.

You can use the SerializableSupplier interface from Flink.

… in KafkaSource

- Changed the type of the KafkaSourceBuilder constructor parameter to a String
- Changed the type of rackIdSupplier to a SerializableSupplier
…ry Supplier

- Added test cases verifying the rack ID supplier is called at the right time
- Simplified the tests in KafkaPartitionSplitReaderTest
Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jeremy-degroot! Just two minor nitpicks, I'll address them when I merge this.

Merging now ...


KafkaSource(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props) {
Properties props,
SerializableSupplier<String> rackIdSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SerializableSupplier<String> rackIdSupplier) {
@Nullable SerializableSupplier<String> rackIdSupplier) {

Comment on lines +369 to +370
public KafkaSourceBuilder<OUT> setRackIdSupplier(SerializableSupplier<String> rackIdCallback) {
this.rackIdSupplier = rackIdCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we unify the naming here to just refer to this as the rackIdSupplier?

@tzulitai
Copy link
Contributor

Actually, @jeremy-degroot would you be able to squash all your changes into one commit that is rebased on latest main branch? That would be super helpful.

tzulitai pushed a commit to tzulitai/flink-connector-kafka that referenced this pull request Sep 29, 2023
…k Awareness

Co-authored-by: jcmejias1 <[email protected]>
Co-authored-by: Mason Chen <[email protected]>
Co-authored-by: Ethan Gouty <[email protected]>
Co-authored-by: Jeremy DeGroot <[email protected]>
Co-authored-by: Siva Venkat Gogineni <[email protected]>

This closes apache#53.
This closes apache#20.
tzulitai pushed a commit to tzulitai/flink-connector-kafka that referenced this pull request Sep 29, 2023
…k Awareness

This closes apache#53.
This closes apache#20.

Co-authored-by: Jeremy DeGroot <[email protected]>
Co-authored-by: jcmejias1 <[email protected]>
Co-authored-by: Mason Chen <[email protected]>
Co-authored-by: Ethan Gouty <[email protected]>
Co-authored-by: Siva Venkat Gogineni <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants