Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
e34e6fe368 Make freshness end-of-task summary like the others 2022-11-15 15:19:56 +01:00
4 changed files with 35 additions and 24 deletions

View File

@@ -184,6 +184,11 @@ class Time(dbtClassMixin, Mergeable):
difference = timedelta(**kwargs).total_seconds()
return actual_age > difference
def human_friendly(self) -> str:
count = self.count
period = str(self.period) + ("s" if count != 1 else "")
return f"{count} {period}"
def __bool__(self):
return self.count is not None and self.period is not None
@@ -204,6 +209,21 @@ class FreshnessThreshold(dbtClassMixin, Mergeable):
else:
return FreshnessStatus.Pass
# human-readable message for artifacts & logs, including end-of-freshness check summary
def message(self, age: float) -> Optional[str]:
from dbt.contracts.results import FreshnessStatus
age_pretty = str(timedelta(seconds=age))
expected = None
if self.status(age) == FreshnessStatus.Error:
expected = self.error_after.human_friendly()
elif self.status(age) == FreshnessStatus.Warn:
expected = self.warn_after.human_friendly()
if expected:
return f"Last updated {age_pretty} ago. Expected no more than {expected}."
else:
return None # match the 'message' for passing tests
def __bool__(self):
return bool(self.warn_after) or bool(self.error_after)

View File

@@ -1847,15 +1847,6 @@ class CompileComplete(InfoLevel, pt.CompileComplete):
return "Done."
@dataclass
class FreshnessCheckComplete(InfoLevel, pt.FreshnessCheckComplete):
def code(self):
return "Q003"
def message(self) -> str:
return "Done."
@dataclass
class SeedHeader(InfoLevel, pt.SeedHeader):
def code(self):
@@ -2955,7 +2946,6 @@ if 1 == 0:
RunningOperationCaughtError(exc="")
CompileComplete()
FreshnessCheckComplete()
SeedHeader(header="")
SeedHeaderSeparator(len_header=0)
SQLRunnerException(exc="")

View File

@@ -3,9 +3,7 @@ import threading
import time
from .base import BaseRunner
from .printer import (
print_run_result_error,
)
from .printer import print_run_end_messages
from .runnable import GraphRunnableTask
from dbt.contracts.results import (
@@ -18,7 +16,6 @@ from dbt.contracts.results import (
from dbt.exceptions import RuntimeException, InternalException
from dbt.events.functions import fire_event, info
from dbt.events.types import (
FreshnessCheckComplete,
LogStartLine,
LogFreshnessResult,
)
@@ -116,6 +113,7 @@ class FreshnessRunner(BaseRunner):
)
status = compiled_node.freshness.status(freshness["age"])
message = compiled_node.freshness.message(freshness["age"])
return SourceFreshnessResult(
node=compiled_node,
@@ -123,7 +121,7 @@ class FreshnessRunner(BaseRunner):
thread_id=threading.current_thread().name,
timing=[],
execution_time=0,
message=None,
message=message,
adapter_response={},
failures=None,
**freshness,
@@ -178,9 +176,7 @@ class FreshnessTask(GraphRunnableTask):
elapsed_time=elapsed_time, generated_at=generated_at, results=results
)
# this should match the logic in other tasks
def task_end_messages(self, results):
for result in results:
if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr):
print_run_result_error(result)
fire_event(FreshnessCheckComplete())
if results:
print_run_end_messages(results)

View File

@@ -80,8 +80,11 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
if newline:
with TextOnly():
fire_event(EmptyLine())
if result.status == NodeStatus.Fail or (is_warning and result.status == NodeStatus.Warn):
if (
result.status == NodeStatus.Fail
or (is_warning and result.status == NodeStatus.Warn)
or (result.node.resource_type == NodeType.Source and result.status == NodeStatus.Error)
):
if is_warning:
fire_event(
RunResultWarning(
@@ -107,12 +110,14 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
else:
fire_event(RunResultErrorNoMessage(status=result.status))
if result.node.build_path is not None:
# SourceFreshnessResult doesn't have build_path
if hasattr(result.node, "build_path") and result.node.build_path is not None:
with TextOnly():
fire_event(EmptyLine())
fire_event(SQLCompiledPath(path=result.node.compiled_path))
if result.node.should_store_failures:
# SourceFreshnessResult doesn't have should_store_failures
if hasattr(result.node, "should_store_failures") and result.node.should_store_failures:
with TextOnly():
fire_event(EmptyLine())
fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name))