mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-21 15:01:28 +00:00
Compare commits
1 Commits
adding-sem
...
jerco/fres
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e34e6fe368 |
@@ -184,6 +184,11 @@ class Time(dbtClassMixin, Mergeable):
|
|||||||
difference = timedelta(**kwargs).total_seconds()
|
difference = timedelta(**kwargs).total_seconds()
|
||||||
return actual_age > difference
|
return actual_age > difference
|
||||||
|
|
||||||
|
def human_friendly(self) -> str:
|
||||||
|
count = self.count
|
||||||
|
period = str(self.period) + ("s" if count != 1 else "")
|
||||||
|
return f"{count} {period}"
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
return self.count is not None and self.period is not None
|
return self.count is not None and self.period is not None
|
||||||
|
|
||||||
@@ -204,6 +209,21 @@ class FreshnessThreshold(dbtClassMixin, Mergeable):
|
|||||||
else:
|
else:
|
||||||
return FreshnessStatus.Pass
|
return FreshnessStatus.Pass
|
||||||
|
|
||||||
|
# human-readable message for artifacts & logs, including end-of-freshness check summary
|
||||||
|
def message(self, age: float) -> Optional[str]:
|
||||||
|
from dbt.contracts.results import FreshnessStatus
|
||||||
|
|
||||||
|
age_pretty = str(timedelta(seconds=age))
|
||||||
|
expected = None
|
||||||
|
if self.status(age) == FreshnessStatus.Error:
|
||||||
|
expected = self.error_after.human_friendly()
|
||||||
|
elif self.status(age) == FreshnessStatus.Warn:
|
||||||
|
expected = self.warn_after.human_friendly()
|
||||||
|
if expected:
|
||||||
|
return f"Last updated {age_pretty} ago. Expected no more than {expected}."
|
||||||
|
else:
|
||||||
|
return None # match the 'message' for passing tests
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
return bool(self.warn_after) or bool(self.error_after)
|
return bool(self.warn_after) or bool(self.error_after)
|
||||||
|
|
||||||
|
|||||||
@@ -1847,15 +1847,6 @@ class CompileComplete(InfoLevel, pt.CompileComplete):
|
|||||||
return "Done."
|
return "Done."
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class FreshnessCheckComplete(InfoLevel, pt.FreshnessCheckComplete):
|
|
||||||
def code(self):
|
|
||||||
return "Q003"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
return "Done."
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SeedHeader(InfoLevel, pt.SeedHeader):
|
class SeedHeader(InfoLevel, pt.SeedHeader):
|
||||||
def code(self):
|
def code(self):
|
||||||
@@ -2955,7 +2946,6 @@ if 1 == 0:
|
|||||||
|
|
||||||
RunningOperationCaughtError(exc="")
|
RunningOperationCaughtError(exc="")
|
||||||
CompileComplete()
|
CompileComplete()
|
||||||
FreshnessCheckComplete()
|
|
||||||
SeedHeader(header="")
|
SeedHeader(header="")
|
||||||
SeedHeaderSeparator(len_header=0)
|
SeedHeaderSeparator(len_header=0)
|
||||||
SQLRunnerException(exc="")
|
SQLRunnerException(exc="")
|
||||||
|
|||||||
@@ -3,9 +3,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from .base import BaseRunner
|
from .base import BaseRunner
|
||||||
from .printer import (
|
from .printer import print_run_end_messages
|
||||||
print_run_result_error,
|
|
||||||
)
|
|
||||||
from .runnable import GraphRunnableTask
|
from .runnable import GraphRunnableTask
|
||||||
|
|
||||||
from dbt.contracts.results import (
|
from dbt.contracts.results import (
|
||||||
@@ -18,7 +16,6 @@ from dbt.contracts.results import (
|
|||||||
from dbt.exceptions import RuntimeException, InternalException
|
from dbt.exceptions import RuntimeException, InternalException
|
||||||
from dbt.events.functions import fire_event, info
|
from dbt.events.functions import fire_event, info
|
||||||
from dbt.events.types import (
|
from dbt.events.types import (
|
||||||
FreshnessCheckComplete,
|
|
||||||
LogStartLine,
|
LogStartLine,
|
||||||
LogFreshnessResult,
|
LogFreshnessResult,
|
||||||
)
|
)
|
||||||
@@ -116,6 +113,7 @@ class FreshnessRunner(BaseRunner):
|
|||||||
)
|
)
|
||||||
|
|
||||||
status = compiled_node.freshness.status(freshness["age"])
|
status = compiled_node.freshness.status(freshness["age"])
|
||||||
|
message = compiled_node.freshness.message(freshness["age"])
|
||||||
|
|
||||||
return SourceFreshnessResult(
|
return SourceFreshnessResult(
|
||||||
node=compiled_node,
|
node=compiled_node,
|
||||||
@@ -123,7 +121,7 @@ class FreshnessRunner(BaseRunner):
|
|||||||
thread_id=threading.current_thread().name,
|
thread_id=threading.current_thread().name,
|
||||||
timing=[],
|
timing=[],
|
||||||
execution_time=0,
|
execution_time=0,
|
||||||
message=None,
|
message=message,
|
||||||
adapter_response={},
|
adapter_response={},
|
||||||
failures=None,
|
failures=None,
|
||||||
**freshness,
|
**freshness,
|
||||||
@@ -178,9 +176,7 @@ class FreshnessTask(GraphRunnableTask):
|
|||||||
elapsed_time=elapsed_time, generated_at=generated_at, results=results
|
elapsed_time=elapsed_time, generated_at=generated_at, results=results
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# this should match the logic in other tasks
|
||||||
def task_end_messages(self, results):
|
def task_end_messages(self, results):
|
||||||
for result in results:
|
if results:
|
||||||
if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr):
|
print_run_end_messages(results)
|
||||||
print_run_result_error(result)
|
|
||||||
|
|
||||||
fire_event(FreshnessCheckComplete())
|
|
||||||
|
|||||||
@@ -80,8 +80,11 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
|
|||||||
if newline:
|
if newline:
|
||||||
with TextOnly():
|
with TextOnly():
|
||||||
fire_event(EmptyLine())
|
fire_event(EmptyLine())
|
||||||
|
if (
|
||||||
if result.status == NodeStatus.Fail or (is_warning and result.status == NodeStatus.Warn):
|
result.status == NodeStatus.Fail
|
||||||
|
or (is_warning and result.status == NodeStatus.Warn)
|
||||||
|
or (result.node.resource_type == NodeType.Source and result.status == NodeStatus.Error)
|
||||||
|
):
|
||||||
if is_warning:
|
if is_warning:
|
||||||
fire_event(
|
fire_event(
|
||||||
RunResultWarning(
|
RunResultWarning(
|
||||||
@@ -107,12 +110,14 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
|
|||||||
else:
|
else:
|
||||||
fire_event(RunResultErrorNoMessage(status=result.status))
|
fire_event(RunResultErrorNoMessage(status=result.status))
|
||||||
|
|
||||||
if result.node.build_path is not None:
|
# SourceFreshnessResult doesn't have build_path
|
||||||
|
if hasattr(result.node, "build_path") and result.node.build_path is not None:
|
||||||
with TextOnly():
|
with TextOnly():
|
||||||
fire_event(EmptyLine())
|
fire_event(EmptyLine())
|
||||||
fire_event(SQLCompiledPath(path=result.node.compiled_path))
|
fire_event(SQLCompiledPath(path=result.node.compiled_path))
|
||||||
|
|
||||||
if result.node.should_store_failures:
|
# SourceFreshnessResult doesn't have should_store_failures
|
||||||
|
if hasattr(result.node, "should_store_failures") and result.node.should_store_failures:
|
||||||
with TextOnly():
|
with TextOnly():
|
||||||
fire_event(EmptyLine())
|
fire_event(EmptyLine())
|
||||||
fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name))
|
fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name))
|
||||||
|
|||||||
Reference in New Issue
Block a user