Skip to content

Commit

Permalink
[FLINK-31989][docs] Update english docs for DynamoDbStreamsSource
Browse files Browse the repository at this point in the history
  • Loading branch information
hlteoh37 committed Nov 6, 2024
1 parent 4fc4e27 commit be3cc86
Showing 1 changed file with 133 additions and 6 deletions.
139 changes: 133 additions & 6 deletions docs/content/docs/connectors/datastream/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,143 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Amazon DynamoDB Connector
The DynamoDB connector allows users to read/write from [Amazon DynamoDB](https://aws.amazon.com/dynamodb/).

# Amazon DynamoDB Sink
As a source, the connector allows users to read change data capture stream from DynamoDB tables using [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).

The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
to setup a table.
As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the [BatchWriteItem API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html).

## Dependency

Apache Flink ships the connector for users to utilize.

To use the connector, add the following Maven dependency to your project:

{{< connector_artifact flink-connector-dynamodb dynamodb >}}


## Amazon DynamoDB Streams Source

The DynamoDB streams source reads from [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
Follow the instructions from the [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to set up and configure the change data capture stream.

The actual events streamed to the DynamoDB Stream depends on the `StreamViewType` specified on the DynamoDB Stream itself.
See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information.

### Usage

The `DynamoDbStreamsSource` provides a fluent builder to construct an instance of the `DynamoDbStreamsSource`.
The code snippet below illustrates how to do so.

{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}}
{{< tab "Java" >}}
```java
// Configure the DynamodbStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream.
DynamoDbStreamsSource<String> dynamoDbStreamsSource =
DynamoDbStreamsSource.<String>builder()
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
.setSourceConfig(sourceConfig)
// User must implement their own deserialization schema to translate change data capture events into custom data types
.setDeserializationSchema(dynamodbDeserializationSchema)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Specify watermarking strategy and the name of the DynamoDB Streams Source operator.
// Specify return type using TypeInformation.
// Specify also UID of operator in line with Flink best practice.
DataStream<String> cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source")
.returns(TypeInformation.of(String.class))
.uid("custom-uid");
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
// Configure the DynamodbStreamsSource
val sourceConfig = new Configuration()
sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST

// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream.
val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
.setSourceConfig(sourceConfig)
// User must implement their own deserialization schema to translate change data capture events into custom data types
.setDeserializationSchema(dynamodbDeserializationSchema)
.build()

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// Specify watermarking strategy and the name of the DynamoDB Streams Source operator.
// Specify return type using TypeInformation.
// Specify also UID of operator in line with Flink best practice.
val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source")
.uid("custom-uid")
```
{{< /tab >}}
{{< /tabs >}}

The above is a simple example of using the `DynamoDbStreamsSource`.
- The DynamoDB Stream being read from is specified using the stream ARN.
- Configuration for the `Source` is supplied using an instance of Flink's `Configuration` class.
The configuration keys can be taken from `AWSConfigOptions` (AWS-specific configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams Source configuration).
- The example specifies the starting position as `TRIM_HORIZON` (see [Configuring Starting Position](#configuring-starting-position) for more information).
- The deserialization format is as `SimpleStringSchema` (see [Deserialization Schema](#deserialization-schema) for more information).
- The distribution of shards across subtasks is controlled using the `UniformShardAssigner` (see [Shard Assignment Strategy](#shard-assignment-strategy) for more information).
- The example also specifies an increasing `WatermarkStrategy`, which means each record will be tagged with event time specified using `approximateCreationDateTime`.
Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second.

### Configuring Starting Position

To specify the starting position of the `DynamodbStreamsSource`, users can set the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in configuration.
- `LATEST`: read all shards of the stream starting from the latest record.
- `TRIM_HORIZON`: read all shards of the stream starting from the earliest record possible (data is trimmed by DynamoDb after 24 hours).

### Deserialization Schema

The `DynamoDbStreamsSource` provides the `DynamoDbStreamsDeserializationSchema<T>` interface to allow users to implement their own
deserialization schema to convert DynamoDB change data capture events into custom event types.

The `DynamoDbStreamsDeserializationSchema<T>#deserialize` method takes in an instance of `Record` from the DynamoDB model.
The `Record` can contain different content, depending on the configuration of the DynamoDB Stream. See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information.

### Shard Assignment Strategy

The `UniformShardAssigner` allocates shards to parallel subtasks of the `Source` operator based on which parallel subtask has the lowest number of allocated shards.
The idea is to distribute change data capture events as evenly as possible across parallel subtasks.

Users can also implement their own shard assignment strategy by implementing the `DynamoDbStreamsShardAssigner` interface.

### Configuration

#### Retry Strategy

The `DynamoDbStreamsSource` interacts with Amazon DynamoDB using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).

The retry strategy used by the AWS SDK client can be tuned using the following configuration options:
- `DYNAMODB_STREAMS_RETRY_COUNT`: Maximum of API retries on retryable errors, before it will restart the Flink job.
- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY`: The base delay used for calculation of the exponential backoff.
- `DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY`: The maximum delay for exponential backoff.

#### Shard discovery

The `DynamoDbStreamsSource` periodically discovers newly created shards on the DynamoDB stream. This can come from shard splitting, or shard rotations.
By default this is set to discover shards every 60 seconds. However, users can customize this to a smaller value by configuring the `SHARD_DISCOVERY_INTERVAL`.

There is an issue for shard discovery where the shard graph returned from DynamoDB might have inconsistencies.
In this case, the `DynamoDbStreamsSource` automatically detects the inconsistency and retries the shard discovery process.
The maximum number of retries can be configured using `DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT`.


## Amazon DynamoDB Sink

The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
to setup a table.

{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
{{< tab "Java" >}}
```java
Expand Down Expand Up @@ -90,7 +217,7 @@ flinkStream.sinkTo(dynamoDbSink)
{{< /tab >}}
{{< /tabs >}}

## Configurations
### Configurations

Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.<InputType>builder()`.

Expand Down Expand Up @@ -130,7 +257,7 @@ Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.<Inpu
12. _build()_
* Constructs and returns the DynamoDB sink.

## Element Converter
### Element Converter

An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided
`DefaultDynamoDbElementConverter` which extracts item schema from element class, this requires the element class to be of composite type (i.e. Pojo, Tuple or Row). In case TypeInformation of the elements is present the schema is eagerly constructed by using `DynamoDbTypeInformedElementConverter` as in `new DynamoDbTypeInformedElementConverter(TypeInformation.of(MyPojo.class))`.
Expand All @@ -141,7 +268,7 @@ annotations see [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer

A sample application using a custom `ElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java). A sample application using the `DynamoDbBeanElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java).

## Using Custom DynamoDB Endpoints
### Using Custom DynamoDB Endpoints

It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS
DynamoDB endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing
Expand Down

0 comments on commit be3cc86

Please sign in to comment.