mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-23 14:41:27 +00:00
Compare commits
46 Commits
enable-pos
...
feature/sm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f75b9539f | ||
|
|
ec2c2c8cf7 | ||
|
|
3aef719fe4 | ||
|
|
ce9c00340f | ||
|
|
b8d288b6f0 | ||
|
|
df09392d92 | ||
|
|
bd5a022e4e | ||
|
|
911cb53063 | ||
|
|
b45715cef7 | ||
|
|
2e5fb45e5f | ||
|
|
48bebe11b1 | ||
|
|
abfa16d687 | ||
|
|
39d145c7a8 | ||
|
|
8250b30862 | ||
|
|
9309e4e906 | ||
|
|
94a021806b | ||
|
|
902cc75c69 | ||
|
|
c18335f51d | ||
|
|
191821779e | ||
|
|
e2ffd41cc3 | ||
|
|
6e9073a3c1 | ||
|
|
e4bd6ee62d | ||
|
|
d2e3248379 | ||
|
|
24bbf5b843 | ||
|
|
a4ec84f37e | ||
|
|
61de5aea19 | ||
|
|
cad6bb0576 | ||
|
|
9e3de01175 | ||
|
|
191bbae093 | ||
|
|
17fa096533 | ||
|
|
71125167a1 | ||
|
|
7fee4d44bf | ||
|
|
17c47ff42d | ||
|
|
f437c8e3eb | ||
|
|
daa9ab1e73 | ||
|
|
d4ff259d66 | ||
|
|
32de684121 | ||
|
|
3d043fe8fd | ||
|
|
5a5df384c0 | ||
|
|
7924b320cf | ||
|
|
5005e0c4bf | ||
|
|
e5bdb93636 | ||
|
|
3e8e91c4c2 | ||
|
|
036a9bf92f | ||
|
|
98e98e5c3b | ||
|
|
72eb4302be |
@@ -1,6 +1,7 @@
|
||||
from pathlib import Path
|
||||
from .graph.manifest import WritableManifest
|
||||
from .results import RunResultsArtifact
|
||||
from .results import FreshnessExecutionResultArtifact
|
||||
from typing import Optional
|
||||
from dbt.exceptions import IncompatibleSchemaException
|
||||
|
||||
@@ -10,6 +11,7 @@ class PreviousState:
|
||||
self.path: Path = path
|
||||
self.manifest: Optional[WritableManifest] = None
|
||||
self.results: Optional[RunResultsArtifact] = None
|
||||
self.sources: Optional[FreshnessExecutionResultArtifact] = None
|
||||
|
||||
manifest_path = self.path / 'manifest.json'
|
||||
if manifest_path.exists() and manifest_path.is_file():
|
||||
@@ -26,3 +28,29 @@ class PreviousState:
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(results_path))
|
||||
raise
|
||||
|
||||
sources_path = self.path / 'sources.json'
|
||||
if sources_path.exists() and sources_path.is_file():
|
||||
try:
|
||||
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(sources_path))
|
||||
raise
|
||||
|
||||
# bring in the project class that needs to be instantiated
|
||||
# define the target path at this step(how do I consider a different target path? based on dbt_project.yml)
|
||||
# the problem I'm facing right now is that the project config is populated AFTER this step
|
||||
# the reason this works with previous state is that we manually set the previous state path
|
||||
# current state is more difficult because we have to read the project config first before this class is instantiated
|
||||
class CurrentState:
|
||||
def __init__(self, path: Path):
|
||||
self.path: Path = path #TODO: fix this by importing target_path later
|
||||
self.sources: Optional[FreshnessExecutionResultArtifact] = None
|
||||
|
||||
sources_path = self.path / 'sources.json'
|
||||
if sources_path.exists() and sources_path.is_file():
|
||||
try:
|
||||
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(sources_path))
|
||||
raise
|
||||
@@ -71,6 +71,29 @@ class NodeSelector(MethodManager):
|
||||
method = self.get_method(spec.method, spec.method_arguments)
|
||||
return set(method.search(included_nodes, spec.value))
|
||||
|
||||
def select_excluded_source_nodes(
|
||||
self, included_nodes: Set[UniqueId], spec: SelectionCriteria,
|
||||
) -> Set[UniqueId]:
|
||||
"""Select the explicitly excluded source nodes, using the given spec. Return
|
||||
the selected set of unique IDs.
|
||||
"""
|
||||
method = self.get_method(spec.method, spec.method_arguments)
|
||||
if spec.value == 'pass':
|
||||
source_status_values_to_exclude = {'warn','error'}
|
||||
elif spec.value == 'warn':
|
||||
source_status_values_to_exclude = {'error'}
|
||||
elif spec.value == 'error':
|
||||
source_status_values_to_exclude = {'pass','warn'}
|
||||
else:
|
||||
source_status_values_to_exclude = set()
|
||||
|
||||
excluded_source_nodes = set()
|
||||
for source_status in source_status_values_to_exclude:
|
||||
source_nodes = method.search(included_nodes, source_status)
|
||||
excluded_source_nodes.update(set(source_nodes))
|
||||
|
||||
return excluded_source_nodes
|
||||
|
||||
def get_nodes_from_criteria(
|
||||
self,
|
||||
spec: SelectionCriteria
|
||||
@@ -85,6 +108,9 @@ class NodeSelector(MethodManager):
|
||||
nodes = self.graph.nodes()
|
||||
try:
|
||||
collected = self.select_included(nodes, spec)
|
||||
if spec.method == 'source_refresh':
|
||||
collected_excluded = self.select_excluded_source_nodes(nodes, spec)
|
||||
|
||||
except InvalidSelectorException:
|
||||
fire_event(SelectorReportInvalidSelector(
|
||||
selector_methods=self.SELECTOR_METHODS,
|
||||
@@ -98,6 +124,23 @@ class NodeSelector(MethodManager):
|
||||
selected=(collected | neighbors),
|
||||
indirect_selection=spec.indirect_selection
|
||||
)
|
||||
if spec.method == 'source_refresh':
|
||||
neighbors_excluded = self.collect_specified_neighbors(spec, collected_excluded)
|
||||
direct_nodes_excluded, indirect_nodes_excluded = self.expand_selection(
|
||||
selected=(collected_excluded | neighbors_excluded),
|
||||
indirect_selection=spec.indirect_selection
|
||||
)
|
||||
direct_nodes = direct_nodes - direct_nodes_excluded
|
||||
indirect_nodes = indirect_nodes - indirect_nodes_excluded
|
||||
|
||||
if direct_nodes_excluded:
|
||||
warn_or_error(f"The '{spec.method}' selector specified in '{spec.raw}' will exclude the below nodes:")
|
||||
warn_or_error(f"Direct Nodes: {direct_nodes_excluded}")
|
||||
warn_or_error(f"Indirect Nodes: {indirect_nodes_excluded}")
|
||||
warn_or_error(f"These source nodes: '{collected_excluded}' require 'status:{spec.value}' for the excluded nodes to run")
|
||||
warn_or_error("Note: Concurrent selectors may include the excluded nodes(ex: source_refresh:warn+ source_refresh:pass+)")
|
||||
warn_or_error("")
|
||||
|
||||
return direct_nodes, indirect_nodes
|
||||
|
||||
def collect_specified_neighbors(
|
||||
|
||||
@@ -22,10 +22,11 @@ from dbt.contracts.graph.parsed import (
|
||||
ParsedGenericTestNode,
|
||||
ParsedSourceDefinition,
|
||||
)
|
||||
from dbt.contracts.state import PreviousState
|
||||
from dbt.contracts.state import PreviousState, CurrentState
|
||||
from dbt.exceptions import (
|
||||
InternalException,
|
||||
RuntimeException,
|
||||
warn_or_error,
|
||||
)
|
||||
from dbt.node_types import NodeType
|
||||
|
||||
@@ -48,6 +49,7 @@ class MethodName(StrEnum):
|
||||
Exposure = 'exposure'
|
||||
Metric = 'metric'
|
||||
Result = 'result'
|
||||
SourceRefresh = 'source_refresh'
|
||||
|
||||
|
||||
def is_selected_node(fqn: List[str], node_selector: str):
|
||||
@@ -577,6 +579,50 @@ class ResultSelectorMethod(SelectorMethod):
|
||||
if node in matches:
|
||||
yield node
|
||||
|
||||
class SourceRefreshSelectorMethod(SelectorMethod): #TODO: this requires SelectorMethod to have current_state as an argument. currently, this works because it's all hard-coded
|
||||
def search(
|
||||
self, included_nodes: Set[UniqueId], selector: str
|
||||
) -> Iterator[UniqueId]:
|
||||
self.current_state = CurrentState(path=Path('target')) #TODO: fix this by importing target_path later
|
||||
|
||||
if self.previous_state is None or self.previous_state.sources is None:
|
||||
raise InternalException(
|
||||
'No previous state comparison freshness results in sources.json'
|
||||
)
|
||||
elif self.current_state is None or self.current_state.sources is None:
|
||||
raise InternalException(
|
||||
'No current state comparison freshness results in sources.json'
|
||||
)
|
||||
if selector=='fresh':
|
||||
current_state_sources = {
|
||||
result.unique_id:result.max_loaded_at for result in self.current_state.sources.results
|
||||
}
|
||||
else:
|
||||
current_state_sources = {
|
||||
result.unique_id:result.max_loaded_at for result in self.current_state.sources.results
|
||||
if result.status == selector
|
||||
}
|
||||
previous_state_sources = {
|
||||
result.unique_id:result.max_loaded_at for result in self.previous_state.sources.results
|
||||
}
|
||||
|
||||
matches = set()
|
||||
matches_not_fresh = set()
|
||||
for unique_id in current_state_sources:
|
||||
if unique_id not in previous_state_sources:
|
||||
matches.add(unique_id)
|
||||
elif current_state_sources.get(unique_id) > previous_state_sources.get(unique_id):
|
||||
matches.add(unique_id)
|
||||
else:
|
||||
matches_not_fresh.add(unique_id)
|
||||
|
||||
if matches_not_fresh:
|
||||
warn_or_error(f"{matches_not_fresh} sources will not refresh other nodes, max_loaded_at date must be greater than previous state")
|
||||
|
||||
for node, real_node in self.all_nodes(included_nodes):
|
||||
if node in matches:
|
||||
yield node
|
||||
|
||||
|
||||
class MethodManager:
|
||||
SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = {
|
||||
@@ -593,6 +639,7 @@ class MethodManager:
|
||||
MethodName.Exposure: ExposureSelectorMethod,
|
||||
MethodName.Metric: MetricSelectorMethod,
|
||||
MethodName.Result: ResultSelectorMethod,
|
||||
MethodName.SourceRefresh: SourceRefreshSelectorMethod,
|
||||
}
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -0,0 +1,436 @@
|
||||
from test.integration.base import DBTIntegrationTest, use_profile
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import string
|
||||
|
||||
import pytest
|
||||
|
||||
from dbt.exceptions import CompilationException
|
||||
|
||||
|
||||
class TestRunResultsState(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "run_results_state_062"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': ['macros'],
|
||||
'seeds': {
|
||||
'test': {
|
||||
'quote_columns': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def _symlink_test_folders(self):
|
||||
# dbt's normal symlink behavior breaks this test. Copy the files
|
||||
# so we can freely modify them.
|
||||
for entry in os.listdir(self.test_original_source_path):
|
||||
src = os.path.join(self.test_original_source_path, entry)
|
||||
tst = os.path.join(self.test_root_dir, entry)
|
||||
if entry in {'models', 'seeds', 'macros'}:
|
||||
shutil.copytree(src, tst)
|
||||
elif os.path.isdir(entry) or entry.endswith('.sql'):
|
||||
os.symlink(src, tst)
|
||||
|
||||
def copy_state(self):
|
||||
assert not os.path.exists('state')
|
||||
os.makedirs('state')
|
||||
shutil.copyfile('target/manifest.json', 'state/manifest.json')
|
||||
shutil.copyfile('target/run_results.json', 'state/run_results.json')
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.run_dbt(['build'])
|
||||
self.copy_state()
|
||||
|
||||
def rebuild_run_dbt(self, expect_pass=True):
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['build'], expect_pass=expect_pass)
|
||||
self.copy_state()
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_seed_run_results_state(self):
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'])
|
||||
self.copy_state()
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
with open('seeds/seed.csv') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
with open('seeds/seed.csv', 'a') as fp:
|
||||
fp.write(f'\"\'\'3,carl{newline}')
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
|
||||
with open('seeds/seed.csv') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
with open('seeds/seed.csv', 'a') as fp:
|
||||
# assume each line is ~2 bytes + len(name)
|
||||
target_size = 1*1024*1024
|
||||
line_size = 64
|
||||
|
||||
num_lines = target_size // line_size
|
||||
|
||||
maxlines = num_lines + 4
|
||||
|
||||
for idx in range(4, maxlines):
|
||||
value = ''.join(random.choices(string.ascii_letters, k=62))
|
||||
fp.write(f'{idx},{value}{newline}')
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_build_run_results_state(self):
|
||||
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 3
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 3
|
||||
assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 4
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 6 # includes exposure
|
||||
assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
# test failure on build tests
|
||||
# fail the unique test
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select 1 as id union all select 1 as id")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model', 'unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state'])
|
||||
assert len(results) == 2
|
||||
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
|
||||
|
||||
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
|
||||
f = open('models/schema.yml', 'r')
|
||||
filedata = f.read()
|
||||
f.close()
|
||||
newdata = filedata.replace('error','warn')
|
||||
f = open('models/schema.yml', 'w')
|
||||
f.write(newdata)
|
||||
f.close()
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=True)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2 # includes table_model to be run
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model', 'unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state'])
|
||||
assert len(results) == 2
|
||||
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_run_run_results_state(self):
|
||||
results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# clear state and rerun upstream view model to test + operator
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run', '--select', 'view_model'], expect_pass=True)
|
||||
self.copy_state()
|
||||
results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# check we are starting from a place with 0 errors
|
||||
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error in the view model to test error and skipped states
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
# test single result selector on error
|
||||
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'view_model'
|
||||
|
||||
# test + operator selection on error
|
||||
results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# single result selector on skipped. Expect this to pass becase underlying view already defined above
|
||||
results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'table_model'
|
||||
|
||||
# add a downstream model that depends on table_model for skipped+ selector
|
||||
with open('models/table_model_downstream.sql', 'w') as fp:
|
||||
fp.write("select * from {{ref('table_model')}}")
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'table_model'
|
||||
assert results[1].node.name == 'table_model_downstream'
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_test_run_results_state(self):
|
||||
# run passed nodes
|
||||
results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
|
||||
|
||||
# run passed nodes with + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
|
||||
|
||||
# update view model to generate a failure case
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write("select 1 as id union all select 1 as id")
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# test with failure selector
|
||||
results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# test with failure selector and + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
|
||||
with open('models/schema.yml', 'r+') as f:
|
||||
filedata = f.read()
|
||||
newdata = filedata.replace('error','warn')
|
||||
f.seek(0)
|
||||
f.write(newdata)
|
||||
f.truncate()
|
||||
|
||||
# rebuild - expect_pass = True because we changed the error to a warning this time around
|
||||
self.rebuild_run_dbt(expect_pass=True)
|
||||
|
||||
# test with warn selector
|
||||
results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# test with warn selector and + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_run_run_results_state(self):
|
||||
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error on a dbt model
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
# modify another dbt model
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 3
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'view_model', 'table_model_modified_example', 'table_model'}
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_test_run_results_state(self):
|
||||
# create failure test case for result:fail selector
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('./models/view_model.sql', 'w') as f:
|
||||
f.write('select 1 as id union all select 1 as id union all select null as id')
|
||||
|
||||
# run dbt build again to trigger test errors
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# get the failures from
|
||||
results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id'}
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_build_run_results_state(self):
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error on a dbt model
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# modify another dbt model
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 5
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'}
|
||||
|
||||
# create failure test case for result:fail selector
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('./models/view_model.sql', 'w') as f:
|
||||
f.write('select 1 as id union all select 1 as id')
|
||||
|
||||
# create error model case for result:error selector
|
||||
with open('./models/error_model.sql', 'w') as f:
|
||||
f.write('select 1 as id from not_exists')
|
||||
|
||||
# create something downstream from the error model to rerun
|
||||
with open('./models/downstream_of_error_model.sql', 'w') as f:
|
||||
f.write('select * from {{ ref("error_model") }} )')
|
||||
|
||||
# regenerate build state
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# modify model again to trigger the state:modified selector
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_another_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 5
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'}
|
||||
Reference in New Issue
Block a user