Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found #152

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

robg-eb
Copy link

@robg-eb robg-eb commented Jul 24, 2024

Purpose of the change

When DynamoDbSink is used with CDC sources, it fails to process DELETE records and throws

    org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema

This is due to DynamoDbSinkWriter passing the whole DynamoDb Item as key instead of the constructed primary key alone.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests to RowDataToAttributeValueConverterTest.java:

    • testDeleteOnlyPrimaryKey - Ensures that for a DELETE request, only the (single) PK field is included
    • testDeleteOnlyPrimaryKeys - Ensures that for a DELETE request with a composite PK, both PK fields are included.
    • testPKIgnoredForInsert - Ensures that PK is ignored when an INSERT request is done, and all fields continue to be included as they have been in the past.
    • testPKIgnoredForUpdateAfter - Ensures that PK is ignored when an UPDATE_AFTER request is done, and all fields continue to be included as they have been in the past.
  • Ran manual tests following the steps noted in https://issues.apache.org/jira/browse/FLINK-35500 under "Steps To Reproduce". Running the SQL statement as described in Step 6 now properly runs a DELETE in DynamoDB.

Significant changes

Previously, the PRIMARY KEY field had no significance for a DynamoDB Sink via Table API. Now, the PRIMARY KEY is required when processing a CDC stream that contains DELETES. This is not a 'breaking change' because the previous behavior for processing a CDC stream containing DELETES was already a failure (The provided key element does not match the schema). This change now provides a clear exception informing users to specify a Primary Key to avoid that failure. To clarify this change, the PR contains updates to the Connector documentation.

Copy link

boring-cyborg bot commented Jul 24, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Map<String, AttributeValue> expectedResult =
singletonMap(key, AttributeValue.builder().s(value).build());

assertThat(actualResult).containsAllEntriesOf(expectedResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be done on one assert using containsExactly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response below

expectedResult.put(key, AttributeValue.builder().s(value).build());
expectedResult.put(additionalKey, AttributeValue.builder().s(additionalValue).build());

assertThat(actualResult).containsAllEntriesOf(expectedResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be done on one assert using containsExactly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response below

expectedResult.put(key, AttributeValue.builder().s(value).build());
expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build());

assertThat(actualResult).containsAllEntriesOf(expectedResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be done on one assert using containsExactly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahmed-hamdy - I took a look at doing this, but containsExactly only works for (ordered) Lists, whereas the things we're comparing here are actually HashMap . We'd have to convert these HashMap and impose an order on them as lists to compare them, which would seem to be more complicated than what I have here in just running two separate assertions - would you agree ? Let me know if you think there's a better way - I'm admittedly not a Java expert..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use assertThat(...).containsExactlyInAnyOrderEntriesOf(...) here to check that maps contain same elements without worrying about order.

@vahmed-hamdy
Copy link
Contributor

There are some style violations, Could you run mvn spotless:apply ahead of the PR please?

@robg-eb
Copy link
Author

robg-eb commented Aug 1, 2024

@vahmed-hamdy -

There are some style violations, Could you run mvn spotless:apply ahead of the PR please?

Done, I've applied updated formatting now!

Copy link
Contributor

@z3d1k z3d1k left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution!
Posted a few notes on code, but overall implementation looks good

@hlteoh37 please take a look

expectedResult.put(key, AttributeValue.builder().s(value).build());
expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build());

assertThat(actualResult).containsAllEntriesOf(expectedResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use assertThat(...).containsExactlyInAnyOrderEntriesOf(...) here to check that maps contain same elements without worrying about order.

@vahmed-hamdy
Copy link
Contributor

Thanks @robg-eb ,
@hlteoh37 could you please take a look

@robg-eb
Copy link
Author

robg-eb commented Aug 13, 2024

I addressed all the open comments from @z3d1k now. Thank you all for the suggestions! Sounds like @hlteoh37 is the next who should take a look?

@@ -62,7 +63,8 @@ protected DynamoDbDynamicSink(
boolean failOnError,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Set<String> overwriteByPartitionKeys,
Set<String> primaryKeys) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DynamoDB nomenclature, this set of properties is a primaryKey (not primaryKeys). Let's use a proper name

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, you're just suggesting changing the variable name from primaryKeys to primaryKey, right? And leaving its data type intact of Set<String>, since a Primary Key can consist of two fields - the Partition Key and the Sort Key?

if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
builder =
builder.setPrimaryKeys(
new HashSet<>(
Copy link

@dzikosc dzikosc Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamoDB primary key is an ordered set of properties - partitionKey and then sortKey. Let's model it like that.

I would suggest using those as dedicated properties instead in the Sink model. We could add an extra validation here to ensure consistency with DDB schema.

  • if one column provided that's Partition Key
  • if two columns provided, first is Partition Key, the 2nd is a sortKey
  • if three or more columns are specified, table registration should fail

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
It must be an ordered set/list of 1 or 2 elements.

Suggestion: if not too complicated, for a user perspective it would be clearer a no-nonsense passing a PrimaryKey object with two fields, partitionKey and sortKey, where partitionKey

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nicusX - While having a dedicated PrimaryKey object with two fields partitionKey and sortKey might work for the DataStream API if I were to add it to the Sink model, I am not clear on how that would then translate to the Table API. I was hoping to just use the fact that the Table API / SQL API already supports the concept of passing in a PRIMARY KEY for that.

I don't think there's any reason here why we would need to separate the partition key and sort key for the purpose here of identifying the Primary Key - and in fact, there is also some naming collision then as well as the current Table API / SQL connector supports passing in a PARTITIONED BY clause, adding to potential confusion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack for not having an object with separate fields.
However, in this case the primaryKey (singular) should be an ordered set (e.g. a List) and not a Set. The order of the two fields is relevant: the first always being the partitionKey, in DDB parlance, and the second, if present, the sortKey

Copy link

@dzikosc dzikosc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's model the concept of DDB primary key, using the specification and correct naming

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.CoreComponents.html#HowItWorks.CoreComponents.PrimaryKey

@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context context) {
.setDynamoDbClientProperties(
dynamoDbConfiguration.getSinkClientProperties());

if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
Copy link

@dzikosc dzikosc Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IN case of DynamoDB contents of PrimaryKey and PARTITION BY should be the same to ensure semantic correctness of various operations.

To avoid repetititiveness and simplify the usage, we should aim to use just PrimaryKey property, as it's a fairly intuitive and well understood concept for DDB users. In the meantime, we should probably aim to start deprecation process of the partition by mechanism.

The first step could be

  • if PK is set, but partition by is not, use PK as partition by
  • document that partition by is on deprecation path
  • log warnings when partition by is being used

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also, I'd add a check that, if PARTITION BY and PRIMARY KEY are both specified, they must be identical

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dzikosc - Isn't there a potential use case wherein a user of the existing PARTITION BY functionality of the sink wants to use it to deduplicate data by something other than the actual Primary Key of the DynamoDB table? For example, the DynamoDB table has:

user_id  Partition Key 
order_id Sort Key 
line_number 
price

Today, the user of the connector could specify a PARTITION BY user_id, order_id, line_number to deduplicate incoming data - Maybe this is a contrived case, but just pointing out that we may now be blocking a use case that was previously supported. Thoughts?

Either way, could we keep the deprecation of PARTITION BY to a separate PR to reduce the scope of this one?

`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (user_id) NOT ENFORCED
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use NOT ENFORCED qualifier? In case of DynamoDB Primary Key property (or properties) are always present on the record.

);
```

Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to dedeuplicate data and support DELETEs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would clarify that PARTITIONED BY is used by Flink for deduplication within the same batch (and to support delete), but it's different from DynamoDB's partitionKey. If the user is familiar with DynamoDB but not much with Flink this may generate lot of confusion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, PARTITIONED BY and PRIMARY KEY should always be the same.
(see other comment by @dzikosc below)

@@ -62,7 +63,8 @@ protected DynamoDbDynamicSink(
boolean failOnError,
Properties dynamoDbClientProperties,
DataType physicalDataType,
Set<String> overwriteByPartitionKeys) {
Set<String> overwriteByPartitionKeys,
Set<String> primaryKeys) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context context) {
.setDynamoDbClientProperties(
dynamoDbConfiguration.getSinkClientProperties());

if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Also, I'd add a check that, if PARTITION BY and PRIMARY KEY are both specified, they must be identical

if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
builder =
builder.setPrimaryKeys(
new HashSet<>(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
It must be an ordered set/list of 1 or 2 elements.

Suggestion: if not too complicated, for a user perspective it would be clearer a no-nonsense passing a PrimaryKey object with two fields, partitionKey and sortKey, where partitionKey

@hlteoh37
Copy link
Contributor

hlteoh37 commented Nov 4, 2024

@robg-eb Thank you for working on this. Can I check if you plan to address the remaining comments?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants