Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness #20

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9c23f7d
Add rackIdSupplier and funnel into KafkaPartitionSplitReader
GoutyEthan Feb 8, 2023
b4aa761
Add rackIdSupplier test to KafkaPartitionSplitReaderTest
GoutyEthan Feb 10, 2023
83771ec
- Formatting
jeremy-degroot Feb 13, 2023
190e2e6
Replaced the literals with a variable
jeremy-degroot Feb 13, 2023
7c2cf26
Changed up handling for the values around the rack ID
jeremy-degroot Feb 14, 2023
cf218ad
Update kafka.md
jcmejias1 Feb 28, 2023
ba451d9
Update kafka.md
jcmejias1 Feb 28, 2023
d85ecd1
Update kafka.md
jcmejias1 Mar 2, 2023
2a6aaa5
Update kafka.md
jcmejias1 Mar 6, 2023
c6550c9
Added documentation for the setConsumerClientRack private method.
sivavenkatgogineni Mar 30, 2023
167098e
spotlessApply
jeremy-degroot Mar 30, 2023
5a7aca7
Merge pull request #1 from jeremy-degroot/FLINK-29398-rack-awareness
jeremy-degroot Mar 30, 2023
a829ecf
Combined a couple of similar tests
jeremy-degroot Apr 7, 2023
c98b9f3
Combined a couple of similar tests
jeremy-degroot Apr 7, 2023
be13242
Update docs/content/docs/connectors/datastream/kafka.md
jeremy-degroot Jun 30, 2023
342cd72
- Cleaned up documentation
jeremy-degroot Jun 30, 2023
d0b24f9
Merge branch 'apache:main' into main
jeremy-degroot Jun 30, 2023
9acf6ba
Merge branch 'FLINK-29398-rack-awareness' into main
jeremy-degroot Jun 30, 2023
91cfa27
Merge branch 'apache:main' into main
jeremy-degroot Aug 31, 2023
3cf0794
Merge branch 'apache:main' into main
jeremy-degroot Sep 20, 2023
b0d5d02
- Moved the resolution of the rackId supplier into the readerSupplier…
jeremy-degroot Sep 21, 2023
e3bd8bc
- Added handling for a null rackIdSupplier in the source reader facto…
jeremy-degroot Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/content/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,25 @@ client dependencies in the job JAR, so you may need to rewrite it with the actua
For detailed explanations of security configurations, please refer to
<a href="https://kafka.apache.org/documentation/#security">the "Security" section in Apache Kafka documentation</a>.

## Kafka Rack Awareness

Kafka rack awareness allows Flink to select and control the cloud region and availability zone that Kafka consumers read from, based on the Rack ID. This feature reduces network costs and latency since it allows consumers to connect to the closest Kafka brokers, possibly colocated in the same cloud region and availability zone.
A client's rack is indicated using the `client.rack` config, and should correspond to a broker's `broker.rack` config.

https://kafka.apache.org/documentation/#consumerconfigs_client.rack

### RackId

setRackIdSupplier() is the Builder method allows us to determine the consumer's rack. If provided, the Supplier will be run when the consumer is set up on the Task Manager, and the consumer's `client.rack` configuration will be set to the value.

One of the ways this can be implemented is by making setRackId equal to an environment variable within your taskManager, for instance:

```
.setRackIdSupplier(() -> System.getenv("TM_NODE_AZ"))
```

The "TM_NODE_AZ" is the name of the environment variable in the TaskManager container that contains the zone we want to use.

### Behind the Scene
{{< hint info >}}
If you are interested in how Kafka source works under the design of new data source API, you may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -98,20 +100,24 @@ public class KafkaSource<OUT>
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
private final Properties props;
// Client rackId callback
private final SerializableSupplier<String> rackIdSupplier;

KafkaSource(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props) {
Properties props,
SerializableSupplier<String> rackIdSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SerializableSupplier<String> rackIdSupplier) {
@Nullable SerializableSupplier<String> rackIdSupplier) {

this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
this.props = props;
this.rackIdSupplier = rackIdSupplier;
}

/**
Expand Down Expand Up @@ -157,7 +163,14 @@ public UserCodeClassLoader getUserCodeClassLoader() {
new KafkaSourceReaderMetrics(readerContext.metricGroup());

Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
() -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
() ->
new KafkaPartitionSplitReader(
props,
readerContext,
kafkaSourceReaderMetrics,
Optional.ofNullable(rackIdSupplier)
.map(Supplier::get)
.orElse(null));
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);

return new KafkaSourceReader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -80,6 +81,7 @@
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
* .setUnbounded(OffsetsInitializer.latest())
* .setRackId(() -> MY_RACK_ID)
* .build();
* }</pre>
*
Expand All @@ -100,6 +102,8 @@ public class KafkaSourceBuilder<OUT> {
private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
protected Properties props;
// Client rackId supplier
private SerializableSupplier<String> rackIdSupplier;

KafkaSourceBuilder() {
this.subscriber = null;
Expand All @@ -108,6 +112,7 @@ public class KafkaSourceBuilder<OUT> {
this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
this.deserializationSchema = null;
this.props = new Properties();
this.rackIdSupplier = null;
}

/**
Expand Down Expand Up @@ -355,6 +360,17 @@ public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) {
return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
}

/**
* Set the clientRackId supplier to be passed down to the KafkaPartitionSplitReader.
*
* @param rackIdCallback callback to provide Kafka consumer client.rack
* @return this KafkaSourceBuilder
*/
public KafkaSourceBuilder<OUT> setRackIdSupplier(SerializableSupplier<String> rackIdCallback) {
this.rackIdSupplier = rackIdCallback;
Comment on lines +369 to +370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we unify the naming here to just refer to this as the rackIdSupplier?

return this;
}

/**
* Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found
* in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
Expand Down Expand Up @@ -422,7 +438,8 @@ public KafkaSource<OUT> build() {
stoppingOffsetsInitializer,
boundedness,
deserializationSchema,
props);
props,
rackIdSupplier);
}

// ------------- private helpers --------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
this(props, context, kafkaSourceReaderMetrics, null);
}

public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
String rackIdSupplier) {
this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
setConsumerClientRack(consumerProps, rackIdSupplier);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be resolved in the caller (KafkaSource) and ConsumerConfig.CLIENT_RACK_CONFIG can be set there. The benefit would be that you do not need to change this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, and while it's certainly possible to do it that way the testing path is much more complex. Since the Supplier would have to be resolved in another Supplier that's passed to the Reader, testing that the behavior is as expected in the actual execution path is difficult. In KafkaPartitionSplitReader, we can call the constructor directly and verify that the rackIdSupplier is called, and then also verify it does what we need it to by verifying the behavior of the helper method you noted further down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah KafkaSource couldn't resolve the rack since the rack may not be the same for JM and TM. I would suggest to resolve this at the KafkaSourceReader if possible and avoid this low level change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can be done in KafkaSourceReader either, since that doesn't interact with the Supplier at all. The alternative seems to be to resolve it in KafkaSourceFetcherManager, since that extends SingleThreadFetcherManager, where both Suppliers eventually run. But that would involve a rather messy overriding of the createSplitFetcher() method from that included class, and I don't think that's any better.

this.consumer = new KafkaConsumer<>(consumerProps);
this.stoppingOffsets = new HashMap<>();
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
Expand Down Expand Up @@ -256,6 +265,20 @@ KafkaConsumer<byte[], byte[]> consumer() {

// --------------- private helper method ----------------------

/**
* This Method performs Null and empty Rack Id validation and sets the rack id to the
* client.rack Consumer Config.
*
* @param consumerProps Consumer Property.
* @param rackId Rack Id's.
*/
@VisibleForTesting
void setConsumerClientRack(Properties consumerProps, String rackId) {
if (rackId != null && !rackId.isEmpty()) {
consumerProps.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, rackId);
}
}

private void parseStartingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStartingFromEarliest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EmptySource;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
Expand Down Expand Up @@ -319,6 +320,38 @@ public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
}

@Test
public void testConsumerClientRackSupplier() {
String rackId = "use1-az1";
Properties properties = new Properties();
KafkaPartitionSplitReader reader =
createReader(
properties,
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
rackId);

// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId);
}

@ParameterizedTest
@NullAndEmptySource
public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
Properties properties = new Properties();
KafkaPartitionSplitReader reader =
createReader(
properties,
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
rackId);

// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
}

// ------------------

private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
Expand Down Expand Up @@ -383,6 +416,13 @@ private KafkaPartitionSplitReader createReader() {

private KafkaPartitionSplitReader createReader(
Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) {
return createReader(additionalProperties, sourceReaderMetricGroup, null);
}

private KafkaPartitionSplitReader createReader(
Properties additionalProperties,
SourceReaderMetricGroup sourceReaderMetricGroup,
String rackId) {
Properties props = new Properties();
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
Expand All @@ -394,7 +434,8 @@ private KafkaPartitionSplitReader createReader(
return new KafkaPartitionSplitReader(
props,
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
rackId);
}

private Map<String, KafkaPartitionSplit> assignSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -53,6 +54,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -79,6 +81,8 @@
import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

/** Unit tests for {@link KafkaSourceReader}. */
public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> {
Expand Down Expand Up @@ -271,7 +275,8 @@ void testDisableOffsetCommit() throws Exception {
Boundedness.CONTINUOUS_UNBOUNDED,
new TestingReaderContext(),
(ignore) -> {},
properties)) {
properties,
null)) {
reader.addSplits(
getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED));
ValidatingSourceOutput output = new ValidatingSourceOutput();
Expand Down Expand Up @@ -479,6 +484,45 @@ public void testSupportsPausingOrResumingSplits() throws Exception {
}
}

@Test
public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception {
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);

try (KafkaSourceReader<Integer> reader =
(KafkaSourceReader<Integer>)
createReader(
Boundedness.CONTINUOUS_UNBOUNDED,
new TestingReaderContext(),
(ignore) -> {},
new Properties(),
rackIdSupplier)) {
// Do nothing here
}

verify(rackIdSupplier, never()).get();
}

@Test
public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Exception {
SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class);
Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");

try (KafkaSourceReader<Integer> reader =
(KafkaSourceReader<Integer>)
createReader(
Boundedness.CONTINUOUS_UNBOUNDED,
new TestingReaderContext(),
(ignore) -> {},
new Properties(),
rackIdSupplier)) {
reader.addSplits(
Collections.singletonList(
new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L)));
}

verify(rackIdSupplier).get();
}

// ------------------------------------------

@Override
Expand Down Expand Up @@ -535,14 +579,15 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
throws Exception {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return createReader(boundedness, context, splitFinishedHook, properties);
return createReader(boundedness, context, splitFinishedHook, properties, null);
}

private SourceReader<Integer, KafkaPartitionSplit> createReader(
Boundedness boundedness,
SourceReaderContext context,
Consumer<Collection<String>> splitFinishedHook,
Properties props)
Properties props,
SerializableSupplier<String> rackIdSupplier)
throws Exception {
KafkaSourceBuilder<Integer> builder =
KafkaSource.<Integer>builder()
Expand All @@ -559,6 +604,9 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
if (boundedness == Boundedness.BOUNDED) {
builder.setBounded(OffsetsInitializer.latest());
}
if (rackIdSupplier != null) {
builder.setRackIdSupplier(rackIdSupplier);
}

return KafkaSourceTestUtils.createReaderWithFinishedSplitHook(
builder.build(), context, splitFinishedHook);
Expand Down
Loading