Compare commits

...

2 Commits

Author SHA1 Message Date
Dave Connors
c0cd2dade6 whitespace control 2022-01-03 17:09:15 -06:00
Dave Connors
ab6aa90939 postgres support for predicates 2022-01-03 16:07:59 -06:00
6 changed files with 55 additions and 20 deletions

View File

@@ -410,6 +410,7 @@ class NodeConfig(NodeAndTestConfig):
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'
incremental_predicates: Optional[List[Dict[str, Any]]] = None
@classmethod
def __pre_deserialize__(cls, data):

View File

@@ -0,0 +1,28 @@
{#
These macros will compile the proper join predicates for incremental models,
merging default behavior with the optional, user-supplied incremental_predicates
config.
#}
{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %}
{{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }}
{% endmacro %}
{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
{#
This behavior should only be observed when dbt calls the default
`get_delete_insert_merge_sql` strategy in dbt-core
#}
{%- if user_predicates -%}
{%- set predicates %}
{%- for condition in user_predicates -%} and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }} {% endfor -%}
{%- endset -%}
{%- endif -%}
{{ return(predicates) }}
{% endmacro %}

View File

@@ -9,6 +9,9 @@
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set user_predicates = config.get('incremental_predicates', default=None) %}
{% set incremental_strategy = config.get('incremental_strategy', default=None) %}
{% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates) %}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
@@ -58,7 +61,7 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates) %}
{% endif %}

View File

@@ -1,27 +1,17 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{% if unique_key %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}
{{ sql_header if sql_header is not none }}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
on {{ predicates }}
{% if unique_key %}
when matched then update set
@@ -39,11 +29,11 @@
{% endmacro %}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
@@ -52,8 +42,11 @@
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);
{% endif %}
)
{{ predicates }}
;
{%- endif %}
insert into {{ target }} ({{ dest_cols_csv }})
(
@@ -69,7 +62,6 @@
{%- endmacro %}
{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}
@@ -80,7 +72,7 @@
on FALSE
when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
{% if predicates %} and {{ predicates }} {% endif %}
then delete
when not matched then insert

View File

@@ -460,6 +460,7 @@ class TestDocsGenerate(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'meta': {},
}
result.update(updates)
@@ -481,6 +482,7 @@ class TestDocsGenerate(DBTIntegrationTest):
'quote_columns': True,
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -509,6 +511,7 @@ class TestDocsGenerate(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'strategy': 'check',
'check_cols': 'all',
'unique_key': 'id',

View File

@@ -93,6 +93,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'alias': None,
'check_cols': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'meta': {},
},
'unique_id': 'snapshot.test.my_snapshot',
@@ -124,6 +125,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -159,6 +161,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -185,6 +188,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'incremental_strategy': 'delete+insert',
'database': None,
'schema': None,
@@ -212,6 +216,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -238,6 +243,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -275,6 +281,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
@@ -333,6 +340,7 @@ class TestStrictUndefined(DBTIntegrationTest):
'quote_columns': False,
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,