mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-18 23:21:27 +00:00
Compare commits
2 Commits
wsargent/a
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0cd2dade6 | ||
|
|
ab6aa90939 |
@@ -410,6 +410,7 @@ class NodeConfig(NodeAndTestConfig):
|
||||
)
|
||||
full_refresh: Optional[bool] = None
|
||||
on_schema_change: Optional[str] = 'ignore'
|
||||
incremental_predicates: Optional[List[Dict[str, Any]]] = None
|
||||
|
||||
@classmethod
|
||||
def __pre_deserialize__(cls, data):
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
{#
|
||||
|
||||
These macros will compile the proper join predicates for incremental models,
|
||||
merging default behavior with the optional, user-supplied incremental_predicates
|
||||
config.
|
||||
|
||||
#}
|
||||
|
||||
{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %}
|
||||
{{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
|
||||
{#
|
||||
|
||||
This behavior should only be observed when dbt calls the default
|
||||
`get_delete_insert_merge_sql` strategy in dbt-core
|
||||
|
||||
#}
|
||||
{%- if user_predicates -%}
|
||||
{%- set predicates %}
|
||||
{%- for condition in user_predicates -%} and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }} {% endfor -%}
|
||||
{%- endset -%}
|
||||
{%- endif -%}
|
||||
|
||||
{{ return(predicates) }}
|
||||
|
||||
{% endmacro %}
|
||||
@@ -9,6 +9,9 @@
|
||||
{%- set full_refresh_mode = (should_full_refresh()) -%}
|
||||
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
|
||||
{% set user_predicates = config.get('incremental_predicates', default=None) %}
|
||||
{% set incremental_strategy = config.get('incremental_strategy', default=None) %}
|
||||
{% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates) %}
|
||||
|
||||
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
|
||||
{% set backup_identifier = model['name'] + "__dbt_backup" %}
|
||||
@@ -58,7 +61,7 @@
|
||||
{% if not dest_columns %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
|
||||
{% endif %}
|
||||
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
|
||||
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
@@ -1,27 +1,17 @@
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{%- set predicates = [] if predicates is none else [] + predicates -%}
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
|
||||
{%- set sql_header = config.get('sql_header', none) -%}
|
||||
|
||||
{% if unique_key %}
|
||||
{% set unique_key_match %}
|
||||
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
|
||||
{% endset %}
|
||||
{% do predicates.append(unique_key_match) %}
|
||||
{% else %}
|
||||
{% do predicates.append('FALSE') %}
|
||||
{% endif %}
|
||||
|
||||
{{ sql_header if sql_header is not none }}
|
||||
|
||||
merge into {{ target }} as DBT_INTERNAL_DEST
|
||||
using {{ source }} as DBT_INTERNAL_SOURCE
|
||||
on {{ predicates | join(' and ') }}
|
||||
on {{ predicates }}
|
||||
|
||||
{% if unique_key %}
|
||||
when matched then update set
|
||||
@@ -39,11 +29,11 @@
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }}
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
|
||||
@@ -52,8 +42,11 @@
|
||||
where ({{ unique_key }}) in (
|
||||
select ({{ unique_key }})
|
||||
from {{ source }}
|
||||
);
|
||||
{% endif %}
|
||||
)
|
||||
{{ predicates }}
|
||||
;
|
||||
|
||||
{%- endif %}
|
||||
|
||||
insert into {{ target }} ({{ dest_cols_csv }})
|
||||
(
|
||||
@@ -69,7 +62,6 @@
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
|
||||
{%- set predicates = [] if predicates is none else [] + predicates -%}
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
{%- set sql_header = config.get('sql_header', none) -%}
|
||||
|
||||
@@ -80,7 +72,7 @@
|
||||
on FALSE
|
||||
|
||||
when not matched by source
|
||||
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
|
||||
{% if predicates %} and {{ predicates }} {% endif %}
|
||||
then delete
|
||||
|
||||
when not matched then insert
|
||||
|
||||
@@ -460,6 +460,7 @@ class TestDocsGenerate(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'meta': {},
|
||||
}
|
||||
result.update(updates)
|
||||
@@ -481,6 +482,7 @@ class TestDocsGenerate(DBTIntegrationTest):
|
||||
'quote_columns': True,
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -509,6 +511,7 @@ class TestDocsGenerate(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'strategy': 'check',
|
||||
'check_cols': 'all',
|
||||
'unique_key': 'id',
|
||||
|
||||
@@ -93,6 +93,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'alias': None,
|
||||
'check_cols': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'meta': {},
|
||||
},
|
||||
'unique_id': 'snapshot.test.my_snapshot',
|
||||
@@ -124,6 +125,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -159,6 +161,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -185,6 +188,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'incremental_strategy': 'delete+insert',
|
||||
'database': None,
|
||||
'schema': None,
|
||||
@@ -212,6 +216,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -238,6 +243,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -275,6 +281,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'persist_docs': {},
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
@@ -333,6 +340,7 @@ class TestStrictUndefined(DBTIntegrationTest):
|
||||
'quote_columns': False,
|
||||
'full_refresh': None,
|
||||
'on_schema_change': 'ignore',
|
||||
'incremental_predicates': None,
|
||||
'database': None,
|
||||
'schema': None,
|
||||
'alias': None,
|
||||
|
||||
Reference in New Issue
Block a user