Compare commits

...

46 Commits

Author SHA1 Message Date
Sung Won Chung
5f75b9539f Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-19 09:40:28 -06:00
Sung Won Chung
ec2c2c8cf7 add fresh selector 2021-11-18 17:36:07 -06:00
Sung Won Chung
3aef719fe4 remove print 2021-11-18 13:21:48 -06:00
Sung Won Chung
ce9c00340f turn generator into set 2021-11-18 13:20:18 -06:00
Sung Won Chung
b8d288b6f0 cleaner syntax 2021-11-18 11:51:52 -06:00
Sung Won Chung
df09392d92 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 08:52:45 -06:00
Sung Won Chung
bd5a022e4e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 07:41:38 -06:00
Sung Won Chung
911cb53063 remove for now 2021-11-17 17:42:16 -06:00
Sung Won Chung
b45715cef7 don't run wasteful logs 2021-11-17 17:31:28 -06:00
Sung Won Chung
2e5fb45e5f clearer refresh language 2021-11-17 17:22:00 -06:00
Sung Won Chung
48bebe11b1 fix error conditions 2021-11-17 17:11:46 -06:00
Sung Won Chung
abfa16d687 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 16:46:45 -06:00
Sung Won Chung
39d145c7a8 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 14:52:44 -06:00
Sung Won Chung
8250b30862 add todo 2021-11-17 13:58:05 -06:00
Sung Won Chung
9309e4e906 remove print 2021-11-17 13:56:13 -06:00
Sung Won Chung
94a021806b add current state 2021-11-17 13:54:20 -06:00
Sung Won Chung
902cc75c69 correct import 2021-11-17 13:53:35 -06:00
Sung Won Chung
c18335f51d compare current and previous state 2021-11-17 13:53:05 -06:00
Sung Won Chung
191821779e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 10:13:12 -06:00
Sung Won Chung
e2ffd41cc3 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 08:44:29 -06:00
Sung Won Chung
6e9073a3c1 clarify that status needs to be correct 2021-11-16 16:17:51 -06:00
Sung Won Chung
e4bd6ee62d new selector flag name 2021-11-16 16:07:36 -06:00
Sung Won Chung
d2e3248379 new log format 2021-11-16 16:02:28 -06:00
Sung Won Chung
24bbf5b843 make compatible with rc and new logger 2021-11-16 12:30:02 -06:00
Sung Won Chung
a4ec84f37e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-16 11:40:51 -06:00
Sung Won Chung
61de5aea19 Revert "Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs""
This reverts commit 71125167a1.
2021-11-09 10:23:52 -06:00
Sung Won Chung
cad6bb0576 use a blank set instead 2021-11-09 10:03:39 -06:00
Sung Won Chung
9e3de01175 handle criterion that does not match nodes 2021-11-09 09:48:42 -06:00
Sung Won Chung
191bbae093 remove comments 2021-11-09 09:44:27 -06:00
Sung Won Chung
17fa096533 tidy up logs 2021-11-09 09:42:20 -06:00
Sung Won Chung
71125167a1 Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs"
This reverts commit 7fee4d44bf, reversing
changes made to 17c47ff42d.
2021-11-09 09:30:14 -06:00
Sung Won Chung
7fee4d44bf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-09 09:29:06 -06:00
Sung Won Chung
17c47ff42d cleaner logging 2021-11-09 09:28:53 -06:00
Sung Won Chung
f437c8e3eb add concurrent selectors note 2021-11-09 09:22:51 -06:00
Sung Won Chung
daa9ab1e73 include logs with meaningul info 2021-11-08 17:15:58 -06:00
Sung Won Chung
d4ff259d66 better if handling 2021-11-08 17:13:48 -06:00
Sung Won Chung
32de684121 opinionated fresh node selection 2021-11-08 16:10:43 -06:00
Sung Won Chung
3d043fe8fd remove prints and clean up logger 2021-11-08 15:59:37 -06:00
Sung Won Chung
5a5df384c0 add excluded source children nodes 2021-11-08 15:48:58 -06:00
Sung Won Chung
7924b320cf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-08 10:30:11 -06:00
Sung Won Chung
5005e0c4bf smarter depends on graph searching notes 2021-10-29 16:44:20 -05:00
Sung Won Chung
e5bdb93636 add todo 2021-10-29 15:18:50 -05:00
Sung Won Chung
3e8e91c4c2 copy test template 2021-10-29 13:49:07 -05:00
Sung Won Chung
036a9bf92f remove debug print logs 2021-10-29 12:38:53 -05:00
Sung Won Chung
98e98e5c3b working selector code 2021-10-29 12:38:27 -05:00
Sung Won Chung
72eb4302be first draft 2021-10-29 10:45:42 -05:00
4 changed files with 555 additions and 1 deletions

View File

@@ -1,6 +1,7 @@
from pathlib import Path
from .graph.manifest import WritableManifest
from .results import RunResultsArtifact
from .results import FreshnessExecutionResultArtifact
from typing import Optional
from dbt.exceptions import IncompatibleSchemaException
@@ -10,6 +11,7 @@ class PreviousState:
self.path: Path = path
self.manifest: Optional[WritableManifest] = None
self.results: Optional[RunResultsArtifact] = None
self.sources: Optional[FreshnessExecutionResultArtifact] = None
manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
@@ -26,3 +28,29 @@ class PreviousState:
except IncompatibleSchemaException as exc:
exc.add_filename(str(results_path))
raise
sources_path = self.path / 'sources.json'
if sources_path.exists() and sources_path.is_file():
try:
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(sources_path))
raise
# bring in the project class that needs to be instantiated
# define the target path at this step(how do I consider a different target path? based on dbt_project.yml)
# the problem I'm facing right now is that the project config is populated AFTER this step
# the reason this works with previous state is that we manually set the previous state path
# current state is more difficult because we have to read the project config first before this class is instantiated
class CurrentState:
def __init__(self, path: Path):
self.path: Path = path #TODO: fix this by importing target_path later
self.sources: Optional[FreshnessExecutionResultArtifact] = None
sources_path = self.path / 'sources.json'
if sources_path.exists() and sources_path.is_file():
try:
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(sources_path))
raise

View File

@@ -71,6 +71,29 @@ class NodeSelector(MethodManager):
method = self.get_method(spec.method, spec.method_arguments)
return set(method.search(included_nodes, spec.value))
def select_excluded_source_nodes(
self, included_nodes: Set[UniqueId], spec: SelectionCriteria,
) -> Set[UniqueId]:
"""Select the explicitly excluded source nodes, using the given spec. Return
the selected set of unique IDs.
"""
method = self.get_method(spec.method, spec.method_arguments)
if spec.value == 'pass':
source_status_values_to_exclude = {'warn','error'}
elif spec.value == 'warn':
source_status_values_to_exclude = {'error'}
elif spec.value == 'error':
source_status_values_to_exclude = {'pass','warn'}
else:
source_status_values_to_exclude = set()
excluded_source_nodes = set()
for source_status in source_status_values_to_exclude:
source_nodes = method.search(included_nodes, source_status)
excluded_source_nodes.update(set(source_nodes))
return excluded_source_nodes
def get_nodes_from_criteria(
self,
spec: SelectionCriteria
@@ -85,6 +108,9 @@ class NodeSelector(MethodManager):
nodes = self.graph.nodes()
try:
collected = self.select_included(nodes, spec)
if spec.method == 'source_refresh':
collected_excluded = self.select_excluded_source_nodes(nodes, spec)
except InvalidSelectorException:
fire_event(SelectorReportInvalidSelector(
selector_methods=self.SELECTOR_METHODS,
@@ -98,6 +124,23 @@ class NodeSelector(MethodManager):
selected=(collected | neighbors),
indirect_selection=spec.indirect_selection
)
if spec.method == 'source_refresh':
neighbors_excluded = self.collect_specified_neighbors(spec, collected_excluded)
direct_nodes_excluded, indirect_nodes_excluded = self.expand_selection(
selected=(collected_excluded | neighbors_excluded),
indirect_selection=spec.indirect_selection
)
direct_nodes = direct_nodes - direct_nodes_excluded
indirect_nodes = indirect_nodes - indirect_nodes_excluded
if direct_nodes_excluded:
warn_or_error(f"The '{spec.method}' selector specified in '{spec.raw}' will exclude the below nodes:")
warn_or_error(f"Direct Nodes: {direct_nodes_excluded}")
warn_or_error(f"Indirect Nodes: {indirect_nodes_excluded}")
warn_or_error(f"These source nodes: '{collected_excluded}' require 'status:{spec.value}' for the excluded nodes to run")
warn_or_error("Note: Concurrent selectors may include the excluded nodes(ex: source_refresh:warn+ source_refresh:pass+)")
warn_or_error("")
return direct_nodes, indirect_nodes
def collect_specified_neighbors(

View File

@@ -22,10 +22,11 @@ from dbt.contracts.graph.parsed import (
ParsedGenericTestNode,
ParsedSourceDefinition,
)
from dbt.contracts.state import PreviousState
from dbt.contracts.state import PreviousState, CurrentState
from dbt.exceptions import (
InternalException,
RuntimeException,
warn_or_error,
)
from dbt.node_types import NodeType
@@ -48,6 +49,7 @@ class MethodName(StrEnum):
Exposure = 'exposure'
Metric = 'metric'
Result = 'result'
SourceRefresh = 'source_refresh'
def is_selected_node(fqn: List[str], node_selector: str):
@@ -577,6 +579,50 @@ class ResultSelectorMethod(SelectorMethod):
if node in matches:
yield node
class SourceRefreshSelectorMethod(SelectorMethod): #TODO: this requires SelectorMethod to have current_state as an argument. currently, this works because it's all hard-coded
def search(
self, included_nodes: Set[UniqueId], selector: str
) -> Iterator[UniqueId]:
self.current_state = CurrentState(path=Path('target')) #TODO: fix this by importing target_path later
if self.previous_state is None or self.previous_state.sources is None:
raise InternalException(
'No previous state comparison freshness results in sources.json'
)
elif self.current_state is None or self.current_state.sources is None:
raise InternalException(
'No current state comparison freshness results in sources.json'
)
if selector=='fresh':
current_state_sources = {
result.unique_id:result.max_loaded_at for result in self.current_state.sources.results
}
else:
current_state_sources = {
result.unique_id:result.max_loaded_at for result in self.current_state.sources.results
if result.status == selector
}
previous_state_sources = {
result.unique_id:result.max_loaded_at for result in self.previous_state.sources.results
}
matches = set()
matches_not_fresh = set()
for unique_id in current_state_sources:
if unique_id not in previous_state_sources:
matches.add(unique_id)
elif current_state_sources.get(unique_id) > previous_state_sources.get(unique_id):
matches.add(unique_id)
else:
matches_not_fresh.add(unique_id)
if matches_not_fresh:
warn_or_error(f"{matches_not_fresh} sources will not refresh other nodes, max_loaded_at date must be greater than previous state")
for node, real_node in self.all_nodes(included_nodes):
if node in matches:
yield node
class MethodManager:
SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = {
@@ -593,6 +639,7 @@ class MethodManager:
MethodName.Exposure: ExposureSelectorMethod,
MethodName.Metric: MetricSelectorMethod,
MethodName.Result: ResultSelectorMethod,
MethodName.SourceRefresh: SourceRefreshSelectorMethod,
}
def __init__(

View File

@@ -0,0 +1,436 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
import random
import shutil
import string
import pytest
from dbt.exceptions import CompilationException
class TestRunResultsState(DBTIntegrationTest):
@property
def schema(self):
return "run_results_state_062"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
'seeds': {
'test': {
'quote_columns': True,
}
}
}
def _symlink_test_folders(self):
# dbt's normal symlink behavior breaks this test. Copy the files
# so we can freely modify them.
for entry in os.listdir(self.test_original_source_path):
src = os.path.join(self.test_original_source_path, entry)
tst = os.path.join(self.test_root_dir, entry)
if entry in {'models', 'seeds', 'macros'}:
shutil.copytree(src, tst)
elif os.path.isdir(entry) or entry.endswith('.sql'):
os.symlink(src, tst)
def copy_state(self):
assert not os.path.exists('state')
os.makedirs('state')
shutil.copyfile('target/manifest.json', 'state/manifest.json')
shutil.copyfile('target/run_results.json', 'state/run_results.json')
def setUp(self):
super().setUp()
self.run_dbt(['build'])
self.copy_state()
def rebuild_run_dbt(self, expect_pass=True):
shutil.rmtree('./state')
self.run_dbt(['build'], expect_pass=expect_pass)
self.copy_state()
@use_profile('postgres')
def test_postgres_seed_run_results_state(self):
shutil.rmtree('./state')
self.run_dbt(['seed'])
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
fp.write(f'\"\'\'3,carl{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
# assume each line is ~2 bytes + len(name)
target_size = 1*1024*1024
line_size = 64
num_lines = target_size // line_size
maxlines = num_lines + 4
for idx in range(4, maxlines):
value = ''.join(random.choices(string.ascii_letters, k=62))
fp.write(f'{idx},{value}{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
@use_profile('postgres')
def test_postgres_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 3
assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'}
results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 4
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 6 # includes exposure
assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
# test failure on build tests
# fail the unique test
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select 1 as id union all select 1 as id")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
f = open('models/schema.yml', 'r')
filedata = f.read()
f.close()
newdata = filedata.replace('error','warn')
f = open('models/schema.yml', 'w')
f.write(newdata)
f.close()
self.rebuild_run_dbt(expect_pass=True)
results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 2 # includes table_model to be run
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
@use_profile('postgres')
def test_postgres_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# clear state and rerun upstream view model to test + operator
shutil.rmtree('./state')
self.run_dbt(['run', '--select', 'view_model'], expect_pass=True)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# check we are starting from a place with 0 errors
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
# force an error in the view model to test error and skipped states
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# test single result selector on error
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'view_model'
# test + operator selection on error
results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# single result selector on skipped. Expect this to pass becase underlying view already defined above
results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'table_model'
# add a downstream model that depends on table_model for skipped+ selector
with open('models/table_model_downstream.sql', 'w') as fp:
fp.write("select * from {{ref('table_model')}}")
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'table_model'
assert results[1].node.name == 'table_model_downstream'
@use_profile('postgres')
def test_postgres_test_run_results_state(self):
# run passed nodes
results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# run passed nodes with + operator
results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# update view model to generate a failure case
os.remove('./models/view_model.sql')
with open('models/view_model.sql', 'w') as fp:
fp.write("select 1 as id union all select 1 as id")
self.rebuild_run_dbt(expect_pass=False)
# test with failure selector
results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with failure selector and + operator
results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
with open('models/schema.yml', 'r+') as f:
filedata = f.read()
newdata = filedata.replace('error','warn')
f.seek(0)
f.write(newdata)
f.truncate()
# rebuild - expect_pass = True because we changed the error to a warning this time around
self.rebuild_run_dbt(expect_pass=True)
# test with warn selector
results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with warn selector and + operator
results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
@use_profile('postgres')
def test_postgres_concurrent_selectors_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'table_model_modified_example', 'table_model'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_test_run_results_state(self):
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id union all select null as id')
# run dbt build again to trigger test errors
self.rebuild_run_dbt(expect_pass=False)
# get the failures from
results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False)
assert len(results) == 1
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'}
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id')
# create error model case for result:error selector
with open('./models/error_model.sql', 'w') as f:
f.write('select 1 as id from not_exists')
# create something downstream from the error model to rerun
with open('./models/downstream_of_error_model.sql', 'w') as f:
f.write('select * from {{ ref("error_model") }} )')
# regenerate build state
self.rebuild_run_dbt(expect_pass=False)
# modify model again to trigger the state:modified selector
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_another_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'}