Skip to content

Commit

Permalink
implement adap-608 framework from core for materialized views
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Jul 12, 2023
1 parent 31481f8 commit 2437891
Show file tree
Hide file tree
Showing 23 changed files with 858 additions and 463 deletions.
5 changes: 4 additions & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ def relation_factory(self):
relation_changesets={
RelationType.MaterializedView: relation_models.RedshiftMaterializedViewRelationChangeset,
},
relation_can_be_renamed={RelationType.MaterializedView},
relation_can_be_renamed={
RelationType.Table,
RelationType.View,
},
render_policy=relation_models.RedshiftRenderPolicy,
)

Expand Down
10 changes: 8 additions & 2 deletions dbt/adapters/redshift/relation/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dbt.adapters.redshift.relation.models.database import RedshiftDatabaseRelation
from dbt.adapters.redshift.relation.models.dist import RedshiftDistRelation
from dbt.adapters.redshift.relation.models.dist import (
RedshiftDistRelation,
RedshiftDistStyle,
)
from dbt.adapters.redshift.relation.models.materialized_view import (
RedshiftMaterializedViewRelation,
RedshiftMaterializedViewRelationChangeset,
Expand All @@ -9,4 +12,7 @@
MAX_CHARACTERS_IN_IDENTIFIER,
)
from dbt.adapters.redshift.relation.models.schema import RedshiftSchemaRelation
from dbt.adapters.redshift.relation.models.sort import RedshiftSortRelation
from dbt.adapters.redshift.relation.models.sort import (
RedshiftSortRelation,
RedshiftSortStyle,
)
22 changes: 16 additions & 6 deletions dbt/adapters/redshift/relation/models/dist.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from dataclasses import dataclass
from typing import Optional, Set

Expand All @@ -12,6 +13,8 @@
from dbt.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError

from dbt.adapters.redshift.relation.models.policy import RedshiftRenderPolicy


class RedshiftDistStyle(StrEnum):
auto = "auto"
Expand All @@ -35,9 +38,13 @@ class RedshiftDistRelation(RelationComponent, ValidationMixin):
- distkey: the column to use for the dist key if `dist_style` is `key`
"""

# attribution
diststyle: Optional[RedshiftDistStyle] = RedshiftDistStyle.default()
distkey: Optional[str] = None

# configuration
render = RedshiftRenderPolicy

@property
def validation_rules(self) -> Set[ValidationRule]:
# index rules get run by default with the mixin
Expand All @@ -64,11 +71,14 @@ def validation_rules(self) -> Set[ValidationRule]:

@classmethod
def from_dict(cls, config_dict) -> "RedshiftDistRelation":
kwargs_dict = {
"diststyle": config_dict.get("diststyle"),
"distkey": config_dict.get("distkey"),
}
dist: "RedshiftDistRelation" = super().from_dict(kwargs_dict) # type: ignore
# don't alter the incoming config
kwargs_dict = deepcopy(config_dict)

if diststyle := config_dict.get("diststyle"):
kwargs_dict.update({"diststyle": RedshiftDistStyle(diststyle)})

dist = super().from_dict(kwargs_dict)
assert isinstance(dist, RedshiftDistRelation)
return dist

@classmethod
Expand Down Expand Up @@ -118,7 +128,7 @@ def parse_describe_relation_results(cls, describe_relation_results: agate.Row) -
Returns: a standard dictionary describing this `RedshiftDistConfig` instance
"""
dist: str = describe_relation_results.get("diststyle")
dist: str = describe_relation_results.get("dist")

try:
# covers `AUTO`, `ALL`, `EVEN`, `KEY`, '', <unexpected>
Expand Down
57 changes: 34 additions & 23 deletions dbt/adapters/redshift/relation/models/materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import agate
from dbt.adapters.relation.models import (
MaterializedViewRelation,
MaterializedViewRelationChangeset,
Relation,
RelationChange,
RelationChangeAction,
RelationChangeset,
)
from dbt.adapters.validation import ValidationMixin, ValidationRule
from dbt.contracts.graph.nodes import ModelNode
Expand Down Expand Up @@ -61,16 +61,14 @@ class RedshiftMaterializedViewRelation(MaterializedViewRelation, ValidationMixin
schema: RedshiftSchemaRelation
query: str
backup: Optional[bool] = True
dist: RedshiftDistRelation = RedshiftDistRelation(
diststyle=RedshiftDistStyle.even, render=RedshiftRenderPolicy
)
sort: RedshiftSortRelation = RedshiftSortRelation(render=RedshiftRenderPolicy)
dist: RedshiftDistRelation = RedshiftDistRelation.from_dict({"diststyle": "even"})
sort: RedshiftSortRelation = RedshiftSortRelation.from_dict({})
autorefresh: Optional[bool] = False

# configuration
render = RedshiftRenderPolicy
SchemaParser = RedshiftSchemaRelation # type: ignore
can_be_renamed = True
can_be_renamed = False

@property
def validation_rules(self) -> Set[ValidationRule]:
Expand Down Expand Up @@ -119,8 +117,8 @@ def parse_model_node(cls, model_node: ModelNode) -> dict:

config_dict.update(
{
"backup": model_node.config.get("backup"),
"autorefresh": model_node.config.get("auto_refresh"),
"backup": model_node.config.extra.get("backup"),
"autorefresh": model_node.config.extra.get("autorefresh"),
}
)

Expand All @@ -145,11 +143,11 @@ def parse_describe_relation_results(
{
"materialized_view": agate.Table(
agate.Row({
"database": "<database_name>",
"schema": "<schema_name>",
"table": "<name>",
"diststyle": "<diststyle/distkey>", # e.g. EVEN | KEY(column1) | AUTO(ALL) | AUTO(KEY(id)),
"sortkey1": "<column_name>",
"database_name": "<database_name>",
"schema_name": "<schema_name>",
"name": "<name>",
"dist": "<diststyle/distkey>", # e.g. EVEN | KEY(column1) | AUTO(ALL) | AUTO(KEY(id)),
"sortkey": "<column_name>",
"autorefresh: any("t", "f"),
})
),
Expand All @@ -162,33 +160,45 @@ def parse_describe_relation_results(
Returns: a standard dictionary describing this `RedshiftMaterializedViewConfig` instance
"""
# merge these because the base class assumes `query` is on the same record as `name`, `schema_name` and
# `database_name`
describe_relation_results = cls._combine_describe_relation_results_tables(
describe_relation_results
)
config_dict = super().parse_describe_relation_results(describe_relation_results)

materialized_view: agate.Row = describe_relation_results["materialized_view"].rows[0]
query: agate.Row = describe_relation_results["query"].rows[0]

config_dict.update(
{
"autorefresh": {"t": True, "f": False}.get(materialized_view.get("autorefresh")),
"query": cls._parse_query(query.get("definition")),
"autorefresh": materialized_view.get("autorefresh"),
"query": cls._parse_query(materialized_view.get("query")),
}
)

# the default for materialized views differs from the default for diststyle in general
# only set it if we got a value
if materialized_view.get("diststyle"):
if materialized_view.get("dist"):
config_dict.update(
{"dist": RedshiftDistRelation.parse_describe_relation_results(materialized_view)}
)

# TODO: this only shows the first column in the sort key
if materialized_view.get("sortkey1"):
if materialized_view.get("sortkey"):
config_dict.update(
{"sort": RedshiftSortRelation.parse_describe_relation_results(materialized_view)}
)

return config_dict

@classmethod
def _combine_describe_relation_results_tables(
cls, describe_relation_results: Dict[str, agate.Table]
) -> Dict[str, agate.Table]:
materialized_view_table: agate.Table = describe_relation_results["materialized_view"]
query_table: agate.Table = describe_relation_results["query"]
combined_table: agate.Table = materialized_view_table.join(query_table, full_outer=True)
return {"materialized_view": combined_table}

@classmethod
def _parse_query(cls, query: str) -> str:
"""
Expand All @@ -211,9 +221,10 @@ def _parse_query(cls, query: str) -> str:
select * from my_base_table
"""
open_paren = query.find("as (") + len("as (")
close_paren = query.find(");")
return query[open_paren:close_paren].strip()
return query
# open_paren = query.find("as (")
# close_paren = query.find(");")
# return query[open_paren:close_paren].strip()


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -235,7 +246,7 @@ def requires_full_refresh(self) -> bool:


@dataclass
class RedshiftMaterializedViewRelationChangeset(RelationChangeset):
class RedshiftMaterializedViewRelationChangeset(MaterializedViewRelationChangeset):
backup: Optional[RedshiftBackupRelationChange] = None
dist: Optional[RedshiftDistRelationChange] = None
sort: Optional[RedshiftSortRelationChange] = None
Expand Down
24 changes: 12 additions & 12 deletions dbt/adapters/redshift/relation/models/sort.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import deepcopy
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional, FrozenSet, Set

import agate
Expand Down Expand Up @@ -43,15 +43,16 @@ class RedshiftSortRelation(RelationComponent, ValidationMixin):
- sort_key: the column(s) to use for the sort key; cannot be combined with `sort_type=auto`
"""

# attribution
sortstyle: Optional[RedshiftSortStyle] = None
sortkey: Optional[FrozenSet[str]] = None
sortkey: Optional[FrozenSet[str]] = field(default_factory=frozenset) # type: ignore

# configuration
render = RedshiftRenderPolicy

def __post_init__(self):
# maintains `frozen=True` while allowing for a variable default on `sort_type`
if self.sortstyle is None and self.sortkey is None:
if self.sortstyle is None and self.sortkey == frozenset():
object.__setattr__(self, "sortstyle", RedshiftSortStyle.default())
elif self.sortstyle is None:
object.__setattr__(self, "sortstyle", RedshiftSortStyle.default_with_columns())
Expand All @@ -63,7 +64,7 @@ def validation_rules(self) -> Set[ValidationRule]:
return {
ValidationRule(
validation_check=not (
self.sortstyle == RedshiftSortStyle.auto and self.sortkey is not None
self.sortstyle == RedshiftSortStyle.auto and self.sortkey != frozenset()
),
validation_error=DbtRuntimeError(
"A `RedshiftSortConfig` that specifies a `sortkey` does not support the `sortstyle` of `auto`."
Expand All @@ -72,7 +73,7 @@ def validation_rules(self) -> Set[ValidationRule]:
ValidationRule(
validation_check=not (
self.sortstyle in (RedshiftSortStyle.compound, RedshiftSortStyle.interleaved)
and self.sortkey is None
and self.sortkey == frozenset()
),
validation_error=DbtRuntimeError(
"A `sortstyle` of `compound` or `interleaved` requires a `sortkey` to be provided."
Expand Down Expand Up @@ -105,12 +106,11 @@ def from_dict(cls, config_dict) -> "RedshiftSortRelation":
# don't alter the incoming config
kwargs_dict = deepcopy(config_dict)

kwargs_dict.update(
{
"sortstyle": config_dict.get("sortstyle"),
"sortkey": frozenset(column for column in config_dict.get("sortkey", {})),
}
)
if sortstyle := config_dict.get("sortstyle"):
kwargs_dict.update({"sortstyle": RedshiftSortStyle(sortstyle)})

if sortkey := config_dict.get("sortkey"):
kwargs_dict.update({"sortkey": frozenset(column for column in sortkey)})

sort = super().from_dict(kwargs_dict)
assert isinstance(sort, RedshiftSortRelation)
Expand Down Expand Up @@ -165,7 +165,7 @@ def parse_describe_relation_results(cls, describe_relation_results: agate.Row) -
Returns: a standard dictionary describing this `RedshiftSortConfig` instance
"""
if sortkey := describe_relation_results.get("sortkey1"):
if sortkey := describe_relation_results.get("sortkey"):
return {"sortkey": {sortkey}}
return {}

Expand Down
77 changes: 77 additions & 0 deletions dbt/include/redshift/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{% /*
Ideally we don't overwrite materializations from dbt-core. However, the implementation of materialized views
requires this, at least for now. There are two issues that lead to this. First, Redshift does not support
the renaming of materialized views. That means we cannot back them up when replacing them. If the relation
that's replacing it is another materialized view, we can control for that since the materialization for
materialized views in dbt-core is flexible. That brings us to the second issue. The materialization for table
has the backup/deploy portion built into it; it's one single macro; replacing that has two options. We
can either break apart the macro in dbt-core, which could have unintended downstream effects for all
adapters. Or we can copy this here and keep it up to date with dbt-core until we resolve the larger issue.
We chose to go with the latter.
*/ %}

{% materialization table, adapter='redshift', supported_languages=['sql'] %}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') %}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
-- the intermediate_relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
/*
See ../view/view.sql for more information about this relation.
*/
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
-- as above, the backup_relation should not already exist
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}

-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}

-- cleanup -- this should be the only piece that differs from dbt-core
{% if existing_relation is not none %}
{% if existing_relation.type == 'materialized_view' %}
{{ drop_relation_if_exists(existing_relation) }}
{% else %}
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{% do create_indexes(target_relation) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Loading

0 comments on commit 2437891

Please sign in to comment.