Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add microbatch strategy #924

Merged
merged 13 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241002-171112.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add microbatch strategy
time: 2024-10-02T17:11:12.88725-05:00
custom:
Author: QMalcolm
Issue: "923"
2 changes: 1 addition & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge"]
return ["append", "delete+insert", "merge", "microbatch"]

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,50 @@
)

{% endmacro %}

{% macro redshift__get_incremental_microbatch_sql(arg_dict) %}
{#-
Technically this function could just call out to the default implementation of delete_insert.
However, the default implementation requires a unique_id, which we actually do not want or
need. Thus we re-implement delete insert here without the unique_id requirement
-#}

{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set predicates = [] -%}

{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{%- for pred in incremental_predicates -%}
{% if "DBT_INTERNAL_DEST." in pred %}
{%- set pred = pred | replace("DBT_INTERNAL_DEST.", target ~ "." ) -%}
{% endif %}
{% if "dbt_internal_dest." in pred %}
{%- set pred = pred | replace("dbt_internal_dest.", target ~ "." ) -%}
{% endif %}
{% do predicates.append(pred) %}
{% endfor %}

{% if not model.config.get("__dbt_internal_microbatch_event_time_start") or not model.config.get("__dbt_internal_microbatch_event_time_end") -%}
{% do exceptions.raise_compiler_error('dbt could not compute the start and end timestamps for the running batch') %}
{% endif %}

{#-- Add additional incremental_predicates to filter for batch --#}
{% do predicates.append(model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% do predicates.append(model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% do arg_dict.update({'incremental_predicates': predicates}) %}

delete from {{ target }}
where (
{% for predicate in predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


# No requirement for a unique_id for redshift microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"
Loading