mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-20 22:21:28 +00:00
Compare commits
2 Commits
enable-pos
...
jerco-4170
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b26d5a7743 | ||
|
|
cbd378a549 |
@@ -11,10 +11,12 @@
|
||||
### Under the hood
|
||||
- Bump artifact schema versions for 1.0.0: manifest v4, run results v4, sources v3. Notable changes: schema test + data test nodes are renamed to generic test + singular test nodes; freshness threshold default values ([#4191](https://github.com/dbt-labs/dbt-core/pull/4191))
|
||||
- Speed up node selection by skipping `incorporate_indirect_nodes` if not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213), [#4214](https://github.com/dbt-labs/dbt-core/issues/4214))
|
||||
- [SF, BQ] When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144))
|
||||
|
||||
Contributors:
|
||||
- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955))
|
||||
- [@frankcash](https://github.com/frankcash) ([4136](https://github.com/dbt-labs/dbt-core/pull/4136))
|
||||
- [@Kayrnt](https://github.com/Kayrnt) ([4136](https://github.com/dbt-labs/dbt-core/pull/4170))
|
||||
|
||||
## dbt-core 1.0.0b2 (October 25, 2021)
|
||||
|
||||
|
||||
@@ -53,8 +53,13 @@
|
||||
{% do adapter.expand_target_column_types(
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
|
||||
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
|
||||
{% set schema_changes_dict = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set dest_columns = schema_changes_dict.get('source_columns') %}
|
||||
{% if not dest_columns %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
|
||||
{% endif %}
|
||||
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro diff_columns(source_columns, target_columns) %}
|
||||
{% macro compare_columns(source_columns, target_columns, should_include) %}
|
||||
|
||||
{% set result = [] %}
|
||||
{% set source_names = source_columns | map(attribute = 'column') | list %}
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
{# --check whether the name attribute exists in the target - this does not perform a data type check #}
|
||||
{% for sc in source_columns %}
|
||||
{% if sc.name not in target_names %}
|
||||
{% if (sc.name in target_names) == should_include %}
|
||||
{{ result.append(sc) }}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
@@ -32,6 +32,14 @@
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro diff_columns(source_columns, target_columns) %}
|
||||
{{ return(compare_columns(source_columns, target_columns, false) ) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro intersect_columns(source_columns, target_columns) %}
|
||||
{{ return(compare_columns(source_columns, target_columns, true) ) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro diff_column_data_types(source_columns, target_columns) %}
|
||||
|
||||
{% set result = [] %}
|
||||
@@ -57,7 +65,8 @@
|
||||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
|
||||
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}
|
||||
|
||||
{%- set in_target_and_source = intersect_columns(target_columns, source_columns) -%}
|
||||
|
||||
{% set new_target_types = diff_column_data_types(source_columns, target_columns) %}
|
||||
|
||||
{% if source_not_in_target != [] %}
|
||||
@@ -72,6 +81,9 @@
|
||||
'schema_changed': schema_changed,
|
||||
'source_not_in_target': source_not_in_target,
|
||||
'target_not_in_source': target_not_in_source,
|
||||
'in_target_and_source': in_target_and_source,
|
||||
'source_columns': source_columns,
|
||||
'target_columns': target_columns,
|
||||
'new_target_types': new_target_types
|
||||
} %}
|
||||
|
||||
@@ -132,7 +144,11 @@
|
||||
|
||||
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
|
||||
|
||||
{% if on_schema_change != 'ignore' %}
|
||||
{% if on_schema_change == 'ignore' %}
|
||||
|
||||
{{ return({}) }}
|
||||
|
||||
{% else %}
|
||||
|
||||
{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %}
|
||||
|
||||
@@ -158,6 +174,8 @@
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{{ return(schema_changes_dict) }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
{{
|
||||
config(
|
||||
materialized='incremental',
|
||||
unique_key='id',
|
||||
on_schema_change='append_new_columns'
|
||||
)
|
||||
}}
|
||||
|
||||
{% set string_type = 'varchar(10)' %}
|
||||
|
||||
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
|
||||
|
||||
{% if is_incremental() %}
|
||||
|
||||
SELECT id,
|
||||
cast(field1 as {{string_type}}) as field1,
|
||||
cast(field3 as {{string_type}}) as field3,
|
||||
cast(field4 as {{string_type}}) as field4
|
||||
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
|
||||
|
||||
{% else %}
|
||||
|
||||
SELECT id,
|
||||
cast(field1 as {{string_type}}) as field1,
|
||||
cast(field2 as {{string_type}}) as field2
|
||||
FROM source_data where id <= 3
|
||||
|
||||
{% endif %}
|
||||
@@ -0,0 +1,19 @@
|
||||
{{
|
||||
config(materialized='table')
|
||||
}}
|
||||
|
||||
{% set string_type = 'varchar(10)' %}
|
||||
|
||||
with source_data as (
|
||||
|
||||
select * from {{ ref('model_a') }}
|
||||
|
||||
)
|
||||
|
||||
select id,
|
||||
cast(field1 as {{string_type}}) as field1,
|
||||
cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as {{string_type}}) AS field2,
|
||||
cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3,
|
||||
cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4
|
||||
|
||||
from source_data
|
||||
@@ -44,6 +44,12 @@ class TestIncrementalSchemaChange(DBTIntegrationTest):
|
||||
compare_source = 'incremental_append_new_columns'
|
||||
compare_target = 'incremental_append_new_columns_target'
|
||||
self.run_twice_and_assert(select, compare_source, compare_target)
|
||||
|
||||
def run_incremental_append_new_columns_remove_one(self):
|
||||
select = 'model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target'
|
||||
compare_source = 'incremental_append_new_columns_remove_one'
|
||||
compare_target = 'incremental_append_new_columns_remove_one_target'
|
||||
self.run_twice_and_assert(select, compare_source, compare_target)
|
||||
|
||||
def run_incremental_sync_all_columns(self):
|
||||
select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target'
|
||||
@@ -70,6 +76,7 @@ class TestIncrementalSchemaChange(DBTIntegrationTest):
|
||||
@use_profile('postgres')
|
||||
def test__postgres__run_incremental_append_new_columns(self):
|
||||
self.run_incremental_append_new_columns()
|
||||
self.run_incremental_append_new_columns_remove_one()
|
||||
|
||||
@use_profile('postgres')
|
||||
def test__postgres__run_incremental_sync_all_columns(self):
|
||||
|
||||
Reference in New Issue
Block a user