Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsert merge strategy #1466

Merged
merged 34 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f275530
add upsert merge strategy
jorritsandbrink Jun 14, 2024
ad21a9f
handle destination upsert support
jorritsandbrink Jun 15, 2024
ebc79e1
refactor row id type handling
jorritsandbrink Jun 18, 2024
9efb502
black format
jorritsandbrink Jun 18, 2024
64d0b17
Merge branch 'devel' into feat/1129-add-upsert-merge-strategy
jorritsandbrink Jun 18, 2024
999da5e
add supported_merge_strategies destination capability
jorritsandbrink Jun 18, 2024
19230a8
fix child row id type handling
jorritsandbrink Jun 18, 2024
34ff5e6
Merge branch 'feat/1129-add-upsert-merge-strategy' of https://github.…
jorritsandbrink Jun 18, 2024
c9a02aa
add condition to exclude destinations that handle merge, but do not i…
jorritsandbrink Jun 19, 2024
7635095
repair improper merge conflict resolution
jorritsandbrink Jun 19, 2024
28e6fb7
improve merge strategy validation
jorritsandbrink Jun 19, 2024
51ab4ac
add default merge strategy to dummy destination capabilities
jorritsandbrink Jun 20, 2024
ced879a
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1129…
jorritsandbrink Jun 28, 2024
5191718
re-add merge strategies capability
jorritsandbrink Jun 28, 2024
0606dfd
re-add upsert schema verification
jorritsandbrink Jun 28, 2024
2647517
black format
jorritsandbrink Jun 28, 2024
01202d4
change SchemaException to ValueError
jorritsandbrink Jun 28, 2024
e4c856b
test upsert merge key warning log
jorritsandbrink Jun 28, 2024
68615dd
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1129…
jorritsandbrink Jun 28, 2024
f53656c
remove obsolete property
jorritsandbrink Jun 28, 2024
4e2988d
remove obsolete import
jorritsandbrink Jun 28, 2024
00e9e25
use get_qualified_table_names utility function consistently
jorritsandbrink Jun 28, 2024
c8c2431
remove obsolete import
jorritsandbrink Jun 28, 2024
b80699a
move test because it needs postgres credentials
jorritsandbrink Jun 28, 2024
5000f43
correct supported merge strategies
jorritsandbrink Jun 28, 2024
cc7d114
move dremio supported merge strategies
jorritsandbrink Jun 28, 2024
caaf248
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1129…
jorritsandbrink Jul 9, 2024
e5e68f2
remove hardcoded row id column references
jorritsandbrink Jul 9, 2024
479e52a
move write disposition hint validation
jorritsandbrink Jul 10, 2024
0d84e6d
replace hardcoded row id column names with constant
jorritsandbrink Jul 10, 2024
245cd0f
add comment
jorritsandbrink Jul 10, 2024
17f2115
refactor schema verification to prevent duplicate warnings
jorritsandbrink Jul 10, 2024
cec6671
Revert "refactor schema verification to prevent duplicate warnings"
jorritsandbrink Jul 11, 2024
0f641e4
comment out test that fails on GitHub CI
jorritsandbrink Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
max_table_nesting: Optional[int] = None
"""Allows a destination to overwrite max_table_nesting from source"""

supported_merge_strategies: Sequence["TLoaderMergeStrategy"] = None # type: ignore[name-defined] # noqa: F821
# TODO: also add `supported_replace_strategies` capability

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False

Expand Down
4 changes: 4 additions & 0 deletions dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ def __init__(self, schema_name: str, version_hash: str, stored_version_hash: str
)


class DestinationCapabilitiesException(DestinationException):
pass


class DestinationInvalidFileFormat(DestinationTerminalException):
def __init__(
self, destination_type: str, file_format: str, file_name: str, message: str
Expand Down
113 changes: 76 additions & 37 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@
from typing import Dict, List, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from dlt.common.json import json
from dlt.common.normalizers.exceptions import InvalidJsonNormalizer
from dlt.common.normalizers.typing import TJSONNormalizer
from dlt.common.normalizers.typing import TJSONNormalizer, TRowIdType
from dlt.common.normalizers.utils import generate_dlt_id, DLT_ID_LENGTH_BYTES

from dlt.common.typing import DictStrAny, TDataItem, StrAny
from dlt.common.schema import Schema
from dlt.common.schema.typing import (
TLoaderMergeStrategy,
TColumnSchema,
TColumnName,
TSimpleRegex,
DLT_NAME_PREFIX,
)
from dlt.common.schema.utils import column_name_validator, get_validity_column_names
from dlt.common.schema.utils import (
column_name_validator,
get_validity_column_names,
get_columns_names_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.schema.exceptions import ColumnNameConflictException
from dlt.common.utils import digest128, update_dict_nested
from dlt.common.normalizers.json import (
Expand Down Expand Up @@ -158,14 +164,16 @@ def norm_row_dicts(dict_row: StrAny, __r_lvl: int, path: Tuple[str, ...] = ()) -
return out_rec_row, out_rec_list

@staticmethod
def get_row_hash(row: Dict[str, Any]) -> str:
def get_row_hash(row: Dict[str, Any], subset: Optional[List[str]] = None) -> str:
"""Returns hash of row.

Hash includes column names and values and is ordered by column name.
Excludes dlt system columns.
Can be used as deterministic row identifier.
"""
row_filtered = {k: v for k, v in row.items() if not k.startswith(DLT_NAME_PREFIX)}
if subset is not None:
row_filtered = {k: v for k, v in row.items() if k in subset}
row_str = json.dumps(row_filtered, sort_keys=True)
return digest128(row_str, DLT_ID_LENGTH_BYTES)

Expand All @@ -188,18 +196,39 @@ def _extend_row(extend: DictStrAny, row: DictStrAny) -> None:
row.update(extend)

def _add_row_id(
self, table: str, row: DictStrAny, parent_row_id: str, pos: int, _r_lvl: int
self,
table: str,
dict_row: DictStrAny,
flattened_row: DictStrAny,
parent_row_id: str,
pos: int,
_r_lvl: int,
) -> str:
# row_id is always random, no matter if primary_key is present or not
row_id = generate_dlt_id()
if _r_lvl > 0:
primary_key = self.schema.filter_row_with_hint(table, "primary_key", row)
if not primary_key:
# child table row deterministic hash
row_id = DataItemNormalizer._get_child_row_hash(parent_row_id, table, pos)
# link to parent table
DataItemNormalizer._link_row(row, parent_row_id, pos)
row[self.c_dlt_id] = row_id
primary_key = False
if _r_lvl > 0: # child table
primary_key = bool(
self.schema.filter_row_with_hint(table, "primary_key", flattened_row)
)
row_id_type = self._get_row_id_type(self.schema, table, primary_key, _r_lvl)

if row_id_type == "random":
row_id = generate_dlt_id()
else:
if _r_lvl == 0: # root table
if row_id_type in ("key_hash", "row_hash"):
subset = None
if row_id_type == "key_hash":
subset = self._get_primary_key(self.schema, table)
# base hash on `dict_row` instead of `flattened_row`
# so changes in child tables lead to new row id
row_id = self.get_row_hash(dict_row, subset=subset)
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
elif _r_lvl > 0: # child table
if row_id_type == "row_hash":
row_id = DataItemNormalizer._get_child_row_hash(parent_row_id, table, pos)
# link to parent table
DataItemNormalizer._link_row(flattened_row, parent_row_id, pos)

flattened_row[self.c_dlt_id] = row_id
return row_id

def _get_propagated_values(self, table: str, row: DictStrAny, _r_lvl: int) -> StrAny:
Expand Down Expand Up @@ -268,22 +297,17 @@ def _normalize_row(
parent_row_id: Optional[str] = None,
pos: Optional[int] = None,
_r_lvl: int = 0,
row_hash: bool = False,
) -> TNormalizedRowIterator:
schema = self.schema
table = schema.naming.shorten_fragments(*parent_path, *ident_path)
# compute row hash and set as row id
if row_hash:
row_id = self.get_row_hash(dict_row)
dict_row[self.c_dlt_id] = row_id
# flatten current row and extract all lists to recur into
flattened_row, lists = self._flatten(table, dict_row, _r_lvl)
# always extend row
DataItemNormalizer._extend_row(extend, flattened_row)
# infer record hash or leave existing primary key if present
row_id = flattened_row.get(self.c_dlt_id, None)
if not row_id:
row_id = self._add_row_id(table, flattened_row, parent_row_id, pos, _r_lvl)
row_id = self._add_row_id(table, dict_row, flattened_row, parent_row_id, pos, _r_lvl)

# find fields to propagate to child tables in config
extend.update(self._get_propagated_values(table, flattened_row, _r_lvl))
Expand Down Expand Up @@ -369,11 +393,7 @@ def normalize_data_item(
row = cast(DictStrAny, item)
# identify load id if loaded data must be processed after loading incrementally
row[self.c_dlt_load_id] = load_id

# determine if row hash should be used as dlt id
row_hash = False
if self._is_scd2_table(self.schema, table_name):
row_hash = self._dlt_id_is_row_hash(self.schema, table_name, self.c_dlt_id)
if self._get_merge_strategy(self.schema, table_name) == "scd2":
self._validate_validity_column_names(
self.schema.name, self._get_validity_column_names(self.schema, table_name), item
)
Expand All @@ -382,7 +402,6 @@ def normalize_data_item(
row,
{},
(self.schema.naming.normalize_table_identifier(table_name),),
row_hash=row_hash,
)

@classmethod
Expand Down Expand Up @@ -450,11 +469,18 @@ def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]:

@staticmethod
@lru_cache(maxsize=None)
def _is_scd2_table(schema: Schema, table_name: str) -> bool:
if table_name in schema.data_table_names():
if schema.get_table(table_name).get("x-merge-strategy") == "scd2":
return True
return False
def _get_merge_strategy(schema: Schema, table_name: str) -> Optional[TLoaderMergeStrategy]:
if table_name in schema.data_table_names(include_incomplete=True):
return schema.get_table(table_name).get("x-merge-strategy") # type: ignore[return-value]
return None

@staticmethod
@lru_cache(maxsize=None)
def _get_primary_key(schema: Schema, table_name: str) -> List[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's one detail: we may be dealing with non-existing table that nevertheless gets primary key from schema hints. (data comes first, then we detect the schema - so to be fully correct we'd need to use flattened data to predict if any primary key will be detected on it)
I looked at our code and this will significantly slow us down, make caching harder etc. so let's ignore it for now.

if table_name not in schema.tables:
return []
table = schema.get_table(table_name)
return get_columns_names_with_prop(table, "primary_key", include_incomplete=True)

@staticmethod
@lru_cache(maxsize=None)
Expand All @@ -463,12 +489,25 @@ def _get_validity_column_names(schema: Schema, table_name: str) -> List[Optional

@staticmethod
@lru_cache(maxsize=None)
def _dlt_id_is_row_hash(schema: Schema, table_name: str, c_dlt_id: str) -> bool:
return (
schema.get_table(table_name)["columns"] # type: ignore[return-value]
.get(c_dlt_id, {})
.get("x-row-version", False)
)
def _get_row_id_type(
schema: Schema, table_name: str, primary_key: bool, _r_lvl: int
) -> TRowIdType:
if _r_lvl == 0: # root table
merge_strategy = DataItemNormalizer._get_merge_strategy(schema, table_name)
if merge_strategy == "upsert":
return "key_hash"
elif merge_strategy == "scd2":
x_row_version_col = get_first_column_name_with_prop(
schema.get_table(table_name),
"x-row-version",
include_incomplete=True,
)
if x_row_version_col == DataItemNormalizer.C_DLT_ID:
return "row_hash"
elif _r_lvl > 0: # child table
if not primary_key:
return "row_hash"
return "random"

@staticmethod
def _validate_validity_column_names(
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/normalizers/typing.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import List, Optional, Type, TypedDict, Literal, Union
from types import ModuleType
from typing import List, Optional, Type, TypedDict, Union

from dlt.common.typing import StrAny
from dlt.common.normalizers.naming import NamingConvention

TNamingConventionReferenceArg = Union[str, Type[NamingConvention], ModuleType]


TRowIdType = Literal["random", "row_hash", "key_hash"]
rudolfix marked this conversation as resolved.
Show resolved Hide resolved


class TJSONNormalizer(TypedDict, total=False):
module: str
config: Optional[StrAny] # config is a free form and is validated by `module`
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
]
"""Known hints of a column used to declare hint regexes."""

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg", "delta"]
TFileFormat = Literal[Literal["preferred"], TLoaderFileFormat]
TTypeDetections = Literal[
Expand Down Expand Up @@ -168,7 +167,10 @@ class NormalizerInfo(TypedDict, total=True):
total=False,
)

TLoaderMergeStrategy = Literal["delete-insert", "scd2"]

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"]


WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
MERGE_STRATEGIES: Set[TLoaderMergeStrategy] = set(get_args(TLoaderMergeStrategy))
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.schema_supports_numeric_precision = False
caps.timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = False
caps.supports_clone_table = True
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "scd2"]
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

return caps

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.supports_multiple_statements = False
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "scd2"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = False
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/duckdb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dummy/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_text_data_type_length = 65536
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = False
caps.supported_merge_strategies = ["delete-insert", "upsert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = False
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_text_data_type_length = 1024 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/redshift/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/snowflake/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = True
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
return caps

@property
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/synapse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetimeoffset-transact-sql?view=sql-server-ver16
caps.timestamp_precision = 7

caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

@property
Expand Down
15 changes: 13 additions & 2 deletions dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,13 @@ def catalog_name(self, escape: bool = True) -> Optional[str]:
# connection is scoped to a current database
return None

def fully_qualified_dataset_name(self, escape: bool = True) -> str:
return ".".join(self.make_qualified_table_name_path(None, escape=escape))
def fully_qualified_dataset_name(self, escape: bool = True, staging: bool = False) -> str:
if staging:
with self.with_staging_dataset():
path = self.make_qualified_table_name_path(None, escape=escape)
else:
path = self.make_qualified_table_name_path(None, escape=escape)
return ".".join(path)

def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str:
return ".".join(self.make_qualified_table_name_path(table_name, escape=escape))
Expand All @@ -188,6 +193,12 @@ def make_qualified_table_name_path(
path.append(table_name)
return path

def get_qualified_table_names(self, table_name: str, escape: bool = True) -> Tuple[str, str]:
"""Returns qualified names for table and corresponding staging table as tuple."""
with self.with_staging_dataset():
staging_table_name = self.make_qualified_table_name(table_name, escape)
return self.make_qualified_table_name(table_name, escape), staging_table_name

def escape_column_name(self, column_name: str, escape: bool = True) -> str:
column_name = self.capabilities.casefold_identifier(column_name)
if escape:
Expand Down
Loading
Loading