Compare commits

...

71 Commits

Author SHA1 Message Date
Matt Winkler
58dcb8b173 add archive_path() for t-2 sources to freshness.py 2021-12-06 16:26:42 -07:00
Matt Winkler
6a1caba22c make SourceFreshSelectorMethod aware of t-2 sources 2021-12-06 16:21:04 -07:00
Matt Winkler
f096432946 make PreviousState aware of t-2 sources 2021-12-06 16:20:34 -07:00
Matt Winkler
248473e6c9 add previous sources artifact read 2021-12-06 14:13:03 -07:00
Matt Winkler
3d7a18638a Merge branch 'feature/smart-source-freshness-runs' of github.com:fishtown-analytics/dbt into feature/smart-source-freshness-runs 2021-12-06 11:19:39 -07:00
Sung Won Chung
4cca4ddc14 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 15:30:44 -06:00
Sung Won Chung
31977136a5 draft work 2021-12-02 15:30:38 -06:00
Sung Won Chung
301427e305 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 10:33:36 -06:00
Sung Won Chung
9a69666130 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-02 09:27:39 -06:00
Sung Won Chung
4445217b67 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-12-01 10:37:40 -06:00
Sung Won Chung
343a0ebd96 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-30 09:12:52 -06:00
Sung Won Chung
87c67fd10c headless path utils 2021-11-29 14:13:40 -06:00
Sung Won Chung
5f8333727f Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-29 13:03:23 -06:00
Sung Won Chung
6483e95a42 add default target path 2021-11-23 14:50:58 -06:00
Sung Won Chung
8c9e5de086 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-23 14:04:38 -06:00
Sung Won Chung
4748b4b2ee Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-22 15:29:12 -06:00
Sung Won Chung
54fc9ccb96 simple profiles_dir import 2021-11-22 15:28:58 -06:00
Sung Won Chung
1edb0dfcd1 add TODO 2021-11-22 15:12:40 -06:00
Sung Won Chung
40ea7ce276 dynamic cwd 2021-11-22 15:01:49 -06:00
Sung Won Chung
6345fe4f75 dynamic project root 2021-11-22 14:56:12 -06:00
Sung Won Chung
08504e4ec2 remove unused import 2021-11-22 09:33:44 -06:00
Sung Won Chung
6f82a525f1 keep it DRY 2021-11-22 09:26:25 -06:00
Sung Won Chung
605ca1a31b remove exclusion logic for status 2021-11-22 09:21:23 -06:00
Sung Won Chung
68630332ba Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-22 09:18:35 -06:00
Sung Won Chung
ac4edb51f0 data bookmarks matter only 2021-11-22 09:16:04 -06:00
Sung Won Chung
5f75b9539f Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-19 09:40:28 -06:00
Sung Won Chung
ec2c2c8cf7 add fresh selector 2021-11-18 17:36:07 -06:00
Sung Won Chung
3aef719fe4 remove print 2021-11-18 13:21:48 -06:00
Sung Won Chung
ce9c00340f turn generator into set 2021-11-18 13:20:18 -06:00
Sung Won Chung
b8d288b6f0 cleaner syntax 2021-11-18 11:51:52 -06:00
Sung Won Chung
df09392d92 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 08:52:45 -06:00
Sung Won Chung
bd5a022e4e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-18 07:41:38 -06:00
Sung Won Chung
911cb53063 remove for now 2021-11-17 17:42:16 -06:00
Sung Won Chung
b45715cef7 don't run wasteful logs 2021-11-17 17:31:28 -06:00
Sung Won Chung
2e5fb45e5f clearer refresh language 2021-11-17 17:22:00 -06:00
Sung Won Chung
48bebe11b1 fix error conditions 2021-11-17 17:11:46 -06:00
Sung Won Chung
abfa16d687 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 16:46:45 -06:00
Sung Won Chung
39d145c7a8 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 14:52:44 -06:00
Sung Won Chung
8250b30862 add todo 2021-11-17 13:58:05 -06:00
Sung Won Chung
9309e4e906 remove print 2021-11-17 13:56:13 -06:00
Sung Won Chung
94a021806b add current state 2021-11-17 13:54:20 -06:00
Sung Won Chung
902cc75c69 correct import 2021-11-17 13:53:35 -06:00
Sung Won Chung
c18335f51d compare current and previous state 2021-11-17 13:53:05 -06:00
Sung Won Chung
191821779e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 10:13:12 -06:00
Sung Won Chung
e2ffd41cc3 Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-17 08:44:29 -06:00
Sung Won Chung
6e9073a3c1 clarify that status needs to be correct 2021-11-16 16:17:51 -06:00
Sung Won Chung
e4bd6ee62d new selector flag name 2021-11-16 16:07:36 -06:00
Sung Won Chung
d2e3248379 new log format 2021-11-16 16:02:28 -06:00
Sung Won Chung
24bbf5b843 make compatible with rc and new logger 2021-11-16 12:30:02 -06:00
Sung Won Chung
a4ec84f37e Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-16 11:40:51 -06:00
Sung Won Chung
61de5aea19 Revert "Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs""
This reverts commit 71125167a1.
2021-11-09 10:23:52 -06:00
Sung Won Chung
cad6bb0576 use a blank set instead 2021-11-09 10:03:39 -06:00
Sung Won Chung
9e3de01175 handle criterion that does not match nodes 2021-11-09 09:48:42 -06:00
Sung Won Chung
191bbae093 remove comments 2021-11-09 09:44:27 -06:00
Sung Won Chung
17fa096533 tidy up logs 2021-11-09 09:42:20 -06:00
Sung Won Chung
71125167a1 Revert "Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs"
This reverts commit 7fee4d44bf, reversing
changes made to 17c47ff42d.
2021-11-09 09:30:14 -06:00
Sung Won Chung
7fee4d44bf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-09 09:29:06 -06:00
Sung Won Chung
17c47ff42d cleaner logging 2021-11-09 09:28:53 -06:00
Sung Won Chung
f437c8e3eb add concurrent selectors note 2021-11-09 09:22:51 -06:00
Sung Won Chung
daa9ab1e73 include logs with meaningul info 2021-11-08 17:15:58 -06:00
Sung Won Chung
d4ff259d66 better if handling 2021-11-08 17:13:48 -06:00
Sung Won Chung
32de684121 opinionated fresh node selection 2021-11-08 16:10:43 -06:00
Sung Won Chung
3d043fe8fd remove prints and clean up logger 2021-11-08 15:59:37 -06:00
Sung Won Chung
5a5df384c0 add excluded source children nodes 2021-11-08 15:48:58 -06:00
Sung Won Chung
7924b320cf Merge branch 'main' of https://github.com/dbt-labs/dbt into feature/smart-source-freshness-runs 2021-11-08 10:30:11 -06:00
Sung Won Chung
5005e0c4bf smarter depends on graph searching notes 2021-10-29 16:44:20 -05:00
Sung Won Chung
e5bdb93636 add todo 2021-10-29 15:18:50 -05:00
Sung Won Chung
3e8e91c4c2 copy test template 2021-10-29 13:49:07 -05:00
Sung Won Chung
036a9bf92f remove debug print logs 2021-10-29 12:38:53 -05:00
Sung Won Chung
98e98e5c3b working selector code 2021-10-29 12:38:27 -05:00
Sung Won Chung
72eb4302be first draft 2021-10-29 10:45:42 -05:00
5 changed files with 649 additions and 3 deletions

View File

@@ -1,15 +1,21 @@
from pathlib import Path from pathlib import Path
from .graph.manifest import WritableManifest from .graph.manifest import WritableManifest
from .results import RunResultsArtifact from .results import RunResultsArtifact
from .results import FreshnessExecutionResultArtifact
from typing import Optional from typing import Optional
from dbt.exceptions import IncompatibleSchemaException from dbt.exceptions import IncompatibleSchemaException
from dbt.path_utils import PathUtils
class PreviousState: class PreviousState:
def __init__(self, path: Path): def __init__(self, path: Path):
self.path: Path = path self.path: Path = path
self.manifest: Optional[WritableManifest] = None self.manifest: Optional[WritableManifest] = None
self.results: Optional[RunResultsArtifact] = None self.results: Optional[RunResultsArtifact] = None
self.sources: Optional[FreshnessExecutionResultArtifact] = None
self.previous_sources: Optional[FreshnessExecutionResultArtifact] = None
manifest_path = self.path / 'manifest.json' manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file(): if manifest_path.exists() and manifest_path.is_file():
@@ -26,3 +32,40 @@ class PreviousState:
except IncompatibleSchemaException as exc: except IncompatibleSchemaException as exc:
exc.add_filename(str(results_path)) exc.add_filename(str(results_path))
raise raise
current_sources_path = self.path / 'sources.json'
if current_sources_path.exists() and current_sources_path.is_file():
try:
self.current_sources = FreshnessExecutionResultArtifact.read(str(current_sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(current_sources_path))
raise
# we either have to get the previous version of sources state from somewhere or generate it on the fly . . .
archive_sources_path = self.path / 'archive_sources' / 'sources.json'
if archive_sources_path.exists() and archive_sources_path.is_file():
try:
self.archive_sources = FreshnessExecutionResultArtifact.read(str(archive_sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(archive_sources_path))
raise
# bring in the project class that needs to be instantiated
# define the target path at this step(how do I consider a different target path? based on dbt_project.yml)
# the problem I'm facing right now is that the project config is populated AFTER this step
# the reason this works with previous state is that we manually set the previous state path
# current state is more difficult because we have to read the project config first before this class is instantiated
class CurrentState(PathUtils):
def __init__(self):
super().__init__()
self.sources: Optional[FreshnessExecutionResultArtifact] = None
target_path = self.get_target_path()
sources_path = target_path / 'sources.json'
if sources_path.exists() and sources_path.is_file():
try:
self.sources = FreshnessExecutionResultArtifact.read(str(sources_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(sources_path))
raise

View File

@@ -22,7 +22,7 @@ from dbt.contracts.graph.parsed import (
ParsedGenericTestNode, ParsedGenericTestNode,
ParsedSourceDefinition, ParsedSourceDefinition,
) )
from dbt.contracts.state import PreviousState from dbt.contracts.state import PreviousState, CurrentState
from dbt.exceptions import ( from dbt.exceptions import (
InternalException, InternalException,
RuntimeException, RuntimeException,
@@ -48,6 +48,7 @@ class MethodName(StrEnum):
Exposure = 'exposure' Exposure = 'exposure'
Metric = 'metric' Metric = 'metric'
Result = 'result' Result = 'result'
SourceFresh = 'source_fresh'
def is_selected_node(fqn: List[str], node_selector: str): def is_selected_node(fqn: List[str], node_selector: str):
@@ -577,6 +578,48 @@ class ResultSelectorMethod(SelectorMethod):
if node in matches: if node in matches:
yield node yield node
class SourceFreshSelectorMethod(SelectorMethod): #TODO: this requires SelectorMethod to have current_state as an argument. currently, this works because it's all hard-coded
def search(
self, included_nodes: Set[UniqueId], selector: str
) -> Iterator[UniqueId]:
#self.current_state = CurrentState() #TODO: fix this by importing target_path later
#TODO: this section in general we should make clear whether it's the current or archived (or whatever name we choose)
# sources file that's missing.
if self.previous_state is None or \
self.previous_state.current_sources is None or \
self.previous_state.archive_sources is None:
raise InternalException(
'No previous state comparison freshness results in sources.json'
)
# elif self.current_state is None or self.current_state.sources is None:
# raise InternalException(
# 'No current state comparison freshness results in sources.json'
# )
current_state_sources = {
result.unique_id:result.max_loaded_at for result in self.previous_state.current_sources.results
}
# TODO: keeping this the same for now but could adjust naming
previous_state_sources = {
result.unique_id:result.max_loaded_at for result in self.previous_state.archive_sources.results
}
matches = set()
for unique_id in current_state_sources:
if unique_id not in previous_state_sources:
matches.add(unique_id)
elif selector == 'yes' and current_state_sources.get(unique_id) > previous_state_sources.get(unique_id):
matches.add(unique_id)
elif selector == 'no' and current_state_sources.get(unique_id) <= previous_state_sources.get(unique_id):
matches.add(unique_id)
for node, real_node in self.all_nodes(included_nodes):
if node in matches:
yield node
class MethodManager: class MethodManager:
SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = { SELECTOR_METHODS: Dict[MethodName, Type[SelectorMethod]] = {
@@ -593,6 +636,7 @@ class MethodManager:
MethodName.Exposure: ExposureSelectorMethod, MethodName.Exposure: ExposureSelectorMethod,
MethodName.Metric: MetricSelectorMethod, MethodName.Metric: MetricSelectorMethod,
MethodName.Result: ResultSelectorMethod, MethodName.Result: ResultSelectorMethod,
MethodName.SourceFresh: SourceFreshSelectorMethod,
} }
def __init__( def __init__(

112
core/dbt/path_utils.py Normal file
View File

@@ -0,0 +1,112 @@
import os
import argparse
from pathlib import Path
from typing import Dict, Any
from dbt.exceptions import DbtProjectError
from dbt.clients.system import load_file_contents
from dbt.clients.yaml_helper import load_yaml_text
from dbt.clients.system import path_exists
from dbt.exceptions import (
NotImplementedException, CompilationException, RuntimeException,
InternalException
)
from dbt.flags import PROFILES_DIR
# from dbt.main import parse_args
# from dbt.task.base import get_nearest_project_dir
#TODO: update this for profiles_dir instead of project_root, --config-dir, --profiles-dir
#TODO: pick a hierarchy of profiles_dir, config_dir, DBT_PROFILES_DIR, project_root,
#TODO: DEFAULT_PROFILES_DIR is duplicated across flags.py and profile.py, helpful to note we duplicate as needed
# import a method from main.py to capture the project root from PROFILES_DIR being redefined
#
# --output is a flag used for the freshness command and should be ignored?
# args = sys.argv[1:]
# print(args)
# parsed = parse_args(args)
# get the argument from the command line as needed
# copy over get_nearest_project_dir?
# pass in --project-dir flag and then default to current working directory
class PathArgsParser:
def path_args_subparser(self):
base_subparser = argparse.ArgumentParser(add_help=False)
base_subparser.add_argument(
'--project-dir',
default=None,
dest='project_dir',
type=str,
help='''
Which directory to look in for the dbt_project.yml file.
Default is the current working directory and its parents.
'''
)
return base_subparser
def get_nearest_project_dir(self):
base_subparser = self.path_args_subparser()
# If the user provides an explicit project directory, use that
# but don't look at parent directories.
project_dir = getattr(base_subparser, 'project_dir', None)
print(f"project_dir: {project_dir}")
if project_dir is not None:
project_file = os.path.join(project_dir, "dbt_project.yml")
if os.path.exists(project_file):
return project_dir
else:
raise RuntimeException(
"fatal: Invalid --project-dir flag. Not a dbt project. "
"Missing dbt_project.yml file"
)
root_path = os.path.abspath(os.sep)
cwd = os.getcwd()
while cwd != root_path:
project_file = os.path.join(cwd, "dbt_project.yml")
if os.path.exists(project_file):
return cwd
cwd = os.path.dirname(cwd)
raise RuntimeException(
"fatal: Not a dbt project (or any of the parent directories). "
"Missing dbt_project.yml file"
)
PathArgs = PathArgsParser()
PathArgs.get_nearest_project_dir()
class PathUtils:
def __init__(self):
self.project_root = PROFILES_DIR #TODO: check if this is "allowed"
def _load_yaml(self, path):
contents = load_file_contents(path)
return load_yaml_text(contents)
def _raw_project_from(self, project_root: str) -> Dict[str, Any]:
project_root = os.path.normpath(project_root)
project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml')
if not path_exists(project_yaml_filepath):
raise DbtProjectError(
'no dbt_project.yml found at expected path {}'
.format(project_yaml_filepath)
)
project_dict = self._load_yaml(project_yaml_filepath)
if not isinstance(project_dict, dict):
raise DbtProjectError(
'dbt_project.yml does not parse to a dictionary'
)
return project_dict
def get_target_path(self) -> Path:
project_dict = self._raw_project_from(self.project_root)
return Path(project_dict.get('target-path','target'))

View File

@@ -26,7 +26,7 @@ from dbt.contracts.graph.parsed import ParsedSourceDefinition
RESULT_FILE_NAME = 'sources.json' RESULT_FILE_NAME = 'sources.json'
RESULT_ARCHIVE_DIR_NAME = 'archive_sources'
class FreshnessRunner(BaseRunner): class FreshnessRunner(BaseRunner):
def on_skip(self): def on_skip(self):
@@ -191,6 +191,15 @@ class FreshnessTask(GraphRunnableTask):
else: else:
return os.path.join(self.config.target_path, RESULT_FILE_NAME) return os.path.join(self.config.target_path, RESULT_FILE_NAME)
def archive_path(self):
if self.args.output:
return os.path.realpath(self.args.output + '/' + RESULT_ARCHIVE_DIR_NAME)
else:
if not os.path.exists(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME)):
os.mkdir(os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME))
return os.path.join(self.config.target_path, RESULT_ARCHIVE_DIR_NAME, RESULT_FILE_NAME)
def raise_on_first_error(self): def raise_on_first_error(self):
return False return False
@@ -211,6 +220,8 @@ class FreshnessTask(GraphRunnableTask):
def write_result(self, result): def write_result(self, result):
artifact = FreshnessExecutionResultArtifact.from_result(result) artifact = FreshnessExecutionResultArtifact.from_result(result)
if os.path.exists(self.result_path()):
os.rename(self.result_path(), self.archive_path())
artifact.write(self.result_path()) artifact.write(self.result_path())
def get_result(self, results, elapsed_time, generated_at): def get_result(self, results, elapsed_time, generated_at):

View File

@@ -0,0 +1,436 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
import random
import shutil
import string
import pytest
from dbt.exceptions import CompilationException
class TestRunResultsState(DBTIntegrationTest):
@property
def schema(self):
return "run_results_state_062"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['macros'],
'seeds': {
'test': {
'quote_columns': True,
}
}
}
def _symlink_test_folders(self):
# dbt's normal symlink behavior breaks this test. Copy the files
# so we can freely modify them.
for entry in os.listdir(self.test_original_source_path):
src = os.path.join(self.test_original_source_path, entry)
tst = os.path.join(self.test_root_dir, entry)
if entry in {'models', 'seeds', 'macros'}:
shutil.copytree(src, tst)
elif os.path.isdir(entry) or entry.endswith('.sql'):
os.symlink(src, tst)
def copy_state(self):
assert not os.path.exists('state')
os.makedirs('state')
shutil.copyfile('target/manifest.json', 'state/manifest.json')
shutil.copyfile('target/run_results.json', 'state/run_results.json')
def setUp(self):
super().setUp()
self.run_dbt(['build'])
self.copy_state()
def rebuild_run_dbt(self, expect_pass=True):
shutil.rmtree('./state')
self.run_dbt(['build'], expect_pass=expect_pass)
self.copy_state()
@use_profile('postgres')
def test_postgres_seed_run_results_state(self):
shutil.rmtree('./state')
self.run_dbt(['seed'])
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
fp.write(f'\"\'\'3,carl{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
with open('seeds/seed.csv') as fp:
fp.readline()
newline = fp.newlines
with open('seeds/seed.csv', 'a') as fp:
# assume each line is ~2 bytes + len(name)
target_size = 1*1024*1024
line_size = 64
num_lines = target_size // line_size
maxlines = num_lines + 4
for idx in range(4, maxlines):
value = ''.join(random.choices(string.ascii_letters, k=62))
fp.write(f'{idx},{value}{newline}')
shutil.rmtree('./state')
self.run_dbt(['seed'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.seed'
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 7
assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
@use_profile('postgres')
def test_postgres_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state'])
assert len(results) == 3
assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'}
results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 4
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state'])
assert len(results) == 6 # includes exposure
assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'}
# test failure on build tests
# fail the unique test
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select 1 as id union all select 1 as id")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
f = open('models/schema.yml', 'r')
filedata = f.read()
f.close()
newdata = filedata.replace('error','warn')
f = open('models/schema.yml', 'w')
f.write(newdata)
f.close()
self.rebuild_run_dbt(expect_pass=True)
results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state'])
assert len(results) == 1
assert results[0] == 'test.unique_view_model_id'
results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 2 # includes table_model to be run
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model', 'unique_view_model_id'}
results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state'])
assert len(results) == 2
assert set(results) == {'test.table_model', 'test.unique_view_model_id'}
@use_profile('postgres')
def test_postgres_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# clear state and rerun upstream view model to test + operator
shutil.rmtree('./state')
self.run_dbt(['run', '--select', 'view_model'], expect_pass=True)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# check we are starting from a place with 0 errors
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'])
assert len(results) == 0
# force an error in the view model to test error and skipped states
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# test single result selector on error
results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'view_model'
# test + operator selection on error
results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 2
assert results[0].node.name == 'view_model'
assert results[1].node.name == 'table_model'
# single result selector on skipped. Expect this to pass becase underlying view already defined above
results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'table_model'
# add a downstream model that depends on table_model for skipped+ selector
with open('models/table_model_downstream.sql', 'w') as fp:
fp.write("select * from {{ref('table_model')}}")
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True)
assert len(results) == 2
assert results[0].node.name == 'table_model'
assert results[1].node.name == 'table_model_downstream'
@use_profile('postgres')
def test_postgres_test_run_results_state(self):
# run passed nodes
results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# run passed nodes with + operator
results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True)
assert len(results) == 2
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id', 'not_null_view_model_id'}
# update view model to generate a failure case
os.remove('./models/view_model.sql')
with open('models/view_model.sql', 'w') as fp:
fp.write("select 1 as id union all select 1 as id")
self.rebuild_run_dbt(expect_pass=False)
# test with failure selector
results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with failure selector and + operator
results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# change the unique test severity from error to warn and reuse the same view_model.sql changes above
with open('models/schema.yml', 'r+') as f:
filedata = f.read()
newdata = filedata.replace('error','warn')
f.seek(0)
f.write(newdata)
f.truncate()
# rebuild - expect_pass = True because we changed the error to a warning this time around
self.rebuild_run_dbt(expect_pass=True)
# test with warn selector
results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
# test with warn selector and + operator
results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True)
assert len(results) == 1
assert results[0].node.name == 'unique_view_model_id'
@use_profile('postgres')
def test_postgres_concurrent_selectors_run_run_results_state(self):
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
shutil.rmtree('./state')
self.run_dbt(['run'], expect_pass=False)
self.copy_state()
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 3
nodes = set([elem.node.name for elem in results])
assert nodes == {'view_model', 'table_model_modified_example', 'table_model'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_test_run_results_state(self):
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id union all select null as id')
# run dbt build again to trigger test errors
self.rebuild_run_dbt(expect_pass=False)
# get the failures from
results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False)
assert len(results) == 1
nodes = set([elem.node.name for elem in results])
assert nodes == {'unique_view_model_id'}
@use_profile('postgres')
def test_postgres_concurrent_selectors_build_run_results_state(self):
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'])
assert len(results) == 0
# force an error on a dbt model
with open('models/view_model.sql') as fp:
fp.readline()
newline = fp.newlines
with open('models/view_model.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
self.rebuild_run_dbt(expect_pass=False)
# modify another dbt model
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'}
# create failure test case for result:fail selector
os.remove('./models/view_model.sql')
with open('./models/view_model.sql', 'w') as f:
f.write('select 1 as id union all select 1 as id')
# create error model case for result:error selector
with open('./models/error_model.sql', 'w') as f:
f.write('select 1 as id from not_exists')
# create something downstream from the error model to rerun
with open('./models/downstream_of_error_model.sql', 'w') as f:
f.write('select * from {{ ref("error_model") }} )')
# regenerate build state
self.rebuild_run_dbt(expect_pass=False)
# modify model again to trigger the state:modified selector
with open('models/table_model_modified_example.sql', 'w') as fp:
fp.write(newline)
fp.write("select * from forced_another_error")
fp.write(newline)
results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False)
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'}