Compare commits

...

21 Commits

Author SHA1 Message Date
Matt Winkler
1f1d65f411 catch final else condition 2021-05-23 18:50:13 -06:00
Matt Winkler
9b0029f90c fixed unintentional wipe behavior 2021-05-23 18:38:46 -06:00
Matt Winkler
1430834196 updated full refresh trigger logic 2021-05-23 09:26:27 -06:00
Matt Winkler
7ba1286c9b reorganize approach 2021-05-21 16:58:37 -06:00
Matt Winkler
3d99fa3d66 reorganize postgres code 2021-05-19 17:48:49 -06:00
Matt Winkler
d5edd65e52 Merge branch 'develop' into feature/incremental-schema-changes 2021-05-18 07:48:52 -06:00
matt-winkler
49f5319f53 merging develop 2021-05-07 12:56:09 -05:00
matt-winkler
86a7536fcd updated for snowflake 2021-05-07 12:55:00 -05:00
matt-winkler
1549af6b3d simple changes from JC's feedback 2021-04-28 17:26:34 -06:00
matt-winkler
7c7b5a07c3 pulled in updates from remote 2021-04-27 21:59:05 -06:00
matt-winkler
335eb15b58 multi-column add / remove with full_refresh 2021-04-27 21:55:46 -06:00
matt-winkler
4ab16d5f9a Merge branch 'develop' into feature/incremental-schema-changes 2021-04-27 11:33:24 -06:00
matt-winkler
c073b45914 updates from Jeremy's feedback 2021-04-27 11:32:05 -06:00
matt-winkler
e26c22053e updates from Jeremy's feedback 2021-04-27 11:31:37 -06:00
matt-winkler
59e10ce1c8 Update CHANGELOG.md
Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>
2021-04-27 09:43:55 -06:00
matt-winkler
1f014decb8 address matching names vs. data types 2021-04-25 21:52:03 -06:00
matt-winkler
8c531fbbec abstract code a bit further 2021-04-23 16:01:22 -06:00
matt-winkler
0be265b30c fix error in diff_columns from testing 2021-04-23 14:18:54 -06:00
matt-winkler
e06f09fbbf update changelog 2021-04-22 16:10:38 -06:00
matt-winkler
82950facfe update incremental helpers code 2021-04-22 15:46:22 -06:00
matt-winkler
f9f1a96cf1 detect and act on schema changes 2021-04-22 15:32:01 -06:00
10 changed files with 492 additions and 8 deletions

View File

@@ -32,6 +32,7 @@ Contributors:
- Support disabling schema tests, and configuring tests from `dbt_project.yml` ([#3252](https://github.com/fishtown-analytics/dbt/issues/3252),
[#3253](https://github.com/fishtown-analytics/dbt/issues/3253), [#3257](https://github.com/fishtown-analytics/dbt/pull/3257))
- Add Jinja tag for tests ([#1173](https://github.com/fishtown-analytics/dbt/issues/1173), [#3261](https://github.com/fishtown-analytics/dbt/pull/3261))
- Support detecting schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3288](https://github.com/fishtown-analytics/dbt/issues/3288))
- Add native support for Postgres index creation ([#804](https://github.com/fishtown-analytics/dbt/issues/804), [3106](https://github.com/fishtown-analytics/dbt/pull/3106))
- Less greedy test selection: expand to select unselected tests if and only if all parents are selected ([#2891](https://github.com/fishtown-analytics/dbt/issues/2891), [#3235](https://github.com/fishtown-analytics/dbt/pull/3235))
- Prevent locks in Redshift during full refresh in incremental materialization. ([#2426](https://github.com/fishtown-analytics/dbt/issues/2426), [#2998](https://github.com/fishtown-analytics/dbt/pull/2998))

View File

@@ -144,6 +144,7 @@ class BaseAdapter(metaclass=AdapterMeta):
- convert_datetime_type
- convert_date_type
- convert_time_type
- regex_replace_sql
Macros:
- get_catalog
@@ -518,6 +519,20 @@ class BaseAdapter(metaclass=AdapterMeta):
'`get_columns_in_relation` is not implemented for this adapter!'
)
@available.parse_list
def incremental_remove_where(self, query):
"""Remove the last WHERE clause from an incremental query"""
# need to remove the last instance of the WHERE clause
# reverse the query string to find the last WHERE
where_idx = len(query) - (query[::-1].find('EREHW') + len('WHERE'))
# we found a where clause so remove it
if where_idx > 0:
where_clause = query[where_idx:]
return query.replace(where_clause, '')
# else return the original string
return query
@available.deprecated('get_columns_in_relation', lambda *a, **k: [])
def get_columns_in_table(
self, schema: str, identifier: str
@@ -805,6 +820,7 @@ class BaseAdapter(metaclass=AdapterMeta):
else:
return identifier
@available
def quote_seed_column(
self, column: str, quote_config: Optional[bool]

View File

@@ -394,6 +394,7 @@ class NodeConfig(BaseConfig):
CompareBehavior.Exclude),
)
full_refresh: Optional[bool] = None
on_schema_change: str = 'ignore'
@classmethod
def __pre_deserialize__(cls, data):

View File

@@ -311,3 +311,35 @@
{{ config.set('sql_header', caller()) }}
{%- endmacro %}
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}
{% macro default__alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{% set sql -%}
alter {{ relation.type }} {{ relation }}
{% if add_columns %}
add
{% endif %}
{% for column in add_columns %}
column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{{ ', ' if add_columns and remove_columns }}
{% if remove_columns %}
drop
{% for column in remove_columns %}
column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{% endif %}
{%- endset %}
{{ return(run_query(sql)) }}
{% endmacro %}
{% endmacro %}

View File

@@ -72,3 +72,5 @@
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}

View File

@@ -1,5 +1,6 @@
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

View File

@@ -5,6 +5,9 @@
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
@@ -12,12 +15,46 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% set to_drop = [] %}
{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}
{% do log('full refresh mode: %s' % trigger_full_refresh) %}
{# -- double check whether full refresh should happen if on_schema_change config is True #}
{% if not trigger_full_refresh and on_schema_change == 'full_refresh' %}
{%- set tmp_relation = make_temp_relation(target_relation) -%}
{%- do run_query(create_table_as(True, tmp_relation, sql)) -%}
{%- set schema_changed = check_for_schema_changes(tmp_relation, target_relation) -%}
{%- if schema_changed -%}
{%- set trigger_full_refresh = True -%}
{%- set sql = adapter.incremental_remove_where(sql) %}
{%- do log('detected a schema change with on_schema_change == full_refresh, refreshing table', info=true) -%}
{%- endif -%}
{# -- BELOW CODE ALSO WORKS #}
{# -- set source_columns = get_columns_in_query(sql) -#}
{# -- set target_columns = get_column_names(adapter.get_columns_in_relation(target_relation)) -#}
{# -- set source_not_in_target = diff_arrays(source_columns, target_columns) -#}
{# -- set target_not_in_source = diff_arrays(target_columns, source_columns) -#}
{# -- if source_not_in_target|length > 0 or target_not_in_source|length > 0 -#}
{# -- set trigger_full_refresh = True -#}
{# -- this removes the WHERE clause from the input model sql in the event a full refresh is needed #}
{# -- set sql = adapter.incremental_remove_where(sql) #}
{# -- do log('detected a schema change with on_schema_change == full_refresh, refreshing table') -#}
{# -- endif -#}
{% endif %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or should_full_refresh() %}
{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% do log('running full refresh procedure', info=true) %}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
{% set backup_identifier = model['name'] + '__dbt_backup' %}
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
@@ -28,13 +65,32 @@
{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% if on_schema_change not in ('ignore', 'full_refresh') %}
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
{% do log('schema changed: %s' % schema_changed, info=true) %}
{% if schema_changed %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% else %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}
{% else %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}
{% endif %}
{% call statement("main") %}

View File

@@ -0,0 +1,137 @@
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}
{% if on_schema_change not in ['full_refresh', 'sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}
{% set log_message = 'invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default_value) %}
{% do log(log_message, info=true) %}
{{ return(default) }}
{% else %}
{{ return(on_schema_change) }}
{% endif %}
{% endmacro %}
{% macro get_column_names(columns) %}
{% set result = [] %}
{% for col in columns %}
{{ result.append(col.column) }}
{% endfor %}
{{ return(result) }}
{% endmacro %}
{% macro diff_arrays(source_array, target_array) %}
{% set result = [] %}
{%- for elem in source_array -%}
{% if elem not in target_array %}
{{ result.append(elem) }}
{% endif %}
{%- endfor -%}
{{ return(result) }}
{% endmacro %}
{% macro diff_columns(source_columns, target_columns) %}
{% set result = [] %}
{% set source_names = get_column_names(source_columns) %}
{% set target_names = get_column_names(target_columns) %}
{# --check whether the name attribute exists in the target, but dont worry about data type differences #}
{%- for col in source_columns -%}
{%- if col.column not in target_names -%}
{{ result.append(col) }}
{%- endif -%}
{%- endfor -%}
{{ return(result) }}
{% endmacro %}
{% macro check_for_schema_changes(source_relation, target_relation) %}
{% set schema_changed = False %}
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}
{% if source_not_in_target != [] %}
{% set schema_changed = True %}
{% elif target_not_in_source != [] %}
{% set schema_changed = True %}
{% endif %}
{{ return(schema_changed) }}
{% endmacro %}
{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %}
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%}
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%}
-- Validates on_schema_change config vs. whether there are column differences
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %}
{{
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append. Review the schemas in the source and target relations')
}}
{% endif %}
{%- if on_schema_change == 'append_new_columns' -%}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%}
{% elif on_schema_change == 'sync_all_columns' %}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
{% endif %}
{{
return(
{
'columns_added': add_to_target_arr,
'columns_removed': remove_from_target_arr
}
)
}}
{% endmacro %}
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
{% if on_schema_change=='fail' %}
{{
exceptions.raise_compiler_error('The source and target schemas on this incremental model are out of sync!
You can specify one of ["fail", "ignore", "add_new_columns", "sync_all_columns", "full_refresh"] in the on_schema_change config to control this behavior.
Please re-run the incremental model with full_refresh set to True to update the target schema.
Alternatively, you can update the schema manually and re-run the process.')
}}
{# unless we ignore, run the sync operation per the config #}
{% else %}
{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %}
{% set columns_added = schema_changes['columns_added'] %}
{% set columns_removed = schema_changes['columns_removed'] %}
{% do log('columns added: ' + columns_added|join(', '), info=true) %}
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %}
{% endif %}
{% endmacro %}

View File

@@ -0,0 +1,121 @@
{% macro incremental_validate_on_schema_change(on_schema_change, default_value='ignore') %}
{% if on_schema_change not in ['full_refresh', 'sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}
{% set log_message = 'invalid value for on_schema_change {{ on_schema_change }} specified. Setting default value of {{ default_value }}.' %}
{% do log(log_message, info=true) %}
{{ return(default_value) }}
{% else %}
{{ return(on_schema_change) }}
{% endif %}
{% endmacro %}
{% macro get_column_names(columns) %}
{% set result = [] %}
{% for col in columns %}
{{ result.append(col.column) }}
{% endfor %}
{{ return(result) }}
{% endmacro %}
{% macro snowflake_diff_columns(source_columns, target_columns) %}
{% set result = [] %}
{% set source_names = get_column_names(source_columns) %}
{% set target_names = get_column_names(target_columns) %}
{# check whether the name attribute exists in the target, but dont worry about data type differences #}
{%- for col in source_columns -%}
{%- if col.column not in target_names -%}
{{ result.append(col) }}
{%- endif -%}
{%- endfor -%}
{{ return(result) }}
{% endmacro %}
{% macro check_for_schema_changes(source_relation, target_relation) %}
{% set schema_changed = False %}
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = snowflake_diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = snowflake_diff_columns(target_columns, source_columns) -%}
{% if source_not_in_target != [] %}
{% set schema_changed = True %}
{% elif target_not_in_source != [] %}
{% set schema_changed = True %}
{% endif %}
{{ return(schema_changed) }}
{% endmacro %}
{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %}
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%}
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%}
-- Validates on_schema_change config vs. whether there are column differences
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %}
{{
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append. Review the schemas in the source and target relations')
}}
{% endif %}
{%- if on_schema_change == 'append_new_columns' -%}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%}
{% elif on_schema_change == 'sync_all_columns' %}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
{% endif %}
{{
return(
{
'columns_added': add_to_target_arr,
'columns_removed': remove_from_target_arr
}
)
}}
{% endmacro %}
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}
{% if on_schema_change=='fail' %}
{{
exceptions.raise_compiler_error('The source and target schemas on this incremental model are out of sync!
You can specify one of ["fail", "ignore", "add_new_columns", "sync_all_columns", "full_refresh"] in the on_schema_change config to control this behavior.
Please re-run the incremental model with full_refresh set to True to update the target schema.
Alternatively, you can update the schema manually and re-run the process.')
}}
{# unless we ignore, run the sync operation per the config #}
{% elif on_schema_change != 'ignore' %}
{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %}
{% set columns_added = schema_changes['columns_added'] %}
{% set columns_removed = schema_changes['columns_removed'] %}
{% do log('columns added: ' + columns_added|join(', '), info=true) %}
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %}
{% endif %}
{% endmacro %}

View File

@@ -24,8 +24,9 @@
{% endif %}
{% endmacro %}
{% materialization incremental, adapter='snowflake' -%}
{% materialization incremental, adapter='snowflake' -%}
{% set original_query_tag = set_query_tag() %}
{%- set unique_key = config.get('unique_key') -%}
@@ -37,6 +38,7 @@
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change')) %}
-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}
@@ -46,20 +48,135 @@
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
{% if schema_changed %}
{% if on_schema_change == 'full_refresh' %}
{% do log('running full_refresh', info=true) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif on_schema_change == 'ignore' %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% else %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}
{% else %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}
{% endif %}
{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}
{{ run_hooks(post_hooks, inside_transaction=True) }}
-- `COMMIT` happens here
{{ adapter.commit() }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}
{% materialization incremental, adapter='snowflake' -%}
{% set original_query_tag = set_query_tag() %}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change')) %}
-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
{% if schema_changed %}
{% if on_schema_change == 'full_refresh' %}
{% do log('running full_refresh', info=true) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif on_schema_change == 'ignore' %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% else %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}
{% else %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(tmp_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}
{% endif %}
{%- call statement('main') -%}