Compare commits

...

15 Commits

Author SHA1 Message Date
Mila Page
4a5f28b72b Add the extra env var check for our CI. 2022-09-04 23:26:19 -07:00
Mila Page
45a350c349 add single threaded flag 2022-09-04 14:15:55 -07:00
Mila Page
84a252149e Fix broken tests and verify local JSON format logs are clean (they are). 2022-09-04 04:09:12 -07:00
Mila Page
bf2eeb4dc6 fix snapshot delete test that was failing 2022-09-04 03:44:35 -07:00
Mila Page
2636517dc1 Merge branch 'main' into CT-866/migrate_hook_tests 2022-08-30 01:18:31 -07:00
Mila Page
f41c163bdd It couldn't be the yaml could it 2022-08-30 00:19:34 -07:00
Mila Page
af44b65174 reset pytest.ini and see what happens when removing extra comma 2022-08-24 02:54:03 -07:00
Mila Page
36fd15a714 Move location of schema.yml to models directory. 2022-08-24 02:27:50 -07:00
Mila Page
3cad9cdf6f Merge branch 'main' into CT-866/migrate_hook_tests 2022-08-18 14:55:53 -07:00
Mila Page
1a603b0d70 Try removing unneeded param to suppress warning. 2022-08-15 23:41:59 -07:00
Mila Page
4996fc4278 Merge branch 'main' into CT-866/migrate_hook_tests 2022-08-15 11:18:00 -07:00
Mila Page
3766f1de6e Remove old integration hook tests. 2022-08-15 02:21:11 -07:00
Mila Page
2660559d86 Add other test (wasn't staged). 2022-08-15 02:12:28 -07:00
Mila Page
19b97899b4 Finish test conversion. 2022-08-15 01:58:45 -07:00
Mila Page
51515e6236 Finish converting first test file. 2022-08-10 20:18:00 -07:00
23 changed files with 899 additions and 829 deletions

View File

@@ -57,7 +57,11 @@ def setup_event_logger(log_path, level_override=None):
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
make_log_dir_if_missing(log_path)
this.format_json = flags.LOG_FORMAT == "json"
# Flags is not set in the integration test suite but is by the environment.
# DBT_LOG_FORMAT is used in our test suite so checking for it's presence is key.
this.format_json = (
flags.LOG_FORMAT == "json" or os.environ.get("DBT_LOG_FORMAT", "json") == "json"
)
# USE_COLORS can be None if the app just started and the cli flags
# havent been applied yet
this.format_color = True if flags.USE_COLORS else False

View File

@@ -1,63 +0,0 @@
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
})
}}
select 3 as id

View File

@@ -1,91 +0,0 @@
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"pre-hook": "\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
})
}}
select 3 as id

View File

@@ -1,63 +0,0 @@
{{
config(
pre_hook="\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
post_hook="\
insert into {{this.schema}}.on_model_hook (\
\"state\",\
\"target.dbname\",\
\"target.host\",\
\"target.name\",\
\"target.schema\",\
\"target.type\",\
\"target.user\",\
\"target.pass\",\
\"target.port\",\
\"target.threads\",\
\"run_started_at\",\
\"invocation_id\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\"pass\", \"\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
)
}}
select 3 as id

View File

@@ -1,32 +0,0 @@
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
insert into {{ target.schema }}.on_run_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
{% endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro hook() %}
select 1
{% endmacro %}

View File

@@ -1,2 +0,0 @@
select 1 as id

View File

@@ -1,21 +0,0 @@
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook select
state,
'{{ target.dbname }}' as \"target.dbname\",\
'{{ target.host }}' as \"target.host\",\
'{{ target.name }}' as \"target.name\",\
'{{ target.schema }}' as \"target.schema\",\
'{{ target.type }}' as \"target.type\",\
'{{ target.user }}' as \"target.user\",\
'{{ target.get(\"pass\", \"\") }}' as \"target.pass\",\
{{ target.port }} as \"target.port\",\
{{ target.threads }} as \"target.threads\",\
'{{ run_started_at }}' as \"run_started_at\",\
'{{ invocation_id }}' as \"invocation_id\"\
from {{ ref('pre') }}\
"
})
}}
select 1 as id

View File

@@ -1 +0,0 @@
select 'end' as state

View File

@@ -1 +0,0 @@
select 'start' as state

View File

@@ -1,7 +0,0 @@
version: 2
seeds:
- name: example_seed
columns:
- name: new_col
tests:
- not_null

View File

@@ -1,4 +0,0 @@
a,b,c
1,2,3
4,5,6
7,8,9
1 a b c
2 1 2 3
3 4 5 6
4 7 8 9

View File

@@ -1,14 +0,0 @@
{% snapshot example_snapshot %}
{{
config(
target_schema=schema,
unique_key='a',
strategy='check',
check_cols='all',
post_hook='alter table {{ this }} add column new_col int')
}}
{{
config(post_hook='update {{ this }} set new_col = 1')
}}
select * from {{ ref('example_seed') }}
{% endsnapshot %}

View File

@@ -1,7 +0,0 @@
version: 2
snapshots:
- name: example_snapshot
columns:
- name: new_col
tests:
- not_null

View File

@@ -1,6 +0,0 @@
{% snapshot example_snapshot %}
{{
config(target_schema=schema, unique_key='a', strategy='check', check_cols='all')
}}
select * from {{ ref('example_seed') }}
{% endsnapshot %}

View File

@@ -1,385 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
from dbt.exceptions import CompilationException
MODEL_PRE_HOOK = """
insert into {{this.schema}}.on_model_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'start',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""
MODEL_POST_HOOK = """
insert into {{this.schema}}.on_model_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'end',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""
class BaseTestPrePost(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.run_sql_file("seed_model.sql")
self.fields = [
'state',
'target.dbname',
'target.host',
'target.name',
'target.port',
'target.schema',
'target.threads',
'target.type',
'target.user',
'target.pass',
'run_started_at',
'invocation_id'
]
@property
def schema(self):
return "model_hooks_014"
def get_ctx_vars(self, state, count):
field_list = ", ".join(['"{}"'.format(f) for f in self.fields])
query = "select {field_list} from {schema}.on_model_hook where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)
vals = self.run_sql(query, fetch='all')
self.assertFalse(len(vals) == 0, 'nothing inserted into hooks table')
self.assertFalse(len(vals) < count, 'too few rows in hooks table')
self.assertFalse(len(vals) > count, 'too many rows in hooks table')
return [{k: v for k, v in zip(self.fields, val)} for val in vals]
def check_hooks(self, state, count=1):
ctxs = self.get_ctx_vars(state, count=count)
for ctx in ctxs:
self.assertEqual(ctx['state'], state)
self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], self.database_host)
self.assertEqual(ctx['target.name'], 'default2')
self.assertEqual(ctx['target.port'], 5432)
self.assertEqual(ctx['target.schema'], self.unique_schema())
self.assertEqual(ctx['target.threads'], 4)
self.assertEqual(ctx['target.type'], 'postgres')
self.assertEqual(ctx['target.user'], 'root')
self.assertEqual(ctx['target.pass'], '')
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')
class TestPrePostModelHooks(BaseTestPrePost):
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
'models': {
'test': {
'pre-hook': [
# inside transaction (runs second)
MODEL_PRE_HOOK,
# outside transaction (runs first)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
],
'post-hook':[
# outside transaction (runs second)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
# inside transaction (runs first)
MODEL_POST_HOOK,
],
}
}
}
@property
def models(self):
return "models"
@use_profile('postgres')
def test_postgres_pre_and_post_model_hooks(self):
self.run_dbt(['run'])
self.check_hooks('start')
self.check_hooks('end')
class TestHookRefs(BaseTestPrePost):
@property
def project_config(self):
return {
'config-version': 2,
'models': {
'test': {
'hooked': {
'post-hook': ['''
insert into {{this.schema}}.on_model_hook select
state,
'{{ target.dbname }}' as "target.dbname",
'{{ target.host }}' as "target.host",
'{{ target.name }}' as "target.name",
'{{ target.schema }}' as "target.schema",
'{{ target.type }}' as "target.type",
'{{ target.user }}' as "target.user",
'{{ target.get("pass", "") }}' as "target.pass",
{{ target.port }} as "target.port",
{{ target.threads }} as "target.threads",
'{{ run_started_at }}' as "run_started_at",
'{{ invocation_id }}' as "invocation_id"
from {{ ref('post') }}'''.strip()],
}
},
}
}
@property
def models(self):
return 'ref-hook-models'
@use_profile('postgres')
def test_postgres_pre_post_model_hooks_refed(self):
self.run_dbt(['run'])
self.check_hooks('start', count=1)
self.check_hooks('end', count=1)
class TestPrePostModelHooksOnSeeds(DBTIntegrationTest):
@property
def schema(self):
return "model_hooks_014"
@property
def models(self):
return "seed-models"
@property
def project_config(self):
return {
'config-version': 2,
'seed-paths': ['seeds'],
'models': {},
'seeds': {
'post-hook': [
'alter table {{ this }} add column new_col int',
'update {{ this }} set new_col = 1'
],
'quote_columns': False,
},
}
@use_profile('postgres')
def test_postgres_hooks_on_seeds(self):
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
class TestPrePostModelHooksOnSeedsPlusPrefixed(TestPrePostModelHooksOnSeeds):
@property
def project_config(self):
return {
'config-version': 2,
'seed-paths': ['seeds'],
'models': {},
'seeds': {
'+post-hook': [
'alter table {{ this }} add column new_col int',
'update {{ this }} set new_col = 1'
],
'quote_columns': False,
},
}
class TestPrePostModelHooksOnSeedsPlusPrefixedWhitespace(TestPrePostModelHooksOnSeeds):
@property
def project_config(self):
return {
'config-version': 2,
'seed-paths': ['seeds'],
'models': {},
'seeds': {
'+ post-hook': [
'alter table {{ this }} add column new_col int',
'update {{ this }} set new_col = 1'
],
'quote_columns': False,
},
}
class TestPrePostModelHooksOnSnapshots(DBTIntegrationTest):
@property
def schema(self):
return "model_hooks_014"
@property
def models(self):
return "test-snapshot-models"
@property
def project_config(self):
return {
'config-version': 2,
'seed-paths': ['seeds'],
'snapshot-paths': ['test-snapshots'],
'models': {},
'snapshots': {
'post-hook': [
'alter table {{ this }} add column new_col int',
'update {{ this }} set new_col = 1'
]
},
'seeds': {
'quote_columns': False,
},
}
@use_profile('postgres')
def test_postgres_hooks_on_snapshots(self):
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['snapshot'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
class TestPrePostModelHooksInConfig(BaseTestPrePost):
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
}
@property
def models(self):
return "configured-models"
@use_profile('postgres')
def test_postgres_pre_and_post_model_hooks_model(self):
self.run_dbt(['run'])
self.check_hooks('start')
self.check_hooks('end')
@use_profile('postgres')
def test_postgres_pre_and_post_model_hooks_model_and_project(self):
self.use_default_project({
'config-version': 2,
'models': {
'test': {
'pre-hook': [
# inside transaction (runs second)
MODEL_PRE_HOOK,
# outside transaction (runs first)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
],
'post-hook':[
# outside transaction (runs second)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
# inside transaction (runs first)
MODEL_POST_HOOK,
],
}
}
})
self.run_dbt(['run'])
self.check_hooks('start', count=2)
self.check_hooks('end', count=2)
class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):
@property
def models(self):
return "kwargs-models"
class TestPrePostSnapshotHooksInConfigKwargs(TestPrePostModelHooksOnSnapshots):
@property
def models(self):
return "test-snapshot-models"
@property
def project_config(self):
return {
'config-version': 2,
'seed-paths': ['seeds'],
'snapshot-paths': ['test-kwargs-snapshots'],
'models': {},
'seeds': {
'quote_columns': False,
},
}
class TestDuplicateHooksInConfigs(DBTIntegrationTest):
@property
def schema(self):
return "model_hooks_014"
@property
def models(self):
return "error-models"
@use_profile('postgres')
def test_postgres_run_duplicate_hook_defs(self):
with self.assertRaises(CompilationException) as exc:
self.run_dbt(['run'])
self.assertIn('pre_hook', str(exc.exception))
self.assertIn('pre-hook', str(exc.exception))

View File

@@ -1,126 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
class TestPrePostRunHooks(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.run_sql_file("seed_run.sql")
self.fields = [
'state',
'target.dbname',
'target.host',
'target.name',
'target.port',
'target.schema',
'target.threads',
'target.type',
'target.user',
'target.pass',
'run_started_at',
'invocation_id'
]
os.environ['TERM_TEST'] = 'TESTING'
@property
def schema(self):
return "run_hooks_014"
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
'seed-paths': ['seeds'],
# The create and drop table statements here validate that these hooks run
# in the same order that they are defined. Drop before create is an error.
# Also check that the table does not exist below.
"on-run-start": [
"{{ custom_run_hook('start', target, run_started_at, invocation_id) }}",
"create table {{ target.schema }}.start_hook_order_test ( id int )",
"drop table {{ target.schema }}.start_hook_order_test",
"{{ log(env_var('TERM_TEST'), info=True) }}",
],
"on-run-end": [
"{{ custom_run_hook('end', target, run_started_at, invocation_id) }}",
"create table {{ target.schema }}.end_hook_order_test ( id int )",
"drop table {{ target.schema }}.end_hook_order_test",
"create table {{ target.schema }}.schemas ( schema text )",
"insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
"create table {{ target.schema }}.db_schemas ( db text, schema text )",
"insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
],
'seeds': {
'quote_columns': False,
},
}
@property
def models(self):
return "models"
def get_ctx_vars(self, state):
field_list = ", ".join(['"{}"'.format(f) for f in self.fields])
query = "select {field_list} from {schema}.on_run_hook where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)
vals = self.run_sql(query, fetch='all')
self.assertFalse(len(vals) == 0, 'nothing inserted into on_run_hook table')
self.assertFalse(len(vals) > 1, 'too many rows in hooks table')
ctx = dict([(k,v) for (k,v) in zip(self.fields, vals[0])])
return ctx
def assert_used_schemas(self):
schemas_query = 'select * from {}.schemas'.format(self.unique_schema())
results = self.run_sql(schemas_query, fetch='all')
self.assertEqual(len(results), 1)
self.assertEqual(results[0][0], self.unique_schema())
db_schemas_query = 'select * from {}.db_schemas'.format(self.unique_schema())
results = self.run_sql(db_schemas_query, fetch='all')
self.assertEqual(len(results), 1)
self.assertEqual(results[0][0], self.default_database)
self.assertEqual(results[0][1], self.unique_schema())
def check_hooks(self, state):
ctx = self.get_ctx_vars(state)
self.assertEqual(ctx['state'], state)
self.assertEqual(ctx['target.dbname'], 'dbt')
self.assertEqual(ctx['target.host'], self.database_host)
self.assertEqual(ctx['target.name'], 'default2')
self.assertEqual(ctx['target.port'], 5432)
self.assertEqual(ctx['target.schema'], self.unique_schema())
self.assertEqual(ctx['target.threads'], 4)
self.assertEqual(ctx['target.type'], 'postgres')
self.assertEqual(ctx['target.user'], 'root')
self.assertEqual(ctx['target.pass'], '')
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')
@use_profile('postgres')
def test__postgres__pre_and_post_run_hooks(self):
self.run_dbt(['run'])
self.check_hooks('start')
self.check_hooks('end')
self.assertTableDoesNotExist("start_hook_order_test")
self.assertTableDoesNotExist("end_hook_order_test")
self.assert_used_schemas()
@use_profile('postgres')
def test__postgres__pre_and_post_seed_hooks(self):
self.run_dbt(['seed'])
self.check_hooks('start')
self.check_hooks('end')
self.assertTableDoesNotExist("start_hook_order_test")
self.assertTableDoesNotExist("end_hook_order_test")
self.assert_used_schemas()

View File

@@ -0,0 +1,333 @@
macros__before_and_after = """
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
insert into {{ target.schema }}.on_run_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
{% endmacro %}
"""
macros__hook = """
{% macro hook() %}
select 1
{% endmacro %}
"""
models__hooks = """
select 1 as id
"""
models__hooks_configured = """
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
})
}}
select 3 as id
"""
models__hooks_error = """
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"pre-hook": "\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
})
}}
select 3 as id
"""
models__hooks_kwargs = """
{{
config(
pre_hook="\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'start',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)",
post_hook="\
insert into {{this.schema}}.on_model_hook (\
\\"state\\",\
\\"target.dbname\\",\
\\"target.host\\",\
\\"target.name\\",\
\\"target.schema\\",\
\\"target.type\\",\
\\"target.user\\",\
\\"target.pass\\",\
\\"target.port\\",\
\\"target.threads\\",\
\\"run_started_at\\",\
\\"invocation_id\\"\
) VALUES (\
'end',\
'{{ target.dbname }}',\
'{{ target.host }}',\
'{{ target.name }}',\
'{{ target.schema }}',\
'{{ target.type }}',\
'{{ target.user }}',\
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.port }},\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
)"
)
}}
select 3 as id
"""
models__hooked = """
{{
config({
"pre_hook": "\
insert into {{this.schema}}.on_model_hook select
state,
'{{ target.dbname }}' as \\"target.dbname\\",\
'{{ target.host }}' as \\"target.host\\",\
'{{ target.name }}' as \\"target.name\\",\
'{{ target.schema }}' as \\"target.schema\\",\
'{{ target.type }}' as \\"target.type\\",\
'{{ target.user }}' as \\"target.user\\",\
'{{ target.get(\\"pass\\", \\"\\") }}' as \\"target.pass\\",\
{{ target.port }} as \\"target.port\\",\
{{ target.threads }} as \\"target.threads\\",\
'{{ run_started_at }}' as \\"run_started_at\\",\
'{{ invocation_id }}' as \\"invocation_id\\"\
from {{ ref('pre') }}\
"
})
}}
select 1 as id
"""
models__post = """
select 'end' as state
"""
models__pre = """
select 'start' as state
"""
snapshots__test_snapshot = """
{% snapshot example_snapshot %}
{{
config(target_schema=schema, unique_key='a', strategy='check', check_cols='all')
}}
select * from {{ ref('example_seed') }}
{% endsnapshot %}
"""
properties__seed_models = """
version: 2
seeds:
- name: example_seed
columns:
- name: new_col
tests:
- not_null
"""
properties__test_snapshot_models = """
version: 2
snapshots:
- name: example_snapshot
columns:
- name: new_col
tests:
- not_null
"""
seeds__example_seed_csv = """a,b,c
1,2,3
4,5,6
7,8,9
"""

View File

@@ -0,0 +1,405 @@
import pytest
from pathlib import Path
from dbt.exceptions import CompilationException
from dbt.tests.util import (
run_dbt,
write_file,
)
from tests.functional.hooks.fixtures import (
models__hooked,
models__hooks,
models__hooks_configured,
models__hooks_error,
models__hooks_kwargs,
models__post,
models__pre,
properties__seed_models,
properties__test_snapshot_models,
seeds__example_seed_csv,
snapshots__test_snapshot,
)
MODEL_PRE_HOOK = """
insert into {{this.schema}}.on_model_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'start',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""
MODEL_POST_HOOK = """
insert into {{this.schema}}.on_model_hook (
"state",
"target.dbname",
"target.host",
"target.name",
"target.schema",
"target.type",
"target.user",
"target.pass",
"target.port",
"target.threads",
"run_started_at",
"invocation_id"
) VALUES (
'end',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.get("pass", "") }}',
{{ target.port }},
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""
class BaseTestPrePost(object):
@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
project.run_sql_file(project.test_data_dir / Path("seed_model.sql"))
def get_ctx_vars(self, state, count, project):
fields = [
"state",
"target.dbname",
"target.host",
"target.name",
"target.port",
"target.schema",
"target.threads",
"target.type",
"target.user",
"target.pass",
"run_started_at",
"invocation_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = "select {field_list} from {schema}.on_model_hook where state = '{state}'".format(
field_list=field_list, schema=project.test_schema, state=state
)
vals = project.run_sql(query, fetch="all")
assert len(vals) != 0, "nothing inserted into hooks table"
assert len(vals) >= count, "too few rows in hooks table"
assert len(vals) <= count, "too many rows in hooks table"
return [{k: v for k, v in zip(fields, val)} for val in vals]
def check_hooks(self, state, project, host, count=1):
ctxs = self.get_ctx_vars(state, count=count, project=project)
for ctx in ctxs:
assert ctx["state"] == state
assert ctx["target.dbname"] == "dbt"
assert ctx["target.host"] == host
assert ctx["target.name"] == "default"
assert ctx["target.port"] == 5432
assert ctx["target.schema"] == project.test_schema
assert ctx["target.threads"] == 4
assert ctx["target.type"] == "postgres"
assert ctx["target.user"] == "root"
assert ctx["target.pass"] == ""
assert (
ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0
), "run_started_at was not set"
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
class TestPrePostModelHooks(BaseTestPrePost):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"test": {
"pre-hook": [
# inside transaction (runs second)
MODEL_PRE_HOOK,
# outside transaction (runs first)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
],
"post-hook": [
# outside transaction (runs second)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
# inside transaction (runs first)
MODEL_POST_HOOK,
],
}
}
}
@pytest.fixture(scope="class")
def models(self):
return {"hooks.sql": models__hooks}
def test_pre_and_post_run_hooks(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target["host"])
self.check_hooks("end", project, dbt_profile_target["host"])
class TestHookRefs(BaseTestPrePost):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"test": {
"hooked": {
"post-hook": [
"""
insert into {{this.schema}}.on_model_hook select
state,
'{{ target.dbname }}' as "target.dbname",
'{{ target.host }}' as "target.host",
'{{ target.name }}' as "target.name",
'{{ target.schema }}' as "target.schema",
'{{ target.type }}' as "target.type",
'{{ target.user }}' as "target.user",
'{{ target.get("pass", "") }}' as "target.pass",
{{ target.port }} as "target.port",
{{ target.threads }} as "target.threads",
'{{ run_started_at }}' as "run_started_at",
'{{ invocation_id }}' as "invocation_id"
from {{ ref('post') }}""".strip()
],
}
},
}
}
@pytest.fixture(scope="class")
def models(self):
return {"hooked.sql": models__hooked, "post.sql": models__post, "pre.sql": models__pre}
def test_pre_post_model_hooks_refed(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target["host"], count=1)
self.check_hooks("end", project, dbt_profile_target["host"], count=1)
class TestPrePostModelHooksOnSeeds(object):
@pytest.fixture(scope="class")
def seeds(self):
return {"example_seed.csv": seeds__example_seed_csv}
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__seed_models}
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seed-paths": ["seeds"],
"models": {},
"seeds": {
"post-hook": [
"alter table {{ this }} add column new_col int",
"update {{ this }} set new_col = 1",
],
"quote_columns": False,
},
}
def test_hooks_on_seeds(self, project):
res = run_dbt(["--single-threaded", "seed", "--log-cache-events"])
assert len(res) == 1, "Expected exactly one item"
res = run_dbt(["--single-threaded", "test", "--log-cache-events"])
assert len(res) == 1, "Expected exactly one item"
class TestPrePostModelHooksOnSeedsPlusPrefixed(TestPrePostModelHooksOnSeeds):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seed-paths": ["seeds"],
"models": {},
"seeds": {
"+post-hook": [
"alter table {{ this }} add column new_col int",
"update {{ this }} set new_col = 1",
],
"quote_columns": False,
},
}
class TestPrePostModelHooksOnSeedsPlusPrefixedWhitespace(TestPrePostModelHooksOnSeeds):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seed-paths": ["seeds"],
"models": {},
"seeds": {
"+post-hook": [
"alter table {{ this }} add column new_col int",
"update {{ this }} set new_col = 1",
],
"quote_columns": False,
},
}
class TestPrePostModelHooksOnSnapshots(object):
@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
path = Path(project.project_root) / "test-snapshots"
Path.mkdir(path)
write_file(snapshots__test_snapshot, path, "snapshot.sql")
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__test_snapshot_models}
@pytest.fixture(scope="class")
def seeds(self):
return {"example_seed.csv": seeds__example_seed_csv}
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seed-paths": ["seeds"],
"snapshot-paths": ["test-snapshots"],
"models": {},
"snapshots": {
"post-hook": [
"alter table {{ this }} add column new_col int",
"update {{ this }} set new_col = 1",
]
},
"seeds": {
"quote_columns": False,
},
}
def test_hooks_on_snapshots(self, project):
res = run_dbt(["seed"])
assert len(res) == 1, "Expected exactly one item"
res = run_dbt(["snapshot"])
assert len(res) == 1, "Expected exactly one item"
res = run_dbt(["test"])
assert len(res) == 1, "Expected exactly one item"
class PrePostModelHooksInConfigSetup(BaseTestPrePost):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"macro-paths": ["macros"],
}
@pytest.fixture(scope="class")
def models(self):
return {"hooks.sql": models__hooks_configured}
class TestPrePostModelHooksInConfig(PrePostModelHooksInConfigSetup):
def test_pre_and_post_model_hooks_model(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target["host"])
self.check_hooks("end", project, dbt_profile_target["host"])
class TestPrePostModelHooksInConfigWithCount(PrePostModelHooksInConfigSetup):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"test": {
"pre-hook": [
# inside transaction (runs second)
MODEL_PRE_HOOK,
# outside transaction (runs first)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
],
"post-hook": [
# outside transaction (runs second)
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
# inside transaction (runs first)
MODEL_POST_HOOK,
],
}
}
}
def test_pre_and_post_model_hooks_model_and_project(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target["host"], count=2)
self.check_hooks("end", project, dbt_profile_target["host"], count=2)
class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):
@pytest.fixture(scope="class")
def models(self):
return {"hooks.sql": models__hooks_kwargs}
class TestPrePostSnapshotHooksInConfigKwargs(TestPrePostModelHooksOnSnapshots):
@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
path = Path(project.project_root) / "test-kwargs-snapshots"
Path.mkdir(path)
write_file(snapshots__test_snapshot, path, "snapshot.sql")
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seed-paths": ["seeds"],
"snapshot-paths": ["test-kwargs-snapshots"],
"models": {},
"snapshots": {
"post-hook": [
"alter table {{ this }} add column new_col int",
"update {{ this }} set new_col = 1",
]
},
"seeds": {
"quote_columns": False,
},
}
class TestDuplicateHooksInConfigs(object):
@pytest.fixture(scope="class")
def models(self):
return {"hooks.sql": models__hooks_error}
def test_run_duplicate_hook_defs(self, project):
with pytest.raises(CompilationException) as exc:
run_dbt()
assert "pre_hook" in str(exc.value)
assert "pre-hook" in str(exc.value)

View File

@@ -0,0 +1,143 @@
import os
import pytest
from pathlib import Path
from tests.functional.hooks.fixtures import (
macros__hook,
macros__before_and_after,
models__hooks,
seeds__example_seed_csv,
)
from dbt.tests.util import (
check_table_does_not_exist,
run_dbt,
)
class TestPrePostRunHooks(object):
@pytest.fixture(scope="function")
def setUp(self, project):
project.run_sql_file(project.test_data_dir / Path("seed_run.sql"))
project.run_sql(f"drop table if exists { project.test_schema }.schemas")
project.run_sql(f"drop table if exists { project.test_schema }.db_schemas")
os.environ["TERM_TEST"] = "TESTING"
@pytest.fixture(scope="class")
def macros(self):
return {"hook.sql": macros__hook, "before-and-after.sql": macros__before_and_after}
@pytest.fixture(scope="class")
def models(self):
return {"hooks.sql": models__hooks}
@pytest.fixture(scope="class")
def seeds(self):
return {"example_seed.csv": seeds__example_seed_csv}
@pytest.fixture(scope="class")
def project_config_update(self):
return {
# The create and drop table statements here validate that these hooks run
# in the same order that they are defined. Drop before create is an error.
# Also check that the table does not exist below.
"on-run-start": [
"{{ custom_run_hook('start', target, run_started_at, invocation_id) }}",
"create table {{ target.schema }}.start_hook_order_test ( id int )",
"drop table {{ target.schema }}.start_hook_order_test",
"{{ log(env_var('TERM_TEST'), info=True) }}",
],
"on-run-end": [
"{{ custom_run_hook('end', target, run_started_at, invocation_id) }}",
"create table {{ target.schema }}.end_hook_order_test ( id int )",
"drop table {{ target.schema }}.end_hook_order_test",
"create table {{ target.schema }}.schemas ( schema text )",
"insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
"create table {{ target.schema }}.db_schemas ( db text, schema text )",
"insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
],
"seeds": {
"quote_columns": False,
},
}
def get_ctx_vars(self, state, project):
fields = [
"state",
"target.dbname",
"target.host",
"target.name",
"target.port",
"target.schema",
"target.threads",
"target.type",
"target.user",
"target.pass",
"run_started_at",
"invocation_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = "select {field_list} from {schema}.on_run_hook where state = '{state}'".format(
field_list=field_list, schema=project.test_schema, state=state
)
vals = project.run_sql(query, fetch="all")
assert len(vals) != 0, "nothing inserted into on_run_hook table"
assert len(vals) == 1, "too many rows in hooks table"
ctx = dict([(k, v) for (k, v) in zip(fields, vals[0])])
return ctx
def assert_used_schemas(self, project):
schemas_query = "select * from {}.schemas".format(project.test_schema)
results = project.run_sql(schemas_query, fetch="all")
assert len(results) == 1
assert results[0][0] == project.test_schema
db_schemas_query = "select * from {}.db_schemas".format(project.test_schema)
results = project.run_sql(db_schemas_query, fetch="all")
assert len(results) == 1
assert results[0][0] == project.database
assert results[0][1] == project.test_schema
def check_hooks(self, state, project, host):
ctx = self.get_ctx_vars(state, project)
assert ctx["state"] == state
assert ctx["target.dbname"] == "dbt"
assert ctx["target.host"] == host
assert ctx["target.name"] == "default"
assert ctx["target.port"] == 5432
assert ctx["target.schema"] == project.test_schema
assert ctx["target.threads"] == 4
assert ctx["target.type"] == "postgres"
assert ctx["target.user"] == "root"
assert ctx["target.pass"] == ""
assert (
ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0
), "run_started_at was not set"
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target):
run_dbt(["run"])
self.check_hooks("start", project, dbt_profile_target["host"])
self.check_hooks("end", project, dbt_profile_target["host"])
check_table_does_not_exist(project.adapter, "start_hook_order_test")
check_table_does_not_exist(project.adapter, "end_hook_order_test")
self.assert_used_schemas(project)
def test_pre_and_post_seed_hooks(self, setUp, project, dbt_profile_target):
run_dbt(["seed"])
self.check_hooks("start", project, dbt_profile_target["host"])
self.check_hooks("end", project, dbt_profile_target["host"])
check_table_does_not_exist(project.adapter, "start_hook_order_test")
check_table_does_not_exist(project.adapter, "end_hook_order_test")
self.assert_used_schemas(project)

View File

@@ -1,5 +1,8 @@
import os
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import time
import pytz
import pytest
from dbt.tests.util import run_dbt, check_relations_equal
@@ -14,6 +17,14 @@ from tests.functional.simple_snapshot.fixtures import (
# These tests uses the same seed data, containing 20 records of which we hard delete the last 10.
# These deleted records set the dbt_valid_to to time the snapshot was ran.
# using replace on a timestamp won't account for hour differences unless given the local timezone.
# we can force python as utc but not postgres fields which need to be handled as local timestamps.
def currenttz():
if time.daylight:
return timezone(timedelta(seconds=-time.altzone), time.tzname[1])
else:
return timezone(timedelta(seconds=-time.timezone), time.tzname[0])
def datetime_snapshot():
NUM_SNAPSHOT_MODELS = 1
@@ -79,7 +90,7 @@ def test_snapshot_hard_delete(project):
for result in snapshotted[10:]:
# result is a tuple, the dbt_valid_to column is the latest
assert isinstance(result[-1], datetime)
assert result[-1].replace(tzinfo=pytz.UTC) >= invalidated_snapshot_datetime
assert result[-1].replace(tzinfo=currenttz()) >= invalidated_snapshot_datetime
# revive records
# Timestamp must have microseconds for tests below to be meaningful
@@ -118,7 +129,7 @@ def test_snapshot_hard_delete(project):
for result in invalidated_records:
# result is a tuple, the dbt_valid_to column is the latest
assert isinstance(result[1], datetime)
assert result[1].replace(tzinfo=pytz.UTC) >= invalidated_snapshot_datetime
assert result[1].replace(tzinfo=currenttz()) >= invalidated_snapshot_datetime
# records which were revived (id = 10, 11)
# dbt_valid_to is null