Skip to content

Commit

Permalink
[FLINK-31987][Connectors/AWS] Implement KDS Table API support
Browse files Browse the repository at this point in the history
  • Loading branch information
karubian committed Oct 11, 2024
1 parent 91381cd commit 638351c
Show file tree
Hide file tree
Showing 28 changed files with 1,011 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public Properties getValidatedConfigurations() {
return awsConfigurations;
}

/** Map {@code scan.foo.bar} to {@code flink.foo.bar}. */
private static String translateAwsKey(String key) {
if (!key.endsWith("credentials.provider")) {
return key.replace("credentials.", "credentials.provider.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ void testBadAWSRegion() {
void testMissingAWSCredentials() {
Map<String, String> defaultProperties = getDefaultAWSConfigurations();
defaultProperties.remove("aws.credentials.basic.accesskeyid");

AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);

Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-base</artifactId>
Expand Down Expand Up @@ -99,6 +106,19 @@
<artifactId>jackson-datatype-jdk8</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.connectors.kinesis.testutils.TestUtil;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.DockerImageVersions;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -35,19 +36,21 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.rules.Timeout;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
Expand All @@ -71,14 +74,18 @@
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */
@Testcontainers
@ExtendWith(MiniClusterExtension.class)
public class KinesisStreamsTableApiIT {

private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class);
private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";

private static final String ORDERS_STREAM = "orders";
private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
private static final String LARGE_ORDERS_STREAM = "large_orders";
private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();

Expand All @@ -91,11 +98,16 @@ public class KinesisStreamsTableApiIT {

@ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);

@ClassRule
public static final KinesaliteContainer KINESALITE =
new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE))
@Container
public static final LocalstackContainer LOCALSTACK_CONTAINER =
new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION))
.withEnv("AWS_CBOR_DISABLE", "1")
.withEnv(
"FLINK_ENV_JAVA_OPTS",
"-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
.withLogConsumer((log) -> LOGGER.info(log.getUtf8String()))
.withNetwork(network)
.withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
.withNetworkAliases("localstack");

public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
TestcontainersSettings.builder()
Expand All @@ -105,7 +117,7 @@ public class KinesisStreamsTableApiIT {
"-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
.network(network)
.logger(LOGGER)
.dependsOn(KINESALITE)
.dependsOn(LOCALSTACK_CONTAINER)
.build();

public static final FlinkContainers FLINK =
Expand All @@ -125,8 +137,11 @@ public static void stopFlink() {
public void setUp() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
httpClient = AWSServicesTestUtils.createHttpClient();
kinesisClient = KINESALITE.createHostClient(httpClient);
kinesisClient =
AWSServicesTestUtils.createAwsSyncClient(
LOCALSTACK_CONTAINER.getEndpoint(), httpClient, KinesisClient.builder());
prepareStream(ORDERS_STREAM);
prepareStream(LARGE_ORDERS_STREAM);
}

@After
Expand All @@ -137,17 +152,22 @@ public void teardown() {

@Test
public void testTableApiSourceAndSink() throws Exception {
executeSqlStatements(readSqlFile("send-orders.sql"));
List<Order> smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10));

// filter-large-orders.sql is supposed to preserve orders with quantity > 10
List<Order> expected =
ImmutableList.of(
new Order("A", 10),
new Order("B", 12),
new Order("C", 14),
new Order("D", 16),
new Order("E", 18));
ImmutableList.of(new Order("C", 15), new Order("D", 20), new Order("E", 25));

smallOrders.forEach(
order -> TestUtil.sendMessage(ORDERS_STREAM, kinesisClient, toJson(order)));
expected.forEach(
order -> TestUtil.sendMessage(ORDERS_STREAM, kinesisClient, toJson(order)));

executeSqlStatements(readSqlFile("filter-large-orders.sql"));

// result order is not guaranteed
List<Order> result = readAllOrdersFromKinesis();
Assertions.assertThat(result).containsAll(expected);
assertThat(result).contains(expected.toArray(new Order[0]));
}

private void prepareStream(String streamName) throws Exception {
Expand Down Expand Up @@ -185,11 +205,13 @@ private List<Order> readAllOrdersFromKinesis() throws Exception {
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
List<Order> orders;
do {
Thread.sleep(1000);
orders =
readMessagesFromStream(
recordBytes -> fromJson(new String(recordBytes), Order.class));
recordBytes -> fromJson(new String(recordBytes), Order.class),
LARGE_ORDERS_STREAM);

} while (deadline.hasTimeLeft() && orders.size() < 5);
} while (deadline.hasTimeLeft() && orders.size() < 3);

return orders;
}
Expand All @@ -213,14 +235,16 @@ private <T> T fromJson(final String json, final Class<T> type) {
}
}

private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser) throws Exception {
private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser, String streamName)
throws Exception {

String shardIterator =
kinesisClient
.getShardIterator(
GetShardIteratorRequest.builder()
.shardId(DEFAULT_FIRST_SHARD_NAME)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.streamName(KinesisStreamsTableApiIT.ORDERS_STREAM)
.streamName(streamName)
.build())
.shardIterator();

Expand All @@ -234,6 +258,14 @@ private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser) thr
return messages;
}

private <T> String toJson(final T object) {
try {
return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("Test Failure.", e);
}
}

/** POJO class for orders used by e2e test. */
public static class Order {
private final String code;
Expand All @@ -259,6 +291,14 @@ public boolean equals(Object o) {
return quantity == order.quantity && Objects.equals(code, order.code);
}

public String getCode() {
return code;
}

public int getQuantity() {
return quantity;
}

@Override
public int hashCode() {
return Objects.hash(code, quantity);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */

CREATE TABLE orders
(
`code` STRING,
`quantity` BIGINT
) WITH (
'connector' = 'kinesis',
'stream.arn' = 'arn:aws:kinesis:ap-southeast-1:000000000000:stream/orders',
'aws.region' = 'us-east-1',
'aws.endpoint' = 'http://localstack:4566',
'aws.credentials.provider' = 'BASIC',
'aws.credentials.basic.accesskeyid' = 'access key',
'aws.credentials.basic.secretkey' = 'secret key',
'source.init.position' = 'TRIM_HORIZON',
'source.shard.discovery.interval' = '1000ms',
'aws.trust.all.certificates' = 'true',
'format' = 'json'
);

CREATE TABLE large_orders
(
`code` STRING,
`quantity` BIGINT
) WITH (
'connector' = 'kinesis',
'stream.arn' = 'arn:aws:kinesis:ap-southeast-1:000000000000:stream/large_orders',
'aws.region' = 'us-east-1',
'aws.endpoint' = 'http://localstack:4566',
'aws.credentials.provider' = 'BASIC',
'aws.credentials.basic.accesskeyid' = 'access key',
'aws.credentials.basic.secretkey' = 'secret key',
'aws.trust.all.certificates' = 'true',
'aws.http.protocol.version' = 'HTTP1_1',
'sink.batch.max-size' = '1',
'format' = 'json'
);

INSERT INTO large_orders
SELECT *
FROM orders
WHERE quantity > 10;

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kinesis</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CREATE TABLE orders (
`code` STRING,
`quantity` BIGINT
) WITH (
'connector' = 'kinesis',
'connector' = 'kinesis-legacy',
'stream' = 'orders',
'aws.endpoint' = 'https://kinesalite:4567',
'aws.credentials.provider'='BASIC',
Expand All @@ -37,7 +37,7 @@ CREATE TABLE large_orders (
`quantity` BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'large_orders',
'stream.arn' = 'arn:aws:kinesis:ap-southeast-1:000000000000:stream/large_orders',
'aws.region' = 'us-east-1',
'aws.endpoint' = 'https://kinesalite:4567',
'aws.credentials.provider' = 'BASIC',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ private Properties defaultSinkProperties() {
{
setProperty("aws.region", "us-west-2");
setProperty("aws.credentials.provider", "BASIC");
setProperty("aws.credentials.provider.basic.accesskeyid", "ververicka");
setProperty(
"aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
setProperty("aws.credentials.basic.accesskeyid", "ververicka");
setProperty("aws.credentials.basic.secretkey", "SuperSecretSecretSquirrel");
}
};
}
Expand Down
Loading

0 comments on commit 638351c

Please sign in to comment.