forked from repo-mirrors/dbt-core
Compare commits
15 Commits
jerco/pyth
...
CT-866/mig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a5f28b72b | ||
|
|
45a350c349 | ||
|
|
84a252149e | ||
|
|
bf2eeb4dc6 | ||
|
|
2636517dc1 | ||
|
|
f41c163bdd | ||
|
|
af44b65174 | ||
|
|
36fd15a714 | ||
|
|
3cad9cdf6f | ||
|
|
1a603b0d70 | ||
|
|
4996fc4278 | ||
|
|
3766f1de6e | ||
|
|
2660559d86 | ||
|
|
19b97899b4 | ||
|
|
51515e6236 |
@@ -57,7 +57,11 @@ def setup_event_logger(log_path, level_override=None):
|
||||
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
|
||||
|
||||
make_log_dir_if_missing(log_path)
|
||||
this.format_json = flags.LOG_FORMAT == "json"
|
||||
# Flags is not set in the integration test suite but is by the environment.
|
||||
# DBT_LOG_FORMAT is used in our test suite so checking for it's presence is key.
|
||||
this.format_json = (
|
||||
flags.LOG_FORMAT == "json" or os.environ.get("DBT_LOG_FORMAT", "json") == "json"
|
||||
)
|
||||
# USE_COLORS can be None if the app just started and the cli flags
|
||||
# havent been applied yet
|
||||
this.format_color = True if flags.USE_COLORS else False
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"post-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
})
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
@@ -1,91 +0,0 @@
|
||||
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"pre-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"post-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
})
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
@@ -1,63 +0,0 @@
|
||||
|
||||
{{
|
||||
config(
|
||||
pre_hook="\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
post_hook="\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\"state\",\
|
||||
\"target.dbname\",\
|
||||
\"target.host\",\
|
||||
\"target.name\",\
|
||||
\"target.schema\",\
|
||||
\"target.type\",\
|
||||
\"target.user\",\
|
||||
\"target.pass\",\
|
||||
\"target.port\",\
|
||||
\"target.threads\",\
|
||||
\"run_started_at\",\
|
||||
\"invocation_id\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\"pass\", \"\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
)
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
@@ -1,32 +0,0 @@
|
||||
|
||||
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
|
||||
|
||||
insert into {{ target.schema }}.on_run_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'{{ state }}',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
|
||||
{% endmacro %}
|
||||
@@ -1,3 +0,0 @@
|
||||
{% macro hook() %}
|
||||
select 1
|
||||
{% endmacro %}
|
||||
@@ -1,2 +0,0 @@
|
||||
|
||||
select 1 as id
|
||||
@@ -1,21 +0,0 @@
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook select
|
||||
state,
|
||||
'{{ target.dbname }}' as \"target.dbname\",\
|
||||
'{{ target.host }}' as \"target.host\",\
|
||||
'{{ target.name }}' as \"target.name\",\
|
||||
'{{ target.schema }}' as \"target.schema\",\
|
||||
'{{ target.type }}' as \"target.type\",\
|
||||
'{{ target.user }}' as \"target.user\",\
|
||||
'{{ target.get(\"pass\", \"\") }}' as \"target.pass\",\
|
||||
{{ target.port }} as \"target.port\",\
|
||||
{{ target.threads }} as \"target.threads\",\
|
||||
'{{ run_started_at }}' as \"run_started_at\",\
|
||||
'{{ invocation_id }}' as \"invocation_id\"\
|
||||
from {{ ref('pre') }}\
|
||||
"
|
||||
})
|
||||
}}
|
||||
select 1 as id
|
||||
@@ -1 +0,0 @@
|
||||
select 'end' as state
|
||||
@@ -1 +0,0 @@
|
||||
select 'start' as state
|
||||
@@ -1,7 +0,0 @@
|
||||
version: 2
|
||||
seeds:
|
||||
- name: example_seed
|
||||
columns:
|
||||
- name: new_col
|
||||
tests:
|
||||
- not_null
|
||||
@@ -1,4 +0,0 @@
|
||||
a,b,c
|
||||
1,2,3
|
||||
4,5,6
|
||||
7,8,9
|
||||
|
@@ -1,14 +0,0 @@
|
||||
{% snapshot example_snapshot %}
|
||||
{{
|
||||
config(
|
||||
target_schema=schema,
|
||||
unique_key='a',
|
||||
strategy='check',
|
||||
check_cols='all',
|
||||
post_hook='alter table {{ this }} add column new_col int')
|
||||
}}
|
||||
{{
|
||||
config(post_hook='update {{ this }} set new_col = 1')
|
||||
}}
|
||||
select * from {{ ref('example_seed') }}
|
||||
{% endsnapshot %}
|
||||
@@ -1,7 +0,0 @@
|
||||
version: 2
|
||||
snapshots:
|
||||
- name: example_snapshot
|
||||
columns:
|
||||
- name: new_col
|
||||
tests:
|
||||
- not_null
|
||||
@@ -1,6 +0,0 @@
|
||||
{% snapshot example_snapshot %}
|
||||
{{
|
||||
config(target_schema=schema, unique_key='a', strategy='check', check_cols='all')
|
||||
}}
|
||||
select * from {{ ref('example_seed') }}
|
||||
{% endsnapshot %}
|
||||
@@ -1,385 +0,0 @@
|
||||
from test.integration.base import DBTIntegrationTest, use_profile
|
||||
from dbt.exceptions import CompilationException
|
||||
|
||||
|
||||
MODEL_PRE_HOOK = """
|
||||
insert into {{this.schema}}.on_model_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'start',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
"""
|
||||
|
||||
MODEL_POST_HOOK = """
|
||||
insert into {{this.schema}}.on_model_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'end',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
class BaseTestPrePost(DBTIntegrationTest):
|
||||
def setUp(self):
|
||||
DBTIntegrationTest.setUp(self)
|
||||
|
||||
self.run_sql_file("seed_model.sql")
|
||||
|
||||
self.fields = [
|
||||
'state',
|
||||
'target.dbname',
|
||||
'target.host',
|
||||
'target.name',
|
||||
'target.port',
|
||||
'target.schema',
|
||||
'target.threads',
|
||||
'target.type',
|
||||
'target.user',
|
||||
'target.pass',
|
||||
'run_started_at',
|
||||
'invocation_id'
|
||||
]
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return "model_hooks_014"
|
||||
|
||||
def get_ctx_vars(self, state, count):
|
||||
field_list = ", ".join(['"{}"'.format(f) for f in self.fields])
|
||||
query = "select {field_list} from {schema}.on_model_hook where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)
|
||||
|
||||
vals = self.run_sql(query, fetch='all')
|
||||
self.assertFalse(len(vals) == 0, 'nothing inserted into hooks table')
|
||||
self.assertFalse(len(vals) < count, 'too few rows in hooks table')
|
||||
self.assertFalse(len(vals) > count, 'too many rows in hooks table')
|
||||
return [{k: v for k, v in zip(self.fields, val)} for val in vals]
|
||||
|
||||
def check_hooks(self, state, count=1):
|
||||
ctxs = self.get_ctx_vars(state, count=count)
|
||||
for ctx in ctxs:
|
||||
self.assertEqual(ctx['state'], state)
|
||||
self.assertEqual(ctx['target.dbname'], 'dbt')
|
||||
self.assertEqual(ctx['target.host'], self.database_host)
|
||||
self.assertEqual(ctx['target.name'], 'default2')
|
||||
self.assertEqual(ctx['target.port'], 5432)
|
||||
self.assertEqual(ctx['target.schema'], self.unique_schema())
|
||||
self.assertEqual(ctx['target.threads'], 4)
|
||||
self.assertEqual(ctx['target.type'], 'postgres')
|
||||
self.assertEqual(ctx['target.user'], 'root')
|
||||
self.assertEqual(ctx['target.pass'], '')
|
||||
|
||||
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
|
||||
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')
|
||||
|
||||
|
||||
class TestPrePostModelHooks(BaseTestPrePost):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': ['macros'],
|
||||
'models': {
|
||||
'test': {
|
||||
'pre-hook': [
|
||||
# inside transaction (runs second)
|
||||
MODEL_PRE_HOOK,
|
||||
|
||||
# outside transaction (runs first)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
],
|
||||
'post-hook':[
|
||||
# outside transaction (runs second)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
|
||||
# inside transaction (runs first)
|
||||
MODEL_POST_HOOK,
|
||||
],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_pre_and_post_model_hooks(self):
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.check_hooks('start')
|
||||
self.check_hooks('end')
|
||||
|
||||
|
||||
class TestHookRefs(BaseTestPrePost):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'models': {
|
||||
'test': {
|
||||
'hooked': {
|
||||
'post-hook': ['''
|
||||
insert into {{this.schema}}.on_model_hook select
|
||||
state,
|
||||
'{{ target.dbname }}' as "target.dbname",
|
||||
'{{ target.host }}' as "target.host",
|
||||
'{{ target.name }}' as "target.name",
|
||||
'{{ target.schema }}' as "target.schema",
|
||||
'{{ target.type }}' as "target.type",
|
||||
'{{ target.user }}' as "target.user",
|
||||
'{{ target.get("pass", "") }}' as "target.pass",
|
||||
{{ target.port }} as "target.port",
|
||||
{{ target.threads }} as "target.threads",
|
||||
'{{ run_started_at }}' as "run_started_at",
|
||||
'{{ invocation_id }}' as "invocation_id"
|
||||
from {{ ref('post') }}'''.strip()],
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return 'ref-hook-models'
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_pre_post_model_hooks_refed(self):
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.check_hooks('start', count=1)
|
||||
self.check_hooks('end', count=1)
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeeds(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "model_hooks_014"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "seed-models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seed-paths': ['seeds'],
|
||||
'models': {},
|
||||
'seeds': {
|
||||
'post-hook': [
|
||||
'alter table {{ this }} add column new_col int',
|
||||
'update {{ this }} set new_col = 1'
|
||||
],
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_hooks_on_seeds(self):
|
||||
res = self.run_dbt(['seed'])
|
||||
self.assertEqual(len(res), 1, 'Expected exactly one item')
|
||||
res = self.run_dbt(['test'])
|
||||
self.assertEqual(len(res), 1, 'Expected exactly one item')
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeedsPlusPrefixed(TestPrePostModelHooksOnSeeds):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seed-paths': ['seeds'],
|
||||
'models': {},
|
||||
'seeds': {
|
||||
'+post-hook': [
|
||||
'alter table {{ this }} add column new_col int',
|
||||
'update {{ this }} set new_col = 1'
|
||||
],
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeedsPlusPrefixedWhitespace(TestPrePostModelHooksOnSeeds):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seed-paths': ['seeds'],
|
||||
'models': {},
|
||||
'seeds': {
|
||||
'+ post-hook': [
|
||||
'alter table {{ this }} add column new_col int',
|
||||
'update {{ this }} set new_col = 1'
|
||||
],
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSnapshots(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "model_hooks_014"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "test-snapshot-models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seed-paths': ['seeds'],
|
||||
'snapshot-paths': ['test-snapshots'],
|
||||
'models': {},
|
||||
'snapshots': {
|
||||
'post-hook': [
|
||||
'alter table {{ this }} add column new_col int',
|
||||
'update {{ this }} set new_col = 1'
|
||||
]
|
||||
},
|
||||
'seeds': {
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_hooks_on_snapshots(self):
|
||||
res = self.run_dbt(['seed'])
|
||||
self.assertEqual(len(res), 1, 'Expected exactly one item')
|
||||
res = self.run_dbt(['snapshot'])
|
||||
self.assertEqual(len(res), 1, 'Expected exactly one item')
|
||||
res = self.run_dbt(['test'])
|
||||
self.assertEqual(len(res), 1, 'Expected exactly one item')
|
||||
|
||||
|
||||
class TestPrePostModelHooksInConfig(BaseTestPrePost):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': ['macros'],
|
||||
}
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "configured-models"
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_pre_and_post_model_hooks_model(self):
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.check_hooks('start')
|
||||
self.check_hooks('end')
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_pre_and_post_model_hooks_model_and_project(self):
|
||||
self.use_default_project({
|
||||
'config-version': 2,
|
||||
'models': {
|
||||
'test': {
|
||||
'pre-hook': [
|
||||
# inside transaction (runs second)
|
||||
MODEL_PRE_HOOK,
|
||||
# outside transaction (runs first)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
],
|
||||
'post-hook':[
|
||||
# outside transaction (runs second)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
# inside transaction (runs first)
|
||||
MODEL_POST_HOOK,
|
||||
],
|
||||
}
|
||||
}
|
||||
})
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.check_hooks('start', count=2)
|
||||
self.check_hooks('end', count=2)
|
||||
|
||||
|
||||
class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):
|
||||
@property
|
||||
def models(self):
|
||||
return "kwargs-models"
|
||||
|
||||
|
||||
class TestPrePostSnapshotHooksInConfigKwargs(TestPrePostModelHooksOnSnapshots):
|
||||
@property
|
||||
def models(self):
|
||||
return "test-snapshot-models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seed-paths': ['seeds'],
|
||||
'snapshot-paths': ['test-kwargs-snapshots'],
|
||||
'models': {},
|
||||
'seeds': {
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestDuplicateHooksInConfigs(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "model_hooks_014"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "error-models"
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_run_duplicate_hook_defs(self):
|
||||
with self.assertRaises(CompilationException) as exc:
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.assertIn('pre_hook', str(exc.exception))
|
||||
self.assertIn('pre-hook', str(exc.exception))
|
||||
@@ -1,126 +0,0 @@
|
||||
from test.integration.base import DBTIntegrationTest, use_profile
|
||||
import os
|
||||
|
||||
|
||||
class TestPrePostRunHooks(DBTIntegrationTest):
|
||||
|
||||
def setUp(self):
|
||||
DBTIntegrationTest.setUp(self)
|
||||
|
||||
self.run_sql_file("seed_run.sql")
|
||||
|
||||
self.fields = [
|
||||
'state',
|
||||
'target.dbname',
|
||||
'target.host',
|
||||
'target.name',
|
||||
'target.port',
|
||||
'target.schema',
|
||||
'target.threads',
|
||||
'target.type',
|
||||
'target.user',
|
||||
'target.pass',
|
||||
'run_started_at',
|
||||
'invocation_id'
|
||||
]
|
||||
os.environ['TERM_TEST'] = 'TESTING'
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return "run_hooks_014"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': ['macros'],
|
||||
'seed-paths': ['seeds'],
|
||||
|
||||
# The create and drop table statements here validate that these hooks run
|
||||
# in the same order that they are defined. Drop before create is an error.
|
||||
# Also check that the table does not exist below.
|
||||
"on-run-start": [
|
||||
"{{ custom_run_hook('start', target, run_started_at, invocation_id) }}",
|
||||
"create table {{ target.schema }}.start_hook_order_test ( id int )",
|
||||
"drop table {{ target.schema }}.start_hook_order_test",
|
||||
"{{ log(env_var('TERM_TEST'), info=True) }}",
|
||||
],
|
||||
"on-run-end": [
|
||||
"{{ custom_run_hook('end', target, run_started_at, invocation_id) }}",
|
||||
"create table {{ target.schema }}.end_hook_order_test ( id int )",
|
||||
"drop table {{ target.schema }}.end_hook_order_test",
|
||||
"create table {{ target.schema }}.schemas ( schema text )",
|
||||
"insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
|
||||
"create table {{ target.schema }}.db_schemas ( db text, schema text )",
|
||||
"insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
|
||||
],
|
||||
'seeds': {
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
def get_ctx_vars(self, state):
|
||||
field_list = ", ".join(['"{}"'.format(f) for f in self.fields])
|
||||
query = "select {field_list} from {schema}.on_run_hook where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)
|
||||
|
||||
vals = self.run_sql(query, fetch='all')
|
||||
self.assertFalse(len(vals) == 0, 'nothing inserted into on_run_hook table')
|
||||
self.assertFalse(len(vals) > 1, 'too many rows in hooks table')
|
||||
ctx = dict([(k,v) for (k,v) in zip(self.fields, vals[0])])
|
||||
|
||||
return ctx
|
||||
|
||||
def assert_used_schemas(self):
|
||||
schemas_query = 'select * from {}.schemas'.format(self.unique_schema())
|
||||
results = self.run_sql(schemas_query, fetch='all')
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0][0], self.unique_schema())
|
||||
|
||||
db_schemas_query = 'select * from {}.db_schemas'.format(self.unique_schema())
|
||||
results = self.run_sql(db_schemas_query, fetch='all')
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0][0], self.default_database)
|
||||
self.assertEqual(results[0][1], self.unique_schema())
|
||||
|
||||
def check_hooks(self, state):
|
||||
ctx = self.get_ctx_vars(state)
|
||||
|
||||
self.assertEqual(ctx['state'], state)
|
||||
self.assertEqual(ctx['target.dbname'], 'dbt')
|
||||
self.assertEqual(ctx['target.host'], self.database_host)
|
||||
self.assertEqual(ctx['target.name'], 'default2')
|
||||
self.assertEqual(ctx['target.port'], 5432)
|
||||
self.assertEqual(ctx['target.schema'], self.unique_schema())
|
||||
self.assertEqual(ctx['target.threads'], 4)
|
||||
self.assertEqual(ctx['target.type'], 'postgres')
|
||||
self.assertEqual(ctx['target.user'], 'root')
|
||||
self.assertEqual(ctx['target.pass'], '')
|
||||
|
||||
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
|
||||
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')
|
||||
|
||||
@use_profile('postgres')
|
||||
def test__postgres__pre_and_post_run_hooks(self):
|
||||
self.run_dbt(['run'])
|
||||
|
||||
self.check_hooks('start')
|
||||
self.check_hooks('end')
|
||||
|
||||
self.assertTableDoesNotExist("start_hook_order_test")
|
||||
self.assertTableDoesNotExist("end_hook_order_test")
|
||||
self.assert_used_schemas()
|
||||
|
||||
@use_profile('postgres')
|
||||
def test__postgres__pre_and_post_seed_hooks(self):
|
||||
self.run_dbt(['seed'])
|
||||
|
||||
self.check_hooks('start')
|
||||
self.check_hooks('end')
|
||||
|
||||
self.assertTableDoesNotExist("start_hook_order_test")
|
||||
self.assertTableDoesNotExist("end_hook_order_test")
|
||||
self.assert_used_schemas()
|
||||
333
tests/functional/hooks/fixtures.py
Normal file
333
tests/functional/hooks/fixtures.py
Normal file
@@ -0,0 +1,333 @@
|
||||
macros__before_and_after = """
|
||||
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
|
||||
|
||||
insert into {{ target.schema }}.on_run_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'{{ state }}',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
|
||||
{% endmacro %}
|
||||
"""
|
||||
|
||||
macros__hook = """
|
||||
{% macro hook() %}
|
||||
select 1
|
||||
{% endmacro %}
|
||||
"""
|
||||
|
||||
models__hooks = """
|
||||
select 1 as id
|
||||
"""
|
||||
|
||||
models__hooks_configured = """
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"post-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
})
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
"""
|
||||
|
||||
models__hooks_error = """
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"pre-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
"post-hook": "\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
})
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
"""
|
||||
|
||||
models__hooks_kwargs = """
|
||||
{{
|
||||
config(
|
||||
pre_hook="\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'start',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)",
|
||||
post_hook="\
|
||||
insert into {{this.schema}}.on_model_hook (\
|
||||
\\"state\\",\
|
||||
\\"target.dbname\\",\
|
||||
\\"target.host\\",\
|
||||
\\"target.name\\",\
|
||||
\\"target.schema\\",\
|
||||
\\"target.type\\",\
|
||||
\\"target.user\\",\
|
||||
\\"target.pass\\",\
|
||||
\\"target.port\\",\
|
||||
\\"target.threads\\",\
|
||||
\\"run_started_at\\",\
|
||||
\\"invocation_id\\"\
|
||||
) VALUES (\
|
||||
'end',\
|
||||
'{{ target.dbname }}',\
|
||||
'{{ target.host }}',\
|
||||
'{{ target.name }}',\
|
||||
'{{ target.schema }}',\
|
||||
'{{ target.type }}',\
|
||||
'{{ target.user }}',\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}',\
|
||||
{{ target.port }},\
|
||||
{{ target.threads }},\
|
||||
'{{ run_started_at }}',\
|
||||
'{{ invocation_id }}'\
|
||||
)"
|
||||
)
|
||||
}}
|
||||
|
||||
select 3 as id
|
||||
"""
|
||||
|
||||
models__hooked = """
|
||||
{{
|
||||
config({
|
||||
"pre_hook": "\
|
||||
insert into {{this.schema}}.on_model_hook select
|
||||
state,
|
||||
'{{ target.dbname }}' as \\"target.dbname\\",\
|
||||
'{{ target.host }}' as \\"target.host\\",\
|
||||
'{{ target.name }}' as \\"target.name\\",\
|
||||
'{{ target.schema }}' as \\"target.schema\\",\
|
||||
'{{ target.type }}' as \\"target.type\\",\
|
||||
'{{ target.user }}' as \\"target.user\\",\
|
||||
'{{ target.get(\\"pass\\", \\"\\") }}' as \\"target.pass\\",\
|
||||
{{ target.port }} as \\"target.port\\",\
|
||||
{{ target.threads }} as \\"target.threads\\",\
|
||||
'{{ run_started_at }}' as \\"run_started_at\\",\
|
||||
'{{ invocation_id }}' as \\"invocation_id\\"\
|
||||
from {{ ref('pre') }}\
|
||||
"
|
||||
})
|
||||
}}
|
||||
select 1 as id
|
||||
"""
|
||||
|
||||
models__post = """
|
||||
select 'end' as state
|
||||
"""
|
||||
|
||||
models__pre = """
|
||||
select 'start' as state
|
||||
"""
|
||||
|
||||
snapshots__test_snapshot = """
|
||||
{% snapshot example_snapshot %}
|
||||
{{
|
||||
config(target_schema=schema, unique_key='a', strategy='check', check_cols='all')
|
||||
}}
|
||||
select * from {{ ref('example_seed') }}
|
||||
{% endsnapshot %}
|
||||
"""
|
||||
|
||||
properties__seed_models = """
|
||||
version: 2
|
||||
seeds:
|
||||
- name: example_seed
|
||||
columns:
|
||||
- name: new_col
|
||||
tests:
|
||||
- not_null
|
||||
"""
|
||||
|
||||
properties__test_snapshot_models = """
|
||||
version: 2
|
||||
snapshots:
|
||||
- name: example_snapshot
|
||||
columns:
|
||||
- name: new_col
|
||||
tests:
|
||||
- not_null
|
||||
"""
|
||||
|
||||
seeds__example_seed_csv = """a,b,c
|
||||
1,2,3
|
||||
4,5,6
|
||||
7,8,9
|
||||
"""
|
||||
405
tests/functional/hooks/test_model_hooks.py
Normal file
405
tests/functional/hooks/test_model_hooks.py
Normal file
@@ -0,0 +1,405 @@
|
||||
import pytest
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from dbt.exceptions import CompilationException
|
||||
|
||||
from dbt.tests.util import (
|
||||
run_dbt,
|
||||
write_file,
|
||||
)
|
||||
|
||||
from tests.functional.hooks.fixtures import (
|
||||
models__hooked,
|
||||
models__hooks,
|
||||
models__hooks_configured,
|
||||
models__hooks_error,
|
||||
models__hooks_kwargs,
|
||||
models__post,
|
||||
models__pre,
|
||||
properties__seed_models,
|
||||
properties__test_snapshot_models,
|
||||
seeds__example_seed_csv,
|
||||
snapshots__test_snapshot,
|
||||
)
|
||||
|
||||
MODEL_PRE_HOOK = """
|
||||
insert into {{this.schema}}.on_model_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'start',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
"""
|
||||
|
||||
MODEL_POST_HOOK = """
|
||||
insert into {{this.schema}}.on_model_hook (
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.schema",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"target.port",
|
||||
"target.threads",
|
||||
"run_started_at",
|
||||
"invocation_id"
|
||||
) VALUES (
|
||||
'end',
|
||||
'{{ target.dbname }}',
|
||||
'{{ target.host }}',
|
||||
'{{ target.name }}',
|
||||
'{{ target.schema }}',
|
||||
'{{ target.type }}',
|
||||
'{{ target.user }}',
|
||||
'{{ target.get("pass", "") }}',
|
||||
{{ target.port }},
|
||||
{{ target.threads }},
|
||||
'{{ run_started_at }}',
|
||||
'{{ invocation_id }}'
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
class BaseTestPrePost(object):
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def setUp(self, project):
|
||||
project.run_sql_file(project.test_data_dir / Path("seed_model.sql"))
|
||||
|
||||
def get_ctx_vars(self, state, count, project):
|
||||
fields = [
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.port",
|
||||
"target.schema",
|
||||
"target.threads",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"run_started_at",
|
||||
"invocation_id",
|
||||
]
|
||||
field_list = ", ".join(['"{}"'.format(f) for f in fields])
|
||||
query = "select {field_list} from {schema}.on_model_hook where state = '{state}'".format(
|
||||
field_list=field_list, schema=project.test_schema, state=state
|
||||
)
|
||||
|
||||
vals = project.run_sql(query, fetch="all")
|
||||
assert len(vals) != 0, "nothing inserted into hooks table"
|
||||
assert len(vals) >= count, "too few rows in hooks table"
|
||||
assert len(vals) <= count, "too many rows in hooks table"
|
||||
return [{k: v for k, v in zip(fields, val)} for val in vals]
|
||||
|
||||
def check_hooks(self, state, project, host, count=1):
|
||||
ctxs = self.get_ctx_vars(state, count=count, project=project)
|
||||
for ctx in ctxs:
|
||||
assert ctx["state"] == state
|
||||
assert ctx["target.dbname"] == "dbt"
|
||||
assert ctx["target.host"] == host
|
||||
assert ctx["target.name"] == "default"
|
||||
assert ctx["target.port"] == 5432
|
||||
assert ctx["target.schema"] == project.test_schema
|
||||
assert ctx["target.threads"] == 4
|
||||
assert ctx["target.type"] == "postgres"
|
||||
assert ctx["target.user"] == "root"
|
||||
assert ctx["target.pass"] == ""
|
||||
|
||||
assert (
|
||||
ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0
|
||||
), "run_started_at was not set"
|
||||
assert (
|
||||
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
|
||||
), "invocation_id was not set"
|
||||
|
||||
|
||||
class TestPrePostModelHooks(BaseTestPrePost):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"models": {
|
||||
"test": {
|
||||
"pre-hook": [
|
||||
# inside transaction (runs second)
|
||||
MODEL_PRE_HOOK,
|
||||
# outside transaction (runs first)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
],
|
||||
"post-hook": [
|
||||
# outside transaction (runs second)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
# inside transaction (runs first)
|
||||
MODEL_POST_HOOK,
|
||||
],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooks.sql": models__hooks}
|
||||
|
||||
def test_pre_and_post_run_hooks(self, project, dbt_profile_target):
|
||||
run_dbt()
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"])
|
||||
self.check_hooks("end", project, dbt_profile_target["host"])
|
||||
|
||||
|
||||
class TestHookRefs(BaseTestPrePost):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"models": {
|
||||
"test": {
|
||||
"hooked": {
|
||||
"post-hook": [
|
||||
"""
|
||||
insert into {{this.schema}}.on_model_hook select
|
||||
state,
|
||||
'{{ target.dbname }}' as "target.dbname",
|
||||
'{{ target.host }}' as "target.host",
|
||||
'{{ target.name }}' as "target.name",
|
||||
'{{ target.schema }}' as "target.schema",
|
||||
'{{ target.type }}' as "target.type",
|
||||
'{{ target.user }}' as "target.user",
|
||||
'{{ target.get("pass", "") }}' as "target.pass",
|
||||
{{ target.port }} as "target.port",
|
||||
{{ target.threads }} as "target.threads",
|
||||
'{{ run_started_at }}' as "run_started_at",
|
||||
'{{ invocation_id }}' as "invocation_id"
|
||||
from {{ ref('post') }}""".strip()
|
||||
],
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooked.sql": models__hooked, "post.sql": models__post, "pre.sql": models__pre}
|
||||
|
||||
def test_pre_post_model_hooks_refed(self, project, dbt_profile_target):
|
||||
run_dbt()
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"], count=1)
|
||||
self.check_hooks("end", project, dbt_profile_target["host"], count=1)
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeeds(object):
|
||||
@pytest.fixture(scope="class")
|
||||
def seeds(self):
|
||||
return {"example_seed.csv": seeds__example_seed_csv}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"schema.yml": properties__seed_models}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"seed-paths": ["seeds"],
|
||||
"models": {},
|
||||
"seeds": {
|
||||
"post-hook": [
|
||||
"alter table {{ this }} add column new_col int",
|
||||
"update {{ this }} set new_col = 1",
|
||||
],
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
def test_hooks_on_seeds(self, project):
|
||||
res = run_dbt(["--single-threaded", "seed", "--log-cache-events"])
|
||||
assert len(res) == 1, "Expected exactly one item"
|
||||
res = run_dbt(["--single-threaded", "test", "--log-cache-events"])
|
||||
assert len(res) == 1, "Expected exactly one item"
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeedsPlusPrefixed(TestPrePostModelHooksOnSeeds):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"seed-paths": ["seeds"],
|
||||
"models": {},
|
||||
"seeds": {
|
||||
"+post-hook": [
|
||||
"alter table {{ this }} add column new_col int",
|
||||
"update {{ this }} set new_col = 1",
|
||||
],
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSeedsPlusPrefixedWhitespace(TestPrePostModelHooksOnSeeds):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"seed-paths": ["seeds"],
|
||||
"models": {},
|
||||
"seeds": {
|
||||
"+post-hook": [
|
||||
"alter table {{ this }} add column new_col int",
|
||||
"update {{ this }} set new_col = 1",
|
||||
],
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestPrePostModelHooksOnSnapshots(object):
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def setUp(self, project):
|
||||
path = Path(project.project_root) / "test-snapshots"
|
||||
Path.mkdir(path)
|
||||
write_file(snapshots__test_snapshot, path, "snapshot.sql")
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"schema.yml": properties__test_snapshot_models}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def seeds(self):
|
||||
return {"example_seed.csv": seeds__example_seed_csv}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"seed-paths": ["seeds"],
|
||||
"snapshot-paths": ["test-snapshots"],
|
||||
"models": {},
|
||||
"snapshots": {
|
||||
"post-hook": [
|
||||
"alter table {{ this }} add column new_col int",
|
||||
"update {{ this }} set new_col = 1",
|
||||
]
|
||||
},
|
||||
"seeds": {
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
def test_hooks_on_snapshots(self, project):
|
||||
res = run_dbt(["seed"])
|
||||
assert len(res) == 1, "Expected exactly one item"
|
||||
res = run_dbt(["snapshot"])
|
||||
assert len(res) == 1, "Expected exactly one item"
|
||||
res = run_dbt(["test"])
|
||||
assert len(res) == 1, "Expected exactly one item"
|
||||
|
||||
|
||||
class PrePostModelHooksInConfigSetup(BaseTestPrePost):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"macro-paths": ["macros"],
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooks.sql": models__hooks_configured}
|
||||
|
||||
|
||||
class TestPrePostModelHooksInConfig(PrePostModelHooksInConfigSetup):
|
||||
def test_pre_and_post_model_hooks_model(self, project, dbt_profile_target):
|
||||
run_dbt()
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"])
|
||||
self.check_hooks("end", project, dbt_profile_target["host"])
|
||||
|
||||
|
||||
class TestPrePostModelHooksInConfigWithCount(PrePostModelHooksInConfigSetup):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"models": {
|
||||
"test": {
|
||||
"pre-hook": [
|
||||
# inside transaction (runs second)
|
||||
MODEL_PRE_HOOK,
|
||||
# outside transaction (runs first)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
],
|
||||
"post-hook": [
|
||||
# outside transaction (runs second)
|
||||
{"sql": "vacuum {{ this.schema }}.on_model_hook", "transaction": False},
|
||||
# inside transaction (runs first)
|
||||
MODEL_POST_HOOK,
|
||||
],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def test_pre_and_post_model_hooks_model_and_project(self, project, dbt_profile_target):
|
||||
run_dbt()
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"], count=2)
|
||||
self.check_hooks("end", project, dbt_profile_target["host"], count=2)
|
||||
|
||||
|
||||
class TestPrePostModelHooksInConfigKwargs(TestPrePostModelHooksInConfig):
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooks.sql": models__hooks_kwargs}
|
||||
|
||||
|
||||
class TestPrePostSnapshotHooksInConfigKwargs(TestPrePostModelHooksOnSnapshots):
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def setUp(self, project):
|
||||
path = Path(project.project_root) / "test-kwargs-snapshots"
|
||||
Path.mkdir(path)
|
||||
write_file(snapshots__test_snapshot, path, "snapshot.sql")
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"seed-paths": ["seeds"],
|
||||
"snapshot-paths": ["test-kwargs-snapshots"],
|
||||
"models": {},
|
||||
"snapshots": {
|
||||
"post-hook": [
|
||||
"alter table {{ this }} add column new_col int",
|
||||
"update {{ this }} set new_col = 1",
|
||||
]
|
||||
},
|
||||
"seeds": {
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestDuplicateHooksInConfigs(object):
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooks.sql": models__hooks_error}
|
||||
|
||||
def test_run_duplicate_hook_defs(self, project):
|
||||
with pytest.raises(CompilationException) as exc:
|
||||
run_dbt()
|
||||
assert "pre_hook" in str(exc.value)
|
||||
assert "pre-hook" in str(exc.value)
|
||||
143
tests/functional/hooks/test_run_hooks.py
Normal file
143
tests/functional/hooks/test_run_hooks.py
Normal file
@@ -0,0 +1,143 @@
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from tests.functional.hooks.fixtures import (
|
||||
macros__hook,
|
||||
macros__before_and_after,
|
||||
models__hooks,
|
||||
seeds__example_seed_csv,
|
||||
)
|
||||
|
||||
from dbt.tests.util import (
|
||||
check_table_does_not_exist,
|
||||
run_dbt,
|
||||
)
|
||||
|
||||
|
||||
class TestPrePostRunHooks(object):
|
||||
@pytest.fixture(scope="function")
|
||||
def setUp(self, project):
|
||||
project.run_sql_file(project.test_data_dir / Path("seed_run.sql"))
|
||||
project.run_sql(f"drop table if exists { project.test_schema }.schemas")
|
||||
project.run_sql(f"drop table if exists { project.test_schema }.db_schemas")
|
||||
os.environ["TERM_TEST"] = "TESTING"
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def macros(self):
|
||||
return {"hook.sql": macros__hook, "before-and-after.sql": macros__before_and_after}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"hooks.sql": models__hooks}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def seeds(self):
|
||||
return {"example_seed.csv": seeds__example_seed_csv}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
# The create and drop table statements here validate that these hooks run
|
||||
# in the same order that they are defined. Drop before create is an error.
|
||||
# Also check that the table does not exist below.
|
||||
"on-run-start": [
|
||||
"{{ custom_run_hook('start', target, run_started_at, invocation_id) }}",
|
||||
"create table {{ target.schema }}.start_hook_order_test ( id int )",
|
||||
"drop table {{ target.schema }}.start_hook_order_test",
|
||||
"{{ log(env_var('TERM_TEST'), info=True) }}",
|
||||
],
|
||||
"on-run-end": [
|
||||
"{{ custom_run_hook('end', target, run_started_at, invocation_id) }}",
|
||||
"create table {{ target.schema }}.end_hook_order_test ( id int )",
|
||||
"drop table {{ target.schema }}.end_hook_order_test",
|
||||
"create table {{ target.schema }}.schemas ( schema text )",
|
||||
"insert into {{ target.schema }}.schemas (schema) values {% for schema in schemas %}( '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
|
||||
"create table {{ target.schema }}.db_schemas ( db text, schema text )",
|
||||
"insert into {{ target.schema }}.db_schemas (db, schema) values {% for db, schema in database_schemas %}('{{ db }}', '{{ schema }}' ){% if not loop.last %},{% endif %}{% endfor %}",
|
||||
],
|
||||
"seeds": {
|
||||
"quote_columns": False,
|
||||
},
|
||||
}
|
||||
|
||||
def get_ctx_vars(self, state, project):
|
||||
fields = [
|
||||
"state",
|
||||
"target.dbname",
|
||||
"target.host",
|
||||
"target.name",
|
||||
"target.port",
|
||||
"target.schema",
|
||||
"target.threads",
|
||||
"target.type",
|
||||
"target.user",
|
||||
"target.pass",
|
||||
"run_started_at",
|
||||
"invocation_id",
|
||||
]
|
||||
field_list = ", ".join(['"{}"'.format(f) for f in fields])
|
||||
query = "select {field_list} from {schema}.on_run_hook where state = '{state}'".format(
|
||||
field_list=field_list, schema=project.test_schema, state=state
|
||||
)
|
||||
|
||||
vals = project.run_sql(query, fetch="all")
|
||||
assert len(vals) != 0, "nothing inserted into on_run_hook table"
|
||||
assert len(vals) == 1, "too many rows in hooks table"
|
||||
ctx = dict([(k, v) for (k, v) in zip(fields, vals[0])])
|
||||
|
||||
return ctx
|
||||
|
||||
def assert_used_schemas(self, project):
|
||||
schemas_query = "select * from {}.schemas".format(project.test_schema)
|
||||
results = project.run_sql(schemas_query, fetch="all")
|
||||
assert len(results) == 1
|
||||
assert results[0][0] == project.test_schema
|
||||
|
||||
db_schemas_query = "select * from {}.db_schemas".format(project.test_schema)
|
||||
results = project.run_sql(db_schemas_query, fetch="all")
|
||||
assert len(results) == 1
|
||||
assert results[0][0] == project.database
|
||||
assert results[0][1] == project.test_schema
|
||||
|
||||
def check_hooks(self, state, project, host):
|
||||
ctx = self.get_ctx_vars(state, project)
|
||||
|
||||
assert ctx["state"] == state
|
||||
assert ctx["target.dbname"] == "dbt"
|
||||
assert ctx["target.host"] == host
|
||||
assert ctx["target.name"] == "default"
|
||||
assert ctx["target.port"] == 5432
|
||||
assert ctx["target.schema"] == project.test_schema
|
||||
assert ctx["target.threads"] == 4
|
||||
assert ctx["target.type"] == "postgres"
|
||||
assert ctx["target.user"] == "root"
|
||||
assert ctx["target.pass"] == ""
|
||||
|
||||
assert (
|
||||
ctx["run_started_at"] is not None and len(ctx["run_started_at"]) > 0
|
||||
), "run_started_at was not set"
|
||||
assert (
|
||||
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
|
||||
), "invocation_id was not set"
|
||||
|
||||
def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target):
|
||||
run_dbt(["run"])
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"])
|
||||
self.check_hooks("end", project, dbt_profile_target["host"])
|
||||
|
||||
check_table_does_not_exist(project.adapter, "start_hook_order_test")
|
||||
check_table_does_not_exist(project.adapter, "end_hook_order_test")
|
||||
self.assert_used_schemas(project)
|
||||
|
||||
def test_pre_and_post_seed_hooks(self, setUp, project, dbt_profile_target):
|
||||
run_dbt(["seed"])
|
||||
|
||||
self.check_hooks("start", project, dbt_profile_target["host"])
|
||||
self.check_hooks("end", project, dbt_profile_target["host"])
|
||||
|
||||
check_table_does_not_exist(project.adapter, "start_hook_order_test")
|
||||
check_table_does_not_exist(project.adapter, "end_hook_order_test")
|
||||
self.assert_used_schemas(project)
|
||||
@@ -1,5 +1,8 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
import time
|
||||
import pytz
|
||||
import pytest
|
||||
from dbt.tests.util import run_dbt, check_relations_equal
|
||||
@@ -14,6 +17,14 @@ from tests.functional.simple_snapshot.fixtures import (
|
||||
# These tests uses the same seed data, containing 20 records of which we hard delete the last 10.
|
||||
# These deleted records set the dbt_valid_to to time the snapshot was ran.
|
||||
|
||||
# using replace on a timestamp won't account for hour differences unless given the local timezone.
|
||||
# we can force python as utc but not postgres fields which need to be handled as local timestamps.
|
||||
def currenttz():
|
||||
if time.daylight:
|
||||
return timezone(timedelta(seconds=-time.altzone), time.tzname[1])
|
||||
else:
|
||||
return timezone(timedelta(seconds=-time.timezone), time.tzname[0])
|
||||
|
||||
|
||||
def datetime_snapshot():
|
||||
NUM_SNAPSHOT_MODELS = 1
|
||||
@@ -79,7 +90,7 @@ def test_snapshot_hard_delete(project):
|
||||
for result in snapshotted[10:]:
|
||||
# result is a tuple, the dbt_valid_to column is the latest
|
||||
assert isinstance(result[-1], datetime)
|
||||
assert result[-1].replace(tzinfo=pytz.UTC) >= invalidated_snapshot_datetime
|
||||
assert result[-1].replace(tzinfo=currenttz()) >= invalidated_snapshot_datetime
|
||||
|
||||
# revive records
|
||||
# Timestamp must have microseconds for tests below to be meaningful
|
||||
@@ -118,7 +129,7 @@ def test_snapshot_hard_delete(project):
|
||||
for result in invalidated_records:
|
||||
# result is a tuple, the dbt_valid_to column is the latest
|
||||
assert isinstance(result[1], datetime)
|
||||
assert result[1].replace(tzinfo=pytz.UTC) >= invalidated_snapshot_datetime
|
||||
assert result[1].replace(tzinfo=currenttz()) >= invalidated_snapshot_datetime
|
||||
|
||||
# records which were revived (id = 10, 11)
|
||||
# dbt_valid_to is null
|
||||
|
||||
Reference in New Issue
Block a user