Skip to content

Commit

Permalink
add merge as a valid incremental strategy (#490)
Browse files Browse the repository at this point in the history
* add merge as a valid incremental strategy

* add new changie

* whitespace

* Tests for incremental strategies

* Trim trailing whitespace

* Copy incremental tests from dbt-snowflake into dbt-redshift

* remove any reference to DBT_INTERNAL_DEST

* add skeleton support

* add skeleton support

* Use Redshift-specific MERGE syntax

* Refactor values to insert when not matched

* add query commenting back

* Test case when using `merge` with `merge_exclude_columns`

* Enumerate column names to insert into

---------

Co-authored-by: Anders <[email protected]>
Co-authored-by: Doug Beatty <[email protected]>
Co-authored-by: Mike Alfare <[email protected]>
Co-authored-by: Doug Beatty <[email protected]>
  • Loading branch information
5 people authored Jul 14, 2023
1 parent 5958da3 commit 493148e
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230613-144752.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: add merge as a new incremental strategy for redshift
time: 2023-06-13T14:47:52.054853-07:00
custom:
Author: jiezhen-chen
Issue: "402"
2 changes: 1 addition & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert"]
return ["append", "delete+insert", "merge"]

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"
Expand Down
66 changes: 66 additions & 0 deletions dbt/include/redshift/macros/materializations/incremental_merge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{% macro redshift__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
{%- set predicates = [] -%}
{% if incremental_predicates is not none %}
{%- set incremental_predicates_list = [] + incremental_predicates -%}
{%- for pred in incremental_predicates_list -%}
{% if "DBT_INTERNAL_DEST." in pred %}
{%- set pred = pred | replace("DBT_INTERNAL_DEST.", target ~ "." ) -%}
{% endif %}
{% if "dbt_internal_dest." in pred %}
{%- set pred = pred | replace("dbt_internal_dest.", target ~ "." ) -%}
{% endif %}
{% do predicates.append(pred) %}
{% endfor %}
{% endif %}

{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = {{ target }}.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = {{ target }}.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }}
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {% endif %}
{% endfor %}
{% endif %}

when not matched then insert (
{% for column_name in update_columns -%}
{{ column_name }}
{%- if not loop.last %}, {% endif %}
{% endfor %}
)
values (
{% for column_name in update_columns -%}
DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {% endif %}
{% endfor %}
)

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)


seeds__expected_merge_exclude_columns_csv = """id,msg,color
1,hello,blue
2,goodbye,green
3,NULL,purple
"""


class TestMergeExcludeColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_exclude_columns.csv": seeds__expected_merge_exclude_columns_csv}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates


class TestIncrementalPredicatesDeleteInsertRedshift(BaseIncrementalPredicates):
pass


class TestPredicatesDeleteInsertRedshift(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+predicates": ["id != 2"], "+incremental_strategy": "delete+insert"}}


class TestIncrementalPredicatesMergeRedshift(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": ["dbt_internal_dest.id != 2"],
"+incremental_strategy": "merge",
}
}


class TestPredicatesMergeRedshift(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+predicates": ["dbt_internal_dest.id != 2"],
"+incremental_strategy": "merge",
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from dbt.tests.util import run_dbt
from dbt.tests.adapter.basic.test_incremental import (
BaseIncremental,
BaseIncrementalNotSchemaChange,
)


class TestBaseIncrementalNotSchemaChange(BaseIncrementalNotSchemaChange):
pass


class TestIncrementalRunResultRedshift(BaseIncremental):
"""Bonus test to verify that incremental models return the number of rows affected"""

def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# run with initial seed
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1

# run with additions
results = run_dbt(["run", "--vars", "seed_name: added"])
assert len(results) == 1
# verify that run_result is correct
rows_affected = results[0].adapter_response["rows_affected"]
assert rows_affected == 10, f"Expected 10 rows changed, found {rows_affected}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest
from dbt.tests.util import run_dbt, get_manifest
from dbt.exceptions import DbtRuntimeError
from dbt.context.providers import generate_runtime_model_context


my_model_sql = """
select 1 as fun
"""


@pytest.fixture(scope="class")
def models():
return {"my_model.sql": my_model_sql}


def test_basic(project):
results = run_dbt(["run"])
assert len(results) == 1

manifest = get_manifest(project.project_root)
model = manifest.nodes["model.test.my_model"]

# Normally the context will be provided by the macro that calls the
# get_incrmental_strategy_macro method, but for testing purposes
# we create a runtime_model_context.
context = generate_runtime_model_context(
model,
project.adapter.config,
manifest,
)

macro_func = project.adapter.get_incremental_strategy_macro(context, "default")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

macro_func = project.adapter.get_incremental_strategy_macro(context, "append")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

macro_func = project.adapter.get_incremental_strategy_macro(context, "delete+insert")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

macro_func = project.adapter.get_incremental_strategy_macro(context, "merge")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

# This incremental strategy is not valid for Redshift
with pytest.raises(DbtRuntimeError) as excinfo:
macro_func = project.adapter.get_incremental_strategy_macro(context, "insert_overwrite")
assert "insert_overwrite" in str(excinfo.value)
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey


class TestUniqueKeyRedshift(BaseIncrementalUniqueKey):
pass


class TestUniqueKeyDeleteInsertRedshift(BaseIncrementalUniqueKey):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+incremental_strategy": "delete+insert"}}

0 comments on commit 493148e

Please sign in to comment.