Compare commits

...

71 Commits

Author SHA1 Message Date
Matt Winkler
58dcb8b173 add archive_path() for t-2 sources to freshness.py 2021-12-06 16:26:42 -07:00
Matt Winkler
6a1caba22c make SourceFreshSelectorMethod aware of t-2 sources 2021-12-06 16:21:04 -07:00
Matt Winkler
f096432946 make PreviousState aware of t-2 sources 2021-12-06 16:20:34 -07:00
Matt Winkler
248473e6c9 add previous sources artifact read 2021-12-06 14:13:03 -07:00
Matt Winkler
3d7a18638a Merge branch 'feature/smart-source-freshness-runs' of github.com:fishtown-analytics/dbt into feature/smart-source-freshness-runs 2021-12-06 11:19:39 -07:00
Sung Won Chung
4cca4ddc14 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 15:30:44 -06:00
Sung Won Chung
31977136a5 draft work 2021-12-02 15:30:38 -06:00
Sung Won Chung
301427e305 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 10:33:36 -06:00
Sung Won Chung
9a69666130 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 09:27:39 -06:00
Sung Won Chung
4445217b67 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-01 10:37:40 -06:00
Sung Won Chung
343a0ebd96 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-30 09:12:52 -06:00
Sung Won Chung
87c67fd10c headless path utils 2021-11-29 14:13:40 -06:00
Sung Won Chung
5f8333727f Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-29 13:03:23 -06:00
Sung Won Chung
6483e95a42 add default target path 2021-11-23 14:50:58 -06:00
Sung Won Chung
8c9e5de086 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-23 14:04:38 -06:00
Sung Won Chung
4748b4b2ee Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-22 15:29:12 -06:00
Sung Won Chung
54fc9ccb96 simple profiles_dir import 2021-11-22 15:28:58 -06:00
Sung Won Chung
1edb0dfcd1 add TODO 2021-11-22 15:12:40 -06:00
Sung Won Chung
40ea7ce276 dynamic cwd 2021-11-22 15:01:49 -06:00
Sung Won Chung
6345fe4f75 dynamic project root 2021-11-22 14:56:12 -06:00
Sung Won Chung
08504e4ec2 remove unused import 2021-11-22 09:33:44 -06:00
Sung Won Chung
6f82a525f1 keep it DRY 2021-11-22 09:26:25 -06:00
Sung Won Chung
605ca1a31b remove exclusion logic for status 2021-11-22 09:21:23 -06:00
Sung Won Chung
68630332ba Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-22 09:18:35 -06:00
Sung Won Chung
ac4edb51f0 data bookmarks matter only 2021-11-22 09:16:04 -06:00
Sung Won Chung
5f75b9539f Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-19 09:40:28 -06:00
Sung Won Chung
ec2c2c8cf7 add fresh selector 2021-11-18 17:36:07 -06:00
Sung Won Chung
3aef719fe4 remove print 2021-11-18 13:21:48 -06:00
Sung Won Chung
ce9c00340f turn generator into set 2021-11-18 13:20:18 -06:00
Sung Won Chung
b8d288b6f0 cleaner syntax 2021-11-18 11:51:52 -06:00
Sung Won Chung
df09392d92 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 08:52:45 -06:00
Sung Won Chung
bd5a022e4e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 07:41:38 -06:00
Sung Won Chung
911cb53063 remove for now 2021-11-17 17:42:16 -06:00
Sung Won Chung
b45715cef7 don't run wasteful logs 2021-11-17 17:31:28 -06:00
Sung Won Chung
2e5fb45e5f clearer refresh language 2021-11-17 17:22:00 -06:00
Sung Won Chung
48bebe11b1 fix error conditions 2021-11-17 17:11:46 -06:00
Sung Won Chung
abfa16d687 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 16:46:45 -06:00
Sung Won Chung
39d145c7a8 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 14:52:44 -06:00
Sung Won Chung
8250b30862 add todo 2021-11-17 13:58:05 -06:00
Sung Won Chung
9309e4e906 remove print 2021-11-17 13:56:13 -06:00
Sung Won Chung
94a021806b add current state 2021-11-17 13:54:20 -06:00
Sung Won Chung
902cc75c69 correct import 2021-11-17 13:53:35 -06:00
Sung Won Chung
c18335f51d compare current and previous state 2021-11-17 13:53:05 -06:00
Sung Won Chung
191821779e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 10:13:12 -06:00
Sung Won Chung
e2ffd41cc3 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 08:44:29 -06:00
Sung Won Chung
6e9073a3c1 clarify that status needs to be correct 2021-11-16 16:17:51 -06:00
Sung Won Chung
e4bd6ee62d new selector flag name 2021-11-16 16:07:36 -06:00
Sung Won Chung
d2e3248379 new log format 2021-11-16 16:02:28 -06:00
Sung Won Chung
24bbf5b843 make compatible with rc and new logger 2021-11-16 12:30:02 -06:00
Sung Won Chung
a4ec84f37e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-16 11:40:51 -06:00
Sung Won Chung
61de5aea19 Revert "Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs""
This reverts commit 71125167a1.
2021-11-09 10:23:52 -06:00
Sung Won Chung
cad6bb0576 use a blank set instead 2021-11-09 10:03:39 -06:00
Sung Won Chung
9e3de01175 handle criterion that does not match nodes 2021-11-09 09:48:42 -06:00
Sung Won Chung
191bbae093 remove comments 2021-11-09 09:44:27 -06:00
Sung Won Chung
17fa096533 tidy up logs 2021-11-09 09:42:20 -06:00
Sung Won Chung
71125167a1 Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs"
This reverts commit 7fee4d44bf, reversing
changes made to 17c47ff42d.
2021-11-09 09:30:14 -06:00
Sung Won Chung
7fee4d44bf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-09 09:29:06 -06:00
Sung Won Chung
17c47ff42d cleaner logging 2021-11-09 09:28:53 -06:00
Sung Won Chung
f437c8e3eb add concurrent selectors note 2021-11-09 09:22:51 -06:00
Sung Won Chung
daa9ab1e73 include logs with meaningul info 2021-11-08 17:15:58 -06:00
Sung Won Chung
d4ff259d66 better if handling 2021-11-08 17:13:48 -06:00
Sung Won Chung
32de684121 opinionated fresh node selection 2021-11-08 16:10:43 -06:00
Sung Won Chung
3d043fe8fd remove prints and clean up logger 2021-11-08 15:59:37 -06:00
Sung Won Chung
5a5df384c0 add excluded source children nodes 2021-11-08 15:48:58 -06:00
Sung Won Chung
7924b320cf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-08 10:30:11 -06:00
Sung Won Chung
5005e0c4bf smarter depends on graph searching notes 2021-10-29 16:44:20 -05:00
Sung Won Chung
e5bdb93636 add todo 2021-10-29 15:18:50 -05:00
Sung Won Chung
3e8e91c4c2 copy test template 2021-10-29 13:49:07 -05:00
Sung Won Chung
036a9bf92f remove debug print logs 2021-10-29 12:38:53 -05:00
Sung Won Chung
98e98e5c3b working selector code 2021-10-29 12:38:27 -05:00
Sung Won Chung
72eb4302be first draft 2021-10-29 10:45:42 -05:00
5 changed files with 649 additions and 3 deletions

View File

@@ -1,15 +1,21 @@
from pathlib import Path
from .graph.manifest import WritableManifest
from .results import RunResultsArtifact
from .results import FreshnessExecutionResultArtifact
from typing import Optional
from dbt.exceptions import IncompatibleSchemaException
from dbt.path_utils import PathUtils
class PreviousState:
def __init__(self, path: Path):
self.path: Path = path
self.manifest: Optional[WritableManifest] = None
self.results: Optional[RunResultsArtifact] = None
self.sources: Optional[FreshnessExecutionResultArtifact] = None
self.previous_sources: Optional[FreshnessExecutionResultArtifact] = None
manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
@@ -26,3 +32,40 @@ class PreviousState:
except IncompatibleSchemaException as exc:
exc.add_filename(str(results_path))
raise
current_sources_path = self.path / 'sources.json'
if current_sources_path.exists() and current_sources_path.is_file():
try:
self.current_sources = FreshnessExecutionResultArtifact.read(str(current_sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(current_sources_path))
raise
# we either have to get the previous version of sources state from somewhere or generate it on the fly . . .
archive_sources_path = self.path / 'archive_sources' / 'sources.json'
if archive_sources_path.exists() and archive_sources_path.is_file():
try:
self.archive_sources = FreshnessExecutionResultArtifact.read(str(archive_sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(archive_sources_path))
raise
# bring in the project class that needs to be instantiated
# define the target path at this step(how do I consider a different target path? based on dbt_project.yml)
# the problem I'm facing right now is that the project config is populated AFTER this step
# the reason this works with previous state is that we manually set the previous state path
# current state is more difficult because we have to read the project config first before this class is instantiated
class CurrentState(PathUtils):
def __init__(self):
super().__init__()
self.sources: Optional[FreshnessExecutionResultArtifact] = None
target_path = self.get_target_path()
sources_path = target_path / 'sources.json'
if sources_path.exists() and sources_path.is_file():
try:
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(sources_path))
raise

View File

@@ -22,7 +22,7 @@ from dbt.contracts.graph.parsed import (
ParsedGenericTestNode,
ParsedSourceDefinition,
)
from dbt.contracts.state import PreviousState
from dbt.contracts.state import PreviousState, CurrentState
from dbt.exceptions import (
InternalException,
RuntimeException,
@@ -48,6 +48,7 @@ class MethodName(StrEnum):
Exposure = 'exposure'
Metric = 'metric'
Result = 'result'
SourceFresh = 'source_fresh'
def is_selected_node(fqn: List[str], node_selector: str):
@@ -577,6 +578,48 @@ class ResultSelectorMethod(SelectorMethod):
if node in matches:
yield node
class SourceFreshSelectorMethod(SelectorMethod): #TODO: this requires SelectorMethod to have current_state as an argument. currently, this works because it's all hard-coded
def search(
self, included_nodes: Set[UniqueId], selector: str
) -> Iterator[UniqueId]:
#self.current_state = CurrentState() #TODO: fix this by importing target_path later
#TODO: this section in general we should make clear whether it's the current or archived (or whatever name we choose)
# sources file that's missing.
if self.previous_state is None or \
self.previous_state.current_sources is None or \
self.previous_state.archive_sources is None:
raise InternalException(
'No previous state comparison freshness results in sources.json'
)
# elif self.current_state is None or self.current_state.sources is None:
# raise InternalException(
# 'No current state comparison freshness results in sources.json'
# )
current_state_sources = {
result.unique_id:result.max_loaded_at for result in self.previous_state.current_sources.results
}
# TODO: keeping this the same for now but could adjust naming
previous_state_sources = {
result.unique_id:result.max_loaded_at for result in self.previous_state.archive_sources.results
}
matches = set()
for unique_id in current_state_sources:
if unique_id not in previous_state_sources:
matches.add(unique_id)
elif selector == 'yes' and current_state_sources.get(unique_id) > previous_state_sources.get(unique_id):
matches.add(unique_id)
elif selector == 'no' and current_state_sources.get(unique_id) <= previous_state_sources.get(unique_id):
matches.add(unique_id)
for node, real_node in self.all_nodes(included_nodes):
if node in matches:
yield node
class MethodManager:
SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = {
@@ -593,6 +636,7 @@ class MethodManager:
MethodName.Exposure: ExposureSelectorMethod,
MethodName.Metric: MetricSelectorMethod,
MethodName.Result: ResultSelectorMethod,
MethodName.SourceFresh: SourceFreshSelectorMethod,
}
def __init__(

112
core/dbt/path_utils.py Normal file
View File

@@ -0,0 +1,112 @@
import os
import argparse
from pathlib import Path
from typing import Dict, Any
from dbt.exceptions import DbtProjectError
from dbt.clients.system import load_file_contents
from dbt.clients.yaml_helper import load_yaml_text
from dbt.clients.system import path_exists
from dbt.exceptions import (
NotImplementedException, CompilationException, RuntimeException,
InternalException
)
from dbt.flags import PROFILES_DIR
# from dbt.main import parse_args
# from dbt.task.base import get_nearest_project_dir
#TODO: update this for profiles_dir instead of project_root, --config-dir, --profiles-dir
#TODO: pick a hierarchy of profiles_dir, config_dir, DBT_PROFILES_DIR, project_root,
#TODO: DEFAULT_PROFILES_DIR is duplicated across flags.py and profile.py, helpful to note we duplicate as needed
# import a method from main.py to capture the project root from PROFILES_DIR being redefined
#
# --output is a flag used for the freshness command and should be ignored?
# args = sys.argv[1:]
# print(args)
# parsed = parse_args(args)
# get the argument from the command line as needed
# copy over get_nearest_project_dir?
# pass in --project-dir flag and then default to current working directory
class PathArgsParser:
def path_args_subparser(self):
base_subparser = argparse.ArgumentParser(add_help=False)
base_subparser.add_argument(
'--project-dir',
default=None,
dest='project_dir',
type=str,
help='''
Which directory to look in for the dbt_project.yml file.
Default is the current working directory and its parents.
'''
)
return base_subparser
def get_nearest_project_dir(self):
base_subparser = self.path_args_subparser()
# If the user provides an explicit project directory, use that
# but don't look at parent directories.
project_dir = getattr(base_subparser, 'project_dir', None)
print(f"project_dir: {project_dir}")
if project_dir is not None:
project_file = os.path.join(project_dir, "dbt_project.yml")
if os.path.exists(project_file):
return project_dir
else:
raise RuntimeException(
"fatal: Invalid --project-dir flag. Not a dbt project. "
"Missing dbt_project.yml file"
)
root_path = os.path.abspath(os.sep)
cwd = os.getcwd()
while cwd != root_path:
project_file = os.path.join(cwd, "dbt_project.yml")
if os.path.exists(project_file):
return cwd
cwd = os.path.dirname(cwd)
raise RuntimeException(
"fatal: Not a dbt project (or any of the parent directories). "
"Missing dbt_project.yml file"
)
PathArgs = PathArgsParser()
PathArgs.get_nearest_project_dir()
class PathUtils:
def __init__(self):
self.project_root = PROFILES_DIR #TODO: check if this is "allowed"
def _load_yaml(self, path):
contents = load_file_contents(path)
return load_yaml_text(contents)
def _raw_project_from(self, project_root: str) -> Dict[str, Any]:
project_root = os.path.normpath(project_root)
project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml')
if not path_exists(project_yaml_filepath):
raise DbtProjectError(
'no dbt_project.yml found at expected path {}'
.format(project_yaml_filepath)
)
project_dict = self._load_yaml(project_yaml_filepath)
if not isinstance(project_dict, dict):
raise DbtProjectError(
'dbt_project.yml does not parse to a dictionary'
)
return project_dict
def get_target_path(self) -> Path:
project_dict = self._raw_project_from(self.project_root)
return Path(project_dict.get('target-path','target'))

View File

@@ -26,7 +26,7 @@ from dbt.contracts.graph.parsed import ParsedSourceDefinition
RESULT_FILE_NAME = 'sources.json'
RESULT_ARCHIVE_DIR_NAME = 'archive_sources'
class FreshnessRunner(BaseRunner):
def on_skip(self):
@@ -191,6 +191,15 @@ class FreshnessTask(GraphRunnableTask):
else:
return os.path.join(self.config.target_path, RESULT_FILE_NAME)
def archive_path(self):
if self.args.output:
return os.path.realpath(self.args.output + '/' + RESULT_ARCHIVE_DIR_NAME)
else:
if not os.path.exists(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME)):
os.mkdir(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME))
return os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME, RESULT_FILE_NAME)
def raise_on_first_error(self):
return False
@@ -211,6 +220,8 @@ class FreshnessTask(GraphRunnableTask):
def write_result(self, result):
artifact = FreshnessExecutionResultArtifact.from_result(result)
if os.path.exists(self.result_path()):
os.rename(self.result_path(), self.archive_path())
artifact.write(self.result_path())
def get_result(self, results, elapsed_time, generated_at):

View File

@@ -0,0 +1,436 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
import random
import shutil
import string
import pytest
from dbt.exceptions import CompilationException
class TestRunResultsState(DBTIntegrationTest):
@property
def schema(self):
return "run_results_state_062"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
'seeds': {
'test': {
'quote_columns': True,
}
}
}
def _symlink_test_folders(self):
# dbt's normal symlink behavior breaks this test. Copy the files
# so we can freely modify them.
for entry in os.listdir(self.test_original_source_path):
src = os.path.join(self.test_original_source_path, entry)
tst = os.path.join(self.test_root_dir, entry)
if entry in {'models', 'seeds', 'macros'}:
shutil.copytree(src, tst)
elif os.path.isdir(entry) or entry.endswith('.sql'):
os.symlink(src, tst)
def copy_state(self):
assert not os.path.exists('state')
os.makedirs('state')
shutil.copyfile('target/manifest.json', 'state/manifest.json')
shutil.copyfile('target/run_results.json', 'state/run_results.json')
def setUp(self):
super().setUp()
self.run_dbt(['build'])
self.copy_state()
def rebuild_run_dbt(self, expect_pass=True):
shutil.rmtree('./state')
self.run_dbt(['build'], expect_pass=expect_pass)
self.copy_state()
@use_profile('postgres')
def test_postgres_seed_run_results_state(self):
shutil.rmtree('./state')
self.run_dbt(['seed'])
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
fp.write(f'\"\'\'3,carl{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
# assume each line is ~2 bytes + len(name)
target_size = 1*1024*1024
line_size = 64
num_lines = target_size // line_size
maxlines = num_lines + 4
for idx in range(4, maxlines):
value = ''.join(random.choices(string.ascii_letters, k=62))
fp.write(f'{idx},{value}{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
@use_profile('postgres')
def test_postgres_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 3
assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'}
results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 4
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 6 # includes exposure
assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
# test failure on build tests
# fail the unique test
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select 1 as id union all select 1 as id")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
f = open('models/schema.yml', 'r')
filedata = f.read()
f.close()
newdata = filedata.replace('error','warn')
f = open('models/schema.yml', 'w')
f.write(newdata)
f.close()
self.rebuild_run_dbt(expect_pass=True)
results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 2 # includes table_model to be run
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
@use_profile('postgres')
def test_postgres_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# clear state and rerun upstream view model to test + operator
shutil.rmtree('./state')
self.run_dbt(['run', '--select', 'view_model'], expect_pass=True)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# check we are starting from a place with 0 errors
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
# force an error in the view model to test error and skipped states
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# test single result selector on error
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'view_model'
# test + operator selection on error
results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# single result selector on skipped. Expect this to pass becase underlying view already defined above
results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'table_model'
# add a downstream model that depends on table_model for skipped+ selector
with open('models/table_model_downstream.sql', 'w') as fp:
fp.write("select * from {{ref('table_model')}}")
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'table_model'
assert results[1].node.name == 'table_model_downstream'
@use_profile('postgres')
def test_postgres_test_run_results_state(self):
# run passed nodes
results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# run passed nodes with + operator
results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# update view model to generate a failure case
os.remove('./models/view_model.sql')
with open('models/view_model.sql', 'w') as fp:
fp.write("select 1 as id union all select 1 as id")
self.rebuild_run_dbt(expect_pass=False)
# test with failure selector
results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with failure selector and + operator
results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
with open('models/schema.yml', 'r+') as f:
filedata = f.read()
newdata = filedata.replace('error','warn')
f.seek(0)
f.write(newdata)
f.truncate()
# rebuild - expect_pass = True because we changed the error to a warning this time around
self.rebuild_run_dbt(expect_pass=True)
# test with warn selector
results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with warn selector and + operator
results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
@use_profile('postgres')
def test_postgres_concurrent_selectors_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'table_model_modified_example', 'table_model'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_test_run_results_state(self):
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id union all select null as id')
# run dbt build again to trigger test errors
self.rebuild_run_dbt(expect_pass=False)
# get the failures from
results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False)
assert len(results) == 1
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'}
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id')
# create error model case for result:error selector
with open('./models/error_model.sql', 'w') as f:
f.write('select 1 as id from not_exists')
# create something downstream from the error model to rerun
with open('./models/downstream_of_error_model.sql', 'w') as f:
f.write('select * from {{ ref("error_model") }} )')
# regenerate build state
self.rebuild_run_dbt(expect_pass=False)
# modify model again to trigger the state:modified selector
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_another_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'}