mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
18 Commits
remove-ssl
...
add_runnin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b60e30eb27 | ||
|
|
cc46f85616 | ||
|
|
dc08df9543 | ||
|
|
fc0583cc3c | ||
|
|
f524aaea17 | ||
|
|
8c8bfddf5e | ||
|
|
18cffeed69 | ||
|
|
8828557791 | ||
|
|
8affc26182 | ||
|
|
fffef72bca | ||
|
|
024a560429 | ||
|
|
e7a670abc2 | ||
|
|
19907edca2 | ||
|
|
4199f6e877 | ||
|
|
6c1fbf3e57 | ||
|
|
d4c3331f64 | ||
|
|
96aae6f79a | ||
|
|
ecef2c5457 |
@@ -194,6 +194,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
|
||||
unrendered_config: Dict[str, Any] = field(default_factory=dict)
|
||||
created_at: float = field(default_factory=lambda: time.time())
|
||||
config_call_dict: Dict[str, Any] = field(default_factory=dict)
|
||||
_event_status: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def write_node(self, target_path: str, subdirectory: str, payload: str):
|
||||
if (os.path.basename(self.path) ==
|
||||
@@ -223,6 +224,8 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
|
||||
def __post_serialize__(self, dct):
|
||||
if 'config_call_dict' in dct:
|
||||
del dct['config_call_dict']
|
||||
if '_event_status' in dct:
|
||||
del dct['_event_status']
|
||||
return dct
|
||||
|
||||
@classmethod
|
||||
@@ -626,6 +629,12 @@ class ParsedSourceDefinition(
|
||||
unrendered_config: Dict[str, Any] = field(default_factory=dict)
|
||||
relation_name: Optional[str] = None
|
||||
created_at: float = field(default_factory=lambda: time.time())
|
||||
_event_status: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_serialize__(self, dct):
|
||||
if '_event_status' in dct:
|
||||
del dct['_event_status']
|
||||
return dct
|
||||
|
||||
def same_database_representation(
|
||||
self, other: 'ParsedSourceDefinition'
|
||||
|
||||
@@ -58,6 +58,12 @@ class collect_timing_info:
|
||||
fire_event(TimingInfoCollected())
|
||||
|
||||
|
||||
class RunningStatus(StrEnum):
|
||||
Started = 'started'
|
||||
Compiling = 'compiling'
|
||||
Executing = 'executing'
|
||||
|
||||
|
||||
class NodeStatus(StrEnum):
|
||||
Success = "success"
|
||||
Error = "error"
|
||||
|
||||
@@ -4,7 +4,6 @@ from datetime import datetime
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||||
# These base types define the _required structure_ for the concrete event #
|
||||
# types defined in types.py #
|
||||
@@ -37,6 +36,19 @@ class ErrorLevel():
|
||||
return "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Node():
|
||||
node_path: str
|
||||
node_name: str
|
||||
unique_id: str
|
||||
resource_type: str
|
||||
materialized: str
|
||||
node_status: str
|
||||
node_started_at: datetime
|
||||
node_finished_at: datetime
|
||||
type: str = 'node_status'
|
||||
|
||||
|
||||
@dataclass
|
||||
class ShowException():
|
||||
# N.B.:
|
||||
@@ -57,6 +69,7 @@ class Event(metaclass=ABCMeta):
|
||||
log_version: int = 1
|
||||
ts: Optional[datetime] = None # use getter for non-optional
|
||||
pid: Optional[int] = None # use getter for non-optional
|
||||
node_info: Optional[Node]
|
||||
|
||||
# four digit string code that uniquely identifies this type of event
|
||||
# uniqueness and valid characters are enforced by tests
|
||||
@@ -93,6 +106,27 @@ class Event(metaclass=ABCMeta):
|
||||
from dbt.events.functions import get_invocation_id
|
||||
return get_invocation_id()
|
||||
|
||||
def get_node_info(self):
|
||||
return None
|
||||
|
||||
|
||||
@dataclass # type: ignore
|
||||
class NodeInfo(Event, metaclass=ABCMeta):
|
||||
report_node_data: Any # Union[ParsedModelNode, ...] TODO: resolve circular imports
|
||||
|
||||
def get_node_info(self):
|
||||
node_info = Node(
|
||||
node_path=self.report_node_data.path,
|
||||
node_name=self.report_node_data.name,
|
||||
unique_id=self.report_node_data.unique_id,
|
||||
resource_type=self.report_node_data.resource_type,
|
||||
materialized=self.report_node_data.config.materialized,
|
||||
node_status=self.report_node_data._event_status.get('node_status'),
|
||||
node_started_at=self.report_node_data._event_status.get("started_at"),
|
||||
node_finished_at=self.report_node_data._event_status.get("finished_at")
|
||||
)
|
||||
return vars(node_info) # TODO: can just return node_info after #4326 is merged in
|
||||
|
||||
|
||||
class File(Event, metaclass=ABCMeta):
|
||||
# Solely the human readable message. Timestamps and formatting will be added by the logger.
|
||||
|
||||
@@ -146,7 +146,8 @@ def scrub_dict_secrets(values: Dict) -> Dict:
|
||||
# the usage site.
|
||||
def event_to_dict(e: T_Event, msg_fn: Callable[[T_Event], str]) -> dict:
|
||||
level = e.level_tag()
|
||||
return {
|
||||
event_dict = {
|
||||
'type': 'log_line',
|
||||
'log_version': e.log_version,
|
||||
'ts': e.get_ts(),
|
||||
'pid': e.get_pid(),
|
||||
@@ -154,9 +155,14 @@ def event_to_dict(e: T_Event, msg_fn: Callable[[T_Event], str]) -> dict:
|
||||
'level': level,
|
||||
'data': Optional[Dict[str, Any]],
|
||||
'event_data_serialized': True,
|
||||
'invocation_id': e.get_invocation_id()
|
||||
'invocation_id': e.get_invocation_id(),
|
||||
'node_info': e.get_node_info() # TODO: update to just `node_info` and more calling this to
|
||||
# `event_to_serializable_dict` after #4326 gets merged in.
|
||||
# also remove refrence to node_info in Event class
|
||||
}
|
||||
|
||||
return event_dict
|
||||
|
||||
|
||||
# translates an Event to a completely formatted text-based log line
|
||||
# you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg())
|
||||
@@ -181,6 +187,7 @@ def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
||||
in e.__dataclass_fields__.items() # type: ignore[attr-defined]
|
||||
if type(y._field_type) == _FIELD_BASE
|
||||
}
|
||||
values['data']['type'] = 'dataclass_attr'
|
||||
else:
|
||||
values['data'] = None
|
||||
|
||||
@@ -282,6 +289,8 @@ def send_exc_to_logger(
|
||||
# to files, etc.)
|
||||
def fire_event(e: Event) -> None:
|
||||
# TODO manage history in phase 2: EVENT_HISTORY.append(e)
|
||||
# if e.code == "Q012":
|
||||
# breakpoint()
|
||||
|
||||
# backwards compatibility for plugins that require old logger (dbt-rpc)
|
||||
if flags.ENABLE_LEGACY_LOGGER:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import (
|
||||
Any,
|
||||
List,
|
||||
NamedTuple,
|
||||
Optional,
|
||||
)
|
||||
@@ -42,3 +43,18 @@ class BaseRelation:
|
||||
|
||||
class InformationSchema(BaseRelation):
|
||||
information_schema_view: Optional[str]
|
||||
|
||||
|
||||
class CompiledNode():
|
||||
compiled_sql: Optional[str]
|
||||
extra_ctes_injected: bool
|
||||
extra_ctes: List[Any]
|
||||
relation_name: Optional[str]
|
||||
|
||||
|
||||
class CompiledModelNode(CompiledNode):
|
||||
resource_type: Any
|
||||
|
||||
|
||||
class ParsedModelNode():
|
||||
resource_type: Any
|
||||
|
||||
0
core/dbt/events/trash.py
Normal file
0
core/dbt/events/trash.py
Normal file
@@ -1,9 +1,15 @@
|
||||
import argparse
|
||||
from dataclasses import dataclass
|
||||
from dbt.events.stubs import _CachedRelation, AdapterResponse, BaseRelation, _ReferenceKey
|
||||
from dbt.events.stubs import (
|
||||
_CachedRelation,
|
||||
AdapterResponse,
|
||||
BaseRelation,
|
||||
ParsedModelNode,
|
||||
_ReferenceKey
|
||||
)
|
||||
from dbt import ui
|
||||
from dbt.events.base_types import (
|
||||
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException
|
||||
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo
|
||||
)
|
||||
from dbt.events.format import format_fancy_output_line, pluralize
|
||||
from dbt.node_types import NodeType
|
||||
@@ -1784,23 +1790,30 @@ class EndOfRunSummary(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintStartLine(InfoLevel, Cli, File):
|
||||
class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
|
||||
description: str
|
||||
index: int
|
||||
total: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Z031"
|
||||
|
||||
def message(self) -> str:
|
||||
msg = f"START {self.description}"
|
||||
return format_fancy_output_line(msg=msg, status='RUN', index=self.index, total=self.total)
|
||||
return format_fancy_output_line(
|
||||
msg=msg,
|
||||
status='RUN',
|
||||
index=self.index,
|
||||
total=self.total
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintHookStartLine(InfoLevel, Cli, File):
|
||||
class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
|
||||
statement: str
|
||||
index: int
|
||||
total: int
|
||||
truncate: bool
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Z032"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1813,13 +1826,14 @@ class PrintHookStartLine(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintHookEndLine(InfoLevel, Cli, File):
|
||||
class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
|
||||
statement: str
|
||||
status: str
|
||||
index: int
|
||||
total: int
|
||||
execution_time: int
|
||||
truncate: bool
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q007"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1833,12 +1847,13 @@ class PrintHookEndLine(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class SkippingDetails(InfoLevel, Cli, File):
|
||||
class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
|
||||
resource_type: str
|
||||
schema: str
|
||||
node_name: str
|
||||
index: int
|
||||
total: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Z033"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1853,11 +1868,12 @@ class SkippingDetails(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintErrorTestResult(ErrorLevel, Cli, File):
|
||||
class PrintErrorTestResult(ErrorLevel, Cli, File, NodeInfo):
|
||||
name: str
|
||||
index: int
|
||||
num_models: int
|
||||
execution_time: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q008"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1871,11 +1887,12 @@ class PrintErrorTestResult(ErrorLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintPassTestResult(InfoLevel, Cli, File):
|
||||
class PrintPassTestResult(InfoLevel, Cli, File, NodeInfo):
|
||||
name: str
|
||||
index: int
|
||||
num_models: int
|
||||
execution_time: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q009"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1889,12 +1906,13 @@ class PrintPassTestResult(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintWarnTestResult(WarnLevel, Cli, File):
|
||||
class PrintWarnTestResult(WarnLevel, Cli, File, NodeInfo):
|
||||
name: str
|
||||
index: int
|
||||
num_models: int
|
||||
execution_time: int
|
||||
failures: List[str]
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q010"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1908,12 +1926,13 @@ class PrintWarnTestResult(WarnLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintFailureTestResult(ErrorLevel, Cli, File):
|
||||
class PrintFailureTestResult(ErrorLevel, Cli, File, NodeInfo):
|
||||
name: str
|
||||
index: int
|
||||
num_models: int
|
||||
execution_time: int
|
||||
failures: List[str]
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q011"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1943,12 +1962,13 @@ class PrintSkipBecauseError(ErrorLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintModelErrorResultLine(ErrorLevel, Cli, File):
|
||||
class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
|
||||
description: str
|
||||
status: str
|
||||
index: int
|
||||
total: int
|
||||
execution_time: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Z035"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -1962,12 +1982,13 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrintModelResultLine(InfoLevel, Cli, File):
|
||||
class PrintModelResultLine(InfoLevel, Cli, File, NodeInfo):
|
||||
description: str
|
||||
status: str
|
||||
index: int
|
||||
total: int
|
||||
execution_time: int
|
||||
report_node_data: Any # TODO: be explicit
|
||||
code: str = "Q012"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -2159,8 +2180,9 @@ class DefaultSelector(InfoLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeStart(DebugLevel, Cli, File):
|
||||
class NodeStart(DebugLevel, Cli, File, NodeInfo):
|
||||
unique_id: str
|
||||
report_node_data: ParsedModelNode
|
||||
code: str = "Q023"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -2168,8 +2190,10 @@ class NodeStart(DebugLevel, Cli, File):
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeFinished(DebugLevel, Cli, File):
|
||||
class NodeFinished(DebugLevel, Cli, File, NodeInfo):
|
||||
unique_id: str
|
||||
report_node_data: ParsedModelNode
|
||||
# TODO: possibly pass entire RunResult
|
||||
code: str = "Q024"
|
||||
|
||||
def message(self) -> str:
|
||||
@@ -2197,6 +2221,26 @@ class ConcurrencyLine(InfoLevel, Cli, File):
|
||||
return self.concurrency_line
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeCompiling(DebugLevel, Cli, File, NodeInfo):
|
||||
unique_id: str
|
||||
report_node_data: ParsedModelNode
|
||||
code: str = "Q030"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Began compiling node {self.unique_id}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeExecuting(DebugLevel, Cli, File, NodeInfo):
|
||||
unique_id: str
|
||||
report_node_data: ParsedModelNode
|
||||
code: str = "Q031"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Began executing node {self.unique_id}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class StarterProjectPath(DebugLevel, Cli, File):
|
||||
dir: str
|
||||
@@ -2628,7 +2672,7 @@ if 1 == 0:
|
||||
FirstRunResultError(msg='')
|
||||
AfterFirstRunResultError(msg='')
|
||||
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False)
|
||||
PrintStartLine(description='', index=0, total=0)
|
||||
PrintStartLine(description='', index=0, total=0, report_node_data='')
|
||||
PrintHookStartLine(statement='', index=0, total=0, truncate=False)
|
||||
PrintHookEndLine(statement='', status='', index=0, total=0, execution_time=0, truncate=False)
|
||||
SkippingDetails(resource_type='', schema='', node_name='', index=0, total=0)
|
||||
@@ -2654,10 +2698,12 @@ if 1 == 0:
|
||||
PrintHookEndPassLine(source_name='', table_name='', index=0, total=0, execution_time=0)
|
||||
PrintCancelLine(conn_name='')
|
||||
DefaultSelector(name='')
|
||||
NodeStart(unique_id='')
|
||||
NodeFinished(unique_id='')
|
||||
NodeStart(report_node_data=ParsedModelNode(), unique_id='')
|
||||
NodeFinished(report_node_data=ParsedModelNode(), unique_id='')
|
||||
QueryCancelationUnsupported(type='')
|
||||
ConcurrencyLine(concurrency_line='')
|
||||
NodeCompiling(report_node_data=ParsedModelNode(), unique_id='')
|
||||
NodeExecuting(report_node_data=ParsedModelNode(), unique_id='')
|
||||
StarterProjectPath(dir='')
|
||||
ConfigFolderDirectory(dir='')
|
||||
NoSampleProfileFound(adapter='')
|
||||
|
||||
@@ -8,7 +8,7 @@ from dbt import tracking
|
||||
from dbt import flags
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.results import (
|
||||
NodeStatus, RunResult, collect_timing_info, RunStatus
|
||||
NodeStatus, RunResult, collect_timing_info, RunStatus, RunningStatus
|
||||
)
|
||||
from dbt.exceptions import (
|
||||
NotImplementedException, CompilationException, RuntimeException,
|
||||
@@ -20,7 +20,8 @@ from dbt.events.types import (
|
||||
DbtProjectError, DbtProjectErrorException, DbtProfileError, DbtProfileErrorException,
|
||||
ProfileListTitle, ListSingleProfile, NoDefinedProfiles, ProfileHelpMessage,
|
||||
CatchableExceptionOnRun, InternalExceptionOnRun, GenericExceptionOnRun,
|
||||
NodeConnectionReleaseError, PrintDebugStackTrace, SkippingDetails, PrintSkipBecauseError
|
||||
NodeConnectionReleaseError, PrintDebugStackTrace, SkippingDetails, PrintSkipBecauseError,
|
||||
NodeCompiling, NodeExecuting
|
||||
)
|
||||
from .printer import print_run_result_error
|
||||
|
||||
@@ -279,6 +280,13 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
def compile_and_execute(self, manifest, ctx):
|
||||
result = None
|
||||
with self.adapter.connection_for(self.node):
|
||||
ctx.node._event_status['node_status'] = str(RunningStatus.Compiling)
|
||||
fire_event(
|
||||
NodeCompiling(
|
||||
report_node_data=ctx.node,
|
||||
unique_id=ctx.node.unique_id,
|
||||
)
|
||||
)
|
||||
with collect_timing_info('compile') as timing_info:
|
||||
# if we fail here, we still have a compiled node to return
|
||||
# this has the benefit of showing a build path for the errant
|
||||
@@ -288,6 +296,13 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
|
||||
# for ephemeral nodes, we only want to compile, not run
|
||||
if not ctx.node.is_ephemeral_model:
|
||||
ctx.node._event_status['node_status'] = str(RunningStatus.Executing)
|
||||
fire_event(
|
||||
NodeExecuting(
|
||||
report_node_data=ctx.node,
|
||||
unique_id=ctx.node.unique_id,
|
||||
)
|
||||
)
|
||||
with collect_timing_info('execute') as timing_info:
|
||||
result = self.run(ctx.node, manifest)
|
||||
ctx.node = result.node
|
||||
@@ -425,7 +440,8 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
schema=schema_name,
|
||||
node_name=node_name,
|
||||
index=self.node_index,
|
||||
total=self.num_nodes
|
||||
total=self.num_nodes,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -40,7 +40,8 @@ class FreshnessRunner(BaseRunner):
|
||||
PrintStartLine(
|
||||
description=description,
|
||||
index=self.node_index,
|
||||
total=self.num_nodes
|
||||
total=self.num_nodes,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -177,7 +177,8 @@ class ModelRunner(CompileRunner):
|
||||
PrintStartLine(
|
||||
description=self.describe_node(),
|
||||
index=self.node_index,
|
||||
total=self.num_nodes
|
||||
total=self.num_nodes,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
@@ -190,7 +191,8 @@ class ModelRunner(CompileRunner):
|
||||
status=result.status,
|
||||
index=self.node_index,
|
||||
total=self.num_nodes,
|
||||
execution_time=result.execution_time
|
||||
execution_time=result.execution_time,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -200,7 +202,8 @@ class ModelRunner(CompileRunner):
|
||||
status=result.message,
|
||||
index=self.node_index,
|
||||
total=self.num_nodes,
|
||||
execution_time=result.execution_time
|
||||
execution_time=result.execution_time,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
@@ -352,7 +355,8 @@ class RunTask(CompileTask):
|
||||
statement=hook_text,
|
||||
index=idx,
|
||||
total=num_hooks,
|
||||
truncate=True
|
||||
truncate=True,
|
||||
report_node_data=hook
|
||||
)
|
||||
)
|
||||
|
||||
@@ -372,7 +376,8 @@ class RunTask(CompileTask):
|
||||
index=idx,
|
||||
total=num_hooks,
|
||||
execution_time=timer.elapsed,
|
||||
truncate=True
|
||||
truncate=True,
|
||||
report_node_data=hook
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ from dbt.events.types import (
|
||||
from dbt.contracts.graph.compiled import CompileResultNode
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.graph.parsed import ParsedSourceDefinition
|
||||
from dbt.contracts.results import NodeStatus, RunExecutionResult
|
||||
from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus
|
||||
from dbt.contracts.state import PreviousState
|
||||
from dbt.exceptions import (
|
||||
InternalException,
|
||||
@@ -206,17 +206,35 @@ class GraphRunnableTask(ManifestTask):
|
||||
with RUNNING_STATE, uid_context:
|
||||
startctx = TimestampNamed('node_started_at')
|
||||
index = self.index_offset(runner.node_index)
|
||||
runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
|
||||
runner.node._event_status['node_status'] = str(RunningStatus.Started)
|
||||
extended_metadata = ModelMetadata(runner.node, index)
|
||||
|
||||
with startctx, extended_metadata:
|
||||
fire_event(NodeStart(unique_id=runner.node.unique_id))
|
||||
fire_event(
|
||||
NodeStart(
|
||||
report_node_data=runner.node,
|
||||
unique_id=runner.node.unique_id,
|
||||
)
|
||||
)
|
||||
status: Dict[str, str]
|
||||
try:
|
||||
result = runner.run_with_hooks(self.manifest)
|
||||
status = runner.get_result_status(result)
|
||||
runner.node._event_status['node_status'] = str(result.status)
|
||||
runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
|
||||
finally:
|
||||
finishctx = TimestampNamed('node_finished_at')
|
||||
finishctx = TimestampNamed('finished_at')
|
||||
with finishctx, DbtModelState(status):
|
||||
fire_event(NodeFinished(unique_id=runner.node.unique_id))
|
||||
fire_event(
|
||||
NodeFinished(
|
||||
report_node_data=runner.node,
|
||||
unique_id=runner.node.unique_id,
|
||||
)
|
||||
)
|
||||
del runner.node._event_status["started_at"]
|
||||
del runner.node._event_status["finished_at"]
|
||||
del runner.node._event_status["node_status"]
|
||||
|
||||
fail_fast = flags.FAIL_FAST
|
||||
|
||||
@@ -278,6 +296,7 @@ class GraphRunnableTask(ManifestTask):
|
||||
runner = self.get_runner(node)
|
||||
# we finally know what we're running! Make sure we haven't decided
|
||||
# to skip it due to upstream failures
|
||||
# breakpoint()
|
||||
if runner.node.unique_id in self._skipped_children:
|
||||
cause = self._skipped_children.pop(runner.node.unique_id)
|
||||
runner.do_skip(cause=cause)
|
||||
|
||||
@@ -27,7 +27,8 @@ class SeedRunner(ModelRunner):
|
||||
PrintStartLine(
|
||||
description=self.describe_node(),
|
||||
index=self.node_index,
|
||||
total=self.num_nodes
|
||||
total=self.num_nodes,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -74,7 +74,8 @@ class TestRunner(CompileRunner):
|
||||
name=model.name,
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time
|
||||
execution_time=result.execution_time,
|
||||
report_node_data=model
|
||||
)
|
||||
)
|
||||
elif result.status == TestStatus.Pass:
|
||||
@@ -83,7 +84,8 @@ class TestRunner(CompileRunner):
|
||||
name=model.name,
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time
|
||||
execution_time=result.execution_time,
|
||||
report_node_data=model
|
||||
)
|
||||
)
|
||||
elif result.status == TestStatus.Warn:
|
||||
@@ -93,7 +95,8 @@ class TestRunner(CompileRunner):
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time,
|
||||
failures=result.failures
|
||||
failures=result.failures,
|
||||
report_node_data=model
|
||||
)
|
||||
)
|
||||
elif result.status == TestStatus.Fail:
|
||||
@@ -103,7 +106,8 @@ class TestRunner(CompileRunner):
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time,
|
||||
failures=result.failures
|
||||
failures=result.failures,
|
||||
report_node_data=model
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -114,7 +118,8 @@ class TestRunner(CompileRunner):
|
||||
PrintStartLine(
|
||||
description=self.describe_node(),
|
||||
index=self.node_index,
|
||||
total=self.num_nodes
|
||||
total=self.num_nodes,
|
||||
report_node_data=self.node
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user