mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
8 Commits
remove-ssl
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cc56c79cc | ||
|
|
a2fb706fbf | ||
|
|
ccb95e0f35 | ||
|
|
bb5f259042 | ||
|
|
03923dca07 | ||
|
|
603230c6a7 | ||
|
|
884d476221 | ||
|
|
2c43d71169 |
@@ -406,6 +406,7 @@ class NodeConfig(BaseConfig):
|
||||
)
|
||||
full_refresh: Optional[bool] = None
|
||||
on_schema_change: Optional[str] = 'ignore'
|
||||
incremental_predicates: Optional[List[Dict[str, Any]]] = None
|
||||
|
||||
@classmethod
|
||||
def __pre_deserialize__(cls, data):
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
{#
|
||||
|
||||
These macros will compile the proper join predicates for incremental models,
|
||||
merging default behavior with the optional, user-supplied incremental_predicates
|
||||
config.
|
||||
|
||||
#}
|
||||
|
||||
|
||||
{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %}
|
||||
{{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
|
||||
{# this should be the default added to the `delete from` syntax in the basic strategy#}
|
||||
{% if user_predicates %}
|
||||
{%- set predicates -%}
|
||||
{%- for condition in user_predicates %}
|
||||
and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }}
|
||||
{%- endfor %}
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{{ return(predicates) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{%- macro snowflake__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
|
||||
|
||||
{%- if incremental_strategy == 'merge'%}
|
||||
{%- if unique_key -%}
|
||||
{%- set match_criteria %}
|
||||
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
|
||||
{%- endset -%}
|
||||
{% else %}
|
||||
{%- set match_criteria %}
|
||||
FALSE
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{%- if user_predicates -%}
|
||||
{%- set filter_criteria %}
|
||||
{%- for condition in user_predicates -%}
|
||||
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
|
||||
{%- endfor -%}
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
{%- elif incremental_strategy == 'delete+insert' %}
|
||||
{%- if user_predicates -%}
|
||||
{%- set filter_criteria %}
|
||||
{%- for condition in user_predicates -%}
|
||||
and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }}
|
||||
{%- endfor -%}
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
{%- endif -%}
|
||||
|
||||
{{ match_criteria }}
|
||||
{{ filter_criteria }}
|
||||
|
||||
{%- endmacro -%}
|
||||
|
||||
{% macro bigquery__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
|
||||
|
||||
{%- if incremental_strategy == 'insert_overwrite'%}
|
||||
{% if partitions is not none and partitions != [] %} {# static #}
|
||||
{%- set match_criteria %}
|
||||
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
|
||||
{{ partitions | join (', ') }}
|
||||
)
|
||||
{%- endset -%}
|
||||
{% else %}
|
||||
{%- set match_criteria %} {# dynamic #}
|
||||
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{%- if user_predicates -%}
|
||||
{%- set filter_criteria %}
|
||||
{%- for condition in user_predicates -%}
|
||||
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
|
||||
{%- endfor -%}
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{{ match_criteria | join(' and ') }}
|
||||
{{ filter_criteria }}
|
||||
|
||||
|
||||
{%- elif incremental_strategy == 'merge' %}
|
||||
{%- if unique_key -%}
|
||||
{%- set match_criteria %}
|
||||
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
|
||||
{%- endset -%}
|
||||
{% else %}
|
||||
{%- set match_criteria %}
|
||||
FALSE
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{%- if user_predicates -%}
|
||||
{%- set filter_criteria %}
|
||||
{%- for condition in user_predicates -%}
|
||||
and DBT_INTERNAL_DEST.{{ condition.source_col }} {{ condition.expression }}
|
||||
{%- endfor -%}
|
||||
{%- endset -%}
|
||||
{% endif %}
|
||||
|
||||
{{ match_criteria }}
|
||||
{{ filter_criteria }}
|
||||
|
||||
{%- endif -%}
|
||||
|
||||
{%- endmacro -%}
|
||||
@@ -1,12 +1,12 @@
|
||||
|
||||
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{{ adapter.dispatch('get_merge_sql')(target, source, unique_key, dest_columns, predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql')(target, source, unique_key, dest_columns) }}
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql')(target, source, unique_key, dest_columns, predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
@@ -16,25 +16,16 @@
|
||||
|
||||
|
||||
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{%- set predicates = [] if predicates is none else [] + predicates -%}
|
||||
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
|
||||
{%- set sql_header = config.get('sql_header', none) -%}
|
||||
|
||||
{% if unique_key %}
|
||||
{% set unique_key_match %}
|
||||
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
|
||||
{% endset %}
|
||||
{% do predicates.append(unique_key_match) %}
|
||||
{% else %}
|
||||
{% do predicates.append('FALSE') %}
|
||||
{% endif %}
|
||||
|
||||
{{ sql_header if sql_header is not none }}
|
||||
|
||||
merge into {{ target }} as DBT_INTERNAL_DEST
|
||||
using {{ source }} as DBT_INTERNAL_SOURCE
|
||||
on {{ predicates | join(' and ') }}
|
||||
on {{ predicates }}
|
||||
|
||||
{% if unique_key %}
|
||||
when matched then update set
|
||||
@@ -63,7 +54,7 @@
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{% macro common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
|
||||
@@ -72,7 +63,11 @@
|
||||
where ({{ unique_key }}) in (
|
||||
select ({{ unique_key }})
|
||||
from {{ source }}
|
||||
);
|
||||
)
|
||||
|
||||
{{ predicates }}
|
||||
|
||||
;
|
||||
{% endif %}
|
||||
|
||||
insert into {{ target }} ({{ dest_cols_csv }})
|
||||
@@ -83,13 +78,12 @@
|
||||
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) }}
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) }}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
|
||||
{%- set predicates = [] if predicates is none else [] + predicates -%}
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
{%- set sql_header = config.get('sql_header', none) -%}
|
||||
|
||||
@@ -100,7 +94,7 @@
|
||||
on FALSE
|
||||
|
||||
when not matched by source
|
||||
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
|
||||
{% if predicates %} and {{ predicates }} {% endif %}
|
||||
then delete
|
||||
|
||||
when not matched then insert
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
|
||||
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main", predicates=none) %}
|
||||
|
||||
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
|
||||
@@ -10,7 +10,10 @@
|
||||
where ({{ unique_key }}) in (
|
||||
select ({{ unique_key }})
|
||||
from {{ tmp_relation }}
|
||||
);
|
||||
)
|
||||
{%- if predicates %}
|
||||
{{ predicates }}
|
||||
{%- endif %};
|
||||
{%- endif %}
|
||||
|
||||
insert into {{ target_relation }} ({{ dest_cols_csv }})
|
||||
|
||||
@@ -9,7 +9,10 @@
|
||||
{%- set full_refresh_mode = (should_full_refresh()) -%}
|
||||
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
|
||||
{% set user_predicates = config.get('incremental_predicates', default=None) %}
|
||||
{% set incremental_strategy = config.get('incremental_strategy') %}
|
||||
|
||||
{% set predicates = get_incremental_predicates(target_relation,incremental_strategy,unique_key,user_predicates) %}
|
||||
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
|
||||
{% set backup_identifier = model['name'] + "__dbt_backup" %}
|
||||
|
||||
@@ -54,8 +57,11 @@
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
|
||||
|
||||
{% set build_sql = incremental_upsert(
|
||||
tmp_relation,
|
||||
target_relation,
|
||||
unique_key=unique_key,
|
||||
predicates=predicates) %}
|
||||
{% endif %}
|
||||
|
||||
{% call statement("main") %}
|
||||
|
||||
@@ -16,31 +16,21 @@
|
||||
|
||||
|
||||
{% macro bq_insert_overwrite(
|
||||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
|
||||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, predicates
|
||||
) %}
|
||||
|
||||
{% if partitions is not none and partitions != [] %} {# static #}
|
||||
|
||||
{% set predicate -%}
|
||||
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
|
||||
{{ partitions | join (', ') }}
|
||||
)
|
||||
{%- endset %}
|
||||
|
||||
{%- set source_sql -%}
|
||||
(
|
||||
{{sql}}
|
||||
)
|
||||
{%- endset -%}
|
||||
|
||||
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
|
||||
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, predicates=predicates, include_sql_header=true) }}
|
||||
|
||||
{% else %} {# dynamic #}
|
||||
|
||||
{% set predicate -%}
|
||||
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
|
||||
{%- endset %}
|
||||
|
||||
{%- set source_sql -%}
|
||||
(
|
||||
select * from {{ tmp_relation }}
|
||||
@@ -74,7 +64,7 @@
|
||||
the sql_header at the materialization-level instead
|
||||
#}
|
||||
-- 3. run the merge statement
|
||||
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};
|
||||
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, predicates, include_sql_header=false) }};
|
||||
|
||||
-- 4. clean up the temp table
|
||||
drop table if exists {{ tmp_relation }}
|
||||
@@ -85,7 +75,7 @@
|
||||
|
||||
|
||||
{% macro bq_generate_incremental_build_sql(
|
||||
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
|
||||
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates=none
|
||||
) %}
|
||||
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
|
||||
{% if strategy == 'insert_overwrite' %}
|
||||
@@ -98,7 +88,7 @@
|
||||
{% endif %}
|
||||
|
||||
{% set build_sql = bq_insert_overwrite(
|
||||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
|
||||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, predicates
|
||||
) %}
|
||||
|
||||
{% else %} {# strategy == 'merge' #}
|
||||
@@ -114,7 +104,7 @@
|
||||
{%- endif -%}
|
||||
{%- endset -%}
|
||||
|
||||
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}
|
||||
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
@@ -138,9 +128,13 @@
|
||||
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
|
||||
{%- set partitions = config.get('partitions', none) -%}
|
||||
{%- set cluster_by = config.get('cluster_by', none) -%}
|
||||
{% set user_predicates = config.get('incremental_predicates', default = None) %}
|
||||
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
|
||||
|
||||
{% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions) %}
|
||||
|
||||
|
||||
{{ run_hooks(pre_hooks) }}
|
||||
|
||||
{% if existing_relation is none %}
|
||||
@@ -169,7 +163,7 @@
|
||||
|
||||
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
|
||||
{% set build_sql = bq_generate_incremental_build_sql(
|
||||
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
|
||||
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_predicates
|
||||
) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
@@ -14,11 +14,11 @@
|
||||
{% do return(strategy) %}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns, predicates) %}
|
||||
{% if strategy == 'merge' %}
|
||||
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
|
||||
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates)) %}
|
||||
{% elif strategy == 'delete+insert' %}
|
||||
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
|
||||
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates)) %}
|
||||
{% else %}
|
||||
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
|
||||
{% endif %}
|
||||
@@ -30,6 +30,7 @@
|
||||
|
||||
{%- set unique_key = config.get('unique_key') -%}
|
||||
{%- set full_refresh_mode = (should_full_refresh()) -%}
|
||||
{% set user_predicates = config.get('incremental_predicates', default = None) %}
|
||||
|
||||
{% set target_relation = this %}
|
||||
{% set existing_relation = load_relation(this) %}
|
||||
@@ -38,6 +39,7 @@
|
||||
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
|
||||
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
|
||||
{% set predicates = get_incremental_predicates(target_relation=target_relation, incremental_strategy=strategy, unique_key=unique_key, user_predicates=user_predicates) %}
|
||||
|
||||
{{ run_hooks(pre_hooks) }}
|
||||
|
||||
@@ -60,7 +62,7 @@
|
||||
to_relation=target_relation) %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns, predicates=predicates) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
@@ -32,8 +32,8 @@
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
|
||||
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
|
||||
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
|
||||
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
|
||||
{% do return(snowflake_dml_explicit_transaction(dml)) %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user