Compare commits

...

18 Commits

Author SHA1 Message Date
Gerda Shank
b60e30eb27 add _event_status to SourceDefinition 2021-11-29 15:29:32 -05:00
Gerda Shank
cc46f85616 Add NodeCompiling and NodeExecuting events, switch to _event_status dict 2021-11-29 15:09:23 -05:00
Gerda Shank
dc08df9543 Add RunningStatus and set in node 2021-11-29 15:06:56 -05:00
Emily Rockman
fc0583cc3c adding node_status logging to more events 2021-11-29 13:45:21 -06:00
Emily Rockman
f524aaea17 add start/end tiemstamps 2021-11-29 11:34:24 -06:00
Emily Rockman
8c8bfddf5e formatting 2021-11-29 08:30:38 -06:00
Emily Rockman
18cffeed69 working state with lots of todos 2021-11-29 08:30:38 -06:00
Emily Rockman
8828557791 saving broken state 2021-11-29 08:30:14 -06:00
Emily Rockman
8affc26182 added in ststus, organized a bit 2021-11-29 08:30:14 -06:00
Emily Rockman
fffef72bca more tweaks to basic implementation 2021-11-29 08:30:14 -06:00
Emily Rockman
024a560429 convert to classes 2021-11-29 08:30:14 -06:00
Emily Rockman
e7a670abc2 fixed failures 2021-11-29 08:29:53 -06:00
Emily Rockman
19907edca2 another pass at node info 2021-11-29 08:27:24 -06:00
Emily Rockman
4199f6e877 add node details to one more 2021-11-29 08:27:00 -06:00
Emily Rockman
6c1fbf3e57 first pass with node_status logging 2021-11-29 08:27:00 -06:00
Emily Rockman
d4c3331f64 WIP 2021-11-29 08:26:32 -06:00
Emily Rockman
96aae6f79a fixed some merg issues 2021-11-29 08:26:32 -06:00
Emily Rockman
ecef2c5457 WIP 2021-11-29 08:26:32 -06:00
13 changed files with 207 additions and 40 deletions

View File

@@ -194,6 +194,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
_event_status: Dict[str, Any] = field(default_factory=dict)
def write_node(self, target_path: str, subdirectory: str, payload: str):
if (os.path.basename(self.path) ==
@@ -223,6 +224,8 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
def __post_serialize__(self, dct):
if 'config_call_dict' in dct:
del dct['config_call_dict']
if '_event_status' in dct:
del dct['_event_status']
return dct
@classmethod
@@ -626,6 +629,12 @@ class ParsedSourceDefinition(
unrendered_config: Dict[str, Any] = field(default_factory=dict)
relation_name: Optional[str] = None
created_at: float = field(default_factory=lambda: time.time())
_event_status: Dict[str, Any] = field(default_factory=dict)
def __post_serialize__(self, dct):
if '_event_status' in dct:
del dct['_event_status']
return dct
def same_database_representation(
self, other: 'ParsedSourceDefinition'

View File

@@ -58,6 +58,12 @@ class collect_timing_info:
fire_event(TimingInfoCollected())
class RunningStatus(StrEnum):
Started = 'started'
Compiling = 'compiling'
Executing = 'executing'
class NodeStatus(StrEnum):
Success = "success"
Error = "error"

View File

@@ -4,7 +4,6 @@ from datetime import datetime
import os
from typing import Any, Optional
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# These base types define the _required structure_ for the concrete event #
# types defined in types.py #
@@ -37,6 +36,19 @@ class ErrorLevel():
return "error"
@dataclass
class Node():
node_path: str
node_name: str
unique_id: str
resource_type: str
materialized: str
node_status: str
node_started_at: datetime
node_finished_at: datetime
type: str = 'node_status'
@dataclass
class ShowException():
# N.B.:
@@ -57,6 +69,7 @@ class Event(metaclass=ABCMeta):
log_version: int = 1
ts: Optional[datetime] = None # use getter for non-optional
pid: Optional[int] = None # use getter for non-optional
node_info: Optional[Node]
# four digit string code that uniquely identifies this type of event
# uniqueness and valid characters are enforced by tests
@@ -93,6 +106,27 @@ class Event(metaclass=ABCMeta):
from dbt.events.functions import get_invocation_id
return get_invocation_id()
def get_node_info(self):
return None
@dataclass # type: ignore
class NodeInfo(Event, metaclass=ABCMeta):
report_node_data: Any # Union[ParsedModelNode, ...] TODO: resolve circular imports
def get_node_info(self):
node_info = Node(
node_path=self.report_node_data.path,
node_name=self.report_node_data.name,
unique_id=self.report_node_data.unique_id,
resource_type=self.report_node_data.resource_type,
materialized=self.report_node_data.config.materialized,
node_status=self.report_node_data._event_status.get('node_status'),
node_started_at=self.report_node_data._event_status.get("started_at"),
node_finished_at=self.report_node_data._event_status.get("finished_at")
)
return vars(node_info) # TODO: can just return node_info after #4326 is merged in
class File(Event, metaclass=ABCMeta):
# Solely the human readable message. Timestamps and formatting will be added by the logger.

View File

@@ -146,7 +146,8 @@ def scrub_dict_secrets(values: Dict) -> Dict:
# the usage site.
def event_to_dict(e: T_Event, msg_fn: Callable[[T_Event], str]) -> dict:
level = e.level_tag()
return {
event_dict = {
'type': 'log_line',
'log_version': e.log_version,
'ts': e.get_ts(),
'pid': e.get_pid(),
@@ -154,9 +155,14 @@ def event_to_dict(e: T_Event, msg_fn: Callable[[T_Event], str]) -> dict:
'level': level,
'data': Optional[Dict[str, Any]],
'event_data_serialized': True,
'invocation_id': e.get_invocation_id()
'invocation_id': e.get_invocation_id(),
'node_info': e.get_node_info() # TODO: update to just `node_info` and more calling this to
# `event_to_serializable_dict` after #4326 gets merged in.
# also remove refrence to node_info in Event class
}
return event_dict
# translates an Event to a completely formatted text-based log line
# you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg())
@@ -181,6 +187,7 @@ def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
in e.__dataclass_fields__.items() # type: ignore[attr-defined]
if type(y._field_type) == _FIELD_BASE
}
values['data']['type'] = 'dataclass_attr'
else:
values['data'] = None
@@ -282,6 +289,8 @@ def send_exc_to_logger(
# to files, etc.)
def fire_event(e: Event) -> None:
# TODO manage history in phase 2: EVENT_HISTORY.append(e)
# if e.code == "Q012":
# breakpoint()
# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:

View File

@@ -1,5 +1,6 @@
from typing import (
Any,
List,
NamedTuple,
Optional,
)
@@ -42,3 +43,18 @@ class BaseRelation:
class InformationSchema(BaseRelation):
information_schema_view: Optional[str]
class CompiledNode():
compiled_sql: Optional[str]
extra_ctes_injected: bool
extra_ctes: List[Any]
relation_name: Optional[str]
class CompiledModelNode(CompiledNode):
resource_type: Any
class ParsedModelNode():
resource_type: Any

0
core/dbt/events/trash.py Normal file
View File

View File

@@ -1,9 +1,15 @@
import argparse
from dataclasses import dataclass
from dbt.events.stubs import _CachedRelation, AdapterResponse, BaseRelation, _ReferenceKey
from dbt.events.stubs import (
_CachedRelation,
AdapterResponse,
BaseRelation,
ParsedModelNode,
_ReferenceKey
)
from dbt import ui
from dbt.events.base_types import (
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo
)
from dbt.events.format import format_fancy_output_line, pluralize
from dbt.node_types import NodeType
@@ -1784,23 +1790,30 @@ class EndOfRunSummary(InfoLevel, Cli, File):
@dataclass
class PrintStartLine(InfoLevel, Cli, File):
class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
description: str
index: int
total: int
report_node_data: Any # TODO: be explicit
code: str = "Z031"
def message(self) -> str:
msg = f"START {self.description}"
return format_fancy_output_line(msg=msg, status='RUN', index=self.index, total=self.total)
return format_fancy_output_line(
msg=msg,
status='RUN',
index=self.index,
total=self.total
)
@dataclass
class PrintHookStartLine(InfoLevel, Cli, File):
class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
statement: str
index: int
total: int
truncate: bool
report_node_data: Any # TODO: be explicit
code: str = "Z032"
def message(self) -> str:
@@ -1813,13 +1826,14 @@ class PrintHookStartLine(InfoLevel, Cli, File):
@dataclass
class PrintHookEndLine(InfoLevel, Cli, File):
class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
statement: str
status: str
index: int
total: int
execution_time: int
truncate: bool
report_node_data: Any # TODO: be explicit
code: str = "Q007"
def message(self) -> str:
@@ -1833,12 +1847,13 @@ class PrintHookEndLine(InfoLevel, Cli, File):
@dataclass
class SkippingDetails(InfoLevel, Cli, File):
class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
resource_type: str
schema: str
node_name: str
index: int
total: int
report_node_data: Any # TODO: be explicit
code: str = "Z033"
def message(self) -> str:
@@ -1853,11 +1868,12 @@ class SkippingDetails(InfoLevel, Cli, File):
@dataclass
class PrintErrorTestResult(ErrorLevel, Cli, File):
class PrintErrorTestResult(ErrorLevel, Cli, File, NodeInfo):
name: str
index: int
num_models: int
execution_time: int
report_node_data: Any # TODO: be explicit
code: str = "Q008"
def message(self) -> str:
@@ -1871,11 +1887,12 @@ class PrintErrorTestResult(ErrorLevel, Cli, File):
@dataclass
class PrintPassTestResult(InfoLevel, Cli, File):
class PrintPassTestResult(InfoLevel, Cli, File, NodeInfo):
name: str
index: int
num_models: int
execution_time: int
report_node_data: Any # TODO: be explicit
code: str = "Q009"
def message(self) -> str:
@@ -1889,12 +1906,13 @@ class PrintPassTestResult(InfoLevel, Cli, File):
@dataclass
class PrintWarnTestResult(WarnLevel, Cli, File):
class PrintWarnTestResult(WarnLevel, Cli, File, NodeInfo):
name: str
index: int
num_models: int
execution_time: int
failures: List[str]
report_node_data: Any # TODO: be explicit
code: str = "Q010"
def message(self) -> str:
@@ -1908,12 +1926,13 @@ class PrintWarnTestResult(WarnLevel, Cli, File):
@dataclass
class PrintFailureTestResult(ErrorLevel, Cli, File):
class PrintFailureTestResult(ErrorLevel, Cli, File, NodeInfo):
name: str
index: int
num_models: int
execution_time: int
failures: List[str]
report_node_data: Any # TODO: be explicit
code: str = "Q011"
def message(self) -> str:
@@ -1943,12 +1962,13 @@ class PrintSkipBecauseError(ErrorLevel, Cli, File):
@dataclass
class PrintModelErrorResultLine(ErrorLevel, Cli, File):
class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
description: str
status: str
index: int
total: int
execution_time: int
report_node_data: Any # TODO: be explicit
code: str = "Z035"
def message(self) -> str:
@@ -1962,12 +1982,13 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File):
@dataclass
class PrintModelResultLine(InfoLevel, Cli, File):
class PrintModelResultLine(InfoLevel, Cli, File, NodeInfo):
description: str
status: str
index: int
total: int
execution_time: int
report_node_data: Any # TODO: be explicit
code: str = "Q012"
def message(self) -> str:
@@ -2159,8 +2180,9 @@ class DefaultSelector(InfoLevel, Cli, File):
@dataclass
class NodeStart(DebugLevel, Cli, File):
class NodeStart(DebugLevel, Cli, File, NodeInfo):
unique_id: str
report_node_data: ParsedModelNode
code: str = "Q023"
def message(self) -> str:
@@ -2168,8 +2190,10 @@ class NodeStart(DebugLevel, Cli, File):
@dataclass
class NodeFinished(DebugLevel, Cli, File):
class NodeFinished(DebugLevel, Cli, File, NodeInfo):
unique_id: str
report_node_data: ParsedModelNode
# TODO: possibly pass entire RunResult
code: str = "Q024"
def message(self) -> str:
@@ -2197,6 +2221,26 @@ class ConcurrencyLine(InfoLevel, Cli, File):
return self.concurrency_line
@dataclass
class NodeCompiling(DebugLevel, Cli, File, NodeInfo):
unique_id: str
report_node_data: ParsedModelNode
code: str = "Q030"
def message(self) -> str:
return f"Began compiling node {self.unique_id}"
@dataclass
class NodeExecuting(DebugLevel, Cli, File, NodeInfo):
unique_id: str
report_node_data: ParsedModelNode
code: str = "Q031"
def message(self) -> str:
return f"Began executing node {self.unique_id}"
@dataclass
class StarterProjectPath(DebugLevel, Cli, File):
dir: str
@@ -2628,7 +2672,7 @@ if 1 == 0:
FirstRunResultError(msg='')
AfterFirstRunResultError(msg='')
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False)
PrintStartLine(description='', index=0, total=0)
PrintStartLine(description='', index=0, total=0, report_node_data='')
PrintHookStartLine(statement='', index=0, total=0, truncate=False)
PrintHookEndLine(statement='', status='', index=0, total=0, execution_time=0, truncate=False)
SkippingDetails(resource_type='', schema='', node_name='', index=0, total=0)
@@ -2654,10 +2698,12 @@ if 1 == 0:
PrintHookEndPassLine(source_name='', table_name='', index=0, total=0, execution_time=0)
PrintCancelLine(conn_name='')
DefaultSelector(name='')
NodeStart(unique_id='')
NodeFinished(unique_id='')
NodeStart(report_node_data=ParsedModelNode(), unique_id='')
NodeFinished(report_node_data=ParsedModelNode(), unique_id='')
QueryCancelationUnsupported(type='')
ConcurrencyLine(concurrency_line='')
NodeCompiling(report_node_data=ParsedModelNode(), unique_id='')
NodeExecuting(report_node_data=ParsedModelNode(), unique_id='')
StarterProjectPath(dir='')
ConfigFolderDirectory(dir='')
NoSampleProfileFound(adapter='')

View File

@@ -8,7 +8,7 @@ from dbt import tracking
from dbt import flags
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import (
NodeStatus, RunResult, collect_timing_info, RunStatus
NodeStatus, RunResult, collect_timing_info, RunStatus, RunningStatus
)
from dbt.exceptions import (
NotImplementedException, CompilationException, RuntimeException,
@@ -20,7 +20,8 @@ from dbt.events.types import (
DbtProjectError, DbtProjectErrorException, DbtProfileError, DbtProfileErrorException,
ProfileListTitle, ListSingleProfile, NoDefinedProfiles, ProfileHelpMessage,
CatchableExceptionOnRun, InternalExceptionOnRun, GenericExceptionOnRun,
NodeConnectionReleaseError, PrintDebugStackTrace, SkippingDetails, PrintSkipBecauseError
NodeConnectionReleaseError, PrintDebugStackTrace, SkippingDetails, PrintSkipBecauseError,
NodeCompiling, NodeExecuting
)
from .printer import print_run_result_error
@@ -279,6 +280,13 @@ class BaseRunner(metaclass=ABCMeta):
def compile_and_execute(self, manifest, ctx):
result = None
with self.adapter.connection_for(self.node):
ctx.node._event_status['node_status'] = str(RunningStatus.Compiling)
fire_event(
NodeCompiling(
report_node_data=ctx.node,
unique_id=ctx.node.unique_id,
)
)
with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
@@ -288,6 +296,13 @@ class BaseRunner(metaclass=ABCMeta):
# for ephemeral nodes, we only want to compile, not run
if not ctx.node.is_ephemeral_model:
ctx.node._event_status['node_status'] = str(RunningStatus.Executing)
fire_event(
NodeExecuting(
report_node_data=ctx.node,
unique_id=ctx.node.unique_id,
)
)
with collect_timing_info('execute') as timing_info:
result = self.run(ctx.node, manifest)
ctx.node = result.node
@@ -425,7 +440,8 @@ class BaseRunner(metaclass=ABCMeta):
schema=schema_name,
node_name=node_name,
index=self.node_index,
total=self.num_nodes
total=self.num_nodes,
report_node_data=self.node
)
)

View File

@@ -40,7 +40,8 @@ class FreshnessRunner(BaseRunner):
PrintStartLine(
description=description,
index=self.node_index,
total=self.num_nodes
total=self.num_nodes,
report_node_data=self.node
)
)

View File

@@ -177,7 +177,8 @@ class ModelRunner(CompileRunner):
PrintStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes
total=self.num_nodes,
report_node_data=self.node
)
)
@@ -190,7 +191,8 @@ class ModelRunner(CompileRunner):
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time
execution_time=result.execution_time,
report_node_data=self.node
)
)
else:
@@ -200,7 +202,8 @@ class ModelRunner(CompileRunner):
status=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time
execution_time=result.execution_time,
report_node_data=self.node
)
)
@@ -352,7 +355,8 @@ class RunTask(CompileTask):
statement=hook_text,
index=idx,
total=num_hooks,
truncate=True
truncate=True,
report_node_data=hook
)
)
@@ -372,7 +376,8 @@ class RunTask(CompileTask):
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
truncate=True
truncate=True,
report_node_data=hook
)
)

View File

@@ -34,7 +34,7 @@ from dbt.events.types import (
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.results import NodeStatus, RunExecutionResult
from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus
from dbt.contracts.state import PreviousState
from dbt.exceptions import (
InternalException,
@@ -206,17 +206,35 @@ class GraphRunnableTask(ManifestTask):
with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at')
index = self.index_offset(runner.node_index)
runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['node_status'] = str(RunningStatus.Started)
extended_metadata = ModelMetadata(runner.node, index)
with startctx, extended_metadata:
fire_event(NodeStart(unique_id=runner.node.unique_id))
fire_event(
NodeStart(
report_node_data=runner.node,
unique_id=runner.node.unique_id,
)
)
status: Dict[str, str]
try:
result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result)
runner.node._event_status['node_status'] = str(result.status)
runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
finally:
finishctx = TimestampNamed('node_finished_at')
finishctx = TimestampNamed('finished_at')
with finishctx, DbtModelState(status):
fire_event(NodeFinished(unique_id=runner.node.unique_id))
fire_event(
NodeFinished(
report_node_data=runner.node,
unique_id=runner.node.unique_id,
)
)
del runner.node._event_status["started_at"]
del runner.node._event_status["finished_at"]
del runner.node._event_status["node_status"]
fail_fast = flags.FAIL_FAST
@@ -278,6 +296,7 @@ class GraphRunnableTask(ManifestTask):
runner = self.get_runner(node)
# we finally know what we're running! Make sure we haven't decided
# to skip it due to upstream failures
# breakpoint()
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)

View File

@@ -27,7 +27,8 @@ class SeedRunner(ModelRunner):
PrintStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes
total=self.num_nodes,
report_node_data=self.node
)
)

View File

@@ -74,7 +74,8 @@ class TestRunner(CompileRunner):
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time
execution_time=result.execution_time,
report_node_data=model
)
)
elif result.status == TestStatus.Pass:
@@ -83,7 +84,8 @@ class TestRunner(CompileRunner):
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time
execution_time=result.execution_time,
report_node_data=model
)
)
elif result.status == TestStatus.Warn:
@@ -93,7 +95,8 @@ class TestRunner(CompileRunner):
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
failures=result.failures
failures=result.failures,
report_node_data=model
)
)
elif result.status == TestStatus.Fail:
@@ -103,7 +106,8 @@ class TestRunner(CompileRunner):
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
failures=result.failures
failures=result.failures,
report_node_data=model
)
)
else:
@@ -114,7 +118,8 @@ class TestRunner(CompileRunner):
PrintStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes
total=self.num_nodes,
report_node_data=self.node
)
)