Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Introduce Merge Engines for Primary-Key Table #212

Open
1 of 5 tasks
wuchong opened this issue Dec 17, 2024 · 0 comments
Open
1 of 5 tasks

[Feature] Introduce Merge Engines for Primary-Key Table #212

wuchong opened this issue Dec 17, 2024 · 0 comments
Labels
component=kv feature New feature or request

Comments

@wuchong
Copy link
Member

wuchong commented Dec 17, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

We propose the introduction of merge engines for primary-key tables, enhancing data management and query capabilities. This feature will encompass the following merge engines, each with distinct functionalities:

  • Version-Based Merge: This merge engine will allow data consolidation based on version numbers. It is very similar to ClickHouse ReplacingMergeTree that keeps the row with the highest version number. This is also very similar to HBase checkAndPut that updates records only when the condition is true. In this way, the out-of-order data can be guaranteed to be ultimately consistent with the upstream.

  • First-Row Based Merge: Designed to retain the first occurrence of a row with the same primary key, this engine boosts performance by minimizing writes to the kv store and generates only insert-only changelogs (some downstream jobs only accept insert-only changelogs).

  • Aggregate Based Merge: This engine will facilitate the merging of data by aggregating records, which is particularly useful for summarizing information and analytics purposes. It will support operations like sum, max, min, count, avg, among others, providing robust analytic capabilities.

Solution

Introduce table properties (configured via Flink DDL with options):

'table.merge-engine' = 'first_row|version|aggregate'
'table.merge-engine.version.column' = '<version_column>'
'table.merge-engine.aggregate.<agg_column>' =  'sum | max | min | count | avg | first_value | last_value' 

The merge-engine properties are table storage properties, and therefore can't be changed after table creation.

How to Use

Version-Based Merge

CREATE TABLE fluss_table (
  id BIGINT,
  ts BIGINT,
  data STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'fluss',
  'bootstrap.address' = '...',
  'table.merge-engine' = 'version',
  'table.merge-engine.version.column' = 'ts'  -- updates when new ts >= old ts  
);

First-Row-Based Merge

CREATE TABLE fluss_table (
  id BIGINT,
  version BIGINT,
  data STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'fluss',
  'bootstrap.address' = '...',
  'table.merge-engine' = 'first_row' -- only the first row of the primary key will be retained.
);

Aggregate-Based Merge

CREATE TABLE fluss_table (
    id BIGINT,
    price DOUBLE,
    sales BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'fluss',
    'bootstrap.address' = '...',
    'table.merge-engine' = 'aggregate',
    'table.merge-engine.aggregate.price' = 'max',
    'table.merge-engine.aggregate.sales' = 'sum'
);

Anything else?

Subtasks:

Willingness to contribute

  • I'm willing to submit a PR!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component=kv feature New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant