diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index fac57744..c8eef330 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -627,7 +627,7 @@ def delta_update_manifest(self, target_relation, custom_location, partition_by): self._update_additional_location(target_relation, location) @available - def delta_create_table(self, target_relation, request, primary_key, partition_key, custom_location): + def delta_create_table(self, target_relation, request, primary_key, partition_key, custom_location, delta_create_table_write_options=None): session, client = self.get_connection() logger.debug(request) @@ -636,6 +636,11 @@ def delta_create_table(self, target_relation, request, primary_key, partition_ke location = f"{session.credentials.location}/{target_relation.schema}/{target_relation.name}" else: location = custom_location + + options_string = "" + if delta_create_table_write_options: + for key, value in delta_create_table_write_options.items(): + options_string += f'.option("{key}", "{value}")' create_table_query = f""" CREATE TABLE {table_name} @@ -647,7 +652,7 @@ def delta_create_table(self, target_relation, request, primary_key, partition_ke custom_glue_code_for_dbt_adapter spark.sql(""" {request} -""").write.format("delta").mode("overwrite")''' +""").write.format("delta").mode("overwrite"){options_string}''' write_data_footer = f'''.save("{location}") SqlWrapper2.execute("""select 1""") diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 7c0da215..8b7c185a 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -19,6 +19,7 @@ {%- set custom_location = config.get('custom_location', default='empty') -%} {%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%} {%- set table_properties = config.get('table_properties', default='empty') -%} + {%- set delta_create_table_write_options = config.get('write_options', default={}) -%} {% set target_relation = this %} {% set existing_relation_type = adapter.get_table_type(target_relation) %} @@ -45,7 +46,7 @@ {% endif %} {% if existing_relation_type is none %} {% if file_format == 'delta' %} - {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} + {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} {% elif file_format == 'iceberg' %} {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }} @@ -56,7 +57,7 @@ {% elif existing_relation_type == 'view' or should_full_refresh() %} {{ drop_relation(target_relation) }} {% if file_format == 'delta' %} - {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location) }} + {{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }} {% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %} {% elif file_format == 'iceberg' %} {{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}