forked from repo-mirrors/dbt-core
Compare commits
15 Commits
v1.7.7
...
feature/su
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49309e7c9e | ||
|
|
e4843e8da3 | ||
|
|
3a173874d9 | ||
|
|
bfe46b1deb | ||
|
|
86464070c9 | ||
|
|
c3a49e41e9 | ||
|
|
54da843e83 | ||
|
|
ce28ab3111 | ||
|
|
c79ae61429 | ||
|
|
560ccbe4be | ||
|
|
b3df2ca7ed | ||
|
|
ca25734abe | ||
|
|
b8fd4bb319 | ||
|
|
39e636ad41 | ||
|
|
607d5b01b6 |
7
.changes/unreleased/Features-20220823-085727.yaml
Normal file
7
.changes/unreleased/Features-20220823-085727.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
kind: Features
|
||||
body: incremental predicates
|
||||
time: 2022-08-23T08:57:27.640804-05:00
|
||||
custom:
|
||||
Author: dave-connors-3
|
||||
Issue: "5680"
|
||||
PR: "5702"
|
||||
BIN
core/dbt/docs/build/doctrees/environment.pickle
vendored
BIN
core/dbt/docs/build/doctrees/environment.pickle
vendored
Binary file not shown.
@@ -50,9 +50,9 @@
|
||||
|
||||
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
|
||||
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
|
||||
{% set incremental_predicates = config.get('incremental_predicates', none) %}
|
||||
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
|
||||
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
|
||||
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
|
||||
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
|
||||
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
|
||||
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
|
||||
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
|
||||
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
|
||||
{%- set predicates = [] if predicates is none else [] + predicates -%}
|
||||
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
|
||||
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
{%- set merge_update_columns = config.get('merge_update_columns') -%}
|
||||
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
|
||||
@@ -32,7 +32,7 @@
|
||||
|
||||
merge into {{ target }} as DBT_INTERNAL_DEST
|
||||
using {{ source }} as DBT_INTERNAL_SOURCE
|
||||
on {{ predicates | join(' and ') }}
|
||||
on {{"(" ~ predicates | join(") and (") ~ ")"}}
|
||||
|
||||
{% if unique_key %}
|
||||
when matched then update set
|
||||
@@ -50,11 +50,11 @@
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }}
|
||||
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
|
||||
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
|
||||
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
|
||||
|
||||
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
|
||||
|
||||
@@ -65,8 +65,13 @@
|
||||
where (
|
||||
{% for key in unique_key %}
|
||||
{{ source }}.{{ key }} = {{ target }}.{{ key }}
|
||||
{{ "and " if not loop.last }}
|
||||
{{ "and " if not loop.last}}
|
||||
{% endfor %}
|
||||
{% if incremental_predicates %}
|
||||
{% for predicate in incremental_predicates %}
|
||||
and {{ predicate }}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
);
|
||||
{% else %}
|
||||
delete from {{ target }}
|
||||
@@ -74,7 +79,12 @@
|
||||
{{ unique_key }}) in (
|
||||
select ({{ unique_key }})
|
||||
from {{ source }}
|
||||
);
|
||||
)
|
||||
{%- if incremental_predicates %}
|
||||
{% for predicate in incremental_predicates %}
|
||||
and {{ predicate }}
|
||||
{% endfor %}
|
||||
{%- endif -%};
|
||||
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
{% macro default__get_incremental_delete_insert_sql(arg_dict) %}
|
||||
|
||||
{% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}
|
||||
{% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
@@ -35,7 +35,7 @@
|
||||
|
||||
{% macro default__get_incremental_merge_sql(arg_dict) %}
|
||||
|
||||
{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}
|
||||
{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
@@ -48,7 +48,7 @@
|
||||
|
||||
{% macro default__get_incremental_insert_overwrite_sql(arg_dict) %}
|
||||
|
||||
{% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %}
|
||||
{% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
@@ -0,0 +1,154 @@
|
||||
import pytest
|
||||
from dbt.tests.util import run_dbt, check_relations_equal
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
models__delete_insert_incremental_predicates_sql = """
|
||||
{{ config(
|
||||
materialized = 'incremental',
|
||||
unique_key = 'id'
|
||||
) }}
|
||||
|
||||
{% if not is_incremental() %}
|
||||
|
||||
select 1 as id, 'hello' as msg, 'blue' as color
|
||||
union all
|
||||
select 2 as id, 'goodbye' as msg, 'red' as color
|
||||
|
||||
{% else %}
|
||||
|
||||
-- delete will not happen on the above record where id = 2, so new record will be inserted instead
|
||||
select 1 as id, 'hey' as msg, 'blue' as color
|
||||
union all
|
||||
select 2 as id, 'yo' as msg, 'green' as color
|
||||
union all
|
||||
select 3 as id, 'anyway' as msg, 'purple' as color
|
||||
|
||||
{% endif %}
|
||||
"""
|
||||
|
||||
seeds__expected_delete_insert_incremental_predicates_csv = """id,msg,color
|
||||
1,hey,blue
|
||||
2,goodbye,red
|
||||
2,yo,green
|
||||
3,anyway,purple
|
||||
"""
|
||||
|
||||
ResultHolder = namedtuple(
|
||||
"ResultHolder",
|
||||
[
|
||||
"seed_count",
|
||||
"model_count",
|
||||
"seed_rows",
|
||||
"inc_test_model_count",
|
||||
"opt_model_count",
|
||||
"relation",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class BaseIncrementalPredicates:
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {
|
||||
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def seeds(self):
|
||||
return {
|
||||
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"models": {
|
||||
"+incremental_predicates": [
|
||||
"id != 2"
|
||||
],
|
||||
"+incremental_strategy": "delete+insert"
|
||||
}
|
||||
}
|
||||
|
||||
def update_incremental_model(self, incremental_model):
|
||||
"""update incremental model after the seed table has been updated"""
|
||||
model_result_set = run_dbt(["run", "--select", incremental_model])
|
||||
return len(model_result_set)
|
||||
|
||||
def get_test_fields(
|
||||
self, project, seed, incremental_model, update_sql_file, opt_model_count=None
|
||||
):
|
||||
|
||||
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))
|
||||
|
||||
model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))
|
||||
# pass on kwarg
|
||||
relation = incremental_model
|
||||
# update seed in anticipation of incremental model update
|
||||
row_count_query = "select * from {}.{}".format(project.test_schema, seed)
|
||||
# project.run_sql_file(Path("seeds") / Path(update_sql_file + ".sql"))
|
||||
seed_rows = len(project.run_sql(row_count_query, fetch="all"))
|
||||
|
||||
# propagate seed state to incremental model according to unique keys
|
||||
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)
|
||||
|
||||
return ResultHolder(
|
||||
seed_count, model_count, seed_rows, inc_test_model_count, opt_model_count, relation
|
||||
)
|
||||
|
||||
def check_scenario_correctness(self, expected_fields, test_case_fields, project):
|
||||
"""Invoke assertions to verify correct build functionality"""
|
||||
# 1. test seed(s) should build afresh
|
||||
assert expected_fields.seed_count == test_case_fields.seed_count
|
||||
# 2. test model(s) should build afresh
|
||||
assert expected_fields.model_count == test_case_fields.model_count
|
||||
# 3. seeds should have intended row counts post update
|
||||
assert expected_fields.seed_rows == test_case_fields.seed_rows
|
||||
# 4. incremental test model(s) should be updated
|
||||
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
|
||||
# 5. extra incremental model(s) should be built; optional since
|
||||
# comparison may be between an incremental model and seed
|
||||
if expected_fields.opt_model_count and test_case_fields.opt_model_count:
|
||||
assert expected_fields.opt_model_count == test_case_fields.opt_model_count
|
||||
# 6. result table should match intended result set (itself a relation)
|
||||
check_relations_equal(
|
||||
project.adapter, [expected_fields.relation, test_case_fields.relation]
|
||||
)
|
||||
|
||||
def get_expected_fields(self, relation, seed_rows, opt_model_count=None):
|
||||
return ResultHolder(
|
||||
seed_count=1,
|
||||
model_count=1,
|
||||
inc_test_model_count=1,
|
||||
seed_rows=seed_rows,
|
||||
opt_model_count=opt_model_count,
|
||||
relation=relation
|
||||
)
|
||||
|
||||
# no unique_key test
|
||||
def test__incremental_predicates(self, project):
|
||||
"""seed should match model after two incremental runs"""
|
||||
|
||||
expected_fields = self.get_expected_fields(relation="expected_delete_insert_incremental_predicates", seed_rows=4)
|
||||
test_case_fields = self.get_test_fields(
|
||||
project, seed="expected_delete_insert_incremental_predicates", incremental_model="delete_insert_incremental_predicates", update_sql_file=None
|
||||
)
|
||||
self.check_scenario_correctness(expected_fields, test_case_fields, project)
|
||||
|
||||
|
||||
class TestIncrementalPredicatesDeleteInsert(BaseIncrementalPredicates):
|
||||
pass
|
||||
|
||||
|
||||
class TestPredicatesDeleteInsert(BaseIncrementalPredicates):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"models": {
|
||||
"+predicates": [
|
||||
"id != 2"
|
||||
],
|
||||
"+incremental_strategy": "delete+insert"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user