mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-18 23:01:27 +00:00
Compare commits
7 Commits
code-quali
...
feature/so
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
002c3c4088 | ||
|
|
1bd8aeb518 | ||
|
|
dd5377ff25 | ||
|
|
870b1d3c69 | ||
|
|
0fd1381d7f | ||
|
|
73ef35f306 | ||
|
|
fe66aba2a5 |
6
.changes/unreleased/Features-20231231-171205.yaml
Normal file
6
.changes/unreleased/Features-20231231-171205.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Added hook support for `dbt source freshness`
|
||||
time: 2023-12-31T17:12:05.587185+02:00
|
||||
custom:
|
||||
Author: ofek1weiss
|
||||
Issue: "5609"
|
||||
@@ -236,8 +236,8 @@ class Flags:
|
||||
# Add entire invocation command to flags
|
||||
object.__setattr__(self, "INVOCATION_COMMAND", "dbt " + " ".join(sys.argv[1:]))
|
||||
|
||||
# Overwrite default assignments with user config if available.
|
||||
if project_flags:
|
||||
# Overwrite default assignments with project flags if available.
|
||||
param_assigned_from_default_copy = params_assigned_from_default.copy()
|
||||
for param_assigned_from_default in params_assigned_from_default:
|
||||
project_flags_param_value = getattr(
|
||||
@@ -252,6 +252,13 @@ class Flags:
|
||||
param_assigned_from_default_copy.remove(param_assigned_from_default)
|
||||
params_assigned_from_default = param_assigned_from_default_copy
|
||||
|
||||
# Add project-level flags that are not available as CLI options / env vars
|
||||
for (
|
||||
project_level_flag_name,
|
||||
project_level_flag_value,
|
||||
) in project_flags.project_only_flags.items():
|
||||
object.__setattr__(self, project_level_flag_name.upper(), project_level_flag_value)
|
||||
|
||||
# Set hard coded flags.
|
||||
object.__setattr__(self, "WHICH", invoked_subcommand_name or ctx.info_name)
|
||||
|
||||
|
||||
@@ -307,6 +307,7 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
|
||||
populate_cache: Optional[bool] = None
|
||||
printer_width: Optional[int] = None
|
||||
send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS
|
||||
source_freshness_run_project_hooks: bool = False
|
||||
static_parser: Optional[bool] = None
|
||||
use_colors: Optional[bool] = None
|
||||
use_colors_file: Optional[bool] = None
|
||||
@@ -316,6 +317,10 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
|
||||
warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None
|
||||
write_json: Optional[bool] = None
|
||||
|
||||
@property
|
||||
def project_only_flags(self) -> Dict[str, Any]:
|
||||
return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProfileConfig(dbtClassMixin, Replaceable):
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
from typing import Optional, List
|
||||
|
||||
from .base import BaseRunner
|
||||
from .printer import (
|
||||
print_run_result_error,
|
||||
)
|
||||
from .runnable import GraphRunnableTask
|
||||
from .run import RunTask
|
||||
|
||||
from dbt.artifacts.freshness import (
|
||||
FreshnessResult,
|
||||
@@ -23,11 +23,12 @@ from dbt.events.types import (
|
||||
LogStartLine,
|
||||
LogFreshnessResult,
|
||||
)
|
||||
from dbt.node_types import NodeType
|
||||
from dbt.contracts.results import RunStatus
|
||||
from dbt.node_types import NodeType, RunHookType
|
||||
|
||||
from dbt.adapters.capability import Capability
|
||||
from dbt.adapters.contracts.connection import AdapterResponse
|
||||
from dbt.contracts.graph.nodes import SourceDefinition
|
||||
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
|
||||
from dbt_common.events.base_types import EventLevel
|
||||
from dbt.graph import ResourceTypeSelector
|
||||
|
||||
@@ -170,7 +171,7 @@ class FreshnessSelector(ResourceTypeSelector):
|
||||
return node.has_freshness
|
||||
|
||||
|
||||
class FreshnessTask(GraphRunnableTask):
|
||||
class FreshnessTask(RunTask):
|
||||
def result_path(self):
|
||||
if self.args.output:
|
||||
return os.path.realpath(self.args.output)
|
||||
@@ -200,7 +201,17 @@ class FreshnessTask(GraphRunnableTask):
|
||||
|
||||
def task_end_messages(self, results):
|
||||
for result in results:
|
||||
if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr):
|
||||
if result.status in (
|
||||
FreshnessStatus.Error,
|
||||
FreshnessStatus.RuntimeErr,
|
||||
RunStatus.Error,
|
||||
):
|
||||
print_run_result_error(result)
|
||||
|
||||
fire_event(FreshnessCheckComplete())
|
||||
|
||||
def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
|
||||
if self.args.source_freshness_run_project_hooks:
|
||||
return super().get_hooks_by_type(hook_type)
|
||||
else:
|
||||
return []
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from dbt.tests.util import run_dbt
|
||||
from dbt.tests.util import run_dbt, run_dbt_and_capture
|
||||
from tests.functional.sources.fixtures import (
|
||||
models_schema_yml,
|
||||
models_view_model_sql,
|
||||
@@ -57,10 +57,17 @@ class BaseSourcesTest:
|
||||
},
|
||||
}
|
||||
|
||||
def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
|
||||
def _extend_cmd_with_vars(self, project, cmd):
|
||||
vars_dict = {
|
||||
"test_run_schema": project.test_schema,
|
||||
"test_loaded_at": project.adapter.quote("updated_at"),
|
||||
}
|
||||
cmd.extend(["--vars", yaml.safe_dump(vars_dict)])
|
||||
|
||||
def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
|
||||
self._extend_cmd_with_vars(project, cmd)
|
||||
return run_dbt(cmd, *args, **kwargs)
|
||||
|
||||
def run_dbt_and_capture_with_vars(self, project, cmd, *args, **kwargs):
|
||||
self._extend_cmd_with_vars(project, cmd)
|
||||
return run_dbt_and_capture(cmd, *args, **kwargs)
|
||||
|
||||
@@ -400,3 +400,85 @@ class TestMetadataFreshnessFails:
|
||||
runner.invoke(["parse"])
|
||||
|
||||
assert got_warning
|
||||
|
||||
|
||||
class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"config-version": 2,
|
||||
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
|
||||
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
|
||||
"flags": {
|
||||
"source_freshness_run_project_hooks": True,
|
||||
},
|
||||
}
|
||||
|
||||
def test_hooks_do_run_for_source_freshness(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
_, log_output = self.run_dbt_and_capture_with_vars(
|
||||
project,
|
||||
[
|
||||
"source",
|
||||
"freshness",
|
||||
],
|
||||
expect_pass=False,
|
||||
)
|
||||
assert "on-run-start" in log_output
|
||||
assert "on-run-end" in log_output
|
||||
|
||||
|
||||
class TestHooksInSourceFreshnessDisabled(SuccessfulSourceFreshnessTest):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"config-version": 2,
|
||||
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
|
||||
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
|
||||
"flags": {
|
||||
"source_freshness_run_project_hooks": False,
|
||||
},
|
||||
}
|
||||
|
||||
def test_hooks_do_run_for_source_freshness(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
_, log_output = self.run_dbt_and_capture_with_vars(
|
||||
project,
|
||||
[
|
||||
"source",
|
||||
"freshness",
|
||||
],
|
||||
expect_pass=False,
|
||||
)
|
||||
assert "on-run-start" not in log_output
|
||||
assert "on-run-end" not in log_output
|
||||
|
||||
|
||||
class TestHooksInSourceFreshnessDefault(SuccessfulSourceFreshnessTest):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"config-version": 2,
|
||||
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
|
||||
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
|
||||
}
|
||||
|
||||
def test_hooks_do_run_for_source_freshness(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
_, log_output = self.run_dbt_and_capture_with_vars(
|
||||
project,
|
||||
[
|
||||
"source",
|
||||
"freshness",
|
||||
],
|
||||
expect_pass=False,
|
||||
)
|
||||
# default behaviour - no hooks run in source freshness
|
||||
assert "on-run-start" not in log_output
|
||||
assert "on-run-end" not in log_output
|
||||
|
||||
@@ -366,6 +366,14 @@ class TestFlags:
|
||||
|
||||
assert flags_a.USE_COLORS == flags_b.USE_COLORS
|
||||
|
||||
def test_set_project_only_flags(self, project_flags, run_context):
|
||||
flags = Flags(run_context, project_flags)
|
||||
|
||||
for project_only_flag, project_only_flag_value in project_flags.project_only_flags.items():
|
||||
assert getattr(flags, project_only_flag) == project_only_flag_value
|
||||
# sanity check: ensure project_only_flag is not part of the click context
|
||||
assert project_only_flag not in run_context.params
|
||||
|
||||
def _create_flags_from_dict(self, cmd, d):
|
||||
write_file("", "profiles.yml")
|
||||
result = Flags.from_dict(cmd, d)
|
||||
|
||||
@@ -16,6 +16,7 @@ import dbt.parser.manifest
|
||||
from dbt import tracking
|
||||
from dbt.contracts.files import SourceFile, FileHash, FilePath
|
||||
from dbt.contracts.graph.manifest import MacroManifest, ManifestStateCheck
|
||||
from dbt.contracts.project import ProjectFlags
|
||||
from dbt.graph import NodeSelector, parse_difference
|
||||
from dbt.events.logging import setup_event_logger
|
||||
from dbt.mp_context import get_mp_context
|
||||
@@ -130,7 +131,7 @@ class GraphTest(unittest.TestCase):
|
||||
cfg.update(extra_cfg)
|
||||
|
||||
config = config_from_parts_or_dicts(project=cfg, profile=self.profile)
|
||||
dbt.flags.set_from_args(Namespace(), config)
|
||||
dbt.flags.set_from_args(Namespace(), ProjectFlags())
|
||||
setup_event_logger(dbt.flags.get_flags())
|
||||
object.__setattr__(dbt.flags.get_flags(), "PARTIAL_PARSE", False)
|
||||
return config
|
||||
|
||||
Reference in New Issue
Block a user