From be3cc86802edce954febe3a20ffbea2ffdd72dc4 Mon Sep 17 00:00:00 2001 From: Hong Teoh Date: Wed, 6 Nov 2024 12:29:46 +0000 Subject: [PATCH] [FLINK-31989][docs] Update english docs for DynamoDbStreamsSource --- .../docs/connectors/datastream/dynamodb.md | 139 +++++++++++++++++- 1 file changed, 133 insertions(+), 6 deletions(-) diff --git a/docs/content/docs/connectors/datastream/dynamodb.md b/docs/content/docs/connectors/datastream/dynamodb.md index 834ac6643..62c306868 100644 --- a/docs/content/docs/connectors/datastream/dynamodb.md +++ b/docs/content/docs/connectors/datastream/dynamodb.md @@ -23,16 +23,143 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +# Amazon DynamoDB Connector +The DynamoDB connector allows users to read/write from [Amazon DynamoDB](https://aws.amazon.com/dynamodb/). -# Amazon DynamoDB Sink +As a source, the connector allows users to read change data capture stream from DynamoDB tables using [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). -The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) -to setup a table. +As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the [BatchWriteItem API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html). + +## Dependency + +Apache Flink ships the connector for users to utilize. To use the connector, add the following Maven dependency to your project: {{< connector_artifact flink-connector-dynamodb dynamodb >}} + +## Amazon DynamoDB Streams Source + +The DynamoDB streams source reads from [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). +Follow the instructions from the [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to set up and configure the change data capture stream. + +The actual events streamed to the DynamoDB Stream depends on the `StreamViewType` specified on the DynamoDB Stream itself. +See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Usage + +The `DynamoDbStreamsSource` provides a fluent builder to construct an instance of the `DynamoDbStreamsSource`. +The code snippet below illustrates how to do so. + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}} +{{< tab "Java" >}} +```java +// Configure the DynamodbStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +DynamoDbStreamsSource dynamoDbStreamsSource = + DynamoDbStreamsSource.builder() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build(); + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. +DataStream cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source") + .returns(TypeInformation.of(String.class)) + .uid("custom-uid"); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +// Configure the DynamodbStreamsSource +val sourceConfig = new Configuration() +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build() + +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. +val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source") + .uid("custom-uid") +``` +{{< /tab >}} +{{< /tabs >}} + +The above is a simple example of using the `DynamoDbStreamsSource`. +- The DynamoDB Stream being read from is specified using the stream ARN. +- Configuration for the `Source` is supplied using an instance of Flink's `Configuration` class. + The configuration keys can be taken from `AWSConfigOptions` (AWS-specific configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams Source configuration). +- The example specifies the starting position as `TRIM_HORIZON` (see [Configuring Starting Position](#configuring-starting-position) for more information). +- The deserialization format is as `SimpleStringSchema` (see [Deserialization Schema](#deserialization-schema) for more information). +- The distribution of shards across subtasks is controlled using the `UniformShardAssigner` (see [Shard Assignment Strategy](#shard-assignment-strategy) for more information). +- The example also specifies an increasing `WatermarkStrategy`, which means each record will be tagged with event time specified using `approximateCreationDateTime`. + Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second. + +### Configuring Starting Position + +To specify the starting position of the `DynamodbStreamsSource`, users can set the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in configuration. +- `LATEST`: read all shards of the stream starting from the latest record. +- `TRIM_HORIZON`: read all shards of the stream starting from the earliest record possible (data is trimmed by DynamoDb after 24 hours). + +### Deserialization Schema + +The `DynamoDbStreamsSource` provides the `DynamoDbStreamsDeserializationSchema` interface to allow users to implement their own +deserialization schema to convert DynamoDB change data capture events into custom event types. + +The `DynamoDbStreamsDeserializationSchema#deserialize` method takes in an instance of `Record` from the DynamoDB model. +The `Record` can contain different content, depending on the configuration of the DynamoDB Stream. See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Shard Assignment Strategy + +The `UniformShardAssigner` allocates shards to parallel subtasks of the `Source` operator based on which parallel subtask has the lowest number of allocated shards. +The idea is to distribute change data capture events as evenly as possible across parallel subtasks. + +Users can also implement their own shard assignment strategy by implementing the `DynamoDbStreamsShardAssigner` interface. + +### Configuration + +#### Retry Strategy + +The `DynamoDbStreamsSource` interacts with Amazon DynamoDB using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +The retry strategy used by the AWS SDK client can be tuned using the following configuration options: +- `DYNAMODB_STREAMS_RETRY_COUNT`: Maximum of API retries on retryable errors, before it will restart the Flink job. +- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY`: The base delay used for calculation of the exponential backoff. +- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY`: The maximum delay for exponential backoff. + +#### Shard discovery + +The `DynamoDbStreamsSource` periodically discovers newly created shards on the DynamoDB stream. This can come from shard splitting, or shard rotations. +By default this is set to discover shards every 60 seconds. However, users can customize this to a smaller value by configuring the `SHARD_DISCOVERY_INTERVAL`. + +There is an issue for shard discovery where the shard graph returned from DynamoDB might have inconsistencies. +In this case, the `DynamoDbStreamsSource` automatically detects the inconsistency and retries the shard discovery process. +The maximum number of retries can be configured using `DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT`. + + +## Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + {{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} {{< tab "Java" >}} ```java @@ -90,7 +217,7 @@ flinkStream.sinkTo(dynamoDbSink) {{< /tab >}} {{< /tabs >}} -## Configurations +### Configurations Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.builder()`. @@ -130,7 +257,7 @@ Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.