mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-20 02:21:27 +00:00
Compare commits
71 Commits
enable-pos
...
feature/sm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58dcb8b173 | ||
|
|
6a1caba22c | ||
|
|
f096432946 | ||
|
|
248473e6c9 | ||
|
|
3d7a18638a | ||
|
|
4cca4ddc14 | ||
|
|
31977136a5 | ||
|
|
301427e305 | ||
|
|
9a69666130 | ||
|
|
4445217b67 | ||
|
|
343a0ebd96 | ||
|
|
87c67fd10c | ||
|
|
5f8333727f | ||
|
|
6483e95a42 | ||
|
|
8c9e5de086 | ||
|
|
4748b4b2ee | ||
|
|
54fc9ccb96 | ||
|
|
1edb0dfcd1 | ||
|
|
40ea7ce276 | ||
|
|
6345fe4f75 | ||
|
|
08504e4ec2 | ||
|
|
6f82a525f1 | ||
|
|
605ca1a31b | ||
|
|
68630332ba | ||
|
|
ac4edb51f0 | ||
|
|
5f75b9539f | ||
|
|
ec2c2c8cf7 | ||
|
|
3aef719fe4 | ||
|
|
ce9c00340f | ||
|
|
b8d288b6f0 | ||
|
|
df09392d92 | ||
|
|
bd5a022e4e | ||
|
|
911cb53063 | ||
|
|
b45715cef7 | ||
|
|
2e5fb45e5f | ||
|
|
48bebe11b1 | ||
|
|
abfa16d687 | ||
|
|
39d145c7a8 | ||
|
|
8250b30862 | ||
|
|
9309e4e906 | ||
|
|
94a021806b | ||
|
|
902cc75c69 | ||
|
|
c18335f51d | ||
|
|
191821779e | ||
|
|
e2ffd41cc3 | ||
|
|
6e9073a3c1 | ||
|
|
e4bd6ee62d | ||
|
|
d2e3248379 | ||
|
|
24bbf5b843 | ||
|
|
a4ec84f37e | ||
|
|
61de5aea19 | ||
|
|
cad6bb0576 | ||
|
|
9e3de01175 | ||
|
|
191bbae093 | ||
|
|
17fa096533 | ||
|
|
71125167a1 | ||
|
|
7fee4d44bf | ||
|
|
17c47ff42d | ||
|
|
f437c8e3eb | ||
|
|
daa9ab1e73 | ||
|
|
d4ff259d66 | ||
|
|
32de684121 | ||
|
|
3d043fe8fd | ||
|
|
5a5df384c0 | ||
|
|
7924b320cf | ||
|
|
5005e0c4bf | ||
|
|
e5bdb93636 | ||
|
|
3e8e91c4c2 | ||
|
|
036a9bf92f | ||
|
|
98e98e5c3b | ||
|
|
72eb4302be |
@@ -1,15 +1,21 @@
|
||||
from pathlib import Path
|
||||
from .graph.manifest import WritableManifest
|
||||
|
||||
from .results import RunResultsArtifact
|
||||
from .results import FreshnessExecutionResultArtifact
|
||||
from typing import Optional
|
||||
|
||||
from dbt.exceptions import IncompatibleSchemaException
|
||||
|
||||
from dbt.path_utils import PathUtils
|
||||
|
||||
class PreviousState:
|
||||
def __init__(self, path: Path):
|
||||
self.path: Path = path
|
||||
self.manifest: Optional[WritableManifest] = None
|
||||
self.results: Optional[RunResultsArtifact] = None
|
||||
self.sources: Optional[FreshnessExecutionResultArtifact] = None
|
||||
self.previous_sources: Optional[FreshnessExecutionResultArtifact] = None
|
||||
|
||||
manifest_path = self.path / 'manifest.json'
|
||||
if manifest_path.exists() and manifest_path.is_file():
|
||||
@@ -26,3 +32,40 @@ class PreviousState:
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(results_path))
|
||||
raise
|
||||
|
||||
current_sources_path = self.path / 'sources.json'
|
||||
if current_sources_path.exists() and current_sources_path.is_file():
|
||||
try:
|
||||
self.current_sources = FreshnessExecutionResultArtifact.read(str(current_sources_path))
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(current_sources_path))
|
||||
raise
|
||||
|
||||
# we either have to get the previous version of sources state from somewhere or generate it on the fly . . .
|
||||
archive_sources_path = self.path / 'archive_sources' / 'sources.json'
|
||||
if archive_sources_path.exists() and archive_sources_path.is_file():
|
||||
try:
|
||||
self.archive_sources = FreshnessExecutionResultArtifact.read(str(archive_sources_path))
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(archive_sources_path))
|
||||
raise
|
||||
|
||||
# bring in the project class that needs to be instantiated
|
||||
# define the target path at this step(how do I consider a different target path? based on dbt_project.yml)
|
||||
# the problem I'm facing right now is that the project config is populated AFTER this step
|
||||
# the reason this works with previous state is that we manually set the previous state path
|
||||
# current state is more difficult because we have to read the project config first before this class is instantiated
|
||||
class CurrentState(PathUtils):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.sources: Optional[FreshnessExecutionResultArtifact] = None
|
||||
|
||||
target_path = self.get_target_path()
|
||||
sources_path = target_path / 'sources.json'
|
||||
|
||||
if sources_path.exists() and sources_path.is_file():
|
||||
try:
|
||||
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
|
||||
except IncompatibleSchemaException as exc:
|
||||
exc.add_filename(str(sources_path))
|
||||
raise
|
||||
|
||||
@@ -22,7 +22,7 @@ from dbt.contracts.graph.parsed import (
|
||||
ParsedGenericTestNode,
|
||||
ParsedSourceDefinition,
|
||||
)
|
||||
from dbt.contracts.state import PreviousState
|
||||
from dbt.contracts.state import PreviousState, CurrentState
|
||||
from dbt.exceptions import (
|
||||
InternalException,
|
||||
RuntimeException,
|
||||
@@ -48,6 +48,7 @@ class MethodName(StrEnum):
|
||||
Exposure = 'exposure'
|
||||
Metric = 'metric'
|
||||
Result = 'result'
|
||||
SourceFresh = 'source_fresh'
|
||||
|
||||
|
||||
def is_selected_node(fqn: List[str], node_selector: str):
|
||||
@@ -577,6 +578,48 @@ class ResultSelectorMethod(SelectorMethod):
|
||||
if node in matches:
|
||||
yield node
|
||||
|
||||
class SourceFreshSelectorMethod(SelectorMethod): #TODO: this requires SelectorMethod to have current_state as an argument. currently, this works because it's all hard-coded
|
||||
def search(
|
||||
self, included_nodes: Set[UniqueId], selector: str
|
||||
) -> Iterator[UniqueId]:
|
||||
#self.current_state = CurrentState() #TODO: fix this by importing target_path later
|
||||
|
||||
#TODO: this section in general we should make clear whether it's the current or archived (or whatever name we choose)
|
||||
# sources file that's missing.
|
||||
if self.previous_state is None or \
|
||||
self.previous_state.current_sources is None or \
|
||||
self.previous_state.archive_sources is None:
|
||||
raise InternalException(
|
||||
'No previous state comparison freshness results in sources.json'
|
||||
)
|
||||
|
||||
# elif self.current_state is None or self.current_state.sources is None:
|
||||
# raise InternalException(
|
||||
# 'No current state comparison freshness results in sources.json'
|
||||
# )
|
||||
|
||||
current_state_sources = {
|
||||
result.unique_id:result.max_loaded_at for result in self.previous_state.current_sources.results
|
||||
}
|
||||
|
||||
# TODO: keeping this the same for now but could adjust naming
|
||||
previous_state_sources = {
|
||||
result.unique_id:result.max_loaded_at for result in self.previous_state.archive_sources.results
|
||||
}
|
||||
|
||||
matches = set()
|
||||
for unique_id in current_state_sources:
|
||||
if unique_id not in previous_state_sources:
|
||||
matches.add(unique_id)
|
||||
elif selector == 'yes' and current_state_sources.get(unique_id) > previous_state_sources.get(unique_id):
|
||||
matches.add(unique_id)
|
||||
elif selector == 'no' and current_state_sources.get(unique_id) <= previous_state_sources.get(unique_id):
|
||||
matches.add(unique_id)
|
||||
|
||||
for node, real_node in self.all_nodes(included_nodes):
|
||||
if node in matches:
|
||||
yield node
|
||||
|
||||
|
||||
class MethodManager:
|
||||
SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = {
|
||||
@@ -593,6 +636,7 @@ class MethodManager:
|
||||
MethodName.Exposure: ExposureSelectorMethod,
|
||||
MethodName.Metric: MetricSelectorMethod,
|
||||
MethodName.Result: ResultSelectorMethod,
|
||||
MethodName.SourceFresh: SourceFreshSelectorMethod,
|
||||
}
|
||||
|
||||
def __init__(
|
||||
|
||||
112
core/dbt/path_utils.py
Normal file
112
core/dbt/path_utils.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import os
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
from dbt.exceptions import DbtProjectError
|
||||
from dbt.clients.system import load_file_contents
|
||||
from dbt.clients.yaml_helper import load_yaml_text
|
||||
from dbt.clients.system import path_exists
|
||||
from dbt.exceptions import (
|
||||
NotImplementedException, CompilationException, RuntimeException,
|
||||
InternalException
|
||||
)
|
||||
|
||||
from dbt.flags import PROFILES_DIR
|
||||
# from dbt.main import parse_args
|
||||
# from dbt.task.base import get_nearest_project_dir
|
||||
|
||||
|
||||
#TODO: update this for profiles_dir instead of project_root, --config-dir, --profiles-dir
|
||||
#TODO: pick a hierarchy of profiles_dir, config_dir, DBT_PROFILES_DIR, project_root,
|
||||
#TODO: DEFAULT_PROFILES_DIR is duplicated across flags.py and profile.py, helpful to note we duplicate as needed
|
||||
# import a method from main.py to capture the project root from PROFILES_DIR being redefined
|
||||
#
|
||||
# --output is a flag used for the freshness command and should be ignored?
|
||||
# args = sys.argv[1:]
|
||||
# print(args)
|
||||
# parsed = parse_args(args)
|
||||
# get the argument from the command line as needed
|
||||
# copy over get_nearest_project_dir?
|
||||
# pass in --project-dir flag and then default to current working directory
|
||||
|
||||
class PathArgsParser:
|
||||
def path_args_subparser(self):
|
||||
base_subparser = argparse.ArgumentParser(add_help=False)
|
||||
|
||||
base_subparser.add_argument(
|
||||
'--project-dir',
|
||||
default=None,
|
||||
dest='project_dir',
|
||||
type=str,
|
||||
help='''
|
||||
Which directory to look in for the dbt_project.yml file.
|
||||
Default is the current working directory and its parents.
|
||||
'''
|
||||
)
|
||||
return base_subparser
|
||||
|
||||
|
||||
def get_nearest_project_dir(self):
|
||||
base_subparser = self.path_args_subparser()
|
||||
# If the user provides an explicit project directory, use that
|
||||
# but don't look at parent directories.
|
||||
project_dir = getattr(base_subparser, 'project_dir', None)
|
||||
print(f"project_dir: {project_dir}")
|
||||
if project_dir is not None:
|
||||
project_file = os.path.join(project_dir, "dbt_project.yml")
|
||||
if os.path.exists(project_file):
|
||||
return project_dir
|
||||
else:
|
||||
raise RuntimeException(
|
||||
"fatal: Invalid --project-dir flag. Not a dbt project. "
|
||||
"Missing dbt_project.yml file"
|
||||
)
|
||||
|
||||
root_path = os.path.abspath(os.sep)
|
||||
cwd = os.getcwd()
|
||||
|
||||
while cwd != root_path:
|
||||
project_file = os.path.join(cwd, "dbt_project.yml")
|
||||
if os.path.exists(project_file):
|
||||
return cwd
|
||||
cwd = os.path.dirname(cwd)
|
||||
|
||||
raise RuntimeException(
|
||||
"fatal: Not a dbt project (or any of the parent directories). "
|
||||
"Missing dbt_project.yml file"
|
||||
)
|
||||
|
||||
|
||||
PathArgs = PathArgsParser()
|
||||
PathArgs.get_nearest_project_dir()
|
||||
|
||||
class PathUtils:
|
||||
def __init__(self):
|
||||
self.project_root = PROFILES_DIR #TODO: check if this is "allowed"
|
||||
|
||||
def _load_yaml(self, path):
|
||||
contents = load_file_contents(path)
|
||||
return load_yaml_text(contents)
|
||||
|
||||
def _raw_project_from(self, project_root: str) -> Dict[str, Any]:
|
||||
project_root = os.path.normpath(project_root)
|
||||
project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml')
|
||||
|
||||
if not path_exists(project_yaml_filepath):
|
||||
raise DbtProjectError(
|
||||
'no dbt_project.yml found at expected path {}'
|
||||
.format(project_yaml_filepath)
|
||||
)
|
||||
|
||||
project_dict = self._load_yaml(project_yaml_filepath)
|
||||
|
||||
if not isinstance(project_dict, dict):
|
||||
raise DbtProjectError(
|
||||
'dbt_project.yml does not parse to a dictionary'
|
||||
)
|
||||
return project_dict
|
||||
|
||||
def get_target_path(self) -> Path:
|
||||
project_dict = self._raw_project_from(self.project_root)
|
||||
return Path(project_dict.get('target-path','target'))
|
||||
@@ -26,7 +26,7 @@ from dbt.contracts.graph.parsed import ParsedSourceDefinition
|
||||
|
||||
|
||||
RESULT_FILE_NAME = 'sources.json'
|
||||
|
||||
RESULT_ARCHIVE_DIR_NAME = 'archive_sources'
|
||||
|
||||
class FreshnessRunner(BaseRunner):
|
||||
def on_skip(self):
|
||||
@@ -190,6 +190,15 @@ class FreshnessTask(GraphRunnableTask):
|
||||
return os.path.realpath(self.args.output)
|
||||
else:
|
||||
return os.path.join(self.config.target_path, RESULT_FILE_NAME)
|
||||
|
||||
def archive_path(self):
|
||||
if self.args.output:
|
||||
return os.path.realpath(self.args.output + '/' + RESULT_ARCHIVE_DIR_NAME)
|
||||
|
||||
else:
|
||||
if not os.path.exists(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME)):
|
||||
os.mkdir(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME))
|
||||
return os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME, RESULT_FILE_NAME)
|
||||
|
||||
def raise_on_first_error(self):
|
||||
return False
|
||||
@@ -211,8 +220,10 @@ class FreshnessTask(GraphRunnableTask):
|
||||
|
||||
def write_result(self, result):
|
||||
artifact = FreshnessExecutionResultArtifact.from_result(result)
|
||||
if os.path.exists(self.result_path()):
|
||||
os.rename(self.result_path(), self.archive_path())
|
||||
artifact.write(self.result_path())
|
||||
|
||||
|
||||
def get_result(self, results, elapsed_time, generated_at):
|
||||
return FreshnessResult.from_node_results(
|
||||
elapsed_time=elapsed_time,
|
||||
|
||||
@@ -0,0 +1,436 @@
|
||||
from test.integration.base import DBTIntegrationTest, use_profile
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import string
|
||||
|
||||
import pytest
|
||||
|
||||
from dbt.exceptions import CompilationException
|
||||
|
||||
|
||||
class TestRunResultsState(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "run_results_state_062"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': ['macros'],
|
||||
'seeds': {
|
||||
'test': {
|
||||
'quote_columns': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def _symlink_test_folders(self):
|
||||
# dbt's normal symlink behavior breaks this test. Copy the files
|
||||
# so we can freely modify them.
|
||||
for entry in os.listdir(self.test_original_source_path):
|
||||
src = os.path.join(self.test_original_source_path, entry)
|
||||
tst = os.path.join(self.test_root_dir, entry)
|
||||
if entry in {'models', 'seeds', 'macros'}:
|
||||
shutil.copytree(src, tst)
|
||||
elif os.path.isdir(entry) or entry.endswith('.sql'):
|
||||
os.symlink(src, tst)
|
||||
|
||||
def copy_state(self):
|
||||
assert not os.path.exists('state')
|
||||
os.makedirs('state')
|
||||
shutil.copyfile('target/manifest.json', 'state/manifest.json')
|
||||
shutil.copyfile('target/run_results.json', 'state/run_results.json')
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.run_dbt(['build'])
|
||||
self.copy_state()
|
||||
|
||||
def rebuild_run_dbt(self, expect_pass=True):
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['build'], expect_pass=expect_pass)
|
||||
self.copy_state()
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_seed_run_results_state(self):
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'])
|
||||
self.copy_state()
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
with open('seeds/seed.csv') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
with open('seeds/seed.csv', 'a') as fp:
|
||||
fp.write(f'\"\'\'3,carl{newline}')
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
|
||||
with open('seeds/seed.csv') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
with open('seeds/seed.csv', 'a') as fp:
|
||||
# assume each line is ~2 bytes + len(name)
|
||||
target_size = 1*1024*1024
|
||||
line_size = 64
|
||||
|
||||
num_lines = target_size // line_size
|
||||
|
||||
maxlines = num_lines + 4
|
||||
|
||||
for idx in range(4, maxlines):
|
||||
value = ''.join(random.choices(string.ascii_letters, k=62))
|
||||
fp.write(f'{idx},{value}{newline}')
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['seed'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.seed'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 7
|
||||
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_build_run_results_state(self):
|
||||
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 3
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 3
|
||||
assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 4
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 6 # includes exposure
|
||||
assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
|
||||
|
||||
# test failure on build tests
|
||||
# fail the unique test
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select 1 as id union all select 1 as id")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model', 'unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state'])
|
||||
assert len(results) == 2
|
||||
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
|
||||
|
||||
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
|
||||
f = open('models/schema.yml', 'r')
|
||||
filedata = f.read()
|
||||
f.close()
|
||||
newdata = filedata.replace('error','warn')
|
||||
f = open('models/schema.yml', 'w')
|
||||
f.write(newdata)
|
||||
f.close()
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=True)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state'])
|
||||
assert len(results) == 1
|
||||
assert results[0] == 'test.unique_view_model_id'
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2 # includes table_model to be run
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model', 'unique_view_model_id'}
|
||||
|
||||
results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state'])
|
||||
assert len(results) == 2
|
||||
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_run_run_results_state(self):
|
||||
results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# clear state and rerun upstream view model to test + operator
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run', '--select', 'view_model'], expect_pass=True)
|
||||
self.copy_state()
|
||||
results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# check we are starting from a place with 0 errors
|
||||
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error in the view model to test error and skipped states
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
# test single result selector on error
|
||||
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'view_model'
|
||||
|
||||
# test + operator selection on error
|
||||
results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'view_model'
|
||||
assert results[1].node.name == 'table_model'
|
||||
|
||||
# single result selector on skipped. Expect this to pass becase underlying view already defined above
|
||||
results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'table_model'
|
||||
|
||||
# add a downstream model that depends on table_model for skipped+ selector
|
||||
with open('models/table_model_downstream.sql', 'w') as fp:
|
||||
fp.write("select * from {{ref('table_model')}}")
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
assert results[0].node.name == 'table_model'
|
||||
assert results[1].node.name == 'table_model_downstream'
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_test_run_results_state(self):
|
||||
# run passed nodes
|
||||
results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
|
||||
|
||||
# run passed nodes with + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 2
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
|
||||
|
||||
# update view model to generate a failure case
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write("select 1 as id union all select 1 as id")
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# test with failure selector
|
||||
results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# test with failure selector and + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
|
||||
with open('models/schema.yml', 'r+') as f:
|
||||
filedata = f.read()
|
||||
newdata = filedata.replace('error','warn')
|
||||
f.seek(0)
|
||||
f.write(newdata)
|
||||
f.truncate()
|
||||
|
||||
# rebuild - expect_pass = True because we changed the error to a warning this time around
|
||||
self.rebuild_run_dbt(expect_pass=True)
|
||||
|
||||
# test with warn selector
|
||||
results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
# test with warn selector and + operator
|
||||
results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
|
||||
assert len(results) == 1
|
||||
assert results[0].node.name == 'unique_view_model_id'
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_run_run_results_state(self):
|
||||
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error on a dbt model
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
shutil.rmtree('./state')
|
||||
self.run_dbt(['run'], expect_pass=False)
|
||||
self.copy_state()
|
||||
|
||||
# modify another dbt model
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 3
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'view_model', 'table_model_modified_example', 'table_model'}
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_test_run_results_state(self):
|
||||
# create failure test case for result:fail selector
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('./models/view_model.sql', 'w') as f:
|
||||
f.write('select 1 as id union all select 1 as id union all select null as id')
|
||||
|
||||
# run dbt build again to trigger test errors
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# get the failures from
|
||||
results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 1
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'unique_view_model_id'}
|
||||
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_concurrent_selectors_build_run_results_state(self):
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'])
|
||||
assert len(results) == 0
|
||||
|
||||
# force an error on a dbt model
|
||||
with open('models/view_model.sql') as fp:
|
||||
fp.readline()
|
||||
newline = fp.newlines
|
||||
|
||||
with open('models/view_model.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# modify another dbt model
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 5
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'}
|
||||
|
||||
# create failure test case for result:fail selector
|
||||
os.remove('./models/view_model.sql')
|
||||
with open('./models/view_model.sql', 'w') as f:
|
||||
f.write('select 1 as id union all select 1 as id')
|
||||
|
||||
# create error model case for result:error selector
|
||||
with open('./models/error_model.sql', 'w') as f:
|
||||
f.write('select 1 as id from not_exists')
|
||||
|
||||
# create something downstream from the error model to rerun
|
||||
with open('./models/downstream_of_error_model.sql', 'w') as f:
|
||||
f.write('select * from {{ ref("error_model") }} )')
|
||||
|
||||
# regenerate build state
|
||||
self.rebuild_run_dbt(expect_pass=False)
|
||||
|
||||
# modify model again to trigger the state:modified selector
|
||||
with open('models/table_model_modified_example.sql', 'w') as fp:
|
||||
fp.write(newline)
|
||||
fp.write("select * from forced_another_error")
|
||||
fp.write(newline)
|
||||
|
||||
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False)
|
||||
assert len(results) == 5
|
||||
nodes = set([elem.node.name for elem in results])
|
||||
assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'}
|
||||
Reference in New Issue
Block a user