Compare commits

...

25 Commits

Author SHA1 Message Date
Michelle Ark
b9f893a291 postgres as service 2024-09-10 15:09:06 -04:00
Mike Alfare
1d3d315249 Add flags from dbt_project.yml to the Project and RuntimeConfig objects (#10644)
* add flags from dbt_project.yml to the Project and RuntimeConfig objects
2024-09-06 15:42:29 -04:00
Gerda Shank
b35ad46e3f Remove deprecation warning to change "tests:" config to "data_tests:" (#10670) 2024-09-05 20:35:28 -04:00
Gerda Shank
c28cb92af5 Warn if timestamp updated_at field uses incompatible timestamp (#10352)
Co-authored-by: Michelle Ark <michelle.ark@dbtlabs.com>
2024-09-04 14:42:14 -04:00
RyoAriyama
b56d96df5e Fix/changes current working dir when using a dbt project dir (#9596)
* made class changing directory a context manager.

* add change log

* fix conflict

* made base as a context manager

* add assertion

* Remove index.html

* add it test to testDbtRunner

* fix deps args order

* fix test

---------

Co-authored-by: Doug Beatty <doug.beatty@dbtlabs.com>
Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
2024-09-03 13:41:53 -07:00
Jeremy Cohen
37d382c8e7 Filter out empty nodes after graph selection (#10580)
* Add unit test

* Filter out empty nodes after graph selection

* Add changie

* Add --indirect-selection empty check to unit test
2024-09-03 18:48:03 +02:00
Gerda Shank
9b7f4ff842 use full manifest in adapter instead of macro_manifest (#10609)
* use full manifest in adapter instead of macro_manifest

* Add test case

* Add changelog entry

* Remove commented code.

---------

Co-authored-by: Peter Allen Webb <peter.webb@dbtlabs.com>
2024-08-29 11:32:30 -04:00
Emily Rockman
555ff8091f update dep for psycopg (#10633) 2024-08-29 09:44:49 -05:00
Emily Rockman
98fddcf54f rework test to ignore utils version (#10625) 2024-08-28 15:25:09 -05:00
Emily Rockman
d652359c61 add typing (#10619) 2024-08-28 13:26:18 -05:00
Peter Webb
f7d21e012e Add More Typing to the dbt.task Module (#10622)
* Add typing to task module.

* More typing in the task module

* Still more types for task module
2024-08-28 11:18:01 -04:00
Quigley Malcolm
e1fa461186 [TIDY FIRST] Fix typing issues in dbt/core/tasks/clean.py (#10617) 2024-08-27 17:37:16 -05:00
Quigley Malcolm
1153597970 Fix typing errors in core/dbt/contracts/sql.py (#10615) 2024-08-27 17:37:00 -05:00
Quigley Malcolm
09f9febc25 [TIDY FIRST] Fix core/dbt/version.py type hinting (#10613) 2024-08-27 17:36:31 -05:00
Doug Beatty
22181409f6 Enable calling a macro in a pre- or post-hook config in properties.yml (#10603)
* Tests for calling a macro in a pre- or post-hook config in properties.yml

* Late render pre- and post-hooks configs in properties / schema YAML files

* Changelog entry
2024-08-27 11:08:56 -06:00
William Deng
f25a474f75 updated saved query tests and fixtures (#10610) 2024-08-26 17:39:35 -04:00
aliceliu
3c55806203 Fix state:modified check for exports (#10565) 2024-08-23 15:22:38 -04:00
Gerda Shank
bba020fcc0 Add test for source names with quotes (#10588) 2024-08-21 11:57:34 -04:00
Courtney Holcomb
84eb0ff672 Bump DSI version (#10585)
* Bump DSI version

* Changelog
2024-08-20 16:37:52 -04:00
Kshitij Aranke
3695698e22 [CORE-364] Add group info to RunResultError, RunResultFailure, RunResultWarning log lines (#10535) 2024-08-19 11:26:00 -07:00
Courtney Holcomb
9ca1bc5b4c Remove unneeded TODO (#10568) 2024-08-14 14:49:47 -07:00
Gerda Shank
5f66678f6d Incremental models with a contract don't need their columns modified (#10371) 2024-08-14 08:15:25 -07:00
Jean Cochrane
63262e93cb Use model alias for the CTE identifier generated during ephemeral materialization (#10290)
* Use alias instead of name when adding ephemeral model prefixes

* Adjust TestCustomSchemaWithCustomMacroFromModelName to test ephemeral models

* Add changelog entry for ephemeral model CTE identifier fix

* Reference model.identifier and model.name where appropriate to resolve typing errors

* Move test for ephemeral model with alias to dedicated test in test_compile.py
2024-08-09 15:00:55 -07:00
Tobie Tusing
374412af53 Improve tree traversal of select_children (#10526)
* update children search

* update search to include children in original selector

* add changie

* remove unused function

* fix wrong function call

* fix depth
2024-08-09 17:38:15 -04:00
Kshitij Aranke
47848b8ea8 Fix add_ephemeral_prefix to identifier instead of name (#10550) 2024-08-09 13:58:37 -05:00
82 changed files with 1354 additions and 445 deletions

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Fix changing the current working directory when using dpt deps, clean and init.
time: 2023-12-06T19:24:42.575372+09:00
custom:
Author: rariyama
Issue: "8997"

View File

@@ -0,0 +1,7 @@
kind: Dependencies
body: Increase supported version range for dbt-semantic-interfaces. Needed to support
custom calendar features.
time: 2024-08-20T13:19:09.015225-07:00
custom:
Author: courtneyholcomb
Issue: "9265"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Warning message for snapshot timestamp data types
time: 2024-06-21T14:16:35.717637-04:00
custom:
Author: gshank
Issue: "10234"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add support for behavior flags
time: 2024-08-29T13:53:20.16122-04:00
custom:
Author: mikealfare
Issue: "10618"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Use model alias for the CTE identifier generated during ephemeral materialization
time: 2024-06-10T20:05:22.510814008Z
custom:
Author: jeancochrane
Issue: "5273"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Do not update varchar column definitions if a contract exists
time: 2024-07-28T22:14:21.67712-04:00
custom:
Author: gshank
Issue: "10362"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Fix state:modified check for exports
time: 2024-08-13T15:42:35.471685-07:00
custom:
Author: aliceliu
Issue: "10138"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Filter out empty nodes after graph selection to support consistent selection of nodes that depend on upstream public models
time: 2024-08-16T14:08:07.426235-07:00
custom:
Author: jtcohen6
Issue: "8987"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Late render pre- and post-hooks configs in properties / schema YAML files
time: 2024-08-24T21:09:03.252733-06:00
custom:
Author: dbeatty10
Issue: "10603"

View File

@@ -0,0 +1,7 @@
kind: Fixes
body: Allow the use of env_var function in certain macros in which it was previously
unavailable.
time: 2024-08-29T10:57:01.160613-04:00
custom:
Author: peterallenwebb
Issue: "10609"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: 'Remove deprecation for tests: to data_tests: change'
time: 2024-09-05T18:02:48.086421-04:00
custom:
Author: gshank
Issue: "10564"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Improve speed of tree traversal when finding children, increasing build speed for some selectors
time: 2024-08-09T13:02:34.759905-07:00
custom:
Author: ttusing
Issue: "10434"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add test for sources tables with quotes
time: 2024-08-21T09:55:16.038101-04:00
custom:
Author: gshank
Issue: "10582"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Additional type hints for `core/dbt/version.py`
time: 2024-08-27T10:50:14.047859-05:00
custom:
Author: QMalcolm
Issue: "10612"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix typing issues in core/dbt/contracts/sql.py
time: 2024-08-27T11:31:23.749912-05:00
custom:
Author: QMalcolm
Issue: "10614"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Fix type errors in `dbt/core/task/clean.py`
time: 2024-08-27T11:48:10.438173-05:00
custom:
Author: QMalcolm
Issue: "10616"

View File

@@ -165,6 +165,18 @@ jobs:
os: [ubuntu-20.04]
split-group: ${{ fromJson(needs.integration-metadata.outputs.split-groups) }}
include: ${{ fromJson(needs.integration-metadata.outputs.include) }}
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
env:
TOXENV: integration
DBT_INVOCATION_ENV: github-actions
@@ -189,14 +201,32 @@ jobs:
- name: Set up postgres (linux)
if: runner.os == 'Linux'
uses: ./.github/actions/setup-postgres-linux
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (macos)
if: runner.os == 'macOS'
uses: ./.github/actions/setup-postgres-macos
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (windows)
if: runner.os == 'Windows'
uses: ./.github/actions/setup-postgres-windows
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Install python tools
run: |

View File

@@ -34,6 +34,7 @@ class Export(dbtClassMixin):
name: str
config: ExportConfig
unrendered_config: Dict[str, str] = field(default_factory=dict)
@dataclass

View File

@@ -1,7 +1,10 @@
from typing import IO, Optional
from typing import IO, List, Optional, Union
from click.exceptions import ClickException
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import RunExecutionResult
from dbt.utils import ExitCodes
@@ -23,7 +26,7 @@ class CliException(ClickException):
# the typing of _file is to satisfy the signature of ClickException.show
# overriding this method prevents click from printing any exceptions to stdout
def show(self, _file: Optional[IO] = None) -> None:
def show(self, _file: Optional[IO] = None) -> None: # type: ignore[type-arg]
pass
@@ -31,7 +34,17 @@ class ResultExit(CliException):
"""This class wraps any exception that contains results while invoking dbt, or the
results of an invocation that did not succeed but did not throw any exceptions."""
def __init__(self, result) -> None:
def __init__(
self,
result: Union[
bool, # debug
CatalogArtifact, # docs generate
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None,
) -> None:
super().__init__(ExitCodes.ModelError)
self.result = result

View File

@@ -218,10 +218,9 @@ def clean(ctx, **kwargs):
"""Delete all folders in the clean-targets list (usually the dbt_packages and target directories.)"""
from dbt.task.clean import CleanTask
task = CleanTask(ctx.obj["flags"], ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with CleanTask(ctx.obj["flags"], ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -437,9 +436,9 @@ def deps(ctx, **kwargs):
message=f"Version is required in --add-package when a package when source is {flags.SOURCE}",
option_name="--add-package",
)
task = DepsTask(flags, ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
with DepsTask(flags, ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -459,10 +458,9 @@ def init(ctx, **kwargs):
"""Initialize a new dbt project."""
from dbt.task.init import InitTask
task = InitTask(ctx.obj["flags"])
results = task.run()
success = task.interpret_results(results)
with InitTask(ctx.obj["flags"]) as task:
results = task.run()
success = task.interpret_results(results)
return results, success

View File

@@ -375,7 +375,7 @@ class Compiler:
_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)
new_cte_name = self.add_ephemeral_prefix(cte_model.name)
new_cte_name = self.add_ephemeral_prefix(cte_model.identifier)
rendered_sql = cte_model._pre_injected_sql or cte_model.compiled_code
sql = f" {new_cte_name} as (\n{rendered_sql}\n)"
@@ -547,6 +547,8 @@ class Compiler:
the node's raw_code into compiled_code, and then calls the
recursive method to "prepend" the ctes.
"""
# REVIEW: UnitTestDefinition shouldn't be possible here because of the
# type of node, and it is likewise an invalid return type.
if isinstance(node, UnitTestDefinition):
return node

View File

@@ -480,6 +480,7 @@ class PartialProject(RenderComponents):
rendered.selectors_dict["selectors"]
)
dbt_cloud = cfg.dbt_cloud
flags: Dict[str, Any] = cfg.flags
project = Project(
project_name=name,
@@ -524,6 +525,7 @@ class PartialProject(RenderComponents):
project_env_vars=project_env_vars,
restrict_access=cfg.restrict_access,
dbt_cloud=dbt_cloud,
flags=flags,
)
# sanity check - this means an internal issue
project.validate()
@@ -568,11 +570,6 @@ class PartialProject(RenderComponents):
) = package_and_project_data_from_root(project_root)
selectors_dict = selector_data_from_root(project_root)
if "flags" in project_dict:
# We don't want to include "flags" in the Project,
# it goes in ProjectFlags
project_dict.pop("flags")
return cls.from_dicts(
project_root=project_root,
project_dict=project_dict,
@@ -645,6 +642,7 @@ class Project:
project_env_vars: Dict[str, Any]
restrict_access: bool
dbt_cloud: Dict[str, Any]
flags: Dict[str, Any]
@property
def all_source_paths(self) -> List[str]:
@@ -724,6 +722,7 @@ class Project:
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
"restrict-access": self.restrict_access,
"dbt-cloud": self.dbt_cloud,
"flags": self.flags,
}
)
if self.query_comment:

View File

@@ -193,6 +193,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
log_cache_events=log_cache_events,
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
)
# Called by 'load_projects' in this class

View File

@@ -8,7 +8,7 @@ from dbt.adapters.exceptions import (
RelationWrongTypeError,
)
from dbt.adapters.exceptions.cache import CacheInconsistencyError
from dbt.events.types import JinjaLogWarning
from dbt.events.types import JinjaLogWarning, SnapshotTimestampWarning
from dbt.exceptions import (
AmbiguousAliasError,
AmbiguousCatalogMatchError,
@@ -116,6 +116,17 @@ def raise_fail_fast_error(msg, node=None) -> NoReturn:
raise FailFastError(msg, node=node)
def warn_snapshot_timestamp_data_types(
snapshot_time_data_type: str, updated_at_data_type: str
) -> None:
warn_or_error(
SnapshotTimestampWarning(
snapshot_time_data_type=snapshot_time_data_type,
updated_at_data_type=updated_at_data_type,
)
)
# Update this when a new function should be added to the
# dbt context's `exceptions` key!
CONTEXT_EXPORTS = {
@@ -141,6 +152,7 @@ CONTEXT_EXPORTS = {
raise_contract_error,
column_type_missing,
raise_fail_fast_error,
warn_snapshot_timestamp_data_types,
]
}

View File

@@ -975,7 +975,7 @@ class ProviderContext(ManifestContext):
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
# this is used by some adapters
table.original_abspath = os.path.abspath(path)
table.original_abspath = os.path.abspath(path) # type: ignore
return table
@contextproperty()

View File

@@ -18,7 +18,6 @@ from typing import (
from mashumaro.types import SerializableType
from dbt import deprecations
from dbt.adapters.base import ConstraintSupport
from dbt.adapters.factory import get_adapter_constraint_support
from dbt.artifacts.resources import Analysis as AnalysisResource
@@ -1148,12 +1147,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if self.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
self.data_tests.extend(self.tests)
self.tests.clear()
@@ -1164,12 +1157,6 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if column.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
column.data_tests.extend(column.tests)
column.tests.clear()
@@ -1467,6 +1454,13 @@ class Group(GroupResource, BaseNode):
def resource_class(cls) -> Type[GroupResource]:
return GroupResource
def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}
# ====================================
# SemanticModel node
@@ -1572,13 +1566,12 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
# exports should be in the same order, so we zip them for easy iteration
for old_export, new_export in zip(old.exports, self.exports):
if not (
old_export.name == new_export.name
and old_export.config.export_as == new_export.config.export_as
and old_export.config.schema_name == new_export.config.schema_name
and old_export.config.alias == new_export.config.alias
):
if not (old_export.name == new_export.name):
return False
keys = ["export_as", "schema", "alias"]
for key in keys:
if old_export.unrendered_config.get(key) != new_export.unrendered_config.get(key):
return False
return True

View File

@@ -155,7 +155,7 @@ class SemanticManifest:
raise ParsingError(
"The semantic layer requires a time spine model with granularity DAY or smaller in the project, "
"but none was found. Guidance on creating this model can be found on our docs site "
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)." # TODO: update docs link when available!
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)."
)
# For backward compatibility: if legacy time spine exists, include it in the manifest.

View File

@@ -5,7 +5,6 @@ from mashumaro.jsonschema.annotations import Pattern
from mashumaro.types import SerializableType
from typing_extensions import Annotated
from dbt import deprecations
from dbt.adapters.contracts.connection import QueryComment
from dbt.contracts.util import Identifier, list_str
from dbt_common.contracts.util import Mergeable
@@ -259,6 +258,7 @@ class Project(dbtClassMixin):
query_comment: Optional[Union[QueryComment, NoValue, str]] = field(default_factory=NoValue)
restrict_access: bool = False
dbt_cloud: Optional[Dict[str, Any]] = None
flags: Dict[str, Any] = field(default_factory=dict)
class Config(dbtMashConfig):
# These tell mashumaro to use aliases for jsonschema and for "from_dict"
@@ -312,10 +312,6 @@ class Project(dbtClassMixin):
raise ValidationError(
"Invalid project config: cannot have both 'tests' and 'data_tests' defined"
)
if "tests" in data:
deprecations.warn(
"project-test-config", deprecated_path="tests", exp_path="data_tests"
)
@dataclass

View File

@@ -29,7 +29,8 @@ class RemoteCompileResult(RemoteCompileResultMixin):
generated_at: datetime = field(default_factory=datetime.utcnow)
@property
def error(self):
def error(self) -> None:
# TODO: Can we delete this? It's never set anywhere else and never accessed
return None
@@ -40,7 +41,7 @@ class RemoteExecutionResult(ExecutionResult):
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)
def write(self, path: str):
def write(self, path: str) -> None:
writable = RunResultsArtifact.from_execution_results(
generated_at=self.generated_at,
results=self.results,

View File

@@ -98,11 +98,6 @@ class CollectFreshnessReturnSignature(DBTDeprecation):
_event = "CollectFreshnessReturnSignature"
class TestsConfigDeprecation(DBTDeprecation):
_name = "project-test-config"
_event = "TestsConfigDeprecation"
class ProjectFlagsMovedDeprecation(DBTDeprecation):
_name = "project-flags-moved"
_event = "ProjectFlagsMovedDeprecation"
@@ -138,7 +133,7 @@ def renamed_env_var(old_name: str, new_name: str):
return cb
def warn(name, *args, **kwargs):
def warn(name: str, *args, **kwargs) -> None:
if name not in deprecations:
# this should (hopefully) never happen
raise RuntimeError("Error showing deprecation warning: {}".format(name))
@@ -167,7 +162,6 @@ deprecations_list: List[DBTDeprecation] = [
ConfigLogPathDeprecation(),
ConfigTargetPathDeprecation(),
CollectFreshnessReturnSignature(),
TestsConfigDeprecation(),
ProjectFlagsMovedDeprecation(),
PackageMaterializationOverrideDeprecation(),
ResourceNamesWithSpacesDeprecation(),

View File

@@ -1610,6 +1610,17 @@ message CompiledNodeMsg {
CompiledNode data = 2;
}
// Q043
message SnapshotTimestampWarning {
string snapshot_time_data_type = 1;
string updated_at_data_type = 2;
}
message SnapshotTimestampWarningMsg {
CoreEventInfo info = 1;
SnapshotTimestampWarning data = 2;
}
// W - Node testing
// Skipped W001
@@ -1809,12 +1820,19 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}
message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}
// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultWarningMsg {
@@ -1828,6 +1846,7 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultFailureMsg {
@@ -1849,6 +1868,7 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}
message RunResultErrorMsg {

File diff suppressed because one or more lines are too long

View File

@@ -388,6 +388,9 @@ class ConfigTargetPathDeprecation(WarnLevel):
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
# Note: this deprecation has been removed, but we are leaving
# the event class here, because users may have specified it in
# warn_error_options.
class TestsConfigDeprecation(WarnLevel):
def code(self) -> str:
return "D012"
@@ -1614,6 +1617,18 @@ class CompiledNode(InfoLevel):
return f"Compiled node '{self.node_name}' is:\n{self.compiled}"
class SnapshotTimestampWarning(WarnLevel):
def code(self) -> str:
return "Q043"
def message(self) -> str:
return (
f"Data type of snapshot table timestamp columns ({self.snapshot_time_data_type}) "
f"doesn't match derived column 'updated_at' ({self.updated_at_data_type}). "
"Please update snapshot config 'updated_at'."
)
# =======================================================
# W - Node testing
# =======================================================

View File

@@ -59,18 +59,40 @@ class Graph:
def select_children(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
descendants: Set[UniqueId] = set()
for node in selected:
descendants.update(self.descendants(node, max_depth))
return descendants
"""Returns all nodes which are descendants of the 'selected' set.
Nodes in the 'selected' set are counted as children only if
they are descendants of other nodes in the 'selected' set."""
children: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.descendants(node, 1))
next_layer = next_layer - children # Avoid re-searching
children.update(next_layer)
selected = next_layer
i += 1
return children
def select_parents(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
ancestors: Set[UniqueId] = set()
for node in selected:
ancestors.update(self.ancestors(node, max_depth))
return ancestors
"""Returns all nodes which are ancestors of the 'selected' set.
Nodes in the 'selected' set are counted as parents only if
they are ancestors of other nodes in the 'selected' set."""
parents: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(self.ancestors(node, 1))
next_layer = next_layer - parents # Avoid re-searching
parents.update(next_layer)
selected = next_layer
i += 1
return parents
def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors: Set[UniqueId] = set()

View File

@@ -87,12 +87,15 @@ class NodeSelector(MethodManager):
)
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)
selected = collected | neighbors
# if --indirect-selection EMPTY, do not expand to adjacent tests
if spec.indirect_selection == IndirectSelection.Empty:
return collected, set()
return selected, set()
else:
neighbors = self.collect_specified_neighbors(spec, collected)
direct_nodes, indirect_nodes = self.expand_selection(
selected=(collected | neighbors), indirect_selection=spec.indirect_selection
selected=selected, indirect_selection=spec.indirect_selection
)
return direct_nodes, indirect_nodes
@@ -177,10 +180,14 @@ class NodeSelector(MethodManager):
node = self.manifest.nodes[unique_id]
if self.include_empty_nodes:
return node.config.enabled
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
return node.empty
else:
return not node.empty and node.config.enabled
return False
def node_is_match(self, node: GraphMemberNode) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
@@ -212,7 +219,12 @@ class NodeSelector(MethodManager):
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {unique_id for unique_id in selected if self._is_match(unique_id)}
return {
unique_id
for unique_id in selected
if self._is_match(unique_id)
and (self.include_empty_nodes or not self._is_empty_node(unique_id))
}
def expand_selection(
self,

View File

@@ -1028,12 +1028,11 @@ class ManifestLoader:
return state_check
def save_macros_to_adapter(self, adapter):
macro_manifest = MacroManifest(self.manifest.macros)
adapter.set_macro_resolver(macro_manifest)
adapter.set_macro_resolver(self.manifest)
# This executes the callable macro_hook and sets the
# query headers
# This executes the callable macro_hook and sets the query headers
query_header_context = generate_query_header_context(adapter.config, macro_manifest)
query_header_context = generate_query_header_context(adapter.config, self.manifest)
self.macro_hook(query_header_context)
# This creates a MacroManifest which contains the macros in

View File

@@ -11,6 +11,7 @@ from dbt.config.renderer import BaseRenderer, Keypath
# keyword args are rendered to capture refs in render_test_update.
# Keyword args are finally rendered at compilation time.
# Descriptions are not rendered until 'process_docs'.
# Pre- and post-hooks in configs are late-rendered.
class SchemaYamlRenderer(BaseRenderer):
def __init__(self, context: Dict[str, Any], key: str) -> None:
super().__init__(context)
@@ -43,6 +44,14 @@ class SchemaYamlRenderer(BaseRenderer):
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
return True
# pre- and post-hooks
if (
len(keypath) >= 2
and keypath[0] == "config"
and keypath[1] in ("pre_hook", "post_hook")
):
return True
# versions
if len(keypath) == 5 and keypath[4] == "description":
return True

View File

@@ -778,7 +778,9 @@ class SavedQueryParser(YamlReader):
self, unparsed: UnparsedExport, saved_query_config: SavedQueryConfig
) -> Export:
return Export(
name=unparsed.name, config=self._get_export_config(unparsed.config, saved_query_config)
name=unparsed.name,
config=self._get_export_config(unparsed.config, saved_query_config),
unrendered_config=unparsed.config,
)
def _get_query_params(self, unparsed: UnparsedQueryParams) -> QueryParams:

View File

@@ -4,7 +4,6 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar
from dbt import deprecations
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import TimeSpine
from dbt.clients.jinja_static import statically_parse_ref_or_source
@@ -568,12 +567,6 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
raise ValidationError(
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
data["data_tests"] = data.pop("tests")
# model-level tests

View File

@@ -17,5 +17,5 @@ class PluginNodes:
def add_model(self, model_args: ModelNodeArgs) -> None:
self.models[model_args.unique_id] = model_args
def update(self, other: "PluginNodes"):
def update(self, other: "PluginNodes") -> None:
self.models.update(other.models)

View File

@@ -44,15 +44,10 @@ from dbt.graph import Graph
from dbt.task.printer import print_run_result_error
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import (
CompilationError,
DbtInternalError,
DbtRuntimeError,
NotImplementedError,
)
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
def read_profiles(profiles_dir=None):
def read_profiles(profiles_dir: Optional[str] = None) -> Dict[str, Any]:
"""This is only used for some error handling"""
if profiles_dir is None:
profiles_dir = get_flags().PROFILES_DIR
@@ -71,6 +66,13 @@ class BaseTask(metaclass=ABCMeta):
def __init__(self, args: Flags) -> None:
self.args = args
def __enter__(self):
self.orig_dir = os.getcwd()
return self
def __exit__(self, exc_type, exc_value, traceback):
os.chdir(self.orig_dir)
@abstractmethod
def run(self):
raise dbt_common.exceptions.base.NotImplementedError("Not Implemented")
@@ -123,7 +125,7 @@ class ConfiguredTask(BaseTask):
self.manifest = manifest
self.compiler = Compiler(self.config)
def compile_manifest(self):
def compile_manifest(self) -> None:
if self.manifest is None:
raise DbtInternalError("compile_manifest called before manifest was loaded")
@@ -165,7 +167,7 @@ class ExecutionContext:
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
self.config = config
self.compiler = Compiler(config)
self.adapter = adapter
@@ -272,7 +274,7 @@ class BaseRunner(metaclass=ABCMeta):
failures=result.failures,
)
def compile_and_execute(self, manifest, ctx):
def compile_and_execute(self, manifest: Manifest, ctx: ExecutionContext):
result = None
with (
self.adapter.connection_named(self.node.unique_id, self.node)
@@ -305,7 +307,7 @@ class BaseRunner(metaclass=ABCMeta):
return result
def _handle_catchable_exception(self, e, ctx):
def _handle_catchable_exception(self, e: DbtRuntimeError, ctx: ExecutionContext) -> str:
if e.node is None:
e.add_node(ctx.node)
@@ -316,7 +318,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_internal_exception(self, e, ctx):
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
fire_event(
InternalErrorOnRun(
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
@@ -324,7 +326,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_generic_exception(self, e, ctx):
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
fire_event(
GenericExceptionOnRun(
build_path=self._node_build_path(),
@@ -337,9 +339,8 @@ class BaseRunner(metaclass=ABCMeta):
return str(e)
def handle_exception(self, e, ctx):
catchable_errors = (CompilationError, DbtRuntimeError)
if isinstance(e, catchable_errors):
def handle_exception(self, e: Exception, ctx: ExecutionContext) -> str:
if isinstance(e, DbtRuntimeError):
error = self._handle_catchable_exception(e, ctx)
elif isinstance(e, DbtInternalError):
error = self._handle_internal_exception(e, ctx)
@@ -347,7 +348,7 @@ class BaseRunner(metaclass=ABCMeta):
error = self._handle_generic_exception(e, ctx)
return error
def safe_run(self, manifest):
def safe_run(self, manifest: Manifest):
started = time.time()
ctx = ExecutionContext(self.node)
error = None
@@ -394,19 +395,19 @@ class BaseRunner(metaclass=ABCMeta):
return None
def before_execute(self):
raise NotImplementedError()
def before_execute(self) -> None:
raise NotImplementedError("before_execute is not implemented")
def execute(self, compiled_node, manifest):
raise NotImplementedError()
raise NotImplementedError("execute is not implemented")
def run(self, compiled_node, manifest):
return self.execute(compiled_node, manifest)
def after_execute(self, result):
raise NotImplementedError()
def after_execute(self, result) -> None:
raise NotImplementedError("after_execute is not implemented")
def _skip_caused_by_ephemeral_failure(self):
def _skip_caused_by_ephemeral_failure(self) -> bool:
if self.skip_cause is None or self.skip_cause.node is None:
return False
return self.skip_cause.node.is_ephemeral_model
@@ -461,7 +462,7 @@ class BaseRunner(metaclass=ABCMeta):
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
return node_result
def do_skip(self, cause=None):
def do_skip(self, cause=None) -> None:
self.skip = True
self.skip_cause = cause

View File

@@ -1,5 +1,5 @@
import threading
from typing import Dict, List, Set
from typing import Dict, List, Optional, Set, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.run import RunResult
@@ -24,16 +24,16 @@ from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self):
def description(self) -> str:
return f"saved query {self.node.name}"
def before_execute(self):
def before_execute(self) -> None:
pass
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result):
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
@@ -83,7 +83,7 @@ class BuildTask(RunTask):
self.selected_unit_tests: Set = set()
self.model_to_unit_test_map: Dict[str, List] = {}
def resource_types(self, no_unit_tests=False):
def resource_types(self, no_unit_tests: bool = False) -> List[NodeType]:
resource_types = resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
)
@@ -210,7 +210,7 @@ class BuildTask(RunTask):
resource_types=resource_types,
)
def get_runner_type(self, node):
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
return self.RUNNER_MAP.get(node.resource_type)
# Special build compile_manifest method to pass add_test_edges to the compiler

View File

@@ -16,7 +16,7 @@ class CleanTask(BaseTask):
self.config = config
self.project = config
def run(self):
def run(self) -> None:
"""
This function takes all the paths in the target file
and cleans the project paths that are not protected.

View File

@@ -1,7 +1,8 @@
import threading
from typing import AbstractSet, Any, Iterable, List, Optional, Set
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type
from dbt.adapters.base import BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
from dbt.context.providers import generate_runtime_model_context
@@ -16,10 +17,10 @@ from dbt_common.exceptions import CompilationError, DbtInternalError
class CloneRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def _build_run_model_result(self, model, context):
@@ -44,7 +45,7 @@ class CloneRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
# no-op
return self.node
@@ -91,7 +92,7 @@ class CloneRunner(BaseRunner):
class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_run_mode(self) -> GraphRunnableMode:
@@ -133,8 +134,8 @@ class CloneTask(GraphRunnableTask):
self.populate_adapter_cache(adapter, schemas_to_cache)
@property
def resource_types(self):
resource_types = resource_types_from_args(
def resource_types(self) -> List[NodeType]:
resource_types: Collection[NodeType] = resource_types_from_args(
self.args, set(REFABLE_NODE_TYPES), set(REFABLE_NODE_TYPES)
)
@@ -154,5 +155,5 @@ class CloneTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CloneRunner

View File

@@ -1,6 +1,8 @@
import threading
from typing import Optional, Type
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import CompiledNode, ParseInlineNodeError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import EXECUTABLE_NODE_TYPES, NodeType
@@ -17,10 +19,10 @@ from dbt_common.exceptions import DbtInternalError
class CompileRunner(BaseRunner):
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def execute(self, compiled_node, manifest):
@@ -35,7 +37,7 @@ class CompileRunner(BaseRunner):
failures=None,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {})
@@ -44,7 +46,7 @@ class CompileTask(GraphRunnableTask):
# it should be removed before the task is complete
_inline_node_id = None
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return True
def get_node_selector(self) -> ResourceTypeSelector:
@@ -62,10 +64,10 @@ class CompileTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return CompileRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
is_inline = bool(getattr(self.args, "inline", None))
output_format = getattr(self.args, "output", "text")
@@ -127,14 +129,14 @@ class CompileTask(GraphRunnableTask):
raise DbtException("Error parsing inline query")
super()._runtime_initialize()
def after_run(self, adapter, results):
def after_run(self, adapter, results) -> None:
# remove inline node from manifest
if self._inline_node_id:
self.manifest.nodes.pop(self._inline_node_id)
self._inline_node_id = None
super().after_run(adapter, results)
def _handle_result(self, result):
def _handle_result(self, result) -> None:
super()._handle_result(result)
if (

View File

@@ -481,7 +481,7 @@ class DebugTask(BaseTask):
return status
@classmethod
def validate_connection(cls, target_dict):
def validate_connection(cls, target_dict) -> None:
"""Validate a connection dictionary. On error, raises a DbtConfigError."""
target_name = "test"
# make a fake profile that we can parse

View File

@@ -96,8 +96,6 @@ class DepsTask(BaseTask):
# See GH-7615
project.project_root = str(Path(project.project_root).resolve())
self.project = project
move_to_nearest_project_dir(project.project_root)
self.cli_vars = args.vars
def track_package_install(
@@ -202,6 +200,7 @@ class DepsTask(BaseTask):
fire_event(DepsLockUpdating(lock_filepath=lock_filepath))
def run(self) -> None:
move_to_nearest_project_dir(self.args.project_dir)
if self.args.add_package:
self.add()

View File

@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import AbstractSet, Dict, List, Optional
from typing import AbstractSet, Dict, List, Optional, Type
from dbt import deprecations
from dbt.adapters.base.impl import FreshnessResponse
@@ -14,6 +14,7 @@ from dbt.artifacts.schemas.freshness import (
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
from dbt.events.types import FreshnessCheckComplete, LogFreshnessResult, LogStartLine
@@ -44,7 +45,7 @@ class FreshnessRunner(BaseRunner):
def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")
def before_execute(self):
def before_execute(self) -> None:
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
LogStartLine(
@@ -55,7 +56,7 @@ class FreshnessRunner(BaseRunner):
)
)
def after_execute(self, result):
def after_execute(self, result) -> None:
if hasattr(result, "node"):
source_name = result.node.source_name
table_name = result.node.name
@@ -162,7 +163,7 @@ class FreshnessRunner(BaseRunner):
**freshness,
)
def compile(self, manifest):
def compile(self, manifest: Manifest):
if self.node.resource_type != NodeType.Source:
# should be unreachable...
raise DbtRuntimeError("freshness runner: got a non-Source")
@@ -184,13 +185,13 @@ class FreshnessTask(RunTask):
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}
def result_path(self):
def result_path(self) -> str:
if self.args.output:
return os.path.realpath(self.args.output)
else:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -214,7 +215,7 @@ class FreshnessTask(RunTask):
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return FreshnessRunner
def get_result(self, results, elapsed_time, generated_at):
@@ -222,7 +223,7 @@ class FreshnessTask(RunTask):
elapsed_time=elapsed_time, generated_at=generated_at, results=results
)
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
for result in results:
if result.status in (
FreshnessStatus.Error,

View File

@@ -1,4 +1,5 @@
import json
from typing import Iterator, List
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
@@ -145,7 +146,7 @@ class ListTask(GraphRunnableTask):
}
)
def generate_paths(self):
def generate_paths(self) -> Iterator[str]:
for node in self._iterate_selected_nodes():
yield node.original_file_path
@@ -177,7 +178,7 @@ class ListTask(GraphRunnableTask):
return self.node_results
@property
def resource_types(self):
def resource_types(self) -> List[NodeType]:
if self.args.models:
return [NodeType.Model]

View File

@@ -1,6 +1,7 @@
from typing import Dict
from typing import Dict, Optional
from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
@@ -68,7 +69,9 @@ def print_run_status_line(results) -> None:
fire_event(StatsLine(stats=stats))
def print_run_result_error(result, newline: bool = True, is_warning: bool = False) -> None:
def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
) -> None:
# set node_info for logging events
node_info = None
if hasattr(result, "node") and result.node:
@@ -77,21 +80,25 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
@@ -99,7 +106,10 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))
@@ -119,10 +129,13 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
elif result.message is not None:
if newline:
fire_event(Formatting(""))
fire_event(RunResultError(msg=result.message, node_info=node_info))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
@@ -144,9 +157,11 @@ def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
)
for error in errors:
print_run_result_error(error, is_warning=False)
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)
for warning in warnings:
print_run_result_error(warning, is_warning=True)
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)
print_run_status_line(results)

View File

@@ -2,7 +2,7 @@ import functools
import threading
import time
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from dbt import tracking, utils
from dbt.adapters.base import BaseRelation
@@ -36,6 +36,7 @@ from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.node_types import NodeType, RunHookType
from dbt.task.base import BaseRunner
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.base_types import EventLevel
from dbt_common.events.contextvars import log_contextvars
@@ -179,7 +180,7 @@ class ModelRunner(CompileRunner):
relation = relation.include(database=False)
return str(relation)
def describe_node(self):
def describe_node(self) -> str:
# TODO CL 'language' will be moved to node level when we change representation
return f"{self.node.language} {self.node.get_materialization()} model {self.get_node_representation()}"
@@ -213,10 +214,10 @@ class ModelRunner(CompileRunner):
level=level,
)
def before_execute(self):
def before_execute(self) -> None:
self.print_start_line()
def after_execute(self, result):
def after_execute(self, result) -> None:
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)
@@ -472,9 +473,20 @@ class RunTask(CompileTask):
resource_types=[NodeType.Model],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return ModelRunner
def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}
return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}
def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])
if results:
print_run_end_messages(results)
print_run_end_messages(results, groups=groups)

View File

@@ -5,7 +5,7 @@ from concurrent.futures import as_completed
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool
from pathlib import Path
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Union
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union
import dbt.exceptions
import dbt.tracking
@@ -181,13 +181,13 @@ class GraphRunnableTask(ConfiguredTask):
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_runner_type(self, node):
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
raise NotImplementedError("Not Implemented")
def result_path(self):
def result_path(self) -> str:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def get_runner(self, node) -> BaseRunner:
@@ -204,6 +204,10 @@ class GraphRunnableTask(ConfiguredTask):
num_nodes = self.num_nodes
cls = self.get_runner_type(node)
if cls is None:
raise DbtInternalError("Could not find runner type for node.")
return cls(self.config, adapter, node, run_count, num_nodes)
def call_runner(self, runner: BaseRunner) -> RunResult:
@@ -334,7 +338,7 @@ class GraphRunnableTask(ConfiguredTask):
args = [runner]
self._submit(pool, args, callback)
def _handle_result(self, result: RunResult):
def _handle_result(self, result: RunResult) -> None:
"""Mark the result as completed, insert the `CompileResultNode` into
the manifest, and mark any descendants (potentially with a 'cause' if
the result was an ephemeral model) as skipped.
@@ -479,7 +483,7 @@ class GraphRunnableTask(ConfiguredTask):
self.defer_to_manifest()
self.populate_adapter_cache(adapter)
def after_run(self, adapter, results):
def after_run(self, adapter, results) -> None:
pass
def print_results_line(self, node_results, elapsed):
@@ -659,7 +663,7 @@ class GraphRunnableTask(ConfiguredTask):
args=dbt.utils.args_to_dict(self.args),
)
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
print_run_end_messages(results)
def _get_previous_state(self) -> Optional[Manifest]:

View File

@@ -1,9 +1,12 @@
import random
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogSeedResult, LogStartLine, SeedHeader
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.events.types import Formatting
@@ -14,10 +17,10 @@ from .run import ModelRunner, RunTask
class SeedRunner(ModelRunner):
def describe_node(self):
def describe_node(self) -> str:
return "seed file {}".format(self.get_node_representation())
def before_execute(self):
def before_execute(self) -> None:
fire_event(
LogStartLine(
description=self.describe_node(),
@@ -33,7 +36,7 @@ class SeedRunner(ModelRunner):
result.agate_table = agate_result.table
return result
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.node
def print_result_line(self, result):
@@ -55,7 +58,7 @@ class SeedRunner(ModelRunner):
class SeedTask(RunTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -68,10 +71,10 @@ class SeedTask(RunTask):
resource_types=[NodeType.Seed],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return SeedRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
if self.args.show:
self.show_tables(results)

View File

@@ -67,7 +67,7 @@ class ShowTask(CompileTask):
else:
return ShowRunner
def task_end_messages(self, results):
def task_end_messages(self, results) -> None:
is_inline = bool(getattr(self.args, "inline", None))
if is_inline:
@@ -108,7 +108,7 @@ class ShowTask(CompileTask):
)
)
def _handle_result(self, result):
def _handle_result(self, result) -> None:
super()._handle_result(result)
if (

View File

@@ -1,7 +1,10 @@
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus
from dbt.events.types import LogSnapshotResult
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError
@@ -11,7 +14,7 @@ from .run import ModelRunner, RunTask
class SnapshotRunner(ModelRunner):
def describe_node(self):
def describe_node(self) -> str:
return "snapshot {}".format(self.get_node_representation())
def print_result_line(self, result):
@@ -34,7 +37,7 @@ class SnapshotRunner(ModelRunner):
class SnapshotTask(RunTask):
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self):
@@ -47,5 +50,5 @@ class SnapshotTask(RunTask):
resource_types=[NodeType.Snapshot],
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return SnapshotRunner

View File

@@ -5,6 +5,7 @@ from typing import Generic, TypeVar
import dbt.exceptions
import dbt_common.exceptions.base
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.sql import (
RemoteCompileResult,
RemoteCompileResultMixin,
@@ -28,18 +29,19 @@ class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
exc=str(e), exc_info=traceback.format_exc(), node_info=self.node.node_info
)
)
# REVIEW: This code is invalid and will always throw.
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt_common.exceptions.DbtRuntimeError):
e.add_node(ctx.node)
return e
def before_execute(self):
def before_execute(self) -> None:
pass
def after_execute(self, result):
def after_execute(self, result) -> None:
pass
def compile(self, manifest):
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod

View File

@@ -3,7 +3,7 @@ import json
import re
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
import daff
@@ -27,6 +27,7 @@ from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.parser.unit_tests import UnitTestManifestLoader
from dbt.task.base import BaseRunner
from dbt.utils import _coerce_decimal, strtobool
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.format import pluralize
@@ -84,14 +85,14 @@ class TestRunner(CompileRunner):
_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_LOG_TEST_RESULT_EVENTS = LogTestResult
def describe_node_name(self):
def describe_node_name(self) -> str:
if self.node.resource_type == NodeType.Unit:
name = f"{self.node.model}::{self.node.versioned_name}"
return name
else:
return self.node.name
def describe_node(self):
def describe_node(self) -> str:
return f"{self.node.resource_type} {self.describe_node_name()}"
def print_result_line(self, result):
@@ -120,7 +121,7 @@ class TestRunner(CompileRunner):
)
)
def before_execute(self):
def before_execute(self) -> None:
self.print_start_line()
def execute_data_test(self, data_test: TestNode, manifest: Manifest) -> TestResultData:
@@ -334,7 +335,7 @@ class TestRunner(CompileRunner):
failures=failures,
)
def after_execute(self, result):
def after_execute(self, result) -> None:
self.print_result_line(result)
def _get_unit_test_agate_table(self, result_table, actual_or_expected: str):
@@ -393,7 +394,7 @@ class TestTask(RunTask):
__test__ = False
def raise_on_first_error(self):
def raise_on_first_error(self) -> bool:
return False
def get_node_selector(self) -> TestSelector:
@@ -405,7 +406,7 @@ class TestTask(RunTask):
previous_state=self.previous_state,
)
def get_runner_type(self, _):
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return TestRunner

View File

@@ -91,6 +91,7 @@ def run_dbt(
if profiles_dir and "--profiles-dir" not in args:
args.extend(["--profiles-dir", profiles_dir])
dbt = dbtRunner()
res = dbt.invoke(args)
# the exception is immediately raised to be caught in tests
@@ -148,7 +149,7 @@ def get_manifest(project_root) -> Optional[Manifest]:
if os.path.exists(path):
with open(path, "rb") as fp:
manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp)
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore[attr-defined]
return manifest
else:
return None

View File

@@ -49,7 +49,10 @@ def get_latest_version(
return semver.VersionSpecifier.from_version_string(version_string)
def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]:
def _get_core_msg_lines(
installed: semver.VersionSpecifier,
latest: Optional[semver.VersionSpecifier],
) -> Tuple[List[List[str]], str]:
installed_s = installed.to_version_string(skip_matcher=True)
installed_line = ["installed", installed_s, ""]
update_info = ""
@@ -208,7 +211,7 @@ def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]:
except ImportError:
# not an adapter
continue
yield plugin_name, mod.version # type: ignore
yield plugin_name, mod.version
def _get_adapter_plugin_names() -> Iterator[str]:

View File

@@ -69,7 +69,7 @@ setup(
# These are major-version-0 packages also maintained by dbt-labs.
# Accept patches but avoid automatically updating past a set minor version range.
"dbt-extractor>=0.5.0,<=0.6",
"dbt-semantic-interfaces>=0.6.11,<0.7",
"dbt-semantic-interfaces>=0.7.0,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.3.0,<2.0",

View File

@@ -8,7 +8,7 @@ RUN apt-get update \
build-essential=12.9 \
ca-certificates=20210119 \
git=1:2.30.2-1+deb11u2 \
libpq-dev=13.14-0+deb11u1 \
libpq-dev=13.16-0+deb11u1 \
make=4.3-4.1 \
openssh-client=1:8.4p1-5+deb11u3 \
software-properties-common=0.96.20.2-2.1 \

View File

@@ -19164,6 +19164,15 @@
"required": [
"export_as"
]
},
"unrendered_config": {
"type": "object",
"additionalProperties": {
"type": "string"
},
"propertyNames": {
"type": "string"
}
}
},
"additionalProperties": false,
@@ -20689,6 +20698,15 @@
"required": [
"export_as"
]
},
"unrendered_config": {
"type": "object",
"additionalProperties": {
"type": "string"
},
"propertyNames": {
"type": "string"
}
}
},
"additionalProperties": false,

View File

@@ -27,9 +27,9 @@ select 1 as col
"""
macros__before_and_after = """
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
{% macro custom_run_hook(state, target, run_started_at, invocation_id, table_name="on_run_hook") %}
insert into {{ target.schema }}.on_run_hook (
insert into {{ target.schema }}.{{ table_name }} (
test_state,
target_dbname,
target_host,
@@ -355,6 +355,26 @@ snapshots:
- not_null
"""
properties__model_hooks = """
version: 2
models:
- name: hooks
config:
pre_hook: "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook: "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
properties__model_hooks_list = """
version: 2
models:
- name: hooks
config:
pre_hook:
- "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook:
- "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
seeds__example_seed_csv = """a,b,c
1,2,3
4,5,6

View File

@@ -6,6 +6,7 @@ from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt, write_file
from dbt_common.exceptions import CompilationError
from tests.functional.adapter.hooks.fixtures import (
macros__before_and_after,
models__hooked,
models__hooks,
models__hooks_configured,
@@ -13,6 +14,8 @@ from tests.functional.adapter.hooks.fixtures import (
models__hooks_kwargs,
models__post,
models__pre,
properties__model_hooks,
properties__model_hooks_list,
properties__seed_models,
properties__test_snapshot_models,
seeds__example_seed_csv,
@@ -257,6 +260,27 @@ class TestPrePostModelHooksOnSeeds(object):
assert len(res) == 1, "Expected exactly one item"
class TestPrePostModelHooksWithMacros(BaseTestPrePost):
@pytest.fixture(scope="class")
def macros(self):
return {"before-and-after.sql": macros__before_and_after}
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks, "hooks.sql": models__hooks}
def test_pre_and_post_run_hooks(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target.get("host", None))
self.check_hooks("end", project, dbt_profile_target.get("host", None))
class TestPrePostModelHooksListWithMacros(TestPrePostModelHooksWithMacros):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks_list, "hooks.sql": models__hooks}
class TestHooksRefsOnSeeds:
"""
This should not succeed, and raise an explicit error

View File

@@ -42,6 +42,15 @@ with recursive t(n) as (
select sum(n) from t;
"""
first_ephemeral_model_with_alias_sql = """
{{ config(materialized = 'ephemeral', alias = 'first_alias') }}
select 1 as fun
"""
second_ephemeral_model_with_alias_sql = """
select * from {{ ref('first_ephemeral_model_with_alias') }}
"""
schema_yml = """
version: 2

View File

@@ -10,10 +10,12 @@ from dbt_common.exceptions import DbtRuntimeError
from tests.functional.assertions.test_runner import dbtTestRunner
from tests.functional.compile.fixtures import (
first_ephemeral_model_sql,
first_ephemeral_model_with_alias_sql,
first_model_sql,
model_multiline_jinja,
schema_yml,
second_ephemeral_model_sql,
second_ephemeral_model_with_alias_sql,
second_model_sql,
third_ephemeral_model_sql,
with_recursive_model_sql,
@@ -128,6 +130,24 @@ class TestEphemeralModels:
]
class TestEphemeralModelWithAlias:
@pytest.fixture(scope="class")
def models(self):
return {
"first_ephemeral_model_with_alias.sql": first_ephemeral_model_with_alias_sql,
"second_ephemeral_model_with_alias.sql": second_ephemeral_model_with_alias_sql,
}
def test_compile(self, project):
run_dbt(["compile"])
assert get_lines("second_ephemeral_model_with_alias") == [
"with __dbt__cte__first_alias as (",
"select 1 as fun",
") select * from __dbt__cte__first_alias",
]
class TestCompile:
@pytest.fixture(scope="class")
def models(self):

View File

@@ -193,3 +193,36 @@ class TestEnvVars:
assert not ("secret_variable" in log_output)
assert "regular_variable" in log_output
del os.environ["DBT_DEBUG"]
class TestEnvVarInCreateSchema:
"""Test that the env_var() method works in overrides of the create_schema
macro, which is called during a different phase of execution than most
macros, causing problems."""
@pytest.fixture(scope="class", autouse=True)
def setup(self):
os.environ["DBT_TEST_ENV_VAR"] = "1"
@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": """
{% macro create_schema(relation) %}
{%- call statement('create_schema') -%}
SELECT {{ env_var('DBT_TEST_ENV_VAR') }} as TEST
{% endcall %}
{% endmacro %}%
"""
}
@pytest.fixture(scope="class")
def models(self):
return {
"mymodel.sql": """
SELECT 1 as TEST -- {%- do adapter.create_schema(this) -%}
"""
}
def test_env_var_in_create_schema(self, project):
run_dbt(["run"])

View File

@@ -1,3 +1,4 @@
import os
from unittest import mock
import pytest
@@ -103,6 +104,21 @@ class TestDbtRunner:
dbt.invoke(args)
assert args == args_before
def test_directory_does_not_change(self, project, dbt: dbtRunner) -> None:
project_dir = os.getcwd() # The directory where dbt_project.yml exists.
os.chdir("../")
cmd_execution_dir = os.getcwd() # The directory where dbt command will be run
commands = ["init", "deps", "clean"]
for command in commands:
args = [command, "--project-dir", project_dir]
if command == "init":
args.append("--skip-profile-setup")
res = dbt.invoke(args)
after_dir = os.getcwd()
assert res.success is True
assert cmd_execution_dir == after_dir
class TestDbtRunnerQueryComments:
@pytest.fixture(scope="class")

View File

@@ -33,16 +33,14 @@ class TestDepsOptions(object):
assert os.path.exists("package-lock.yml")
with open("package-lock.yml") as fp:
contents = fp.read()
assert (
contents
== """packages:
- package: fivetran/fivetran_utils
version: 0.4.7
- package: dbt-labs/dbt_utils
version: 1.2.0
sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8
"""
)
fivetran_package = "- package: fivetran/fivetran_utils\n version: 0.4.7"
# dbt-utils is a dep in fivetran so we can't check for a specific version or this test fails everytime a new dbt-utils version comes out
dbt_labs_package = "- package: dbt-labs/dbt_utils"
package_sha = "sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8"
assert fivetran_package in contents
assert dbt_labs_package in contents
assert package_sha in contents
def test_deps_default(self, clean_start):
run_dbt(["deps"])
@@ -50,16 +48,13 @@ sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8
assert os.path.exists("package-lock.yml")
with open("package-lock.yml") as fp:
contents = fp.read()
assert (
contents
== """packages:
- package: fivetran/fivetran_utils
version: 0.4.7
- package: dbt-labs/dbt_utils
version: 1.2.0
sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8
"""
)
fivetran_package = "- package: fivetran/fivetran_utils\n version: 0.4.7"
# dbt-utils is a dep in fivetran so we can't check for a specific version or this test fails everytime a new dbt-utils version comes out
dbt_labs_package = "- package: dbt-labs/dbt_utils"
package_sha = "sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8"
assert fivetran_package in contents
assert dbt_labs_package in contents
assert package_sha in contents
def test_deps_add(self, clean_start):
run_dbt(["deps", "--add-package", "dbt-labs/audit_helper@0.9.0"])

View File

@@ -32,7 +32,7 @@ class TestTestsConfigDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = {"project-test-config"}
expected = set()
assert expected == deprecations.active_deprecations
def test_project_tests_config_fail(self, project):
@@ -41,7 +41,7 @@ class TestTestsConfigDeprecation:
with pytest.raises(CompilationError) as exc:
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
exc_str = " ".join(str(exc.value).split()) # flatten all whitespace
expected_msg = "The `tests` config has been renamed to `data_tests`"
expected_msg = "Configuration paths exist in your dbt_project.yml file which do not apply to any resources. There are 1 unused configuration paths: - data_tests"
assert expected_msg in exc_str
@@ -62,17 +62,13 @@ class TestSchemaTestDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = {"project-test-config"}
expected = set()
assert expected == deprecations.active_deprecations
def test_generic_tests_fail(self, project):
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
with pytest.raises(CompilationError) as exc:
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
exc_str = " ".join(str(exc.value).split()) # flatten all whitespace
expected_msg = "The `tests` config has been renamed to `data_tests`"
assert expected_msg in exc_str
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
def test_generic_data_test_parsing(self, project):
results = run_dbt(["list", "--resource-type", "test"])
@@ -98,7 +94,7 @@ class TestSourceSchemaTestDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = {"project-test-config"}
expected = set()
assert expected == deprecations.active_deprecations
def test_generic_data_tests(self, project):

View File

@@ -5,6 +5,7 @@ import pytest
from dbt.events.types import InvalidOptionYAML
from dbt.tests.util import get_manifest, read_file, run_dbt
from dbt_common.events import EventLevel
from dbt_common.events.functions import fire_event
my_model_sql = """
@@ -103,37 +104,176 @@ def test_invalid_event_value(project, logs_dir):
assert str(excinfo.value) == "[InvalidOptionYAML]: Unable to parse dict {'option_name': 1}"
class TestNodeInfo:
groups_yml = """
groups:
- name: my_group
owner:
name: my_name
email: my.email@gmail.com
slack: my_slack
other_property: something_else
models:
- name: my_model
group: my_group
access: public
"""
class TestRunResultErrorNodeInfo:
@pytest.fixture(scope="class")
def models(self):
return {"my_model.sql": "select not_found as id"}
return {
"my_model.sql": "select not_found as id",
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
# get log file
log_file = read_file(logs_dir, "dbt.log")
task_printer_events = [
"RunResultWarning",
"RunResultFailure",
"RunResultWarningMessage",
"RunResultError",
"RunResultErrorNoMessage",
"SQLCompiledPath",
"CheckNodeTestFailure",
]
count = 0
for log_line in log_file.split("\n"):
# skip empty lines
if len(log_line) == 0:
if not log_line:
continue
# The adapter logging also shows up, so skip non-json lines
if "[debug]" in log_line:
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
log_dct = json.loads(log_line)
log_data = log_dct["data"]
log_event = log_dct["info"]["name"]
if log_event in task_printer_events:
assert "node_info" in log_data
count += 1
assert count > 0
if log_json["info"]["name"] == "RunResultError":
assert "node_info" in log_json["data"]
assert log_json["data"]["node_info"]["unique_id"] == "model.test.my_model"
assert "Database Error" in log_json["data"]["msg"]
def assert_group_data(group_data):
assert group_data["name"] == "my_group"
assert group_data["owner"] == {
"name": "my_name",
"email": "my.email@gmail.com",
"slack": "my_slack",
"other_property": "something_else",
}
class TestRunResultErrorGroup:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": "select not_found as id",
"groups.yml": groups_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1
assert run_result_error_count == 1
class TestRunResultFailureGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"], expect_pass=False)
assert len(results) == 2
log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
run_result_failure_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1
if log_json["info"]["name"] == "RunResultFailure":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_failure_count += 1
assert run_result_error_count == 1
assert run_result_failure_count == 1
class TestRunResultWarningGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null:
config:
severity: warn
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"])
assert len(results) == 2
log_file = read_file(logs_dir, "dbt.log")
run_result_warning_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultWarning":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_warning_count += 1
assert run_result_warning_count == 1

View File

@@ -0,0 +1,131 @@
import pytest
from dbt.tests.util import (
check_relations_equal,
get_relation_columns,
relation_from_name,
run_dbt,
)
seeds_base_csv = """
id,name_xxx,some_date
1,Easton,1981-05-20T06:46:51
2,Lillian,1978-09-03T18:10:33
3,Jeremiah,1982-03-11T03:59:51
4,Nolan,1976-05-06T20:21:35
5,Hannah,1982-06-23T05:41:26
6,Eleanor,1991-08-10T23:12:21
7,Lily,1971-03-29T14:58:02
8,Jonathan,1988-02-26T02:55:24
9,Adrian,1994-02-09T13:14:23
10,Nora,1976-03-01T16:51:39
""".lstrip()
seeds_added_csv = (
seeds_base_csv
+ """
11,Mateo,2014-09-07T17:04:27
12,Julian,2000-02-04T11:48:30
13,Gabriel,2001-07-10T07:32:52
14,Isaac,2002-11-24T03:22:28
15,Levi,2009-11-15T11:57:15
16,Elizabeth,2005-04-09T03:50:11
17,Grayson,2019-08-06T19:28:17
18,Dylan,2014-03-01T11:50:41
19,Jayden,2009-06-06T07:12:49
20,Luke,2003-12-05T21:42:18
""".lstrip()
)
incremental_not_schema_change_sql = """
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
select
1 || '-' || current_timestamp as user_id_current_time,
{% if is_incremental() %}
'thisis18characters' as platform
{% else %}
'okthisis20characters' as platform
{% endif %}
"""
incremental_sql = """
{{ config(materialized="incremental") }}
select * from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
"""
schema_base_yml = """
sources:
- name: raw
schema: "{{ target.schema }}"
tables:
- name: seed
identifier: "{{ var('seed_name', 'base') }}"
models:
- name: incremental
config:
contract:
enforced: true
on_schema_change: append_new_columns
columns:
- name: id
data_type: int
- name: name_xxx
data_type: character varying(10)
- name: some_date
data_type: timestamp
"""
class TestIncremental:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "incremental"}
@pytest.fixture(scope="class")
def models(self):
return {"incremental.sql": incremental_sql, "schema.yml": schema_base_yml}
@pytest.fixture(scope="class")
def seeds(self):
return {"base.csv": seeds_base_csv, "added.csv": seeds_added_csv}
def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2
# base table rowcount
relation = relation_from_name(project.adapter, "base")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10
# added table rowcount
relation = relation_from_name(project.adapter, "added")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 20
# run command
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1
# check relations equal
check_relations_equal(project.adapter, ["base", "incremental"])
# change seed_name var
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--debug", "--vars", "seed_name: added"])
assert len(results) == 1
# Error before fix: Changing col type from character varying(10) to character varying(256) in table:
# "dbt"."test<...>_test_incremental_with_contract"."incremental"
columns = get_relation_columns(project.adapter, "incremental")
# [('id', 'integer', None), ('name_xxx', 'character varying', 10), ('some_date', 'timestamp without time zone', None)]
for column in columns:
if column[0] == "name_xxx":
assert column[2] == 10

View File

@@ -11,10 +11,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Metric('txn_revenue', ['id']) }} > 1"
exports:
- name: my_export
@@ -33,10 +33,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Metric('txn_revenue', ['id']) }} > 1"
exports:
- name: my_export
@@ -54,10 +54,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -72,8 +72,8 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
where: "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "Dimension('id__ds')"
where: "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
"""
saved_query_with_extra_config_attributes_yml = """
@@ -85,10 +85,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -108,10 +108,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -129,10 +129,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
"""
@@ -149,10 +149,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('user__ds')"
- "Dimension('id__ds')"
where:
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:

View File

@@ -12,7 +12,7 @@ from tests.functional.semantic_models.fixtures import (
)
class TestSavedQueryBuildNoOp:
class TestSavedQueryBuild:
@pytest.fixture(scope="class")
def models(self):
return {
@@ -31,7 +31,8 @@ packages:
version: 1.1.1
"""
def test_build_saved_queries(self, project):
def test_build_saved_queries_no_op(self, project) -> None:
"""Test building saved query exports with no flag, so should be no-op."""
run_dbt(["deps"])
result = run_dbt(["build"])
assert len(result.results) == 3

View File

@@ -1,5 +1,6 @@
import os
import shutil
from copy import deepcopy
from typing import List
import pytest
@@ -36,6 +37,17 @@ class TestSavedQueryParsing:
"docs.md": saved_query_description,
}
@pytest.fixture(scope="class")
def other_schema(self, unique_schema):
return unique_schema + "_other"
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema):
outputs = {"default": dbt_profile_target, "prod": deepcopy(dbt_profile_target)}
outputs["default"]["schema"] = unique_schema
outputs["prod"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}
def copy_state(self):
if not os.path.exists("state"):
os.makedirs("state")
@@ -60,6 +72,11 @@ class TestSavedQueryParsing:
assert saved_query.exports[0].config.alias == "my_export_alias"
assert saved_query.exports[0].config.export_as == ExportDestinationType.TABLE
assert saved_query.exports[0].config.schema_name == "my_export_schema_name"
assert saved_query.exports[0].unrendered_config == {
"alias": "my_export_alias",
"export_as": "table",
"schema": "my_export_schema_name",
}
# Save state
self.copy_state()
@@ -86,6 +103,86 @@ class TestSavedQueryParsing:
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
def test_semantic_model_parsing_change_export(self, project, other_schema):
runner = dbtTestRunner()
result = runner.invoke(["parse", "--no-partial-parse"])
assert result.success
assert isinstance(result.result, Manifest)
manifest = result.result
assert len(manifest.saved_queries) == 1
saved_query = manifest.saved_queries["saved_query.test.test_saved_query"]
assert saved_query.name == "test_saved_query"
assert saved_query.exports[0].name == "my_export"
# Save state
self.copy_state()
# Nothing has changed, so no state:modified results
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 0
# Change export name
write_file(
saved_queries_yml.replace("name: my_export", "name: my_expor2"),
project.project_root,
"models",
"saved_queries.yml",
)
# State modified finds changed saved_query
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
# Change export schema
write_file(
saved_queries_yml.replace(
"schema: my_export_schema_name", "schema: my_export_schema_name2"
),
project.project_root,
"models",
"saved_queries.yml",
)
# State modified finds changed saved_query
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
def test_semantic_model_parsing_with_default_schema(self, project, other_schema):
write_file(
saved_queries_with_defaults_yml, project.project_root, "models", "saved_queries.yml"
)
runner = dbtTestRunner()
result = runner.invoke(["parse", "--no-partial-parse", "--target", "prod"])
assert result.success
assert isinstance(result.result, Manifest)
manifest = result.result
assert len(manifest.saved_queries) == 1
saved_query = manifest.saved_queries["saved_query.test.test_saved_query"]
assert saved_query.name == "test_saved_query"
assert len(saved_query.query_params.metrics) == 1
assert len(saved_query.query_params.group_by) == 1
assert len(saved_query.query_params.where.where_filters) == 3
assert len(saved_query.depends_on.nodes) == 1
assert saved_query.description == "My SavedQuery Description"
assert len(saved_query.exports) == 1
assert saved_query.exports[0].name == "my_export"
assert saved_query.exports[0].config.alias == "my_export_alias"
assert saved_query.exports[0].config.export_as == ExportDestinationType.TABLE
assert saved_query.exports[0].config.schema_name == other_schema
assert saved_query.exports[0].unrendered_config == {
"alias": "my_export_alias",
"export_as": "table",
}
# Save state
self.copy_state()
# Nothing has changed, so no state:modified results
results = run_dbt(
["ls", "--select", "state:modified", "--state", "./state", "--target", "prod"]
)
assert len(results) == 0
# There should also be no state:modified results when using the default schema
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 0
def test_saved_query_error(self, project):
error_schema_yml = saved_queries_yml.replace("simple_metric", "metric_not_found")
write_file(error_schema_yml, project.project_root, "models", "saved_queries.yml")
@@ -125,14 +222,14 @@ class TestSavedQueryPartialParsing:
where_filter.where_sql_template
for where_filter in saved_query1.query_params.where.where_filters
} == {
"{{ Dimension('user__ds', 'DAY') }} <= now()",
"{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'",
"{{ TimeDimension('id__ds', 'DAY') }} <= now()",
"{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'",
}
# String filter
assert len(saved_query2.query_params.where.where_filters) == 1
assert (
saved_query2.query_params.where.where_filters[0].where_sql_template
== "{{ Dimension('user__ds', 'DAY') }} <= now()"
== "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
)
def test_saved_query_metrics_changed(self, project):

View File

@@ -0,0 +1,72 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture
create_source_sql = """
create table {database}.{schema}.source_users (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
updated_time TIMESTAMP WITH TIME ZONE
);
insert into {database}.{schema}.source_users (id, first_name, last_name, email, gender, ip_address, updated_time) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30');
"""
model_users_sql = """
select * from {{ source('test_source', 'source_users') }}
"""
snapshot_sql = """
{% snapshot users_snapshot %}
select * from {{ ref('users') }}
{% endsnapshot %}
"""
source_schema_yml = """
sources:
- name: test_source
loader: custom
schema: "{{ target.schema }}"
tables:
- name: source_users
loaded_at_field: updated_time
"""
snapshot_schema_yml = """
snapshots:
- name: users_snapshot
config:
target_schema: "{{ target.schema }}"
strategy: timestamp
unique_key: id
updated_at: updated_time
"""
class TestSnapshotConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"users.sql": model_users_sql,
"source_schema.yml": source_schema_yml,
"snapshot_schema.yml": snapshot_schema_yml,
}
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": snapshot_sql}
def test_timestamp_snapshot(self, project):
project.run_sql(create_source_sql)
run_dbt(["run"])
results, log_output = run_dbt_and_capture(["snapshot"])
assert len(results) == 1
assert "Please update snapshot config" in log_output

View File

@@ -0,0 +1,31 @@
from dbt.tests.util import get_manifest, run_dbt, write_file
from tests.fixtures.jaffle_shop import JaffleShopProject
# Note: in an actual file (as opposed to a string that we write into a files)
# there would only be a single backslash.
sources_yml = """
sources:
- name: something_else
database: raw
schema: jaffle_shop
tables:
- name: "\\"/test/orders\\""
- name: customers
"""
class TestNameChars(JaffleShopProject):
def test_quotes_in_table_names(self, project):
# Write out a sources definition that includes a table name with quotes and a forward slash
# Note: forward slashes are not legal in filenames in Linux (or Windows),
# so we won't see forward slashes in model names, because they come from file names.
write_file(sources_yml, project.project_root, "models", "sources.yml")
manifest = run_dbt(["parse"])
assert len(manifest.sources) == 2
assert 'source.jaffle_shop.something_else."/test/orders"' in manifest.sources.keys()
# We've written out the manifest.json artifact, we want to ensure
# that it can be read in again (the json is valid).
# Note: the key in the json actually looks like: "source.jaffle_shop.something_else.\"/test/orders\""
new_manifest = get_manifest(project.project_root)
assert new_manifest
assert 'source.jaffle_shop.something_else."/test/orders"' in new_manifest.sources.keys()

View File

@@ -43,7 +43,7 @@ class TestProjectMethods:
def test__str__(self, project: Project):
assert (
str(project)
== "{'name': 'test_project', 'version': 1.0, 'project-root': 'doesnt/actually/exist', 'profile': 'test_profile', 'model-paths': ['models'], 'macro-paths': ['macros'], 'seed-paths': ['seeds'], 'test-paths': ['tests'], 'analysis-paths': ['analyses'], 'docs-paths': ['docs'], 'asset-paths': ['assets'], 'target-path': 'target', 'snapshot-paths': ['snapshots'], 'clean-targets': ['target'], 'log-path': 'path/to/project/logs', 'quoting': {}, 'models': {}, 'on-run-start': [], 'on-run-end': [], 'dispatch': [{'macro_namespace': 'dbt_utils', 'search_order': ['test_project', 'dbt_utils']}], 'seeds': {}, 'snapshots': {}, 'sources': {}, 'data_tests': {}, 'unit_tests': {}, 'metrics': {}, 'semantic-models': {}, 'saved-queries': {}, 'exposures': {}, 'vars': {}, 'require-dbt-version': ['=0.0.0'], 'restrict-access': False, 'dbt-cloud': {}, 'query-comment': {'comment': \"\\n{%- set comment_dict = {} -%}\\n{%- do comment_dict.update(\\n app='dbt',\\n dbt_version=dbt_version,\\n profile_name=target.get('profile_name'),\\n target_name=target.get('target_name'),\\n) -%}\\n{%- if node is not none -%}\\n {%- do comment_dict.update(\\n node_id=node.unique_id,\\n ) -%}\\n{% else %}\\n {# in the node context, the connection name is the node_id #}\\n {%- do comment_dict.update(connection_name=connection_name) -%}\\n{%- endif -%}\\n{{ return(tojson(comment_dict)) }}\\n\", 'append': False, 'job-label': False}, 'packages': []}"
== "{'name': 'test_project', 'version': 1.0, 'project-root': 'doesnt/actually/exist', 'profile': 'test_profile', 'model-paths': ['models'], 'macro-paths': ['macros'], 'seed-paths': ['seeds'], 'test-paths': ['tests'], 'analysis-paths': ['analyses'], 'docs-paths': ['docs'], 'asset-paths': ['assets'], 'target-path': 'target', 'snapshot-paths': ['snapshots'], 'clean-targets': ['target'], 'log-path': 'path/to/project/logs', 'quoting': {}, 'models': {}, 'on-run-start': [], 'on-run-end': [], 'dispatch': [{'macro_namespace': 'dbt_utils', 'search_order': ['test_project', 'dbt_utils']}], 'seeds': {}, 'snapshots': {}, 'sources': {}, 'data_tests': {}, 'unit_tests': {}, 'metrics': {}, 'semantic-models': {}, 'saved-queries': {}, 'exposures': {}, 'vars': {}, 'require-dbt-version': ['=0.0.0'], 'restrict-access': False, 'dbt-cloud': {}, 'flags': {}, 'query-comment': {'comment': \"\\n{%- set comment_dict = {} -%}\\n{%- do comment_dict.update(\\n app='dbt',\\n dbt_version=dbt_version,\\n profile_name=target.get('profile_name'),\\n target_name=target.get('target_name'),\\n) -%}\\n{%- if node is not none -%}\\n {%- do comment_dict.update(\\n node_id=node.unique_id,\\n ) -%}\\n{% else %}\\n {# in the node context, the connection name is the node_id #}\\n {%- do comment_dict.update(connection_name=connection_name) -%}\\n{%- endif -%}\\n{{ return(tojson(comment_dict)) }}\\n\", 'append': False, 'job-label': False}, 'packages': []}"
)
def test_get_selector(self, project: Project):

View File

@@ -297,3 +297,29 @@ class TestNodeSelector:
queue.get(block=False)
queue.mark_done(got.unique_id)
assert queue.empty()
def test_select_downstream_of_empty_model(self, runtime_config: RuntimeConfig):
# empty model
model_one = make_model(pkg="other", name="model_one", code="")
# non-empty model
model_two = make_model(
pkg="pkg",
name="model_two",
code="""select * from {{ref('model_one')}}""",
refs=[model_one],
)
models = [model_one, model_two]
manifest = make_manifest(nodes=models)
# Get the graph
compiler = dbt.compilation.Compiler(runtime_config)
graph = compiler.compile(manifest)
# Ensure that model_two is selected as downstream of model_one
selector = NodeSelector(graph, manifest)
spec = graph_selector.SelectionCriteria.from_single_spec("model_one+")
assert selector.get_selected(spec) == {"model.pkg.model_two"}
# Ensure that --indirect-selection empty returns the same result
spec.indirect_selection = graph_selector.IndirectSelection.Empty
assert selector.get_selected(spec) == {"model.pkg.model_two"}

View File

@@ -412,6 +412,9 @@ sample_values = [
core_types.CompiledNode(
node_name="", compiled="", is_inline=True, unique_id="model.test.my_model"
),
core_types.SnapshotTimestampWarning(
snapshot_time_data_type="DATETIME", updated_at_data_type="DATETIMEZ"
),
# W - Node testing ======================
core_types.CatchableExceptionOnRun(exc=""),
core_types.InternalErrorOnRun(build_path="", exc=""),

View File

@@ -70,6 +70,7 @@ def project(selector_config: SelectorConfig) -> Project:
project_env_vars={},
restrict_access=False,
dbt_cloud={},
flags={},
)