Compare commits

...

8 Commits

Author SHA1 Message Date
Dave Connors
3cc56c79cc incomplete 9/6 2021-09-07 08:50:06 -05:00
Dave Connors
a2fb706fbf update snowflake behavior 2021-09-06 18:07:29 -05:00
Dave Connors
ccb95e0f35 default predicates case 2021-08-23 11:16:35 -05:00
Dave Connors
bb5f259042 update bq macros 2021-08-11 09:27:00 -05:00
Dave Connors
03923dca07 add args to all macros 2021-08-10 16:21:27 -05:00
Dave Connors
603230c6a7 WIP dictionary config 2021-08-09 11:48:57 -05:00
Dave Connors
884d476221 update snowflake macros 2021-08-06 17:15:50 -05:00
Dave Connors
2c43d71169 jinja updates 2021-08-06 14:22:45 -05:00
8 changed files with 161 additions and 47 deletions

View File

@@ -406,6 +406,7 @@ class NodeConfig(BaseConfig):
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'
incremental_predicates: Optional[List[Dict[str, Any]]] = None
@classmethod
def __pre_deserialize__(cls, data):

View File

@@ -0,0 +1,114 @@
{#
These macros will compile the proper join predicates for incremental models,
merging default behavior with the optional, user-supplied incremental_predicates
config.
#}
{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %}
{{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }}
{% endmacro %}
{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
{# this should be the default added to the `delete from` syntax in the basic strategy#}
{% if user_predicates %}
{%- set predicates -%}
{%- for condition in user_predicates %}
and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }}
{%- endfor %}
{%- endset -%}
{% endif %}
{{ return(predicates) }}
{% endmacro %}
{%- macro snowflake__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
{%- if incremental_strategy == 'merge'%}
{%- if unique_key -%}
{%- set match_criteria %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{%- endset -%}
{% else %}
{%- set match_criteria %}
FALSE
{%- endset -%}
{% endif %}
{%- if user_predicates -%}
{%- set filter_criteria %}
{%- for condition in user_predicates -%}
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
{%- endfor -%}
{%- endset -%}
{% endif %}
{%- elif incremental_strategy == 'delete+insert' %}
{%- if user_predicates -%}
{%- set filter_criteria %}
{%- for condition in user_predicates -%}
and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }}
{%- endfor -%}
{%- endset -%}
{% endif %}
{%- endif -%}
{{ match_criteria }}
{{ filter_criteria }}
{%- endmacro -%}
{% macro bigquery__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
{%- if incremental_strategy == 'insert_overwrite'%}
{% if partitions is not none and partitions != [] %} {# static #}
{%- set match_criteria %}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset -%}
{% else %}
{%- set match_criteria %} {# dynamic #}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset -%}
{% endif %}
{%- if user_predicates -%}
{%- set filter_criteria %}
{%- for condition in user_predicates -%}
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
{%- endfor -%}
{%- endset -%}
{% endif %}
{{ match_criteria | join(' and ') }}
{{ filter_criteria }}
{%- elif incremental_strategy == 'merge' %}
{%- if unique_key -%}
{%- set match_criteria %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{%- endset -%}
{% else %}
{%- set match_criteria %}
FALSE
{%- endset -%}
{% endif %}
{%- if user_predicates -%}
{%- set filter_criteria %}
{%- for condition in user_predicates -%}
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
{%- endfor -%}
{%- endset -%}
{% endif %}
{{ match_criteria }}
{{ filter_criteria }}
{%- endif -%}
{%- endmacro -%}

View File

@@ -1,12 +1,12 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_merge_sql')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql')(target, source, unique_key, dest_columns) }}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}
@@ -16,25 +16,16 @@
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{% if unique_key %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}
{{ sql_header if sql_header is not none }}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
on {{ predicates }}
{% if unique_key %}
when matched then update set
@@ -63,7 +54,7 @@
{% endmacro %}
{% macro common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{% macro common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
@@ -72,7 +63,11 @@
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);
)
{{ predicates }}
;
{% endif %}
insert into {{ target }} ({{ dest_cols_csv }})
@@ -83,13 +78,12 @@
{%- endmacro %}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) }}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) }}
{% endmacro %}
{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}
@@ -100,7 +94,7 @@
on FALSE
when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
{% if predicates %} and {{ predicates }} {% endif %}
then delete
when not matched then insert

View File

@@ -1,5 +1,5 @@
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main", predicates=none) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
@@ -10,7 +10,10 @@
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation }}
);
)
{%- if predicates %}
{{ predicates }}
{%- endif %};
{%- endif %}
insert into {{ target_relation }} ({{ dest_cols_csv }})

View File

@@ -9,7 +9,10 @@
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set user_predicates = config.get('incremental_predicates', default=None) %}
{% set incremental_strategy = config.get('incremental_strategy') %}
{% set predicates = get_incremental_predicates(target_relation,incremental_strategy,unique_key,user_predicates) %}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
@@ -54,8 +57,11 @@
from_relation=tmp_relation,
to_relation=target_relation) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% set build_sql = incremental_upsert(
tmp_relation,
target_relation,
unique_key=unique_key,
predicates=predicates) %}
{% endif %}
{% call statement("main") %}

View File

@@ -16,31 +16,21 @@
{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, predicates
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}
{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, predicates=predicates, include_sql_header=true) }}
{% else %} {# dynamic #}
{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
{%- set source_sql -%}
(
select * from {{ tmp_relation }}
@@ -74,7 +64,7 @@
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, predicates, include_sql_header=false) }};
-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
@@ -85,7 +75,7 @@
{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates=none
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}
@@ -98,7 +88,7 @@
{% endif %}
{% set build_sql = bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, predicates
) %}
{% else %} {# strategy == 'merge' #}
@@ -114,7 +104,7 @@
{%- endif -%}
{%- endset -%}
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %}
{% endif %}
@@ -138,9 +128,13 @@
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}
{% set user_predicates = config.get('incremental_predicates', default = None) %}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions) %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is none %}
@@ -169,7 +163,7 @@
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates
) %}
{% endif %}

View File

@@ -14,11 +14,11 @@
{% do return(strategy) %}
{% endmacro %}
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns, predicates) %}
{% if strategy == 'merge' %}
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates)) %}
{% elif strategy == 'delete+insert' %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}
@@ -30,6 +30,7 @@
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set user_predicates = config.get('incremental_predicates', default = None) %}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
@@ -38,6 +39,7 @@
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set predicates = get_incremental_predicates(target_relation=target_relation, incremental_strategy=strategy, unique_key=unique_key, user_predicates=user_predicates) %}
{{ run_hooks(pre_hooks) }}
@@ -60,7 +62,7 @@
to_relation=target_relation) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns, predicates=predicates) %}
{% endif %}

View File

@@ -32,8 +32,8 @@
{% endmacro %}
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}