Compare commits

...

2 Commits

Author SHA1 Message Date
Jeremy Cohen
b26d5a7743 on_schema_change:append_new_columns should gracefully handle column removal 2021-11-07 14:06:53 +01:00
Christophe Oudar
cbd378a549 Use common columns for incremental schema changes 2021-11-06 02:24:33 +01:00
6 changed files with 85 additions and 6 deletions

View File

@@ -11,10 +11,12 @@
### Under the hood
- Bump artifact schema versions for 1.0.0: manifest v4, run results v4, sources v3. Notable changes: schema test + data test nodes are renamed to generic test + singular test nodes; freshness threshold default values ([#4191](https://github.com/dbt-labs/dbt-core/pull/4191))
- Speed up node selection by skipping `incorporate_indirect_nodes` if not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213), [#4214](https://github.com/dbt-labs/dbt-core/issues/4214))
- [SF, BQ] When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144))
Contributors:
- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955))
- [@frankcash](https://github.com/frankcash) ([4136](https://github.com/dbt-labs/dbt-core/pull/4136))
- [@Kayrnt](https://github.com/Kayrnt) ([4136](https://github.com/dbt-labs/dbt-core/pull/4170))
## dbt-core 1.0.0b2 (October 25, 2021)

View File

@@ -53,8 +53,13 @@
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set schema_changes_dict = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = schema_changes_dict.get('source_columns') %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% endif %}

View File

@@ -15,7 +15,7 @@
{% endmacro %}
{% macro diff_columns(source_columns, target_columns) %}
{% macro compare_columns(source_columns, target_columns, should_include) %}
{% set result = [] %}
{% set source_names = source_columns | map(attribute = 'column') | list %}
@@ -23,7 +23,7 @@
{# --check whether the name attribute exists in the target - this does not perform a data type check #}
{% for sc in source_columns %}
{% if sc.name not in target_names %}
{% if (sc.name in target_names) == should_include %}
{{ result.append(sc) }}
{% endif %}
{% endfor %}
@@ -32,6 +32,14 @@
{% endmacro %}
{% macro diff_columns(source_columns, target_columns) %}
{{ return(compare_columns(source_columns, target_columns, false) ) }}
{% endmacro %}
{% macro intersect_columns(source_columns, target_columns) %}
{{ return(compare_columns(source_columns, target_columns, true) ) }}
{% endmacro %}
{% macro diff_column_data_types(source_columns, target_columns) %}
{% set result = [] %}
@@ -57,7 +65,8 @@
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}
{%- set in_target_and_source = intersect_columns(target_columns, source_columns) -%}
{% set new_target_types = diff_column_data_types(source_columns, target_columns) %}
{% if source_not_in_target != [] %}
@@ -72,6 +81,9 @@
'schema_changed': schema_changed,
'source_not_in_target': source_not_in_target,
'target_not_in_source': target_not_in_source,
'in_target_and_source': in_target_and_source,
'source_columns': source_columns,
'target_columns': target_columns,
'new_target_types': new_target_types
} %}
@@ -132,7 +144,11 @@
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
{% if on_schema_change != 'ignore' %}
{% if on_schema_change == 'ignore' %}
{{ return({}) }}
{% else %}
{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %}
@@ -158,6 +174,8 @@
{% endif %}
{% endif %}
{{ return(schema_changes_dict) }}
{% endif %}

View File

@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns'
)
}}
{% set string_type = 'varchar(10)' %}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3
{% endif %}

View File

@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}
{% set string_type = 'varchar(10)' %}
with source_data as (
select * from {{ ref('model_a') }}
)
select id,
cast(field1 as {{string_type}}) as field1,
cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as {{string_type}}) AS field2,
cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3,
cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4
from source_data

View File

@@ -44,6 +44,12 @@ class TestIncrementalSchemaChange(DBTIntegrationTest):
compare_source = 'incremental_append_new_columns'
compare_target = 'incremental_append_new_columns_target'
self.run_twice_and_assert(select, compare_source, compare_target)
def run_incremental_append_new_columns_remove_one(self):
select = 'model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target'
compare_source = 'incremental_append_new_columns_remove_one'
compare_target = 'incremental_append_new_columns_remove_one_target'
self.run_twice_and_assert(select, compare_source, compare_target)
def run_incremental_sync_all_columns(self):
select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target'
@@ -70,6 +76,7 @@ class TestIncrementalSchemaChange(DBTIntegrationTest):
@use_profile('postgres')
def test__postgres__run_incremental_append_new_columns(self):
self.run_incremental_append_new_columns()
self.run_incremental_append_new_columns_remove_one()
@use_profile('postgres')
def test__postgres__run_incremental_sync_all_columns(self):