Skip to content

Commit

Permalink
Feat/migrate sql gen to macro (#461)
Browse files Browse the repository at this point in the history
* adapt macros

* add parameters and fix impl

* fix iceberg snapshot expire

* add tests

* iceberg functionnal tests

* update readme and comment

---------

Co-authored-by: Aissam Chia <[email protected]>
  • Loading branch information
aiss93 and Aissam Chia authored Oct 25, 2024
1 parent 4f1defa commit 0ea95c1
Show file tree
Hide file tree
Showing 15 changed files with 669 additions and 576 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## New version
- Allow to load big seed files
- Migrates the PySpark code for the Iceberg file format at a macro level, making the impl.py file more readable.
- Fixes the get_columns_in_relation function to work for both Iceberg and non-Iceberg tables without hard-coding the catalog name.
- Fixes the get_location table function to work for both Iceberg and non-Iceberg tables on macOS and Windows.
- Adds a helper function to retrieve the Iceberg catalog namespace from the profile.yaml file.
- Adds merge_exclude_columns and incremental_predicates features.


## v1.8.6
- Fix session provisioning timeout and delay handling
Expand Down
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,20 +607,21 @@ dbt will run an [atomic `merge` statement](https://iceberg.apache.org/docs/lates
- Iceberg also supports `insert_overwrite` and `append` strategies.
- The `warehouse` conf must be provided, but it's overwritten by the adapter `location` in your profile or `custom_location` in model configuration.
- By default, this materialization has `iceberg_expire_snapshots` set to 'True', if you need to have historical auditable changes, set: `iceberg_expire_snapshots='False'`.
- Currently, due to some dbt internal, the iceberg catalog used internally when running glue interactive sessions with dbt-glue has a hardcoded name `glue_catalog`. This name is an alias pointing to the AWS Glue Catalog but is specific to each session. If you want to interact with your data in another session without using dbt-glue (from a Glue Studio notebook, for example), you can configure another alias (ie. another name for the Iceberg Catalog). To illustrate this concept, you can set in your configuration file :
```
--conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog
```
And then run in an AWS Glue Studio Notebook a session with the following config:
- The `custom_iceberg_catalog_namespace` parameter configures the namespace for Apache Iceberg catalog integration. This parameter enables the use of Iceberg tables within your Spark application by setting up the necessary catalog configurations. **Default Value:** `glue_catalog`
When specifying a non-null and non-empty value for `custom_iceberg_catalog_namespace`, the following Spark configurations must be provided:

```
--conf spark.sql.catalog.AnotherRandomCatalogName=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.{catalog_namespace}=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.{catalog_namespace}.warehouse={warehouse_path}
--conf spark.sql.catalog.{catalog_namespace}.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.{catalog_namespace}.io-impl=org.apache.iceberg.aws.s3.S3FileIO
```
In both cases, the underlying catalog would be the AWS Glue Catalog, unique in your AWS Account and Region, and you would be able to work with the exact same data. Also make sure that if you change the name of the Glue Catalog Alias, you change it in all the other `--conf` where it's used:
When using the default value, the following spark configruation should be added to enable iceberg.
```
--conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.RandomCatalogName.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
...
--conf spark.sql.catalog.RandomCatalogName.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://your-warehouse-path
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
```
- A full reference to `table_properties` can be found [here](https://iceberg.apache.org/docs/latest/configuration/).
- Iceberg Tables are natively supported by Athena. Therefore, you can query tables created and operated with dbt-glue adapter from Athena.
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/glue/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GlueCredentials(Credentials):
datalake_formats: Optional[str] = None
enable_session_per_model: Optional[bool] = False
use_arrow: Optional[bool] = False

custom_iceberg_catalog_namespace: Optional[str] = "glue_catalog"

@property
def type(self):
Expand Down
259 changes: 51 additions & 208 deletions dbt/adapters/glue/impl.py

Large diffs are not rendered by default.

105 changes: 48 additions & 57 deletions dbt/include/glue/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
{% macro glue__location_clause(relation) %}
{% macro glue__location_clause() %}
{%- set custom_location = config.get('custom_location', validator=validation.any[basestring]) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set materialized = config.get('materialized') -%}

{%- if custom_location is not none %}
location '{{ custom_location }}'
{%- else -%}
{% if file_format == 'iceberg' %}
{{ adapter.get_iceberg_location(relation) }}
{%- else -%}
{{ adapter.get_location(relation) }}
{%- endif %}
{{ adapter.get_location(this) }}
{%- endif %}
{%- endmacro -%}

Expand All @@ -24,17 +20,27 @@
{{return('')}}
{%- endmacro %}

{% macro glue__make_target_relation(relation, file_format) %}
{%- set iceberg_catalog = adapter.get_custom_iceberg_catalog_namespace() -%}
{%- set first_iceberg_load = (file_format == 'iceberg') -%}
{%- set non_null_catalog = (iceberg_catalog is not none) -%}
{%- if non_null_catalog and first_iceberg_load %}
{# /* We add the iceberg catalog is the following cases */ #}
{%- do return(relation.incorporate(path={"schema": iceberg_catalog ~ '.' ~ relation.schema, "identifier": relation.identifier})) -%}
{%- else -%}
{# /* Otherwise we keep the relation as it is */ #}
{%- do return(relation) -%}
{%- endif %}
{% endmacro %}

{% macro glue__drop_relation(relation) -%}

{% call statement('drop_relation', auto_begin=False) -%}
{% set rel_type = adapter.get_table_type(relation) %}
{%- if rel_type is not none and rel_type != 'iceberg_table' %}
drop {{ rel_type }} if exists {{ relation }}
{%- elif rel_type is not none and rel_type == 'iceberg_table' %}
{%- set default_catalog = 'glue_catalog' -%}
drop table if exists {{ default_catalog }}.{{ relation }}
{%- else -%}
drop table if exists {{ relation }}
{%- endif %}
{%- if relation.type == 'view' %}
drop view if exists {{ this }}
{%- else -%}
drop table if exists {{ relation }}
{%- endif %}
{%- endcall %}
{% endmacro %}

Expand All @@ -59,50 +65,37 @@
{%- endmacro -%}

{% macro glue__create_table_as(temporary, relation, sql) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}

{%- set create_statement_string -%}
{% if file_format in ['delta', 'iceberg'] -%}
create or replace table
{%- else -%}
create table
{% endif %}
{%- endset %}

{% if temporary -%}
{{ create_temporary_view(relation, sql) }}
{%- else -%}
{{ create_statement_string }} {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}

{%- set create_statement_string -%}
{% if file_format in ['delta', 'iceberg'] -%}
create or replace table
{%- else -%}
create table
{% endif %}
{{ glue__file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ set_table_properties(table_properties) }}
{{ glue__location_clause(relation) }}
{{ comment_clause() }}
as
{{ sql }}
{%- endif %}
{%- endmacro -%}

{% macro glue__create_tmp_table_as(relation, sql) -%}
{% call statement("create_tmp_table_as", fetch_result=false, auto_begin=false) %}
set spark.sql.legacy.allowNonEmptyLocationInCTAS=true
dbt_next_query
DROP TABLE IF EXISTS {{ relation }}
dbt_next_query
create table {{ relation }}
{{ adapter.get_location(relation) }}
{%- endset %}

{{ create_statement_string }} {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
{% endif %}
{{ glue__file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ set_table_properties(table_properties) }}
{{ glue__location_clause() }}
{{ comment_clause() }}
as
{{ sql }}
{% endcall %}
{{ sql }}
{%- endif %}
{%- endmacro -%}

{% macro glue__snapshot_get_time() -%}
Expand Down Expand Up @@ -143,9 +136,7 @@
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
{%- endif -%}
DROP VIEW IF EXISTS {{ relation }}
dbt_next_query
create view {{ relation }}
create or replace view {{ relation }}
as
{{ sql }}
{% endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,100 +1,94 @@
{% materialization incremental, adapter='glue' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{# /*-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --*/ #}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='insert_overwrite') -%}
{%- set file_format = dbt_glue_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_glue_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{% if raw_file_format == 'iceberg' %}
{%- set file_format = 'iceberg' -%}
{%- set strategy = raw_strategy -%}
{% else %}
{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{% endif %}

{# /*-- Set vars --*/ #}
{%- set language = model['language'] -%}
{%- set existing_relation_type = adapter.get_table_type(this) -%}
{%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = existing_relation or glue__make_target_relation(this, config.get('file_format')) -%}
{%- set tmp_relation = make_temp_relation(this, '_tmp').include(schema=false) -%}
{%- set unique_key = config.get('unique_key', none) -%}
{% if unique_key is none and file_format == 'hudi' %}
{{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }}
{% endif %}

{%- set partition_by = config.get('partition_by', none) -%}
{%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%}
{%- set custom_location = config.get('custom_location', default='empty') -%}
{%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%}
{%- set table_properties = config.get('table_properties', default='empty') -%}
{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_grants = config.get('lf_grants') -%}
{%- set delta_create_table_write_options = config.get('write_options', default={}) -%}

{% set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}
{% set existing_relation_type = adapter.get_table_type(target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation, '_tmp') %}
{% set is_incremental = 'False' %}
{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_grants = config.get('lf_grants') %}
{%- set substitute_variables = config.get('substitute_variables', default=[]) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set is_incremental = 'False' -%}

{% call statement() %}
set spark.sql.autoBroadcastJoinThreshold=-1
{% endcall %}
{% if existing_relation_type is not none %}
{%- set target_relation = target_relation.incorporate(type=existing_relation_type if existing_relation_type != "iceberg_table" else "table") -%}
{% endif %}

{# /*-- Validate specific requirements for hudi --*/ #}
{% if unique_key is none and file_format == 'hudi' %}
{{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }}
{% endif %}

{# /*-- Run pre-hooks --*/ #}
{{ run_hooks(pre_hooks) }}
{%- set substitute_variables = config.get('substitute_variables', default=[]) -%}

{# /*-- Incremental Process Logic --*/ #}
{% if file_format == 'hudi' %}
{%- set hudi_options = config.get('hudi_options', default={}) -%}
{{ adapter.hudi_merge_table(target_relation, sql, unique_key, partition_by, custom_location, hudi_options, substitute_variables) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}
{% if existing_relation_type is none %}
{# /*-- If the relation doesn't exist it needs to be created --*/ #}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% endif %}
{% elif existing_relation_type == 'view' or should_full_refresh() %}
{# /*-- Relation must be dropped and created --*/ #}
{{ drop_relation(target_relation) }}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% endif %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{%- if expire_snapshots == 'True' -%}
{%- set result = adapter.iceberg_expire_snapshots(target_relation) -%}
{%- endif -%}
{% else %}
{{ glue__create_tmp_table_as(tmp_relation, sql) }}
{# /*-- Relation must be merged --*/ #}
{%- call statement('create_tmp_view') -%}
{{ create_table_as(True, tmp_relation, sql) }}
{%- endcall -%}
{% set is_incremental = 'True' %}
{% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}

{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, incremental_predicates) %}
{%- do process_schema_changes(on_schema_change, tmp_relation, target_relation) -%}
{% endif %}
{% endif %}

{# /*-- Excute the main statement --*/ #}
{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{# /*-- To not break existing workloads, but I think this doesn't need to be here since it can be addeed as post_hook query --*/ #}
{%- if file_format == 'iceberg' and expire_snapshots == 'True' -%}
{%- set result = adapter.iceberg_expire_snapshots(target_relation) -%}
{%- endif -%}

{# /*-- Run post-hooks --*/ #}
{{ run_hooks(post_hooks) }}

{# /*-- setup lake formation tags --*/ #}
{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{% endif %}

{# /*-- setup lake formation grants --*/ #}
{% if lf_grants is not none %}
{{ adapter.apply_lf_grants(target_relation, lf_grants) }}
{% endif %}
Expand Down
Loading

0 comments on commit 0ea95c1

Please sign in to comment.