Compare commits

...

16 Commits

Author SHA1 Message Date
Kyle Wigley
eae14e1be7 use adapter to execute sql 2022-04-01 16:18:13 -04:00
Matthew McKnight
32f0a30b58 create incremental test directory in adapter zone 2022-04-01 09:21:43 -05:00
Matthew McKnight
f430efc56a v2 based on feedback for base version of testing, plus small removal of leftover breakpoint 2022-03-31 17:07:23 -05:00
Matthew McKnight
0b263604a2 Merge branch 'main' of github.com:dbt-labs/dbt into mcknight/convert_unique_key 2022-03-30 16:10:15 -05:00
Matthew McKnight
9282b4cc7a uncommenting seed_count 2022-03-30 09:57:02 -05:00
Matthew McKnight
b8f79149be removed commented out code 2022-03-29 16:20:38 -05:00
Matthew McKnight
50c7ba1e92 remove older test_unique_key tests 2022-03-29 15:50:35 -05:00
Matthew McKnight
970d57d433 taking in chenyu's changes to fixtures 2022-03-28 16:47:13 -05:00
Matthew McKnight
ae12ccf2e8 moving tests to base class and inheriting in a simple class 2022-03-28 16:27:12 -05:00
Matthew McKnight
053910cb1b removed unused fixture 2022-03-28 16:02:45 -05:00
Matthew McKnight
d3451a6041 minor changes to the bad_unique_key tests 2022-03-28 15:47:42 -05:00
Matthew McKnight
3dec58f592 moving non basic test up one directory to be more broadly part of adapter zone 2022-03-28 15:24:15 -05:00
Matthew McKnight
f8d93b0524 adding changelog entry 2022-03-28 15:08:41 -05:00
Matthew McKnight
bd15abf502 testing cause of failure 2022-03-28 11:33:42 -05:00
Matthew McKnight
1e52c9c134 Merge branch 'main' of github.com:dbt-labs/dbt into mcknight/convert_unique_key 2022-03-25 15:45:10 -05:00
Matthew McKnight
17b5d11c8b init push up of converted unique_key tests 2022-03-25 11:48:25 -05:00
20 changed files with 565 additions and 570 deletions

View File

@@ -0,0 +1,7 @@
kind: Features
body: converting unique key as list tests to new pytest format
time: 2022-03-28T15:08:12.70006-05:00
custom:
Author: McKnight-42
Issue: "4882"
PR: "4958"

View File

@@ -1,8 +1,7 @@
from dbt.context import providers
from unittest.mock import patch
from contextlib import contextmanager
from dbt.events.functions import fire_event
from dbt.events.test_types import IntegrationTestDebug
from dbt.tests.util import run_sql_with_adapter
# This code was copied from the earlier test framework in test/integration/base.py
# The goal is to vastly simplify this and replace it with calls to macros.
@@ -309,38 +308,7 @@ class TableComparison:
# This duplicates code in the TestProjInfo class.
def run_sql(self, sql, fetch=None):
if sql.strip() == "":
return
# substitute schema and database in sql
adapter = self.adapter
kwargs = {
"schema": self.unique_schema,
"database": adapter.quote(self.default_database),
}
sql = sql.format(**kwargs)
with self.get_connection("__test") as conn:
msg = f'test connection "{conn.name}" executing: {sql}'
fire_event(IntegrationTestDebug(msg=msg))
with conn.handle.cursor() as cursor:
try:
cursor.execute(sql)
conn.handle.commit()
conn.handle.commit()
if fetch == "one":
return cursor.fetchone()
elif fetch == "all":
return cursor.fetchall()
else:
return
except BaseException as e:
if conn.handle and not getattr(conn.handle, "closed", True):
conn.handle.rollback()
print(sql)
print(e)
raise
finally:
conn.transaction_open = False
return run_sql_with_adapter(self.adapter, sql, fetch=fetch)
def get_tables_in_schema(self):
sql = """

View File

@@ -1,17 +0,0 @@
{{
config(
materialized='incremental',
unique_key=['state', 'state']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,16 +0,0 @@
-- ensure model with empty string unique key should build normally
{{
config(
materialized='incremental',
unique_key=''
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,14 +0,0 @@
-- model with empty list unique key should build normally
{{
config(
materialized='incremental',
unique_key=[]
)
}}
select * from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,21 +0,0 @@
{{
config(
materialized='table'
)
}}
select
'CT'::varchar(2) as state,
'Hartford'::varchar(12) as county,
'Hartford'::varchar(12) as city,
'2022-02-14'::date as last_visit_date
union all
select 'MA'::varchar(2),'Suffolk'::varchar(12),'Boston'::varchar(12),'2020-02-12'::date
union all
select 'NJ'::varchar(2),'Mercer'::varchar(12),'Trenton'::varchar(12),'2022-01-01'::date
union all
select 'NY'::varchar(2),'Kings'::varchar(12),'Brooklyn'::varchar(12),'2021-04-02'::date
union all
select 'NY'::varchar(2),'New York'::varchar(12),'Manhattan'::varchar(12),'2021-04-01'::date
union all
select 'PA'::varchar(2),'Philadelphia'::varchar(12),'Philadelphia'::varchar(12),'2021-05-21'::date

View File

@@ -1,21 +0,0 @@
{{
config(
materialized='table'
)
}}
select
'CT'::varchar(2) as state,
'Hartford'::varchar(12) as county,
'Hartford'::varchar(12) as city,
'2022-02-14'::date as last_visit_date
union all
select 'MA'::varchar(2),'Suffolk'::varchar(12),'Boston'::varchar(12),'2020-02-12'::date
union all
select 'NJ'::varchar(2),'Mercer'::varchar(12),'Trenton'::varchar(12),'2022-01-01'::date
union all
select 'NY'::varchar(2),'Kings'::varchar(12),'Brooklyn'::varchar(12),'2021-04-02'::date
union all
select 'NY'::varchar(2),'New York'::varchar(12),'Manhattan'::varchar(12),'2021-04-01'::date
union all
select 'PA'::varchar(2),'Philadelphia'::varchar(12),'Philadelphia'::varchar(12),'2021-05-21'::date

View File

@@ -1,15 +0,0 @@
-- no specified unique key should cause no special build behavior
{{
config(
materialized='incremental'
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,21 +0,0 @@
-- a multi-argument unique key list should see overwriting on rows in the model
-- where all unique key fields apply
-- N.B. needed for direct comparison with seed
{{
config(
materialized='incremental',
unique_key=['state', 'county', 'city']
)
}}
select
state as state,
county as county,
city as city,
last_visit_date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,16 +0,0 @@
-- a model with a unique key not found in the table itself will error out
{{
config(
materialized='incremental',
unique_key='thisisnotacolumn'
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,10 +0,0 @@
-- a unique key list with any element not in the model itself should error out
{{
config(
materialized='incremental',
unique_key=['state', 'thisisnotacolumn']
)
}}
select * from {{ ref('seed') }}

View File

@@ -1,21 +0,0 @@
-- a unique key with a string should trigger to overwrite behavior when
-- the source has entries in conflict (i.e. more than one row per unique key
-- combination)
{{
config(
materialized='incremental',
unique_key='state'
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,20 +0,0 @@
-- a multi-argument unique key list should see overwriting on rows in the model
-- where all unique key fields apply
{{
config(
materialized='incremental',
unique_key=['state', 'county', 'city']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,20 +0,0 @@
-- a one argument unique key list should result in overwritting semantics for
-- that one matching field
{{
config(
materialized='incremental',
unique_key=['state']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}

View File

@@ -1,12 +0,0 @@
-- Insert statement which when applied to seed.csv sees incremental model
-- grow in size while not (necessarily) diverging from the seed itself.
-- insert two new rows, both of which should be in incremental model
-- with any unique columns
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('WA','King','Seattle','2022-02-01');
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('CA','Los Angeles','Los Angeles','2022-02-01');

View File

@@ -1,9 +0,0 @@
-- Insert statement which when applied to seed.csv triggers the inplace
-- overwrite strategy of incremental models. Seed and incremental model
-- diverge.
-- insert new row, which should not be in incremental model
-- with primary or first three columns unique
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('CT','Hartford','Hartford','2022-02-14');

View File

@@ -1,7 +0,0 @@
state,county,city,last_visit_date
CT,Hartford,Hartford,2020-09-23
MA,Suffolk,Boston,2020-02-12
NJ,Mercer,Trenton,2022-01-01
NY,Kings,Brooklyn,2021-04-02
NY,New York,Manhattan,2021-04-01
PA,Philadelphia,Philadelphia,2021-05-21
1 state county city last_visit_date
2 CT Hartford Hartford 2020-09-23
3 MA Suffolk Boston 2020-02-12
4 NJ Mercer Trenton 2022-01-01
5 NY Kings Brooklyn 2021-04-02
6 NY New York Manhattan 2021-04-01
7 PA Philadelphia Philadelphia 2021-05-21

View File

@@ -1,295 +0,0 @@
from dbt.contracts.results import RunStatus
from test.integration.base import DBTIntegrationTest, use_profile
from collections import namedtuple
from pathlib import Path
TestResults = namedtuple(
'TestResults',
['seed_count', 'model_count', 'seed_rows', 'inc_test_model_count',
'opt_model_count', 'relation'],
)
class TestIncrementalUniqueKeyBase(DBTIntegrationTest):
@property
def schema(self):
return 'incremental_unique_key'
@property
def models(self):
return 'models'
def update_incremental_model(self, incremental_model):
'''update incremental model after the seed table has been updated'''
model_result_set = self.run_dbt(['run', '--select', incremental_model])
return len(model_result_set)
def setup_test(self, seed, incremental_model, update_sql_file):
'''build a test case and return values for assertions
[INFO] Models must be in place to test incremental model
construction and merge behavior. Database touches are side
effects to extract counts (which speak to health of unique keys).'''
#idempotently create some number of seeds and incremental models'''
seed_count = len(self.run_dbt(
['seed', '--select', seed, '--full-refresh']
))
model_count = len(self.run_dbt(
['run', '--select', incremental_model, '--full-refresh']
))
# update seed in anticipation of incremental model update
row_count_query = 'select * from {}.{}'.format(
self.unique_schema(),
seed
)
self.run_sql_file(Path('seeds') / Path(update_sql_file + '.sql'))
seed_rows = len(self.run_sql(row_count_query, fetch='all'))
# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(
incremental_model=incremental_model
)
return (seed_count, model_count, seed_rows, inc_test_model_count)
def test_scenario_correctness(self, expected_fields, test_case_fields):
'''Invoke assertions to verify correct build functionality'''
# 1. test seed(s) should build afresh
self.assertEqual(
expected_fields.seed_count, test_case_fields.seed_count
)
# 2. test model(s) should build afresh
self.assertEqual(
expected_fields.model_count, test_case_fields.model_count
)
# 3. seeds should have intended row counts post update
self.assertEqual(
expected_fields.seed_rows, test_case_fields.seed_rows
)
# 4. incremental test model(s) should be updated
self.assertEqual(
expected_fields.inc_test_model_count,
test_case_fields.inc_test_model_count
)
# 5. extra incremental model(s) should be built; optional since
# comparison may be between an incremental model and seed
if (expected_fields.opt_model_count and
test_case_fields.opt_model_count):
self.assertEqual(
expected_fields.opt_model_count,
test_case_fields.opt_model_count
)
# 6. result table should match intended result set (itself a relation)
self.assertTablesEqual(
expected_fields.relation, test_case_fields.relation
)
def stub_expected_fields(
self, relation, seed_rows, opt_model_count=None
):
return TestResults(
seed_count=1, model_count=1, seed_rows=seed_rows,
inc_test_model_count=1, opt_model_count=opt_model_count,
relation=relation
)
def fail_to_build_inc_missing_unique_key_column(self, incremental_model_name):
'''should pass back error state when trying build an incremental
model whose unique key or keylist includes a column missing
from the incremental model'''
seed_count = len(self.run_dbt(
['seed', '--select', 'seed', '--full-refresh']
))
# unique keys are not applied on first run, so two are needed
self.run_dbt(
['run', '--select', incremental_model_name, '--full-refresh'],
expect_pass=True
)
run_result = self.run_dbt(
['run', '--select', incremental_model_name],
expect_pass=False
).results[0]
return run_result.status, run_result.message
class TestIncrementalWithoutUniqueKey(TestIncrementalUniqueKeyBase):
@use_profile('postgres')
def test__postgres_no_unique_keys(self):
'''with no unique keys, seed and model should match'''
seed='seed'
seed_rows=8
incremental_model='no_unique_key'
update_sql_file='add_new_rows'
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
class TestIncrementalStrUniqueKey(TestIncrementalUniqueKeyBase):
@use_profile('postgres')
def test__postgres_empty_str_unique_key(self):
'''with empty string for unique key, seed and model should match'''
seed='seed'
seed_rows=8
incremental_model='empty_str_unique_key'
update_sql_file='add_new_rows'
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_one_unique_key(self):
'''with one unique key, model will overwrite existing row'''
seed='seed'
seed_rows=7
incremental_model='str_unique_key'
update_sql_file='duplicate_insert'
expected_model='one_str__overwrite'
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_bad_unique_key(self):
'''expect compilation error from unique key not being a column'''
(status, exc) = self.fail_to_build_inc_missing_unique_key_column(
incremental_model_name='not_found_unique_key'
)
self.assertEqual(status, RunStatus.Error)
self.assertTrue("thisisnotacolumn" in exc.lower())
class TestIncrementalListUniqueKey(TestIncrementalUniqueKeyBase):
@use_profile('postgres')
def test__postgres_empty_unique_key_list(self):
'''with no unique keys, seed and model should match'''
seed='seed'
seed_rows=8
incremental_model='empty_unique_key_list'
update_sql_file='add_new_rows'
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_unary_unique_key_list(self):
'''with one unique key, model will overwrite existing row'''
seed='seed'
seed_rows=7
incremental_model='unary_unique_key_list'
update_sql_file='duplicate_insert'
expected_model='unique_key_list__inplace_overwrite'
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_duplicated_unary_unique_key_list(self):
'''with two of the same unique key, model will overwrite existing row'''
seed='seed'
seed_rows=7
incremental_model='duplicated_unary_unique_key_list'
update_sql_file='duplicate_insert'
expected_model='unique_key_list__inplace_overwrite'
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_trinary_unique_key_list(self):
'''with three unique keys, model will overwrite existing row'''
seed='seed'
seed_rows=7
incremental_model='trinary_unique_key_list'
update_sql_file='duplicate_insert'
expected_model='unique_key_list__inplace_overwrite'
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_trinary_unique_key_list_no_update(self):
'''even with three unique keys, adding distinct rows to seed does not
cause seed and model to diverge'''
seed='seed'
seed_rows=8
incremental_model='nontyped_trinary_unique_key_list'
update_sql_file='add_new_rows'
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
self.test_scenario_correctness(expected_fields, test_case_fields)
@use_profile('postgres')
def test__postgres_bad_unique_key_list(self):
'''expect compilation error from unique key not being a column'''
(status, exc) = self.fail_to_build_inc_missing_unique_key_column(
incremental_model_name='not_found_unique_key_list'
)
self.assertEqual(status, RunStatus.Error)
self.assertTrue("thisisnotacolumn" in exc.lower())

View File

@@ -0,0 +1,556 @@
import pytest
from dbt.tests.util import run_dbt
from dbt.tests.tables import TableComparison
from dbt.contracts.results import RunStatus
from collections import namedtuple
from pathlib import Path
models__trinary_unique_key_list_sql = """
-- a multi-argument unique key list should see overwriting on rows in the model
-- where all unique key fields apply
{{
config(
materialized='incremental',
unique_key=['state', 'county', 'city']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__nontyped_trinary_unique_key_list_sql = """
-- a multi-argument unique key list should see overwriting on rows in the model
-- where all unique key fields apply
-- N.B. needed for direct comparison with seed
{{
config(
materialized='incremental',
unique_key=['state', 'county', 'city']
)
}}
select
state as state,
county as county,
city as city,
last_visit_date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__unary_unique_key_list_sql = """
-- a one argument unique key list should result in overwritting semantics for
-- that one matching field
{{
config(
materialized='incremental',
unique_key=['state']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__not_found_unique_key_sql = """
-- a model with a unique key not found in the table itself will error out
{{
config(
materialized='incremental',
unique_key='thisisnotacolumn'
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__empty_unique_key_list_sql = """
-- model with empty list unique key should build normally
{{
config(
materialized='incremental',
unique_key=[]
)
}}
select * from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__no_unique_key_sql = """
-- no specified unique key should cause no special build behavior
{{
config(
materialized='incremental'
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__empty_str_unique_key_sql = """
-- ensure model with empty string unique key should build normally
{{
config(
materialized='incremental',
unique_key=''
)
}}
select
*
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__str_unique_key_sql = """
-- a unique key with a string should trigger to overwrite behavior when
-- the source has entries in conflict (i.e. more than one row per unique key
-- combination)
{{
config(
materialized='incremental',
unique_key='state'
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__duplicated_unary_unique_key_list_sql = """
{{
config(
materialized='incremental',
unique_key=['state', 'state']
)
}}
select
state::varchar(2) as state,
county::varchar(12) as county,
city::varchar(12) as city,
last_visit_date::date as last_visit_date
from {{ ref('seed') }}
{% if is_incremental() %}
where last_visit_date > (select max(last_visit_date) from {{ this }})
{% endif %}
"""
models__not_found_unique_key_list_sql = """
-- a unique key list with any element not in the model itself should error out
{{
config(
materialized='incremental',
unique_key=['state', 'thisisnotacolumn']
)
}}
select * from {{ ref('seed') }}
"""
models__expected__one_str__overwrite_sql = """
{{
config(
materialized='table'
)
}}
select
'CT'::varchar(2) as state,
'Hartford'::varchar(12) as county,
'Hartford'::varchar(12) as city,
'2022-02-14'::date as last_visit_date
union all
select 'MA'::varchar(2),'Suffolk'::varchar(12),'Boston'::varchar(12),'2020-02-12'::date
union all
select 'NJ'::varchar(2),'Mercer'::varchar(12),'Trenton'::varchar(12),'2022-01-01'::date
union all
select 'NY'::varchar(2),'Kings'::varchar(12),'Brooklyn'::varchar(12),'2021-04-02'::date
union all
select 'NY'::varchar(2),'New York'::varchar(12),'Manhattan'::varchar(12),'2021-04-01'::date
union all
select 'PA'::varchar(2),'Philadelphia'::varchar(12),'Philadelphia'::varchar(12),'2021-05-21'::date
"""
models__expected__unique_key_list__inplace_overwrite_sql = """
{{
config(
materialized='table'
)
}}
select
'CT'::varchar(2) as state,
'Hartford'::varchar(12) as county,
'Hartford'::varchar(12) as city,
'2022-02-14'::date as last_visit_date
union all
select 'MA'::varchar(2),'Suffolk'::varchar(12),'Boston'::varchar(12),'2020-02-12'::date
union all
select 'NJ'::varchar(2),'Mercer'::varchar(12),'Trenton'::varchar(12),'2022-01-01'::date
union all
select 'NY'::varchar(2),'Kings'::varchar(12),'Brooklyn'::varchar(12),'2021-04-02'::date
union all
select 'NY'::varchar(2),'New York'::varchar(12),'Manhattan'::varchar(12),'2021-04-01'::date
union all
select 'PA'::varchar(2),'Philadelphia'::varchar(12),'Philadelphia'::varchar(12),'2021-05-21'::date
"""
seeds__duplicate_insert_sql = """
-- Insert statement which when applied to seed.csv triggers the inplace
-- overwrite strategy of incremental models. Seed and incremental model
-- diverge.
-- insert new row, which should not be in incremental model
-- with primary or first three columns unique
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('CT','Hartford','Hartford','2022-02-14');
"""
seeds__seed_csv = """state,county,city,last_visit_date
CT,Hartford,Hartford,2020-09-23
MA,Suffolk,Boston,2020-02-12
NJ,Mercer,Trenton,2022-01-01
NY,Kings,Brooklyn,2021-04-02
NY,New York,Manhattan,2021-04-01
PA,Philadelphia,Philadelphia,2021-05-21
"""
seeds__add_new_rows_sql = """
-- Insert statement which when applied to seed.csv sees incremental model
-- grow in size while not (necessarily) diverging from the seed itself.
-- insert two new rows, both of which should be in incremental model
-- with any unique columns
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('WA','King','Seattle','2022-02-01');
insert into {schema}.seed
(state, county, city, last_visit_date)
values ('CA','Los Angeles','Los Angeles','2022-02-01');
"""
ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"opt_model_count",
"relation",
],
)
class BaseIncrementalUniqueKey:
@pytest.fixture(scope="class")
def models(self):
return {
"trinary_unique_key_list.sql": models__trinary_unique_key_list_sql,
"nontyped_trinary_unique_key_list.sql": models__nontyped_trinary_unique_key_list_sql,
"unary_unique_key_list.sql": models__unary_unique_key_list_sql,
"not_found_unique_key.sql": models__not_found_unique_key_sql,
"empty_unique_key_list.sql": models__empty_unique_key_list_sql,
"no_unique_key.sql": models__no_unique_key_sql,
"empty_str_unique_key.sql": models__empty_str_unique_key_sql,
"str_unique_key.sql": models__str_unique_key_sql,
"duplicated_unary_unique_key_list.sql": models__duplicated_unary_unique_key_list_sql,
"not_found_unique_key_list.sql": models__not_found_unique_key_list_sql,
"expected": {
"one_str__overwrite.sql": models__expected__one_str__overwrite_sql,
"unique_key_list__inplace_overwrite.sql": models__expected__unique_key_list__inplace_overwrite_sql,
},
}
@pytest.fixture(scope="class")
def seeds(self):
return {
"duplicate_insert.sql": seeds__duplicate_insert_sql,
"seed.csv": seeds__seed_csv,
"add_new_rows.sql": seeds__add_new_rows_sql,
}
def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)
def get_test_fields(
self, project, seed, incremental_model, update_sql_file, opt_model_count=None
):
"""build a test case and return values for assertions
[INFO] Models must be in place to test incremental model
construction and merge behavior. Database touches are side
effects to extract counts (which speak to health of unique keys)."""
# idempotently create some number of seeds and incremental models'''
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))
model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))
# pass on kwarg
relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = "select * from {}.{}".format(project.test_schema, seed)
project.run_sql_file(Path("seeds") / Path(update_sql_file + ".sql"))
seed_rows = len(project.run_sql(row_count_query, fetch="all"))
# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)
return ResultHolder(
seed_count, model_count, seed_rows, inc_test_model_count, opt_model_count, relation
)
def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. extra incremental model(s) should be built; optional since
# comparison may be between an incremental model and seed
if expected_fields.opt_model_count and test_case_fields.opt_model_count:
assert expected_fields.opt_model_count == test_case_fields.opt_model_count
# 6. result table should match intended result set (itself a relation)
table_comp = TableComparison(
adapter=project.adapter, unique_schema=project.test_schema, database=project.database
)
table_comp.assert_tables_equal(expected_fields.relation, test_case_fields.relation)
def get_expected_fields(self, relation, seed_rows, opt_model_count=None):
return ResultHolder(
seed_count=1,
model_count=1,
seed_rows=seed_rows,
inc_test_model_count=1,
opt_model_count=opt_model_count,
relation=relation,
)
def fail_to_build_inc_missing_unique_key_column(self, incremental_model_name):
"""should pass back error state when trying build an incremental
model whose unique key or keylist includes a column missing
from the incremental model"""
seed_count = len(run_dbt(["seed", "--select", "seed", "--full-refresh"])) # noqa:F841
# unique keys are not applied on first run, so two are needed
run_dbt(
["run", "--select", incremental_model_name, "--full-refresh"],
expect_pass=True,
)
run_result = run_dbt(
["run", "--select", incremental_model_name], expect_pass=False
).results[0]
return run_result.status, run_result.message
# no unique_key test
def test__no_unique_keys(self, project):
"""with no unique keys, seed and model should match"""
expected_fields = self.get_expected_fields(relation="seed", seed_rows=8)
test_case_fields = self.get_test_fields(
project, seed="seed", incremental_model="no_unique_key", update_sql_file="add_new_rows"
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
# unique_key as str tests
def test__empty_str_unique_key(self, project):
"""with empty string for unique key, seed and model should match"""
expected_fields = self.get_expected_fields(relation="seed", seed_rows=8)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="empty_str_unique_key",
update_sql_file="add_new_rows",
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__one_unique_key(self, project):
"""with one unique key, model will overwrite existing row"""
expected_fields = self.get_expected_fields(
relation="one_str__overwrite", seed_rows=7, opt_model_count=1
)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="str_unique_key",
update_sql_file="duplicate_insert",
opt_model_count=self.update_incremental_model("one_str__overwrite"),
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__bad_unique_key(self, project):
"""expect compilation error from unique key not being a column"""
(status, exc) = self.fail_to_build_inc_missing_unique_key_column(
incremental_model_name="not_found_unique_key"
)
assert status == RunStatus.Error
assert "thisisnotacolumn" in exc.lower()
# test unique_key as list
def test__empty_unique_key_list(self, project):
"""with no unique keys, seed and model should match"""
expected_fields = self.get_expected_fields(relation="seed", seed_rows=8)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="empty_unique_key_list",
update_sql_file="add_new_rows",
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__unary_unique_key_list(self, project):
"""with one unique key, model will overwrite existing row"""
expected_fields = self.get_expected_fields(
relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1
)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="unary_unique_key_list",
update_sql_file="duplicate_insert",
opt_model_count=self.update_incremental_model("unique_key_list__inplace_overwrite"),
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__duplicated_unary_unique_key_list(self, project):
"""with two of the same unique key, model will overwrite existing row"""
expected_fields = self.get_expected_fields(
relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1
)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="duplicated_unary_unique_key_list",
update_sql_file="duplicate_insert",
opt_model_count=self.update_incremental_model("unique_key_list__inplace_overwrite"),
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__trinary_unique_key_list(self, project):
"""with three unique keys, model will overwrite existing row"""
expected_fields = self.get_expected_fields(
relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1
)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="trinary_unique_key_list",
update_sql_file="duplicate_insert",
opt_model_count=self.update_incremental_model("unique_key_list__inplace_overwrite"),
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__trinary_unique_key_list_no_update(self, project):
"""even with three unique keys, adding distinct rows to seed does not
cause seed and model to diverge"""
expected_fields = self.get_expected_fields(relation="seed", seed_rows=8)
test_case_fields = self.get_test_fields(
project,
seed="seed",
incremental_model="nontyped_trinary_unique_key_list",
update_sql_file="add_new_rows",
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)
def test__bad_unique_key_list(self, project):
"""expect compilation error from unique key not being a column"""
(status, exc) = self.fail_to_build_inc_missing_unique_key_column(
incremental_model_name="not_found_unique_key_list"
)
assert status == RunStatus.Error
assert "thisisnotacolumn" in exc.lower()
class TestIncrementalUniqueKey(BaseIncrementalUniqueKey):
pass

View File

@@ -64,7 +64,6 @@ class TestPermissions:
project,
):
# now it should work!
# breakpoint()
project.run_sql("grant create on database {} to noaccess".format(project.database))
project.run_sql(
'grant usage, create on schema "{}" to noaccess'.format(project.test_schema)