mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
21 Commits
jerco/upda
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f1d65f411 | ||
|
|
9b0029f90c | ||
|
|
1430834196 | ||
|
|
7ba1286c9b | ||
|
|
3d99fa3d66 | ||
|
|
d5edd65e52 | ||
|
|
49f5319f53 | ||
|
|
86a7536fcd | ||
|
|
1549af6b3d | ||
|
|
7c7b5a07c3 | ||
|
|
335eb15b58 | ||
|
|
4ab16d5f9a | ||
|
|
c073b45914 | ||
|
|
e26c22053e | ||
|
|
59e10ce1c8 | ||
|
|
1f014decb8 | ||
|
|
8c531fbbec | ||
|
|
0be265b30c | ||
|
|
e06f09fbbf | ||
|
|
82950facfe | ||
|
|
f9f1a96cf1 |
@@ -32,6 +32,7 @@ Contributors:
|
||||
- Support disabling schema tests, and configuring tests from `dbt_project.yml` ([#3252](https://github.com/fishtown-analytics/dbt/issues/3252),
|
||||
[#3253](https://github.com/fishtown-analytics/dbt/issues/3253), [#3257](https://github.com/fishtown-analytics/dbt/pull/3257))
|
||||
- Add Jinja tag for tests ([#1173](https://github.com/fishtown-analytics/dbt/issues/1173), [#3261](https://github.com/fishtown-analytics/dbt/pull/3261))
|
||||
- Support detecting schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3288](https://github.com/fishtown-analytics/dbt/issues/3288))
|
||||
- Add native support for Postgres index creation ([#804](https://github.com/fishtown-analytics/dbt/issues/804), [3106](https://github.com/fishtown-analytics/dbt/pull/3106))
|
||||
- Less greedy test selection: expand to select unselected tests if and only if all parents are selected ([#2891](https://github.com/fishtown-analytics/dbt/issues/2891), [#3235](https://github.com/fishtown-analytics/dbt/pull/3235))
|
||||
- Prevent locks in Redshift during full refresh in incremental materialization. ([#2426](https://github.com/fishtown-analytics/dbt/issues/2426), [#2998](https://github.com/fishtown-analytics/dbt/pull/2998))
|
||||
|
||||
@@ -144,6 +144,7 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
- convert_datetime_type
|
||||
- convert_date_type
|
||||
- convert_time_type
|
||||
- regex_replace_sql
|
||||
|
||||
Macros:
|
||||
- get_catalog
|
||||
@@ -518,6 +519,20 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
'`get_columns_in_relation` is not implemented for this adapter!'
|
||||
)
|
||||
|
||||
@available.parse_list
|
||||
def incremental_remove_where(self, query):
|
||||
"""Remove the last WHERE clause from an incremental query"""
|
||||
# need to remove the last instance of the WHERE clause
|
||||
# reverse the query string to find the last WHERE
|
||||
where_idx = len(query) - (query[::-1].find('EREHW') + len('WHERE'))
|
||||
# we found a where clause so remove it
|
||||
if where_idx > 0:
|
||||
where_clause = query[where_idx:]
|
||||
return query.replace(where_clause, '')
|
||||
|
||||
# else return the original string
|
||||
return query
|
||||
|
||||
@available.deprecated('get_columns_in_relation', lambda *a, **k: [])
|
||||
def get_columns_in_table(
|
||||
self, schema: str, identifier: str
|
||||
@@ -805,6 +820,7 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
else:
|
||||
return identifier
|
||||
|
||||
|
||||
@available
|
||||
def quote_seed_column(
|
||||
self, column: str, quote_config: Optional[bool]
|
||||
|
||||
@@ -394,6 +394,7 @@ class NodeConfig(BaseConfig):
|
||||
CompareBehavior.Exclude),
|
||||
)
|
||||
full_refresh: Optional[bool] = None
|
||||
on_schema_change: str = 'ignore'
|
||||
|
||||
@classmethod
|
||||
def __pre_deserialize__(cls, data):
|
||||
|
||||
@@ -311,3 +311,35 @@
|
||||
{{ config.set('sql_header', caller()) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
|
||||
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
|
||||
|
||||
{% set sql -%}
|
||||
|
||||
alter {{ relation.type }} {{ relation }}
|
||||
{% if add_columns %}
|
||||
add
|
||||
{% endif %}
|
||||
{% for column in add_columns %}
|
||||
column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
|
||||
{% endfor %}
|
||||
|
||||
{{ ', ' if add_columns and remove_columns }}
|
||||
|
||||
{% if remove_columns %}
|
||||
drop
|
||||
{% for column in remove_columns %}
|
||||
column {{ column.name }}{{ ',' if not loop.last }}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{%- endset %}
|
||||
|
||||
{{ return(run_query(sql)) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
@@ -72,3 +72,5 @@
|
||||
{% endif %}
|
||||
{% do return(config_full_refresh) %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
|
||||
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
|
||||
|
||||
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
|
||||
|
||||
|
||||
@@ -5,6 +5,9 @@
|
||||
|
||||
{% set target_relation = this.incorporate(type='table') %}
|
||||
{% set existing_relation = load_relation(this) %}
|
||||
{%- set full_refresh_mode = (should_full_refresh()) -%}
|
||||
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
|
||||
|
||||
{{ run_hooks(pre_hooks, inside_transaction=False) }}
|
||||
|
||||
@@ -12,12 +15,46 @@
|
||||
{{ run_hooks(pre_hooks, inside_transaction=True) }}
|
||||
|
||||
{% set to_drop = [] %}
|
||||
|
||||
{# -- first check whether we want to full refresh for source view or config reasons #}
|
||||
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}
|
||||
{% do log('full refresh mode: %s' % trigger_full_refresh) %}
|
||||
|
||||
{# -- double check whether full refresh should happen if on_schema_change config is True #}
|
||||
{% if not trigger_full_refresh and on_schema_change == 'full_refresh' %}
|
||||
{%- set tmp_relation = make_temp_relation(target_relation) -%}
|
||||
{%- do run_query(create_table_as(True, tmp_relation, sql)) -%}
|
||||
|
||||
{%- set schema_changed = check_for_schema_changes(tmp_relation, target_relation) -%}
|
||||
{%- if schema_changed -%}
|
||||
{%- set trigger_full_refresh = True -%}
|
||||
{%- set sql = adapter.incremental_remove_where(sql) %}
|
||||
{%- do log('detected a schema change with on_schema_change == full_refresh, refreshing table', info=true) -%}
|
||||
|
||||
{%- endif -%}
|
||||
|
||||
{# -- BELOW CODE ALSO WORKS #}
|
||||
{# -- set source_columns = get_columns_in_query(sql) -#}
|
||||
{# -- set target_columns = get_column_names(adapter.get_columns_in_relation(target_relation)) -#}
|
||||
{# -- set source_not_in_target = diff_arrays(source_columns, target_columns) -#}
|
||||
{# -- set target_not_in_source = diff_arrays(target_columns, source_columns) -#}
|
||||
{# -- if source_not_in_target|length > 0 or target_not_in_source|length > 0 -#}
|
||||
{# -- set trigger_full_refresh = True -#}
|
||||
{# -- this removes the WHERE clause from the input model sql in the event a full refresh is needed #}
|
||||
{# -- set sql = adapter.incremental_remove_where(sql) #}
|
||||
{# -- do log('detected a schema change with on_schema_change == full_refresh, refreshing table') -#}
|
||||
{# -- endif -#}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% if existing_relation is none %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
{% elif existing_relation.is_view or should_full_refresh() %}
|
||||
|
||||
{% elif trigger_full_refresh %}
|
||||
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
|
||||
{% do log('running full refresh procedure', info=true) %}
|
||||
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
|
||||
{% set backup_identifier = model['name'] + "__dbt_backup" %}
|
||||
{% set backup_identifier = model['name'] + '__dbt_backup' %}
|
||||
|
||||
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
|
||||
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
|
||||
@@ -28,13 +65,32 @@
|
||||
{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
|
||||
{% set need_swap = true %}
|
||||
{% do to_drop.append(backup_relation) %}
|
||||
|
||||
{% else %}
|
||||
{% set tmp_relation = make_temp_relation(target_relation) %}
|
||||
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
|
||||
{% set tmp_relation = make_temp_relation(target_relation) %}
|
||||
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
|
||||
|
||||
{% if on_schema_change not in ('ignore', 'full_refresh') %}
|
||||
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
|
||||
{% do log('schema changed: %s' % schema_changed, info=true) %}
|
||||
{% if schema_changed %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
|
||||
|
||||
{% else %}
|
||||
{% do adapter.expand_target_column_types(
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
|
||||
{% endif %}
|
||||
|
||||
{% else %}
|
||||
{% do adapter.expand_target_column_types(
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% call statement("main") %}
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}
|
||||
|
||||
{% if on_schema_change not in ['full_refresh', 'sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}
|
||||
|
||||
{% set log_message = 'invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default_value) %}
|
||||
{% do log(log_message, info=true) %}
|
||||
|
||||
{{ return(default) }}
|
||||
|
||||
{% else %}
|
||||
{{ return(on_schema_change) }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro get_column_names(columns) %}
|
||||
|
||||
{% set result = [] %}
|
||||
|
||||
{% for col in columns %}
|
||||
{{ result.append(col.column) }}
|
||||
{% endfor %}
|
||||
|
||||
{{ return(result) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro diff_arrays(source_array, target_array) %}
|
||||
|
||||
{% set result = [] %}
|
||||
{%- for elem in source_array -%}
|
||||
{% if elem not in target_array %}
|
||||
|
||||
{{ result.append(elem) }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{%- endfor -%}
|
||||
|
||||
{{ return(result) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro diff_columns(source_columns, target_columns) %}
|
||||
|
||||
{% set result = [] %}
|
||||
{% set source_names = get_column_names(source_columns) %}
|
||||
{% set target_names = get_column_names(target_columns) %}
|
||||
|
||||
{# --check whether the name attribute exists in the target, but dont worry about data type differences #}
|
||||
{%- for col in source_columns -%}
|
||||
{%- if col.column not in target_names -%}
|
||||
{{ result.append(col) }}
|
||||
{%- endif -%}
|
||||
{%- endfor -%}
|
||||
|
||||
{{ return(result) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro check_for_schema_changes(source_relation, target_relation) %}
|
||||
|
||||
{% set schema_changed = False %}
|
||||
|
||||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
|
||||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
|
||||
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}
|
||||
|
||||
{% if source_not_in_target != [] %}
|
||||
{% set schema_changed = True %}
|
||||
{% elif target_not_in_source != [] %}
|
||||
{% set schema_changed = True %}
|
||||
{% endif %}
|
||||
|
||||
{{ return(schema_changed) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %}
|
||||
|
||||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
|
||||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%}
|
||||
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%}
|
||||
|
||||
-- Validates on_schema_change config vs. whether there are column differences
|
||||
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %}
|
||||
|
||||
{{
|
||||
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append. Review the schemas in the source and target relations')
|
||||
}}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
{%- if on_schema_change == 'append_new_columns' -%}
|
||||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%}
|
||||
{% elif on_schema_change == 'sync_all_columns' %}
|
||||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
|
||||
{% endif %}
|
||||
|
||||
{{
|
||||
return(
|
||||
{
|
||||
'columns_added': add_to_target_arr,
|
||||
'columns_removed': remove_from_target_arr
|
||||
}
|
||||
)
|
||||
}}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
|
||||
|
||||
{% if on_schema_change=='fail' %}
|
||||
|
||||
{{
|
||||
exceptions.raise_compiler_error('The source and target schemas on this incremental model are out of sync!
|
||||
You can specify one of ["fail", "ignore", "add_new_columns", "sync_all_columns", "full_refresh"] in the on_schema_change config to control this behavior.
|
||||
Please re-run the incremental model with full_refresh set to True to update the target schema.
|
||||
Alternatively, you can update the schema manually and re-run the process.')
|
||||
}}
|
||||
|
||||
{# unless we ignore, run the sync operation per the config #}
|
||||
{% else %}
|
||||
|
||||
{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %}
|
||||
{% set columns_added = schema_changes['columns_added'] %}
|
||||
{% set columns_removed = schema_changes['columns_removed'] %}
|
||||
{% do log('columns added: ' + columns_added|join(', '), info=true) %}
|
||||
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,121 @@
|
||||
{% macro incremental_validate_on_schema_change(on_schema_change, default_value='ignore') %}
|
||||
|
||||
{% if on_schema_change not in ['full_refresh', 'sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}
|
||||
|
||||
{% set log_message = 'invalid value for on_schema_change {{ on_schema_change }} specified. Setting default value of {{ default_value }}.' %}
|
||||
{% do log(log_message, info=true) %}
|
||||
|
||||
{{ return(default_value) }}
|
||||
|
||||
{% else %}
|
||||
{{ return(on_schema_change) }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro get_column_names(columns) %}
|
||||
|
||||
{% set result = [] %}
|
||||
|
||||
{% for col in columns %}
|
||||
{{ result.append(col.column) }}
|
||||
{% endfor %}
|
||||
|
||||
{{ return(result) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro snowflake_diff_columns(source_columns, target_columns) %}
|
||||
|
||||
{% set result = [] %}
|
||||
{% set source_names = get_column_names(source_columns) %}
|
||||
{% set target_names = get_column_names(target_columns) %}
|
||||
|
||||
{# check whether the name attribute exists in the target, but dont worry about data type differences #}
|
||||
{%- for col in source_columns -%}
|
||||
{%- if col.column not in target_names -%}
|
||||
{{ result.append(col) }}
|
||||
{%- endif -%}
|
||||
{%- endfor -%}
|
||||
|
||||
{{ return(result) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro check_for_schema_changes(source_relation, target_relation) %}
|
||||
|
||||
{% set schema_changed = False %}
|
||||
|
||||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
|
||||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set source_not_in_target = snowflake_diff_columns(source_columns, target_columns) -%}
|
||||
{%- set target_not_in_source = snowflake_diff_columns(target_columns, source_columns) -%}
|
||||
|
||||
{% if source_not_in_target != [] %}
|
||||
{% set schema_changed = True %}
|
||||
{% elif target_not_in_source != [] %}
|
||||
{% set schema_changed = True %}
|
||||
{% endif %}
|
||||
|
||||
{{ return(schema_changed) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %}
|
||||
|
||||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
|
||||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
|
||||
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%}
|
||||
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%}
|
||||
|
||||
-- Validates on_schema_change config vs. whether there are column differences
|
||||
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %}
|
||||
|
||||
{{
|
||||
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append. Review the schemas in the source and target relations')
|
||||
}}
|
||||
|
||||
{% endif %}
|
||||
|
||||
|
||||
{%- if on_schema_change == 'append_new_columns' -%}
|
||||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%}
|
||||
{% elif on_schema_change == 'sync_all_columns' %}
|
||||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
|
||||
{% endif %}
|
||||
|
||||
{{
|
||||
return(
|
||||
{
|
||||
'columns_added': add_to_target_arr,
|
||||
'columns_removed': remove_from_target_arr
|
||||
}
|
||||
)
|
||||
}}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
|
||||
|
||||
{% if on_schema_change=='fail' %}
|
||||
|
||||
{{
|
||||
exceptions.raise_compiler_error('The source and target schemas on this incremental model are out of sync!
|
||||
You can specify one of ["fail", "ignore", "add_new_columns", "sync_all_columns", "full_refresh"] in the on_schema_change config to control this behavior.
|
||||
Please re-run the incremental model with full_refresh set to True to update the target schema.
|
||||
Alternatively, you can update the schema manually and re-run the process.')
|
||||
}}
|
||||
|
||||
{# unless we ignore, run the sync operation per the config #}
|
||||
{% elif on_schema_change != 'ignore' %}
|
||||
|
||||
{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %}
|
||||
{% set columns_added = schema_changes['columns_added'] %}
|
||||
{% set columns_removed = schema_changes['columns_removed'] %}
|
||||
{% do log('columns added: ' + columns_added|join(', '), info=true) %}
|
||||
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
@@ -24,8 +24,9 @@
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
{% materialization incremental, adapter='snowflake' -%}
|
||||
|
||||
{% materialization incremental, adapter='snowflake' -%}
|
||||
|
||||
{% set original_query_tag = set_query_tag() %}
|
||||
|
||||
{%- set unique_key = config.get('unique_key') -%}
|
||||
@@ -37,6 +38,7 @@
|
||||
|
||||
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
|
||||
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change')) %}
|
||||
|
||||
-- setup
|
||||
{{ run_hooks(pre_hooks, inside_transaction=False) }}
|
||||
@@ -46,20 +48,135 @@
|
||||
|
||||
{% if existing_relation is none %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif existing_relation.is_view %}
|
||||
{#-- Can't overwrite a view with a table - we must drop --#}
|
||||
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
|
||||
{% do adapter.drop_relation(existing_relation) %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif full_refresh_mode %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% else %}
|
||||
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
|
||||
{% do adapter.expand_target_column_types(
|
||||
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
|
||||
|
||||
{% if schema_changed %}
|
||||
|
||||
{% if on_schema_change == 'full_refresh' %}
|
||||
{% do log('running full_refresh', info=true) %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif on_schema_change == 'ignore' %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% else %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% else %}
|
||||
{% do adapter.expand_target_column_types(
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
|
||||
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{%- call statement('main') -%}
|
||||
{{ build_sql }}
|
||||
{%- endcall -%}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=True) }}
|
||||
|
||||
-- `COMMIT` happens here
|
||||
{{ adapter.commit() }}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=False) }}
|
||||
|
||||
{% set target_relation = target_relation.incorporate(type='table') %}
|
||||
{% do persist_docs(target_relation, model) %}
|
||||
|
||||
{% do unset_query_tag(original_query_tag) %}
|
||||
|
||||
{{ return({'relations': [target_relation]}) }}
|
||||
|
||||
{%- endmaterialization %}
|
||||
|
||||
|
||||
|
||||
{% materialization incremental, adapter='snowflake' -%}
|
||||
|
||||
{% set original_query_tag = set_query_tag() %}
|
||||
|
||||
{%- set unique_key = config.get('unique_key') -%}
|
||||
{%- set full_refresh_mode = (should_full_refresh()) -%}
|
||||
|
||||
{% set target_relation = this %}
|
||||
{% set existing_relation = load_relation(this) %}
|
||||
{% set tmp_relation = make_temp_relation(this) %}
|
||||
|
||||
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
|
||||
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
|
||||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change')) %}
|
||||
|
||||
-- setup
|
||||
{{ run_hooks(pre_hooks, inside_transaction=False) }}
|
||||
|
||||
-- `BEGIN` happens here:
|
||||
{{ run_hooks(pre_hooks, inside_transaction=True) }}
|
||||
|
||||
{% if existing_relation is none %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif existing_relation.is_view %}
|
||||
{#-- Can't overwrite a view with a table - we must drop --#}
|
||||
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
|
||||
{% do adapter.drop_relation(existing_relation) %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif full_refresh_mode %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% else %}
|
||||
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
|
||||
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
|
||||
|
||||
{% if schema_changed %}
|
||||
|
||||
{% if on_schema_change == 'full_refresh' %}
|
||||
{% do log('running full_refresh', info=true) %}
|
||||
{% set build_sql = create_table_as(False, target_relation, sql) %}
|
||||
|
||||
{% elif on_schema_change == 'ignore' %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% else %}
|
||||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
|
||||
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% else %}
|
||||
{% do adapter.expand_target_column_types(
|
||||
from_relation=tmp_relation,
|
||||
to_relation=target_relation) %}
|
||||
|
||||
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
|
||||
|
||||
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{%- call statement('main') -%}
|
||||
|
||||
Reference in New Issue
Block a user