Compare commits

..

1 Commits

Author SHA1 Message Date
Chenyu Li
db0535f64c Interface for Execution queue 2024-08-09 18:20:37 -07:00
78 changed files with 517 additions and 1282 deletions

View File

@@ -1,6 +0,0 @@
kind: Breaking Changes
body: Fix changing the current working directory when using dpt deps, clean and init.
time: 2023-12-06T19:24:42.575372+09:00
custom:
Author: rariyama
Issue: "8997"

View File

@@ -1,7 +0,0 @@
kind: Dependencies
body: Increase supported version range for dbt-semantic-interfaces. Needed to support
custom calendar features.
time: 2024-08-20T13:19:09.015225-07:00
custom:
Author: courtneyholcomb
Issue: "9265"

View File

@@ -1,6 +0,0 @@
kind: Features
body: Warning message for snapshot timestamp data types
time: 2024-06-21T14:16:35.717637-04:00
custom:
Author: gshank
Issue: "10234"

View File

@@ -1,6 +0,0 @@
kind: Features
body: Add support for behavior flags
time: 2024-08-29T13:53:20.16122-04:00
custom:
Author: mikealfare
Issue: "10618"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: Do not update varchar column definitions if a contract exists
time: 2024-07-28T22:14:21.67712-04:00
custom:
Author: gshank
Issue: "10362"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: Fix state:modified check for exports
time: 2024-08-13T15:42:35.471685-07:00
custom:
Author: aliceliu
Issue: "10138"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: Filter out empty nodes after graph selection to support consistent selection of nodes that depend on upstream public models
time: 2024-08-16T14:08:07.426235-07:00
custom:
Author: jtcohen6
Issue: "8987"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: Late render pre- and post-hooks configs in properties / schema YAML files
time: 2024-08-24T21:09:03.252733-06:00
custom:
Author: dbeatty10
Issue: "10603"

View File

@@ -1,7 +0,0 @@
kind: Fixes
body: Allow the use of env_var function in certain macros in which it was previously
unavailable.
time: 2024-08-29T10:57:01.160613-04:00
custom:
Author: peterallenwebb
Issue: "10609"

View File

@@ -1,6 +0,0 @@
kind: Fixes
body: 'Remove deprecation for tests: to data_tests: change'
time: 2024-09-05T18:02:48.086421-04:00
custom:
Author: gshank
Issue: "10564"

View File

@@ -1,7 +0,0 @@
kind: Under the Hood
body: Add group info to RunResultError, RunResultFailure, RunResultWarning log lines
time: 2024-08-07T15:56:52.171199-05:00
custom:
Author: aranke
Issue: ""
JiraID: "364"

View File

@@ -1,6 +0,0 @@
kind: Under the Hood
body: Add test for sources tables with quotes
time: 2024-08-21T09:55:16.038101-04:00
custom:
Author: gshank
Issue: "10582"

View File

@@ -1,6 +0,0 @@
kind: Under the Hood
body: Additional type hints for `core/dbt/version.py`
time: 2024-08-27T10:50:14.047859-05:00
custom:
Author: QMalcolm
Issue: "10612"

View File

@@ -1,6 +0,0 @@
kind: Under the Hood
body: Fix typing issues in core/dbt/contracts/sql.py
time: 2024-08-27T11:31:23.749912-05:00
custom:
Author: QMalcolm
Issue: "10614"

View File

@@ -1,6 +0,0 @@
kind: Under the Hood
body: Fix type errors in `dbt/core/task/clean.py`
time: 2024-08-27T11:48:10.438173-05:00
custom:
Author: QMalcolm
Issue: "10616"

View File

@@ -165,18 +165,6 @@ jobs:
os: [ubuntu-20.04]
split-group: ${{ fromJson(needs.integration-metadata.outputs.split-groups) }}
include: ${{ fromJson(needs.integration-metadata.outputs.include) }}
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
env:
TOXENV: integration
DBT_INVOCATION_ENV: github-actions
@@ -201,32 +189,14 @@ jobs:
- name: Set up postgres (linux)
if: runner.os == 'Linux'
uses: ./.github/actions/setup-postgres-linux
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (macos)
if: runner.os == 'macOS'
uses: ./.github/actions/setup-postgres-macos
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Set up postgres (windows)
if: runner.os == 'Windows'
uses: ./.github/actions/setup-postgres-windows
env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: postgres
PGDATABASE: postgres
- name: Install python tools
run: |

View File

@@ -34,7 +34,6 @@ class Export(dbtClassMixin):
name: str
config: ExportConfig
unrendered_config: Dict[str, str] = field(default_factory=dict)
@dataclass

View File

@@ -1,10 +1,7 @@
from typing import IO, List, Optional, Union
from typing import IO, Optional
from click.exceptions import ClickException
from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import RunExecutionResult
from dbt.utils import ExitCodes
@@ -26,7 +23,7 @@ class CliException(ClickException):
# the typing of _file is to satisfy the signature of ClickException.show
# overriding this method prevents click from printing any exceptions to stdout
def show(self, _file: Optional[IO] = None) -> None: # type: ignore[type-arg]
def show(self, _file: Optional[IO] = None) -> None:
pass
@@ -34,17 +31,7 @@ class ResultExit(CliException):
"""This class wraps any exception that contains results while invoking dbt, or the
results of an invocation that did not succeed but did not throw any exceptions."""
def __init__(
self,
result: Union[
bool, # debug
CatalogArtifact, # docs generate
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None,
) -> None:
def __init__(self, result) -> None:
super().__init__(ExitCodes.ModelError)
self.result = result

View File

@@ -218,9 +218,10 @@ def clean(ctx, **kwargs):
"""Delete all folders in the clean-targets list (usually the dbt_packages and target directories.)"""
from dbt.task.clean import CleanTask
with CleanTask(ctx.obj["flags"], ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
task = CleanTask(ctx.obj["flags"], ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -436,9 +437,9 @@ def deps(ctx, **kwargs):
message=f"Version is required in --add-package when a package when source is {flags.SOURCE}",
option_name="--add-package",
)
with DepsTask(flags, ctx.obj["project"]) as task:
results = task.run()
success = task.interpret_results(results)
task = DepsTask(flags, ctx.obj["project"])
results = task.run()
success = task.interpret_results(results)
return results, success
@@ -458,9 +459,10 @@ def init(ctx, **kwargs):
"""Initialize a new dbt project."""
from dbt.task.init import InitTask
with InitTask(ctx.obj["flags"]) as task:
results = task.run()
success = task.interpret_results(results)
task = InitTask(ctx.obj["flags"])
results = task.run()
success = task.interpret_results(results)
return results, success

View File

@@ -547,8 +547,6 @@ class Compiler:
the node's raw_code into compiled_code, and then calls the
recursive method to "prepend" the ctes.
"""
# REVIEW: UnitTestDefinition shouldn't be possible here because of the
# type of node, and it is likewise an invalid return type.
if isinstance(node, UnitTestDefinition):
return node

View File

@@ -480,7 +480,6 @@ class PartialProject(RenderComponents):
rendered.selectors_dict["selectors"]
)
dbt_cloud = cfg.dbt_cloud
flags: Dict[str, Any] = cfg.flags
project = Project(
project_name=name,
@@ -525,7 +524,6 @@ class PartialProject(RenderComponents):
project_env_vars=project_env_vars,
restrict_access=cfg.restrict_access,
dbt_cloud=dbt_cloud,
flags=flags,
)
# sanity check - this means an internal issue
project.validate()
@@ -570,6 +568,11 @@ class PartialProject(RenderComponents):
) = package_and_project_data_from_root(project_root)
selectors_dict = selector_data_from_root(project_root)
if "flags" in project_dict:
# We don't want to include "flags" in the Project,
# it goes in ProjectFlags
project_dict.pop("flags")
return cls.from_dicts(
project_root=project_root,
project_dict=project_dict,
@@ -642,7 +645,6 @@ class Project:
project_env_vars: Dict[str, Any]
restrict_access: bool
dbt_cloud: Dict[str, Any]
flags: Dict[str, Any]
@property
def all_source_paths(self) -> List[str]:
@@ -722,7 +724,6 @@ class Project:
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
"restrict-access": self.restrict_access,
"dbt-cloud": self.dbt_cloud,
"flags": self.flags,
}
)
if self.query_comment:

View File

@@ -193,7 +193,6 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
log_cache_events=log_cache_events,
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
)
# Called by 'load_projects' in this class

View File

@@ -8,7 +8,7 @@ from dbt.adapters.exceptions import (
RelationWrongTypeError,
)
from dbt.adapters.exceptions.cache import CacheInconsistencyError
from dbt.events.types import JinjaLogWarning, SnapshotTimestampWarning
from dbt.events.types import JinjaLogWarning
from dbt.exceptions import (
AmbiguousAliasError,
AmbiguousCatalogMatchError,
@@ -116,17 +116,6 @@ def raise_fail_fast_error(msg, node=None) -> NoReturn:
raise FailFastError(msg, node=node)
def warn_snapshot_timestamp_data_types(
snapshot_time_data_type: str, updated_at_data_type: str
) -> None:
warn_or_error(
SnapshotTimestampWarning(
snapshot_time_data_type=snapshot_time_data_type,
updated_at_data_type=updated_at_data_type,
)
)
# Update this when a new function should be added to the
# dbt context's `exceptions` key!
CONTEXT_EXPORTS = {
@@ -152,7 +141,6 @@ CONTEXT_EXPORTS = {
raise_contract_error,
column_type_missing,
raise_fail_fast_error,
warn_snapshot_timestamp_data_types,
]
}

View File

@@ -975,7 +975,7 @@ class ProviderContext(ManifestContext):
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
# this is used by some adapters
table.original_abspath = os.path.abspath(path) # type: ignore
table.original_abspath = os.path.abspath(path)
return table
@contextproperty()

View File

@@ -18,6 +18,7 @@ from typing import (
from mashumaro.types import SerializableType
from dbt import deprecations
from dbt.adapters.base import ConstraintSupport
from dbt.adapters.factory import get_adapter_constraint_support
from dbt.artifacts.resources import Analysis as AnalysisResource
@@ -1147,6 +1148,12 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if self.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
self.data_tests.extend(self.tests)
self.tests.clear()
@@ -1157,6 +1164,12 @@ class UnpatchedSourceDefinition(BaseNode):
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if column.tests:
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
column.data_tests.extend(column.tests)
column.tests.clear()
@@ -1454,13 +1467,6 @@ class Group(GroupResource, BaseNode):
def resource_class(cls) -> Type[GroupResource]:
return GroupResource
def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
}
# ====================================
# SemanticModel node
@@ -1566,12 +1572,13 @@ class SavedQuery(NodeInfoMixin, GraphNode, SavedQueryResource):
# exports should be in the same order, so we zip them for easy iteration
for old_export, new_export in zip(old.exports, self.exports):
if not (old_export.name == new_export.name):
if not (
old_export.name == new_export.name
and old_export.config.export_as == new_export.config.export_as
and old_export.config.schema_name == new_export.config.schema_name
and old_export.config.alias == new_export.config.alias
):
return False
keys = ["export_as", "schema", "alias"]
for key in keys:
if old_export.unrendered_config.get(key) != new_export.unrendered_config.get(key):
return False
return True

View File

@@ -155,7 +155,7 @@ class SemanticManifest:
raise ParsingError(
"The semantic layer requires a time spine model with granularity DAY or smaller in the project, "
"but none was found. Guidance on creating this model can be found on our docs site "
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)."
"(https://docs.getdbt.com/docs/build/metricflow-time-spine)." # TODO: update docs link when available!
)
# For backward compatibility: if legacy time spine exists, include it in the manifest.

View File

@@ -5,6 +5,7 @@ from mashumaro.jsonschema.annotations import Pattern
from mashumaro.types import SerializableType
from typing_extensions import Annotated
from dbt import deprecations
from dbt.adapters.contracts.connection import QueryComment
from dbt.contracts.util import Identifier, list_str
from dbt_common.contracts.util import Mergeable
@@ -258,7 +259,6 @@ class Project(dbtClassMixin):
query_comment: Optional[Union[QueryComment, NoValue, str]] = field(default_factory=NoValue)
restrict_access: bool = False
dbt_cloud: Optional[Dict[str, Any]] = None
flags: Dict[str, Any] = field(default_factory=dict)
class Config(dbtMashConfig):
# These tell mashumaro to use aliases for jsonschema and for "from_dict"
@@ -312,6 +312,10 @@ class Project(dbtClassMixin):
raise ValidationError(
"Invalid project config: cannot have both 'tests' and 'data_tests' defined"
)
if "tests" in data:
deprecations.warn(
"project-test-config", deprecated_path="tests", exp_path="data_tests"
)
@dataclass

View File

@@ -29,8 +29,7 @@ class RemoteCompileResult(RemoteCompileResultMixin):
generated_at: datetime = field(default_factory=datetime.utcnow)
@property
def error(self) -> None:
# TODO: Can we delete this? It's never set anywhere else and never accessed
def error(self):
return None
@@ -41,7 +40,7 @@ class RemoteExecutionResult(ExecutionResult):
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)
def write(self, path: str) -> None:
def write(self, path: str):
writable = RunResultsArtifact.from_execution_results(
generated_at=self.generated_at,
results=self.results,

View File

@@ -98,6 +98,11 @@ class CollectFreshnessReturnSignature(DBTDeprecation):
_event = "CollectFreshnessReturnSignature"
class TestsConfigDeprecation(DBTDeprecation):
_name = "project-test-config"
_event = "TestsConfigDeprecation"
class ProjectFlagsMovedDeprecation(DBTDeprecation):
_name = "project-flags-moved"
_event = "ProjectFlagsMovedDeprecation"
@@ -133,7 +138,7 @@ def renamed_env_var(old_name: str, new_name: str):
return cb
def warn(name: str, *args, **kwargs) -> None:
def warn(name, *args, **kwargs):
if name not in deprecations:
# this should (hopefully) never happen
raise RuntimeError("Error showing deprecation warning: {}".format(name))
@@ -162,6 +167,7 @@ deprecations_list: List[DBTDeprecation] = [
ConfigLogPathDeprecation(),
ConfigTargetPathDeprecation(),
CollectFreshnessReturnSignature(),
TestsConfigDeprecation(),
ProjectFlagsMovedDeprecation(),
PackageMaterializationOverrideDeprecation(),
ResourceNamesWithSpacesDeprecation(),

View File

@@ -1610,17 +1610,6 @@ message CompiledNodeMsg {
CompiledNode data = 2;
}
// Q043
message SnapshotTimestampWarning {
string snapshot_time_data_type = 1;
string updated_at_data_type = 2;
}
message SnapshotTimestampWarningMsg {
CoreEventInfo info = 1;
SnapshotTimestampWarning data = 2;
}
// W - Node testing
// Skipped W001
@@ -1820,19 +1809,12 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}
message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}
// Z021
message RunResultWarning {
string resource_type = 1;
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultWarningMsg {
@@ -1846,7 +1828,6 @@ message RunResultFailure {
string node_name = 2;
string path = 3;
NodeInfo node_info = 4;
Group group = 5;
}
message RunResultFailureMsg {
@@ -1868,7 +1849,6 @@ message StatsLineMsg {
message RunResultError {
string msg = 1;
NodeInfo node_info = 2;
Group group = 3;
}
message RunResultErrorMsg {

File diff suppressed because one or more lines are too long

View File

@@ -388,9 +388,6 @@ class ConfigTargetPathDeprecation(WarnLevel):
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
# Note: this deprecation has been removed, but we are leaving
# the event class here, because users may have specified it in
# warn_error_options.
class TestsConfigDeprecation(WarnLevel):
def code(self) -> str:
return "D012"
@@ -1617,18 +1614,6 @@ class CompiledNode(InfoLevel):
return f"Compiled node '{self.node_name}' is:\n{self.compiled}"
class SnapshotTimestampWarning(WarnLevel):
def code(self) -> str:
return "Q043"
def message(self) -> str:
return (
f"Data type of snapshot table timestamp columns ({self.snapshot_time_data_type}) "
f"doesn't match derived column 'updated_at' ({self.updated_at_data_type}). "
"Please update snapshot config 'updated_at'."
)
# =======================================================
# W - Node testing
# =======================================================

View File

@@ -4,13 +4,17 @@ from typing import Dict, Generator, List, Optional, Set
import networkx as nx # type: ignore
from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import (
Exposure,
GraphMemberNode,
Metric,
ModelNode,
SourceDefinition,
)
from dbt.contracts.state import PreviousState
from dbt.graph.selector_spec import SelectionSpec
from dbt.node_types import NodeType
from .graph import UniqueId
@@ -212,3 +216,80 @@ class GraphQueue:
with self.lock:
self.some_task_done.wait()
return self.inner.unfinished_tasks
class ExecutionQueue:
"""
ExecutionQueue manage what nodes to execute in what order, based on the supplied inputs.
It is responsible for managing the queue of nodes to execute, and for marking nodes as
done when they have been executed.
"""
def __init__(
self,
manifest: Manifest,
previous_state: PreviousState,
resource_types: List[NodeType],
include_empty_nodes: Optional[bool] = False,
selection_spec: Optional[SelectionSpec] = None,
fail_fast: Optional[bool] = False,
) -> None:
"""Create a new ExecutionQueue.
Nodes to execute are selected based on the manifest, previous state, selection spec, inlcude_empty_nodes, and resource_types.
See Args for more details.
Example usage:
pool = ThreadPool(4)
queue = ExecutionQueue(manifest, previous_state, [NodeType.Model, NodeType.Test])
def callback(result: RunResult):
queue.handle_node_result(result)
def run(node: GraphMemberNode):
result = node.run()
return result
while queue.count() > 0:
node = queue.get()
pool.apply_async(run, args=(node), callback=callback)
results = queue.join()
Args:
manifest (Manifest): the manifest of the project
previous_state (PreviousState): the previous state of the project, used in state selection.
resource_types (List[NodeType]): the types of resources to include in the selection.
include_empty_nodes (Optional[bool]): whether to include nodes that do not have values in the selection. Defaults to False.
selection_spec (Optional[SelectionSpec]): the selection spec to use. Defaults to None
fail_fast (Optional[bool]): when set to True, the will will stop execution after the first error. Defaults to False.
"""
pass
def count(self) -> int:
"""
Returns:
int: the number of nodes in the queue (excluding in-progress nodes)
"""
return 0
def handle_node_result(self, result: RunResult) -> None:
"""Given a RunResult, mark the node as done and update the queue to make more nodes avaliable.
Args:
result (RunResult): _description_
"""
pass
def get(self, block: bool = True) -> GraphMemberNode:
"""
Get the next node to execute.
Args:
block (bool, optional): whether to block until a node is available. Defaults to True.
"""
return ModelNode() # type: ignore
def join(self) -> list[RunResult]:
"""Wait for all nodes to finish executing, and return the results of all nodes.
Returns:
list[RunResult]: the results of all nodes.
"""
return []

View File

@@ -87,15 +87,12 @@ class NodeSelector(MethodManager):
)
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)
selected = collected | neighbors
# if --indirect-selection EMPTY, do not expand to adjacent tests
if spec.indirect_selection == IndirectSelection.Empty:
return selected, set()
return collected, set()
else:
neighbors = self.collect_specified_neighbors(spec, collected)
direct_nodes, indirect_nodes = self.expand_selection(
selected=selected, indirect_selection=spec.indirect_selection
selected=(collected | neighbors), indirect_selection=spec.indirect_selection
)
return direct_nodes, indirect_nodes
@@ -180,14 +177,10 @@ class NodeSelector(MethodManager):
node = self.manifest.nodes[unique_id]
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
return node.empty
if self.include_empty_nodes:
return node.config.enabled
else:
return False
return not node.empty and node.config.enabled
def node_is_match(self, node: GraphMemberNode) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
@@ -219,12 +212,7 @@ class NodeSelector(MethodManager):
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {
unique_id
for unique_id in selected
if self._is_match(unique_id)
and (self.include_empty_nodes or not self._is_empty_node(unique_id))
}
return {unique_id for unique_id in selected if self._is_match(unique_id)}
def expand_selection(
self,

View File

@@ -1028,11 +1028,12 @@ class ManifestLoader:
return state_check
def save_macros_to_adapter(self, adapter):
adapter.set_macro_resolver(self.manifest)
macro_manifest = MacroManifest(self.manifest.macros)
adapter.set_macro_resolver(macro_manifest)
# This executes the callable macro_hook and sets the
# query headers
# This executes the callable macro_hook and sets the query headers
query_header_context = generate_query_header_context(adapter.config, self.manifest)
query_header_context = generate_query_header_context(adapter.config, macro_manifest)
self.macro_hook(query_header_context)
# This creates a MacroManifest which contains the macros in

View File

@@ -11,7 +11,6 @@ from dbt.config.renderer import BaseRenderer, Keypath
# keyword args are rendered to capture refs in render_test_update.
# Keyword args are finally rendered at compilation time.
# Descriptions are not rendered until 'process_docs'.
# Pre- and post-hooks in configs are late-rendered.
class SchemaYamlRenderer(BaseRenderer):
def __init__(self, context: Dict[str, Any], key: str) -> None:
super().__init__(context)
@@ -44,14 +43,6 @@ class SchemaYamlRenderer(BaseRenderer):
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
return True
# pre- and post-hooks
if (
len(keypath) >= 2
and keypath[0] == "config"
and keypath[1] in ("pre_hook", "post_hook")
):
return True
# versions
if len(keypath) == 5 and keypath[4] == "description":
return True

View File

@@ -778,9 +778,7 @@ class SavedQueryParser(YamlReader):
self, unparsed: UnparsedExport, saved_query_config: SavedQueryConfig
) -> Export:
return Export(
name=unparsed.name,
config=self._get_export_config(unparsed.config, saved_query_config),
unrendered_config=unparsed.config,
name=unparsed.name, config=self._get_export_config(unparsed.config, saved_query_config)
)
def _get_query_params(self, unparsed: UnparsedQueryParams) -> QueryParams:

View File

@@ -4,6 +4,7 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar
from dbt import deprecations
from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import TimeSpine
from dbt.clients.jinja_static import statically_parse_ref_or_source
@@ -567,6 +568,12 @@ class PatchParser(YamlReader, Generic[NonSourceTarget, Parsed]):
raise ValidationError(
"Invalid test config: cannot have both 'tests' and 'data_tests' defined"
)
if is_root_project:
deprecations.warn(
"project-test-config",
deprecated_path="tests",
exp_path="data_tests",
)
data["data_tests"] = data.pop("tests")
# model-level tests

View File

@@ -17,5 +17,5 @@ class PluginNodes:
def add_model(self, model_args: ModelNodeArgs) -> None:
self.models[model_args.unique_id] = model_args
def update(self, other: "PluginNodes") -> None:
def update(self, other: "PluginNodes"):
self.models.update(other.models)

View File

@@ -44,10 +44,15 @@ from dbt.graph import Graph
from dbt.task.printer import print_run_result_error
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError, NotImplementedError
from dbt_common.exceptions import (
CompilationError,
DbtInternalError,
DbtRuntimeError,
NotImplementedError,
)
def read_profiles(profiles_dir: Optional[str] = None) -> Dict[str, Any]:
def read_profiles(profiles_dir=None):
"""This is only used for some error handling"""
if profiles_dir is None:
profiles_dir = get_flags().PROFILES_DIR
@@ -66,13 +71,6 @@ class BaseTask(metaclass=ABCMeta):
def __init__(self, args: Flags) -> None:
self.args = args
def __enter__(self):
self.orig_dir = os.getcwd()
return self
def __exit__(self, exc_type, exc_value, traceback):
os.chdir(self.orig_dir)
@abstractmethod
def run(self):
raise dbt_common.exceptions.base.NotImplementedError("Not Implemented")
@@ -125,7 +123,7 @@ class ConfiguredTask(BaseTask):
self.manifest = manifest
self.compiler = Compiler(self.config)
def compile_manifest(self) -> None:
def compile_manifest(self):
if self.manifest is None:
raise DbtInternalError("compile_manifest called before manifest was loaded")
@@ -167,7 +165,7 @@ class ExecutionContext:
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
self.config = config
self.compiler = Compiler(config)
self.adapter = adapter
@@ -274,7 +272,7 @@ class BaseRunner(metaclass=ABCMeta):
failures=result.failures,
)
def compile_and_execute(self, manifest: Manifest, ctx: ExecutionContext):
def compile_and_execute(self, manifest, ctx):
result = None
with (
self.adapter.connection_named(self.node.unique_id, self.node)
@@ -307,7 +305,7 @@ class BaseRunner(metaclass=ABCMeta):
return result
def _handle_catchable_exception(self, e: DbtRuntimeError, ctx: ExecutionContext) -> str:
def _handle_catchable_exception(self, e, ctx):
if e.node is None:
e.add_node(ctx.node)
@@ -318,7 +316,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
def _handle_internal_exception(self, e, ctx):
fire_event(
InternalErrorOnRun(
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
@@ -326,7 +324,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
def _handle_generic_exception(self, e, ctx):
fire_event(
GenericExceptionOnRun(
build_path=self._node_build_path(),
@@ -339,8 +337,9 @@ class BaseRunner(metaclass=ABCMeta):
return str(e)
def handle_exception(self, e: Exception, ctx: ExecutionContext) -> str:
if isinstance(e, DbtRuntimeError):
def handle_exception(self, e, ctx):
catchable_errors = (CompilationError, DbtRuntimeError)
if isinstance(e, catchable_errors):
error = self._handle_catchable_exception(e, ctx)
elif isinstance(e, DbtInternalError):
error = self._handle_internal_exception(e, ctx)
@@ -348,7 +347,7 @@ class BaseRunner(metaclass=ABCMeta):
error = self._handle_generic_exception(e, ctx)
return error
def safe_run(self, manifest: Manifest):
def safe_run(self, manifest):
started = time.time()
ctx = ExecutionContext(self.node)
error = None
@@ -395,19 +394,19 @@ class BaseRunner(metaclass=ABCMeta):
return None
def before_execute(self) -> None:
raise NotImplementedError("before_execute is not implemented")
def before_execute(self):
raise NotImplementedError()
def execute(self, compiled_node, manifest):
raise NotImplementedError("execute is not implemented")
raise NotImplementedError()
def run(self, compiled_node, manifest):
return self.execute(compiled_node, manifest)
def after_execute(self, result) -> None:
raise NotImplementedError("after_execute is not implemented")
def after_execute(self, result):
raise NotImplementedError()
def _skip_caused_by_ephemeral_failure(self) -> bool:
def _skip_caused_by_ephemeral_failure(self):
if self.skip_cause is None or self.skip_cause.node is None:
return False
return self.skip_cause.node.is_ephemeral_model
@@ -462,7 +461,7 @@ class BaseRunner(metaclass=ABCMeta):
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
return node_result
def do_skip(self, cause=None) -> None:
def do_skip(self, cause=None):
self.skip = True
self.skip_cause = cause

View File

@@ -1,5 +1,5 @@
import threading
from typing import Dict, List, Optional, Set, Type
from typing import Dict, List, Set
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.run import RunResult
@@ -24,16 +24,16 @@ from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
def description(self):
return f"saved query {self.node.name}"
def before_execute(self) -> None:
def before_execute(self):
pass
def compile(self, manifest: Manifest):
def compile(self, manifest):
return self.node
def after_execute(self, result) -> None:
def after_execute(self, result):
fire_event(
LogNodeNoOpResult(
description=self.description,
@@ -83,7 +83,7 @@ class BuildTask(RunTask):
self.selected_unit_tests: Set = set()
self.model_to_unit_test_map: Dict[str, List] = {}
def resource_types(self, no_unit_tests: bool = False) -> List[NodeType]:
def resource_types(self, no_unit_tests=False):
resource_types = resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
)
@@ -210,7 +210,7 @@ class BuildTask(RunTask):
resource_types=resource_types,
)
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, node):
return self.RUNNER_MAP.get(node.resource_type)
# Special build compile_manifest method to pass add_test_edges to the compiler

View File

@@ -16,7 +16,7 @@ class CleanTask(BaseTask):
self.config = config
self.project = config
def run(self) -> None:
def run(self):
"""
This function takes all the paths in the target file
and cleans the project paths that are not protected.

View File

@@ -1,8 +1,7 @@
import threading
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type
from typing import AbstractSet, Any, Iterable, List, Optional, Set
from dbt.adapters.base import BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
from dbt.context.providers import generate_runtime_model_context
@@ -17,10 +16,10 @@ from dbt_common.exceptions import CompilationError, DbtInternalError
class CloneRunner(BaseRunner):
def before_execute(self) -> None:
def before_execute(self):
pass
def after_execute(self, result) -> None:
def after_execute(self, result):
pass
def _build_run_model_result(self, model, context):
@@ -45,7 +44,7 @@ class CloneRunner(BaseRunner):
failures=None,
)
def compile(self, manifest: Manifest):
def compile(self, manifest):
# no-op
return self.node
@@ -92,7 +91,7 @@ class CloneRunner(BaseRunner):
class CloneTask(GraphRunnableTask):
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_run_mode(self) -> GraphRunnableMode:
@@ -134,8 +133,8 @@ class CloneTask(GraphRunnableTask):
self.populate_adapter_cache(adapter, schemas_to_cache)
@property
def resource_types(self) -> List[NodeType]:
resource_types: Collection[NodeType] = resource_types_from_args(
def resource_types(self):
resource_types = resource_types_from_args(
self.args, set(REFABLE_NODE_TYPES), set(REFABLE_NODE_TYPES)
)
@@ -155,5 +154,5 @@ class CloneTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return CloneRunner

View File

@@ -1,8 +1,6 @@
import threading
from typing import Optional, Type
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import CompiledNode, ParseInlineNodeError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import EXECUTABLE_NODE_TYPES, NodeType
@@ -19,10 +17,10 @@ from dbt_common.exceptions import DbtInternalError
class CompileRunner(BaseRunner):
def before_execute(self) -> None:
def before_execute(self):
pass
def after_execute(self, result) -> None:
def after_execute(self, result):
pass
def execute(self, compiled_node, manifest):
@@ -37,7 +35,7 @@ class CompileRunner(BaseRunner):
failures=None,
)
def compile(self, manifest: Manifest):
def compile(self, manifest):
return self.compiler.compile_node(self.node, manifest, {})
@@ -46,7 +44,7 @@ class CompileTask(GraphRunnableTask):
# it should be removed before the task is complete
_inline_node_id = None
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return True
def get_node_selector(self) -> ResourceTypeSelector:
@@ -64,10 +62,10 @@ class CompileTask(GraphRunnableTask):
resource_types=resource_types,
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return CompileRunner
def task_end_messages(self, results) -> None:
def task_end_messages(self, results):
is_inline = bool(getattr(self.args, "inline", None))
output_format = getattr(self.args, "output", "text")
@@ -129,14 +127,14 @@ class CompileTask(GraphRunnableTask):
raise DbtException("Error parsing inline query")
super()._runtime_initialize()
def after_run(self, adapter, results) -> None:
def after_run(self, adapter, results):
# remove inline node from manifest
if self._inline_node_id:
self.manifest.nodes.pop(self._inline_node_id)
self._inline_node_id = None
super().after_run(adapter, results)
def _handle_result(self, result) -> None:
def _handle_result(self, result):
super()._handle_result(result)
if (

View File

@@ -481,7 +481,7 @@ class DebugTask(BaseTask):
return status
@classmethod
def validate_connection(cls, target_dict) -> None:
def validate_connection(cls, target_dict):
"""Validate a connection dictionary. On error, raises a DbtConfigError."""
target_name = "test"
# make a fake profile that we can parse

View File

@@ -96,6 +96,8 @@ class DepsTask(BaseTask):
# See GH-7615
project.project_root = str(Path(project.project_root).resolve())
self.project = project
move_to_nearest_project_dir(project.project_root)
self.cli_vars = args.vars
def track_package_install(
@@ -200,7 +202,6 @@ class DepsTask(BaseTask):
fire_event(DepsLockUpdating(lock_filepath=lock_filepath))
def run(self) -> None:
move_to_nearest_project_dir(self.args.project_dir)
if self.args.add_package:
self.add()

View File

@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import AbstractSet, Dict, List, Optional, Type
from typing import AbstractSet, Dict, List, Optional
from dbt import deprecations
from dbt.adapters.base.impl import FreshnessResponse
@@ -14,7 +14,6 @@ from dbt.artifacts.schemas.freshness import (
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
from dbt.events.types import FreshnessCheckComplete, LogFreshnessResult, LogStartLine
@@ -45,7 +44,7 @@ class FreshnessRunner(BaseRunner):
def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")
def before_execute(self) -> None:
def before_execute(self):
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
LogStartLine(
@@ -56,7 +55,7 @@ class FreshnessRunner(BaseRunner):
)
)
def after_execute(self, result) -> None:
def after_execute(self, result):
if hasattr(result, "node"):
source_name = result.node.source_name
table_name = result.node.name
@@ -163,7 +162,7 @@ class FreshnessRunner(BaseRunner):
**freshness,
)
def compile(self, manifest: Manifest):
def compile(self, manifest):
if self.node.resource_type != NodeType.Source:
# should be unreachable...
raise DbtRuntimeError("freshness runner: got a non-Source")
@@ -185,13 +184,13 @@ class FreshnessTask(RunTask):
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}
def result_path(self) -> str:
def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
else:
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_node_selector(self):
@@ -215,7 +214,7 @@ class FreshnessTask(RunTask):
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return FreshnessRunner
def get_result(self, results, elapsed_time, generated_at):
@@ -223,7 +222,7 @@ class FreshnessTask(RunTask):
elapsed_time=elapsed_time, generated_at=generated_at, results=results
)
def task_end_messages(self, results) -> None:
def task_end_messages(self, results):
for result in results:
if result.status in (
FreshnessStatus.Error,

View File

@@ -1,5 +1,4 @@
import json
from typing import Iterator, List
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
@@ -146,7 +145,7 @@ class ListTask(GraphRunnableTask):
}
)
def generate_paths(self) -> Iterator[str]:
def generate_paths(self):
for node in self._iterate_selected_nodes():
yield node.original_file_path
@@ -178,7 +177,7 @@ class ListTask(GraphRunnableTask):
return self.node_results
@property
def resource_types(self) -> List[NodeType]:
def resource_types(self):
if self.args.models:
return [NodeType.Model]

View File

@@ -1,7 +1,6 @@
from typing import Dict, Optional
from typing import Dict
from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
@@ -69,9 +68,7 @@ def print_run_status_line(results) -> None:
fire_event(StatsLine(stats=stats))
def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
) -> None:
def print_run_result_error(result, newline: bool = True, is_warning: bool = False) -> None:
# set node_info for logging events
node_info = None
if hasattr(result, "node") and result.node:
@@ -80,25 +77,21 @@ def print_run_result_error(
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
)
)
@@ -106,10 +99,7 @@ def print_run_result_error(
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
fire_event(RunResultError(msg=result.message, node_info=node_info))
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))
@@ -129,13 +119,10 @@ def print_run_result_error(
elif result.message is not None:
if newline:
fire_event(Formatting(""))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
fire_event(RunResultError(msg=result.message, node_info=node_info))
def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
errors, warnings = [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
@@ -157,11 +144,9 @@ def print_run_end_messages(
)
for error in errors:
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)
print_run_result_error(error, is_warning=False)
for warning in warnings:
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)
print_run_result_error(warning, is_warning=True)
print_run_status_line(results)

View File

@@ -2,7 +2,7 @@ import functools
import threading
import time
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple
from dbt import tracking, utils
from dbt.adapters.base import BaseRelation
@@ -36,7 +36,6 @@ from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.node_types import NodeType, RunHookType
from dbt.task.base import BaseRunner
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.base_types import EventLevel
from dbt_common.events.contextvars import log_contextvars
@@ -180,7 +179,7 @@ class ModelRunner(CompileRunner):
relation = relation.include(database=False)
return str(relation)
def describe_node(self) -> str:
def describe_node(self):
# TODO CL 'language' will be moved to node level when we change representation
return f"{self.node.language} {self.node.get_materialization()} model {self.get_node_representation()}"
@@ -214,10 +213,10 @@ class ModelRunner(CompileRunner):
level=level,
)
def before_execute(self) -> None:
def before_execute(self):
self.print_start_line()
def after_execute(self, result) -> None:
def after_execute(self, result):
track_model_run(self.node_index, self.num_nodes, result)
self.print_result_line(result)
@@ -473,20 +472,9 @@ class RunTask(CompileTask):
resource_types=[NodeType.Model],
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return ModelRunner
def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}
return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}
def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])
if results:
print_run_end_messages(results, groups=groups)
print_run_end_messages(results)

View File

@@ -5,7 +5,7 @@ from concurrent.futures import as_completed
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool
from pathlib import Path
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Union
import dbt.exceptions
import dbt.tracking
@@ -181,13 +181,13 @@ class GraphRunnableTask(ConfiguredTask):
self.num_nodes = len([n for n in self._flattened_nodes if not n.is_ephemeral_model])
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, node):
raise NotImplementedError("Not Implemented")
def result_path(self) -> str:
def result_path(self):
return os.path.join(self.config.project_target_path, RESULT_FILE_NAME)
def get_runner(self, node) -> BaseRunner:
@@ -204,10 +204,6 @@ class GraphRunnableTask(ConfiguredTask):
num_nodes = self.num_nodes
cls = self.get_runner_type(node)
if cls is None:
raise DbtInternalError("Could not find runner type for node.")
return cls(self.config, adapter, node, run_count, num_nodes)
def call_runner(self, runner: BaseRunner) -> RunResult:
@@ -338,7 +334,7 @@ class GraphRunnableTask(ConfiguredTask):
args = [runner]
self._submit(pool, args, callback)
def _handle_result(self, result: RunResult) -> None:
def _handle_result(self, result: RunResult):
"""Mark the result as completed, insert the `CompileResultNode` into
the manifest, and mark any descendants (potentially with a 'cause' if
the result was an ephemeral model) as skipped.
@@ -483,7 +479,7 @@ class GraphRunnableTask(ConfiguredTask):
self.defer_to_manifest()
self.populate_adapter_cache(adapter)
def after_run(self, adapter, results) -> None:
def after_run(self, adapter, results):
pass
def print_results_line(self, node_results, elapsed):
@@ -663,7 +659,7 @@ class GraphRunnableTask(ConfiguredTask):
args=dbt.utils.args_to_dict(self.args),
)
def task_end_messages(self, results) -> None:
def task_end_messages(self, results):
print_run_end_messages(results)
def _get_previous_state(self) -> Optional[Manifest]:

View File

@@ -1,12 +1,9 @@
import random
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogSeedResult, LogStartLine, SeedHeader
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.events.types import Formatting
@@ -17,10 +14,10 @@ from .run import ModelRunner, RunTask
class SeedRunner(ModelRunner):
def describe_node(self) -> str:
def describe_node(self):
return "seed file {}".format(self.get_node_representation())
def before_execute(self) -> None:
def before_execute(self):
fire_event(
LogStartLine(
description=self.describe_node(),
@@ -36,7 +33,7 @@ class SeedRunner(ModelRunner):
result.agate_table = agate_result.table
return result
def compile(self, manifest: Manifest):
def compile(self, manifest):
return self.node
def print_result_line(self, result):
@@ -58,7 +55,7 @@ class SeedRunner(ModelRunner):
class SeedTask(RunTask):
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_node_selector(self):
@@ -71,10 +68,10 @@ class SeedTask(RunTask):
resource_types=[NodeType.Seed],
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return SeedRunner
def task_end_messages(self, results) -> None:
def task_end_messages(self, results):
if self.args.show:
self.show_tables(results)

View File

@@ -67,7 +67,7 @@ class ShowTask(CompileTask):
else:
return ShowRunner
def task_end_messages(self, results) -> None:
def task_end_messages(self, results):
is_inline = bool(getattr(self.args, "inline", None))
if is_inline:
@@ -108,7 +108,7 @@ class ShowTask(CompileTask):
)
)
def _handle_result(self, result) -> None:
def _handle_result(self, result):
super()._handle_result(result)
if (

View File

@@ -1,10 +1,7 @@
from typing import Optional, Type
from dbt.artifacts.schemas.results import NodeStatus
from dbt.events.types import LogSnapshotResult
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner
from dbt_common.events.base_types import EventLevel
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError
@@ -14,7 +11,7 @@ from .run import ModelRunner, RunTask
class SnapshotRunner(ModelRunner):
def describe_node(self) -> str:
def describe_node(self):
return "snapshot {}".format(self.get_node_representation())
def print_result_line(self, result):
@@ -37,7 +34,7 @@ class SnapshotRunner(ModelRunner):
class SnapshotTask(RunTask):
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_node_selector(self):
@@ -50,5 +47,5 @@ class SnapshotTask(RunTask):
resource_types=[NodeType.Snapshot],
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return SnapshotRunner

View File

@@ -5,7 +5,6 @@ from typing import Generic, TypeVar
import dbt.exceptions
import dbt_common.exceptions.base
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.sql import (
RemoteCompileResult,
RemoteCompileResultMixin,
@@ -29,19 +28,18 @@ class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
exc=str(e), exc_info=traceback.format_exc(), node_info=self.node.node_info
)
)
# REVIEW: This code is invalid and will always throw.
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt_common.exceptions.DbtRuntimeError):
e.add_node(ctx.node)
return e
def before_execute(self) -> None:
def before_execute(self):
pass
def after_execute(self, result) -> None:
def after_execute(self, result):
pass
def compile(self, manifest: Manifest):
def compile(self, manifest):
return self.compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod

View File

@@ -3,7 +3,7 @@ import json
import re
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import daff
@@ -27,7 +27,6 @@ from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.parser.unit_tests import UnitTestManifestLoader
from dbt.task.base import BaseRunner
from dbt.utils import _coerce_decimal, strtobool
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.format import pluralize
@@ -85,14 +84,14 @@ class TestRunner(CompileRunner):
_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_LOG_TEST_RESULT_EVENTS = LogTestResult
def describe_node_name(self) -> str:
def describe_node_name(self):
if self.node.resource_type == NodeType.Unit:
name = f"{self.node.model}::{self.node.versioned_name}"
return name
else:
return self.node.name
def describe_node(self) -> str:
def describe_node(self):
return f"{self.node.resource_type} {self.describe_node_name()}"
def print_result_line(self, result):
@@ -121,7 +120,7 @@ class TestRunner(CompileRunner):
)
)
def before_execute(self) -> None:
def before_execute(self):
self.print_start_line()
def execute_data_test(self, data_test: TestNode, manifest: Manifest) -> TestResultData:
@@ -335,7 +334,7 @@ class TestRunner(CompileRunner):
failures=failures,
)
def after_execute(self, result) -> None:
def after_execute(self, result):
self.print_result_line(result)
def _get_unit_test_agate_table(self, result_table, actual_or_expected: str):
@@ -394,7 +393,7 @@ class TestTask(RunTask):
__test__ = False
def raise_on_first_error(self) -> bool:
def raise_on_first_error(self):
return False
def get_node_selector(self) -> TestSelector:
@@ -406,7 +405,7 @@ class TestTask(RunTask):
previous_state=self.previous_state,
)
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
def get_runner_type(self, _):
return TestRunner

View File

@@ -91,7 +91,6 @@ def run_dbt(
if profiles_dir and "--profiles-dir" not in args:
args.extend(["--profiles-dir", profiles_dir])
dbt = dbtRunner()
res = dbt.invoke(args)
# the exception is immediately raised to be caught in tests
@@ -149,7 +148,7 @@ def get_manifest(project_root) -> Optional[Manifest]:
if os.path.exists(path):
with open(path, "rb") as fp:
manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore[attr-defined]
manifest: Manifest = Manifest.from_msgpack(manifest_mp)
return manifest
else:
return None

View File

@@ -49,10 +49,7 @@ def get_latest_version(
return semver.VersionSpecifier.from_version_string(version_string)
def _get_core_msg_lines(
installed: semver.VersionSpecifier,
latest: Optional[semver.VersionSpecifier],
) -> Tuple[List[List[str]], str]:
def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]:
installed_s = installed.to_version_string(skip_matcher=True)
installed_line = ["installed", installed_s, ""]
update_info = ""
@@ -211,7 +208,7 @@ def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]:
except ImportError:
# not an adapter
continue
yield plugin_name, mod.version
yield plugin_name, mod.version # type: ignore
def _get_adapter_plugin_names() -> Iterator[str]:

View File

@@ -69,7 +69,7 @@ setup(
# These are major-version-0 packages also maintained by dbt-labs.
# Accept patches but avoid automatically updating past a set minor version range.
"dbt-extractor>=0.5.0,<=0.6",
"dbt-semantic-interfaces>=0.7.0,<0.8",
"dbt-semantic-interfaces>=0.6.11,<0.7",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.3.0,<2.0",

View File

@@ -8,7 +8,7 @@ RUN apt-get update \
build-essential=12.9 \
ca-certificates=20210119 \
git=1:2.30.2-1+deb11u2 \
libpq-dev=13.16-0+deb11u1 \
libpq-dev=13.14-0+deb11u1 \
make=4.3-4.1 \
openssh-client=1:8.4p1-5+deb11u3 \
software-properties-common=0.96.20.2-2.1 \

View File

@@ -19164,15 +19164,6 @@
"required": [
"export_as"
]
},
"unrendered_config": {
"type": "object",
"additionalProperties": {
"type": "string"
},
"propertyNames": {
"type": "string"
}
}
},
"additionalProperties": false,
@@ -20698,15 +20689,6 @@
"required": [
"export_as"
]
},
"unrendered_config": {
"type": "object",
"additionalProperties": {
"type": "string"
},
"propertyNames": {
"type": "string"
}
}
},
"additionalProperties": false,

View File

@@ -27,9 +27,9 @@ select 1 as col
"""
macros__before_and_after = """
{% macro custom_run_hook(state, target, run_started_at, invocation_id, table_name="on_run_hook") %}
{% macro custom_run_hook(state, target, run_started_at, invocation_id) %}
insert into {{ target.schema }}.{{ table_name }} (
insert into {{ target.schema }}.on_run_hook (
test_state,
target_dbname,
target_host,
@@ -355,26 +355,6 @@ snapshots:
- not_null
"""
properties__model_hooks = """
version: 2
models:
- name: hooks
config:
pre_hook: "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook: "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
properties__model_hooks_list = """
version: 2
models:
- name: hooks
config:
pre_hook:
- "{{ custom_run_hook('start', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
post_hook:
- "{{ custom_run_hook('end', target, run_started_at, invocation_id, table_name='on_model_hook') }}"
"""
seeds__example_seed_csv = """a,b,c
1,2,3
4,5,6

View File

@@ -6,7 +6,6 @@ from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt, write_file
from dbt_common.exceptions import CompilationError
from tests.functional.adapter.hooks.fixtures import (
macros__before_and_after,
models__hooked,
models__hooks,
models__hooks_configured,
@@ -14,8 +13,6 @@ from tests.functional.adapter.hooks.fixtures import (
models__hooks_kwargs,
models__post,
models__pre,
properties__model_hooks,
properties__model_hooks_list,
properties__seed_models,
properties__test_snapshot_models,
seeds__example_seed_csv,
@@ -260,27 +257,6 @@ class TestPrePostModelHooksOnSeeds(object):
assert len(res) == 1, "Expected exactly one item"
class TestPrePostModelHooksWithMacros(BaseTestPrePost):
@pytest.fixture(scope="class")
def macros(self):
return {"before-and-after.sql": macros__before_and_after}
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks, "hooks.sql": models__hooks}
def test_pre_and_post_run_hooks(self, project, dbt_profile_target):
run_dbt()
self.check_hooks("start", project, dbt_profile_target.get("host", None))
self.check_hooks("end", project, dbt_profile_target.get("host", None))
class TestPrePostModelHooksListWithMacros(TestPrePostModelHooksWithMacros):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": properties__model_hooks_list, "hooks.sql": models__hooks}
class TestHooksRefsOnSeeds:
"""
This should not succeed, and raise an explicit error

View File

@@ -193,36 +193,3 @@ class TestEnvVars:
assert not ("secret_variable" in log_output)
assert "regular_variable" in log_output
del os.environ["DBT_DEBUG"]
class TestEnvVarInCreateSchema:
"""Test that the env_var() method works in overrides of the create_schema
macro, which is called during a different phase of execution than most
macros, causing problems."""
@pytest.fixture(scope="class", autouse=True)
def setup(self):
os.environ["DBT_TEST_ENV_VAR"] = "1"
@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": """
{% macro create_schema(relation) %}
{%- call statement('create_schema') -%}
SELECT {{ env_var('DBT_TEST_ENV_VAR') }} as TEST
{% endcall %}
{% endmacro %}%
"""
}
@pytest.fixture(scope="class")
def models(self):
return {
"mymodel.sql": """
SELECT 1 as TEST -- {%- do adapter.create_schema(this) -%}
"""
}
def test_env_var_in_create_schema(self, project):
run_dbt(["run"])

View File

@@ -1,4 +1,3 @@
import os
from unittest import mock
import pytest
@@ -104,21 +103,6 @@ class TestDbtRunner:
dbt.invoke(args)
assert args == args_before
def test_directory_does_not_change(self, project, dbt: dbtRunner) -> None:
project_dir = os.getcwd() # The directory where dbt_project.yml exists.
os.chdir("../")
cmd_execution_dir = os.getcwd() # The directory where dbt command will be run
commands = ["init", "deps", "clean"]
for command in commands:
args = [command, "--project-dir", project_dir]
if command == "init":
args.append("--skip-profile-setup")
res = dbt.invoke(args)
after_dir = os.getcwd()
assert res.success is True
assert cmd_execution_dir == after_dir
class TestDbtRunnerQueryComments:
@pytest.fixture(scope="class")

View File

@@ -33,14 +33,16 @@ class TestDepsOptions(object):
assert os.path.exists("package-lock.yml")
with open("package-lock.yml") as fp:
contents = fp.read()
fivetran_package = "- package: fivetran/fivetran_utils\n version: 0.4.7"
# dbt-utils is a dep in fivetran so we can't check for a specific version or this test fails everytime a new dbt-utils version comes out
dbt_labs_package = "- package: dbt-labs/dbt_utils"
package_sha = "sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8"
assert fivetran_package in contents
assert dbt_labs_package in contents
assert package_sha in contents
assert (
contents
== """packages:
- package: fivetran/fivetran_utils
version: 0.4.7
- package: dbt-labs/dbt_utils
version: 1.2.0
sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8
"""
)
def test_deps_default(self, clean_start):
run_dbt(["deps"])
@@ -48,13 +50,16 @@ class TestDepsOptions(object):
assert os.path.exists("package-lock.yml")
with open("package-lock.yml") as fp:
contents = fp.read()
fivetran_package = "- package: fivetran/fivetran_utils\n version: 0.4.7"
# dbt-utils is a dep in fivetran so we can't check for a specific version or this test fails everytime a new dbt-utils version comes out
dbt_labs_package = "- package: dbt-labs/dbt_utils"
package_sha = "sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8"
assert fivetran_package in contents
assert dbt_labs_package in contents
assert package_sha in contents
assert (
contents
== """packages:
- package: fivetran/fivetran_utils
version: 0.4.7
- package: dbt-labs/dbt_utils
version: 1.2.0
sha1_hash: 71304bca2138cf8004070b3573a1e17183c0c1a8
"""
)
def test_deps_add(self, clean_start):
run_dbt(["deps", "--add-package", "dbt-labs/audit_helper@0.9.0"])

View File

@@ -32,7 +32,7 @@ class TestTestsConfigDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = set()
expected = {"project-test-config"}
assert expected == deprecations.active_deprecations
def test_project_tests_config_fail(self, project):
@@ -41,7 +41,7 @@ class TestTestsConfigDeprecation:
with pytest.raises(CompilationError) as exc:
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
exc_str = " ".join(str(exc.value).split()) # flatten all whitespace
expected_msg = "Configuration paths exist in your dbt_project.yml file which do not apply to any resources. There are 1 unused configuration paths: - data_tests"
expected_msg = "The `tests` config has been renamed to `data_tests`"
assert expected_msg in exc_str
@@ -62,13 +62,17 @@ class TestSchemaTestDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = set()
expected = {"project-test-config"}
assert expected == deprecations.active_deprecations
def test_generic_tests_fail(self, project):
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
with pytest.raises(CompilationError) as exc:
run_dbt(["--warn-error", "--no-partial-parse", "parse"])
exc_str = " ".join(str(exc.value).split()) # flatten all whitespace
expected_msg = "The `tests` config has been renamed to `data_tests`"
assert expected_msg in exc_str
def test_generic_data_test_parsing(self, project):
results = run_dbt(["list", "--resource-type", "test"])
@@ -94,7 +98,7 @@ class TestSourceSchemaTestDeprecation:
deprecations.reset_deprecations()
assert deprecations.active_deprecations == set()
run_dbt(["parse"])
expected = set()
expected = {"project-test-config"}
assert expected == deprecations.active_deprecations
def test_generic_data_tests(self, project):

View File

@@ -5,7 +5,6 @@ import pytest
from dbt.events.types import InvalidOptionYAML
from dbt.tests.util import get_manifest, read_file, run_dbt
from dbt_common.events import EventLevel
from dbt_common.events.functions import fire_event
my_model_sql = """
@@ -104,176 +103,37 @@ def test_invalid_event_value(project, logs_dir):
assert str(excinfo.value) == "[InvalidOptionYAML]: Unable to parse dict {'option_name': 1}"
groups_yml = """
groups:
- name: my_group
owner:
name: my_name
email: my.email@gmail.com
slack: my_slack
other_property: something_else
models:
- name: my_model
group: my_group
access: public
"""
class TestRunResultErrorNodeInfo:
class TestNodeInfo:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": "select not_found as id",
}
return {"my_model.sql": "select not_found as id"}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
# get log file
log_file = read_file(logs_dir, "dbt.log")
task_printer_events = [
"RunResultWarning",
"RunResultFailure",
"RunResultWarningMessage",
"RunResultError",
"RunResultErrorNoMessage",
"SQLCompiledPath",
"CheckNodeTestFailure",
]
count = 0
for log_line in log_file.split("\n"):
if not log_line:
# skip empty lines
if len(log_line) == 0:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
# The adapter logging also shows up, so skip non-json lines
if "[debug]" in log_line:
continue
if log_json["info"]["name"] == "RunResultError":
assert "node_info" in log_json["data"]
assert log_json["data"]["node_info"]["unique_id"] == "model.test.my_model"
assert "Database Error" in log_json["data"]["msg"]
def assert_group_data(group_data):
assert group_data["name"] == "my_group"
assert group_data["owner"] == {
"name": "my_name",
"email": "my.email@gmail.com",
"slack": "my_slack",
"other_property": "something_else",
}
class TestRunResultErrorGroup:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": "select not_found as id",
"groups.yml": groups_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "run"], expect_pass=False)
assert len(results) == 1
log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1
assert run_result_error_count == 1
class TestRunResultFailureGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"], expect_pass=False)
assert len(results) == 2
log_file = read_file(logs_dir, "dbt.log")
run_result_error_count = 0
run_result_failure_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultError":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_error_count += 1
if log_json["info"]["name"] == "RunResultFailure":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_failure_count += 1
assert run_result_error_count == 1
assert run_result_failure_count == 1
class TestRunResultWarningGroup:
@pytest.fixture(scope="class")
def models(self):
schema_yml = (
groups_yml
+ """
columns:
- name: my_column
tests:
- not_null:
config:
severity: warn
"""
)
print(schema_yml)
return {
"my_model.sql": "select 1 as id, null as my_column",
"groups.yml": schema_yml,
}
def test_node_info_on_results(self, project, logs_dir):
results = run_dbt(["--log-format=json", "build"])
assert len(results) == 2
log_file = read_file(logs_dir, "dbt.log")
run_result_warning_count = 0
for log_line in log_file.split("\n"):
if not log_line:
continue
log_json = json.loads(log_line)
if log_json["info"]["level"] == EventLevel.DEBUG:
continue
if log_json["info"]["name"] == "RunResultWarning":
assert "group" in log_json["data"]
assert_group_data(log_json["data"]["group"])
run_result_warning_count += 1
assert run_result_warning_count == 1
log_dct = json.loads(log_line)
log_data = log_dct["data"]
log_event = log_dct["info"]["name"]
if log_event in task_printer_events:
assert "node_info" in log_data
count += 1
assert count > 0

View File

@@ -1,131 +0,0 @@
import pytest
from dbt.tests.util import (
check_relations_equal,
get_relation_columns,
relation_from_name,
run_dbt,
)
seeds_base_csv = """
id,name_xxx,some_date
1,Easton,1981-05-20T06:46:51
2,Lillian,1978-09-03T18:10:33
3,Jeremiah,1982-03-11T03:59:51
4,Nolan,1976-05-06T20:21:35
5,Hannah,1982-06-23T05:41:26
6,Eleanor,1991-08-10T23:12:21
7,Lily,1971-03-29T14:58:02
8,Jonathan,1988-02-26T02:55:24
9,Adrian,1994-02-09T13:14:23
10,Nora,1976-03-01T16:51:39
""".lstrip()
seeds_added_csv = (
seeds_base_csv
+ """
11,Mateo,2014-09-07T17:04:27
12,Julian,2000-02-04T11:48:30
13,Gabriel,2001-07-10T07:32:52
14,Isaac,2002-11-24T03:22:28
15,Levi,2009-11-15T11:57:15
16,Elizabeth,2005-04-09T03:50:11
17,Grayson,2019-08-06T19:28:17
18,Dylan,2014-03-01T11:50:41
19,Jayden,2009-06-06T07:12:49
20,Luke,2003-12-05T21:42:18
""".lstrip()
)
incremental_not_schema_change_sql = """
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
select
1 || '-' || current_timestamp as user_id_current_time,
{% if is_incremental() %}
'thisis18characters' as platform
{% else %}
'okthisis20characters' as platform
{% endif %}
"""
incremental_sql = """
{{ config(materialized="incremental") }}
select * from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
"""
schema_base_yml = """
sources:
- name: raw
schema: "{{ target.schema }}"
tables:
- name: seed
identifier: "{{ var('seed_name', 'base') }}"
models:
- name: incremental
config:
contract:
enforced: true
on_schema_change: append_new_columns
columns:
- name: id
data_type: int
- name: name_xxx
data_type: character varying(10)
- name: some_date
data_type: timestamp
"""
class TestIncremental:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"name": "incremental"}
@pytest.fixture(scope="class")
def models(self):
return {"incremental.sql": incremental_sql, "schema.yml": schema_base_yml}
@pytest.fixture(scope="class")
def seeds(self):
return {"base.csv": seeds_base_csv, "added.csv": seeds_added_csv}
def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2
# base table rowcount
relation = relation_from_name(project.adapter, "base")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10
# added table rowcount
relation = relation_from_name(project.adapter, "added")
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 20
# run command
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1
# check relations equal
check_relations_equal(project.adapter, ["base", "incremental"])
# change seed_name var
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--debug", "--vars", "seed_name: added"])
assert len(results) == 1
# Error before fix: Changing col type from character varying(10) to character varying(256) in table:
# "dbt"."test<...>_test_incremental_with_contract"."incremental"
columns = get_relation_columns(project.adapter, "incremental")
# [('id', 'integer', None), ('name_xxx', 'character varying', 10), ('some_date', 'timestamp without time zone', None)]
for column in columns:
if column[0] == "name_xxx":
assert column[2] == 10

View File

@@ -11,10 +11,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Metric('txn_revenue', ['id']) }} > 1"
exports:
- name: my_export
@@ -33,10 +33,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Metric('txn_revenue', ['id']) }} > 1"
exports:
- name: my_export
@@ -54,10 +54,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -72,8 +72,8 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
where: "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "Dimension('user__ds')"
where: "{{ Dimension('user__ds', 'DAY') }} <= now()"
"""
saved_query_with_extra_config_attributes_yml = """
@@ -85,10 +85,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -108,10 +108,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:
@@ -129,10 +129,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
"""
@@ -149,10 +149,10 @@ saved_queries:
metrics:
- simple_metric
group_by:
- "Dimension('id__ds')"
- "Dimension('user__ds')"
where:
- "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
- "{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'"
- "{{ Dimension('user__ds', 'DAY') }} <= now()"
- "{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'"
exports:
- name: my_export
config:

View File

@@ -12,7 +12,7 @@ from tests.functional.semantic_models.fixtures import (
)
class TestSavedQueryBuild:
class TestSavedQueryBuildNoOp:
@pytest.fixture(scope="class")
def models(self):
return {
@@ -31,8 +31,7 @@ packages:
version: 1.1.1
"""
def test_build_saved_queries_no_op(self, project) -> None:
"""Test building saved query exports with no flag, so should be no-op."""
def test_build_saved_queries(self, project):
run_dbt(["deps"])
result = run_dbt(["build"])
assert len(result.results) == 3

View File

@@ -1,6 +1,5 @@
import os
import shutil
from copy import deepcopy
from typing import List
import pytest
@@ -37,17 +36,6 @@ class TestSavedQueryParsing:
"docs.md": saved_query_description,
}
@pytest.fixture(scope="class")
def other_schema(self, unique_schema):
return unique_schema + "_other"
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema):
outputs = {"default": dbt_profile_target, "prod": deepcopy(dbt_profile_target)}
outputs["default"]["schema"] = unique_schema
outputs["prod"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}
def copy_state(self):
if not os.path.exists("state"):
os.makedirs("state")
@@ -72,11 +60,6 @@ class TestSavedQueryParsing:
assert saved_query.exports[0].config.alias == "my_export_alias"
assert saved_query.exports[0].config.export_as == ExportDestinationType.TABLE
assert saved_query.exports[0].config.schema_name == "my_export_schema_name"
assert saved_query.exports[0].unrendered_config == {
"alias": "my_export_alias",
"export_as": "table",
"schema": "my_export_schema_name",
}
# Save state
self.copy_state()
@@ -103,86 +86,6 @@ class TestSavedQueryParsing:
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
def test_semantic_model_parsing_change_export(self, project, other_schema):
runner = dbtTestRunner()
result = runner.invoke(["parse", "--no-partial-parse"])
assert result.success
assert isinstance(result.result, Manifest)
manifest = result.result
assert len(manifest.saved_queries) == 1
saved_query = manifest.saved_queries["saved_query.test.test_saved_query"]
assert saved_query.name == "test_saved_query"
assert saved_query.exports[0].name == "my_export"
# Save state
self.copy_state()
# Nothing has changed, so no state:modified results
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 0
# Change export name
write_file(
saved_queries_yml.replace("name: my_export", "name: my_expor2"),
project.project_root,
"models",
"saved_queries.yml",
)
# State modified finds changed saved_query
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
# Change export schema
write_file(
saved_queries_yml.replace(
"schema: my_export_schema_name", "schema: my_export_schema_name2"
),
project.project_root,
"models",
"saved_queries.yml",
)
# State modified finds changed saved_query
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 1
def test_semantic_model_parsing_with_default_schema(self, project, other_schema):
write_file(
saved_queries_with_defaults_yml, project.project_root, "models", "saved_queries.yml"
)
runner = dbtTestRunner()
result = runner.invoke(["parse", "--no-partial-parse", "--target", "prod"])
assert result.success
assert isinstance(result.result, Manifest)
manifest = result.result
assert len(manifest.saved_queries) == 1
saved_query = manifest.saved_queries["saved_query.test.test_saved_query"]
assert saved_query.name == "test_saved_query"
assert len(saved_query.query_params.metrics) == 1
assert len(saved_query.query_params.group_by) == 1
assert len(saved_query.query_params.where.where_filters) == 3
assert len(saved_query.depends_on.nodes) == 1
assert saved_query.description == "My SavedQuery Description"
assert len(saved_query.exports) == 1
assert saved_query.exports[0].name == "my_export"
assert saved_query.exports[0].config.alias == "my_export_alias"
assert saved_query.exports[0].config.export_as == ExportDestinationType.TABLE
assert saved_query.exports[0].config.schema_name == other_schema
assert saved_query.exports[0].unrendered_config == {
"alias": "my_export_alias",
"export_as": "table",
}
# Save state
self.copy_state()
# Nothing has changed, so no state:modified results
results = run_dbt(
["ls", "--select", "state:modified", "--state", "./state", "--target", "prod"]
)
assert len(results) == 0
# There should also be no state:modified results when using the default schema
results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"])
assert len(results) == 0
def test_saved_query_error(self, project):
error_schema_yml = saved_queries_yml.replace("simple_metric", "metric_not_found")
write_file(error_schema_yml, project.project_root, "models", "saved_queries.yml")
@@ -222,14 +125,14 @@ class TestSavedQueryPartialParsing:
where_filter.where_sql_template
for where_filter in saved_query1.query_params.where.where_filters
} == {
"{{ TimeDimension('id__ds', 'DAY') }} <= now()",
"{{ TimeDimension('id__ds', 'DAY') }} >= '2023-01-01'",
"{{ Dimension('user__ds', 'DAY') }} <= now()",
"{{ Dimension('user__ds', 'DAY') }} >= '2023-01-01'",
}
# String filter
assert len(saved_query2.query_params.where.where_filters) == 1
assert (
saved_query2.query_params.where.where_filters[0].where_sql_template
== "{{ TimeDimension('id__ds', 'DAY') }} <= now()"
== "{{ Dimension('user__ds', 'DAY') }} <= now()"
)
def test_saved_query_metrics_changed(self, project):

View File

@@ -1,72 +0,0 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture
create_source_sql = """
create table {database}.{schema}.source_users (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
updated_time TIMESTAMP WITH TIME ZONE
);
insert into {database}.{schema}.source_users (id, first_name, last_name, email, gender, ip_address, updated_time) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30');
"""
model_users_sql = """
select * from {{ source('test_source', 'source_users') }}
"""
snapshot_sql = """
{% snapshot users_snapshot %}
select * from {{ ref('users') }}
{% endsnapshot %}
"""
source_schema_yml = """
sources:
- name: test_source
loader: custom
schema: "{{ target.schema }}"
tables:
- name: source_users
loaded_at_field: updated_time
"""
snapshot_schema_yml = """
snapshots:
- name: users_snapshot
config:
target_schema: "{{ target.schema }}"
strategy: timestamp
unique_key: id
updated_at: updated_time
"""
class TestSnapshotConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"users.sql": model_users_sql,
"source_schema.yml": source_schema_yml,
"snapshot_schema.yml": snapshot_schema_yml,
}
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": snapshot_sql}
def test_timestamp_snapshot(self, project):
project.run_sql(create_source_sql)
run_dbt(["run"])
results, log_output = run_dbt_and_capture(["snapshot"])
assert len(results) == 1
assert "Please update snapshot config" in log_output

View File

@@ -1,31 +0,0 @@
from dbt.tests.util import get_manifest, run_dbt, write_file
from tests.fixtures.jaffle_shop import JaffleShopProject
# Note: in an actual file (as opposed to a string that we write into a files)
# there would only be a single backslash.
sources_yml = """
sources:
- name: something_else
database: raw
schema: jaffle_shop
tables:
- name: "\\"/test/orders\\""
- name: customers
"""
class TestNameChars(JaffleShopProject):
def test_quotes_in_table_names(self, project):
# Write out a sources definition that includes a table name with quotes and a forward slash
# Note: forward slashes are not legal in filenames in Linux (or Windows),
# so we won't see forward slashes in model names, because they come from file names.
write_file(sources_yml, project.project_root, "models", "sources.yml")
manifest = run_dbt(["parse"])
assert len(manifest.sources) == 2
assert 'source.jaffle_shop.something_else."/test/orders"' in manifest.sources.keys()
# We've written out the manifest.json artifact, we want to ensure
# that it can be read in again (the json is valid).
# Note: the key in the json actually looks like: "source.jaffle_shop.something_else.\"/test/orders\""
new_manifest = get_manifest(project.project_root)
assert new_manifest
assert 'source.jaffle_shop.something_else."/test/orders"' in new_manifest.sources.keys()

View File

@@ -43,7 +43,7 @@ class TestProjectMethods:
def test__str__(self, project: Project):
assert (
str(project)
== "{'name': 'test_project', 'version': 1.0, 'project-root': 'doesnt/actually/exist', 'profile': 'test_profile', 'model-paths': ['models'], 'macro-paths': ['macros'], 'seed-paths': ['seeds'], 'test-paths': ['tests'], 'analysis-paths': ['analyses'], 'docs-paths': ['docs'], 'asset-paths': ['assets'], 'target-path': 'target', 'snapshot-paths': ['snapshots'], 'clean-targets': ['target'], 'log-path': 'path/to/project/logs', 'quoting': {}, 'models': {}, 'on-run-start': [], 'on-run-end': [], 'dispatch': [{'macro_namespace': 'dbt_utils', 'search_order': ['test_project', 'dbt_utils']}], 'seeds': {}, 'snapshots': {}, 'sources': {}, 'data_tests': {}, 'unit_tests': {}, 'metrics': {}, 'semantic-models': {}, 'saved-queries': {}, 'exposures': {}, 'vars': {}, 'require-dbt-version': ['=0.0.0'], 'restrict-access': False, 'dbt-cloud': {}, 'flags': {}, 'query-comment': {'comment': \"\\n{%- set comment_dict = {} -%}\\n{%- do comment_dict.update(\\n app='dbt',\\n dbt_version=dbt_version,\\n profile_name=target.get('profile_name'),\\n target_name=target.get('target_name'),\\n) -%}\\n{%- if node is not none -%}\\n {%- do comment_dict.update(\\n node_id=node.unique_id,\\n ) -%}\\n{% else %}\\n {# in the node context, the connection name is the node_id #}\\n {%- do comment_dict.update(connection_name=connection_name) -%}\\n{%- endif -%}\\n{{ return(tojson(comment_dict)) }}\\n\", 'append': False, 'job-label': False}, 'packages': []}"
== "{'name': 'test_project', 'version': 1.0, 'project-root': 'doesnt/actually/exist', 'profile': 'test_profile', 'model-paths': ['models'], 'macro-paths': ['macros'], 'seed-paths': ['seeds'], 'test-paths': ['tests'], 'analysis-paths': ['analyses'], 'docs-paths': ['docs'], 'asset-paths': ['assets'], 'target-path': 'target', 'snapshot-paths': ['snapshots'], 'clean-targets': ['target'], 'log-path': 'path/to/project/logs', 'quoting': {}, 'models': {}, 'on-run-start': [], 'on-run-end': [], 'dispatch': [{'macro_namespace': 'dbt_utils', 'search_order': ['test_project', 'dbt_utils']}], 'seeds': {}, 'snapshots': {}, 'sources': {}, 'data_tests': {}, 'unit_tests': {}, 'metrics': {}, 'semantic-models': {}, 'saved-queries': {}, 'exposures': {}, 'vars': {}, 'require-dbt-version': ['=0.0.0'], 'restrict-access': False, 'dbt-cloud': {}, 'query-comment': {'comment': \"\\n{%- set comment_dict = {} -%}\\n{%- do comment_dict.update(\\n app='dbt',\\n dbt_version=dbt_version,\\n profile_name=target.get('profile_name'),\\n target_name=target.get('target_name'),\\n) -%}\\n{%- if node is not none -%}\\n {%- do comment_dict.update(\\n node_id=node.unique_id,\\n ) -%}\\n{% else %}\\n {# in the node context, the connection name is the node_id #}\\n {%- do comment_dict.update(connection_name=connection_name) -%}\\n{%- endif -%}\\n{{ return(tojson(comment_dict)) }}\\n\", 'append': False, 'job-label': False}, 'packages': []}"
)
def test_get_selector(self, project: Project):

View File

@@ -297,29 +297,3 @@ class TestNodeSelector:
queue.get(block=False)
queue.mark_done(got.unique_id)
assert queue.empty()
def test_select_downstream_of_empty_model(self, runtime_config: RuntimeConfig):
# empty model
model_one = make_model(pkg="other", name="model_one", code="")
# non-empty model
model_two = make_model(
pkg="pkg",
name="model_two",
code="""select * from {{ref('model_one')}}""",
refs=[model_one],
)
models = [model_one, model_two]
manifest = make_manifest(nodes=models)
# Get the graph
compiler = dbt.compilation.Compiler(runtime_config)
graph = compiler.compile(manifest)
# Ensure that model_two is selected as downstream of model_one
selector = NodeSelector(graph, manifest)
spec = graph_selector.SelectionCriteria.from_single_spec("model_one+")
assert selector.get_selected(spec) == {"model.pkg.model_two"}
# Ensure that --indirect-selection empty returns the same result
spec.indirect_selection = graph_selector.IndirectSelection.Empty
assert selector.get_selected(spec) == {"model.pkg.model_two"}

View File

@@ -412,9 +412,6 @@ sample_values = [
core_types.CompiledNode(
node_name="", compiled="", is_inline=True, unique_id="model.test.my_model"
),
core_types.SnapshotTimestampWarning(
snapshot_time_data_type="DATETIME", updated_at_data_type="DATETIMEZ"
),
# W - Node testing ======================
core_types.CatchableExceptionOnRun(exc=""),
core_types.InternalErrorOnRun(build_path="", exc=""),

View File

@@ -70,7 +70,6 @@ def project(selector_config: SelectorConfig) -> Project:
project_env_vars={},
restrict_access=False,
dbt_cloud={},
flags={},
)