Compare commits

...

3 Commits

Author SHA1 Message Date
Gerda Shank
023e3e16f6 Write out parsed_manifest.json when writing out partial parsing manifest 2022-10-25 16:11:29 -04:00
Gerda Shank
1570108b8d Create a minimal model node message 2022-10-23 08:40:40 -05:00
Gerda Shank
7c18ebf0b1 Combine various print result log events with different levels 2022-10-18 16:10:40 -05:00
20 changed files with 359 additions and 725 deletions

View File

@@ -0,0 +1,23 @@
syntax = "proto3";
package proto_nodes;
message DependsOn {
repeated string macros = 1;
repeated string nodes = 2;
}
message ModelNode {
string resource_type = 1;
string unique_id = 2;
string name = 3;
string package_name = 4;
string alias = 5;
DependsOn depends_on = 6;
string database = 7;
string schema = 8;
string language = 9;
string original_file_path = 10;
repeated string tags = 11;
string description = 12;
}

View File

@@ -353,6 +353,24 @@ class ParsedHookNode(ParsedNode):
class ParsedModelNode(ParsedNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Model]})
def to_msg(self):
from dbt.contracts.graph.proto_nodes import ModelNode
node = ModelNode(
name=self.name,
resource_type=str(self.resource_type),
unique_id=self.unique_id,
package_name=self.package_name,
alias=self.alias,
# depends_on=...
database=self.database,
schema=self.schema,
language=self.language,
tags=self.tags,
description=self.description,
)
return node
# TODO: rm?
@dataclass

View File

@@ -0,0 +1,29 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: nodes.proto
# plugin: python-betterproto
from dataclasses import dataclass
from typing import List
import betterproto
@dataclass
class DependsOn(betterproto.Message):
macros: List[str] = betterproto.string_field(1)
nodes: List[str] = betterproto.string_field(2)
@dataclass
class ModelNode(betterproto.Message):
resource_type: str = betterproto.string_field(1)
unique_id: str = betterproto.string_field(2)
name: str = betterproto.string_field(3)
package_name: str = betterproto.string_field(4)
alias: str = betterproto.string_field(5)
depends_on: "DependsOn" = betterproto.message_field(6)
database: str = betterproto.string_field(7)
schema: str = betterproto.string_field(8)
language: str = betterproto.string_field(9)
original_file_path: str = betterproto.string_field(10)
tags: List[str] = betterproto.string_field(11)
description: str = betterproto.string_field(12)

View File

@@ -49,7 +49,8 @@ class BaseEvent:
def __post_init__(self):
super().__post_init__()
self.info.level = self.level_tag()
if not self.info.level:
self.info.level = self.level_tag()
if not hasattr(self.info, "msg") or not self.info.msg:
self.info.msg = self.message()
self.info.invocation_id = get_invocation_id()

View File

@@ -2,6 +2,7 @@ import betterproto
from colorama import Style
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
from dbt.events.types import EventBufferFull, MainReportVersion, EmptyLine
from dbt.events.proto_types import EventInfo
import dbt.flags as flags
from dbt.constants import SECRET_ENV_PREFIX, METADATA_ENV_PREFIX
@@ -325,3 +326,11 @@ def add_to_event_history(event):
def reset_event_history():
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE)
# Currently used to set the level in EventInfo, so logging events can
# provide more than one "level". Might be used in the future to set
# more fields in EventInfo, once some of that information is no longer global
def info(level="info"):
info = EventInfo(level=level)
return info

View File

@@ -52,7 +52,6 @@ class NodeInfo(betterproto.Message):
class RunResultMsg(betterproto.Message):
"""RunResult"""
# status: Union[RunStatus, TestStatus, FreshnessStatus]
status: str = betterproto.string_field(1)
message: str = betterproto.string_field(2)
timing_info: List["TimingInfoMsg"] = betterproto.message_field(3)
@@ -1357,57 +1356,21 @@ class SQLRunnerException(betterproto.Message):
@dataclass
class PrintErrorTestResult(betterproto.Message):
class LogTestResult(betterproto.Message):
"""Q007"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
num_models: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
num_failures: int = betterproto.int32_field(8)
@dataclass
class PrintPassTestResult(betterproto.Message):
"""Q008"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
@dataclass
class PrintWarnTestResult(betterproto.Message):
"""Q009"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
num_failures: int = betterproto.int32_field(7)
@dataclass
class PrintFailureTestResult(betterproto.Message):
"""Q010"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
name: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
num_models: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
num_failures: int = betterproto.int32_field(7)
@dataclass
class PrintStartLine(betterproto.Message):
class LogStartLine(betterproto.Message):
"""Q011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1418,7 +1381,7 @@ class PrintStartLine(betterproto.Message):
@dataclass
class PrintModelResultLine(betterproto.Message):
class LogModelResult(betterproto.Message):
"""Q012"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1427,40 +1390,11 @@ class PrintModelResultLine(betterproto.Message):
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
execution_time: int = betterproto.int32_field(7)
@dataclass
class PrintModelErrorResultLine(betterproto.Message):
"""Q013"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
description: str = betterproto.string_field(3)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
@dataclass
class PrintSnapshotErrorResultLine(betterproto.Message):
"""Q014"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
description: str = betterproto.string_field(3)
status: str = betterproto.string_field(4)
index: int = betterproto.int32_field(5)
total: int = betterproto.int32_field(6)
execution_time: float = betterproto.float_field(7)
cfg: Dict[str, str] = betterproto.map_field(
8, betterproto.TYPE_STRING, betterproto.TYPE_STRING
)
@dataclass
class PrintSnapshotResultLine(betterproto.Message):
class LogSnapshotResult(betterproto.Message):
"""Q015"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1476,7 +1410,7 @@ class PrintSnapshotResultLine(betterproto.Message):
@dataclass
class PrintSeedErrorResultLine(betterproto.Message):
class LogSeedResult(betterproto.Message):
"""Q016"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1490,73 +1424,21 @@ class PrintSeedErrorResultLine(betterproto.Message):
@dataclass
class PrintSeedResultLine(betterproto.Message):
"""Q017"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
status: str = betterproto.string_field(3)
index: int = betterproto.int32_field(4)
total: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
schema: str = betterproto.string_field(7)
relation: str = betterproto.string_field(8)
@dataclass
class PrintFreshnessErrorLine(betterproto.Message):
class LogFreshnessResult(betterproto.Message):
"""Q018"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
status: str = betterproto.string_field(2)
node_info: "NodeInfo" = betterproto.message_field(3)
index: int = betterproto.int32_field(4)
total: int = betterproto.int32_field(5)
execution_time: float = betterproto.float_field(6)
source_name: str = betterproto.string_field(7)
table_name: str = betterproto.string_field(8)
@dataclass
class PrintFreshnessErrorStaleLine(betterproto.Message):
"""Q019"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintFreshnessWarnLine(betterproto.Message):
"""Q020"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintFreshnessPassLine(betterproto.Message):
"""Q021"""
info: "EventInfo" = betterproto.message_field(1)
node_info: "NodeInfo" = betterproto.message_field(2)
index: int = betterproto.int32_field(3)
total: int = betterproto.int32_field(4)
execution_time: float = betterproto.float_field(5)
source_name: str = betterproto.string_field(6)
table_name: str = betterproto.string_field(7)
@dataclass
class PrintCancelLine(betterproto.Message):
class LogCancelLine(betterproto.Message):
"""Q022"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1642,7 +1524,7 @@ class NodeExecuting(betterproto.Message):
@dataclass
class PrintHookStartLine(betterproto.Message):
class LogHookStartLine(betterproto.Message):
"""Q032"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1653,7 +1535,7 @@ class PrintHookStartLine(betterproto.Message):
@dataclass
class PrintHookEndLine(betterproto.Message):
class LogHookEndLine(betterproto.Message):
"""Q033"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1824,7 +1706,7 @@ class TimingInfoCollected(betterproto.Message):
@dataclass
class PrintDebugStackTrace(betterproto.Message):
class LogDebugStackTrace(betterproto.Message):
"""Z011"""
info: "EventInfo" = betterproto.message_field(1)
@@ -1991,7 +1873,7 @@ class EndOfRunSummary(betterproto.Message):
@dataclass
class PrintSkipBecauseError(betterproto.Message):
class LogSkipBecauseError(betterproto.Message):
"""Z034"""
info: "EventInfo" = betterproto.message_field(1)

View File

@@ -38,7 +38,6 @@ message NodeInfo {
// RunResult
message RunResultMsg {
// status: Union[RunStatus, TestStatus, FreshnessStatus]
string status = 1;
string message = 2;
repeated TimingInfoMsg timing_info = 3;
@@ -1030,49 +1029,23 @@ message SQLRunnerException {
}
// Q007
message PrintErrorTestResult {
message LogTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
string status = 4;
int32 index = 5;
int32 num_models = 6;
float execution_time = 7;
int32 num_failures = 8;
}
// Q008
message PrintPassTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
}
// Q009
message PrintWarnTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
}
// Skipped Q008, Q009, Q010
// Q010
message PrintFailureTestResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string name = 3;
int32 index = 4;
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
}
// Q011
message PrintStartLine {
message LogStartLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
@@ -1081,41 +1054,20 @@ message PrintStartLine {
}
// Q012
message PrintModelResultLine {
message LogModelResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
int32 execution_time = 7;
}
// Q013
message PrintModelErrorResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
}
// Q014
message PrintSnapshotErrorResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
string status = 4;
int32 index = 5;
int32 total = 6;
float execution_time = 7;
map<string, string> cfg = 8;
}
// skipped Q013, Q014
// Q015
message PrintSnapshotResultLine {
message LogSnapshotResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string description = 3;
@@ -1127,7 +1079,7 @@ message PrintSnapshotResultLine {
}
// Q016
message PrintSeedErrorResultLine {
message LogSeedResult {
EventInfo info = 1;
NodeInfo node_info = 2;
string status = 3;
@@ -1138,64 +1090,26 @@ message PrintSeedErrorResultLine {
string relation = 8;
}
// Q017
message PrintSeedResultLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string status = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string schema = 7;
string relation = 8;
}
// Skipped Q017
// Q018
message PrintFreshnessErrorLine {
message LogFreshnessResult {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
string status = 2;
NodeInfo node_info = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string source_name = 7;
string table_name = 8;
}
// Q019
message PrintFreshnessErrorStaleLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Q020
message PrintFreshnessWarnLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Skipped Q019, Q020, Q021
// Q021
message PrintFreshnessPassLine {
EventInfo info = 1;
NodeInfo node_info = 2;
int32 index = 3;
int32 total = 4;
float execution_time = 5;
string source_name = 6;
string table_name = 7;
}
// Q022
message PrintCancelLine {
message LogCancelLine {
EventInfo info = 1;
string conn_name = 2;
}
@@ -1261,7 +1175,7 @@ message NodeExecuting {
}
// Q032
message PrintHookStartLine {
message LogHookStartLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string statement = 3;
@@ -1270,7 +1184,7 @@ message PrintHookStartLine {
}
// Q033
message PrintHookEndLine {
message LogHookEndLine {
EventInfo info = 1;
NodeInfo node_info = 2;
string statement = 3;
@@ -1411,7 +1325,7 @@ message TimingInfoCollected {
}
// Z011
message PrintDebugStackTrace {
message LogDebugStackTrace {
EventInfo info = 1;
string exc_info = 2;
}
@@ -1538,7 +1452,7 @@ message EndOfRunSummary {
// Skipped Z031, Z032, Z033
// Z034
message PrintSkipBecauseError {
message LogSkipBecauseError {
EventInfo info = 1;
string schema = 2;
string relation = 3;

View File

@@ -1562,75 +1562,43 @@ class SQLRunnerException(DebugLevel, pt.SQLRunnerException): # noqa
@dataclass
@dataclass
class PrintErrorTestResult(ErrorLevel, pt.PrintErrorTestResult):
class LogTestResult(ErrorLevel, pt.LogTestResult):
def code(self):
return "Q007"
def message(self) -> str:
info = "ERROR"
if self.status == "error":
info = "ERROR"
status = ui.red(info)
elif self.status == "pass":
info = "PASS"
status = ui.green(info)
elif self.status == "warn":
info = f"WARN {self.num_failures}"
status = ui.yellow(info)
else: # self.status == "fail":
info = f"FAIL {self.num_failures}"
status = ui.red(info)
msg = f"{info} {self.name}"
return format_fancy_output_line(
msg=msg,
status=ui.red(info),
status=status,
index=self.index,
total=self.num_models,
execution_time=self.execution_time,
)
@dataclass
class PrintPassTestResult(InfoLevel, pt.PrintPassTestResult):
def code(self):
return "Q008"
def message(self) -> str:
info = "PASS"
msg = f"{info} {self.name}"
return format_fancy_output_line(
msg=msg,
status=ui.green(info),
index=self.index,
total=self.num_models,
execution_time=self.execution_time,
)
@classmethod
def status_to_level(cls, status):
if status == "fail":
return "error"
else:
return status
@dataclass
class PrintWarnTestResult(WarnLevel, pt.PrintWarnTestResult):
def code(self):
return "Q009"
def message(self) -> str:
info = f"WARN {self.num_failures}"
msg = f"{info} {self.name}"
return format_fancy_output_line(
msg=msg,
status=ui.yellow(info),
index=self.index,
total=self.num_models,
execution_time=self.execution_time,
)
@dataclass
class PrintFailureTestResult(ErrorLevel, pt.PrintFailureTestResult):
def code(self):
return "Q010"
def message(self) -> str:
info = f"FAIL {self.num_failures}"
msg = f"{info} {self.name}"
return format_fancy_output_line(
msg=msg,
status=ui.red(info),
index=self.index,
total=self.num_models,
execution_time=self.execution_time,
)
@dataclass
class PrintStartLine(InfoLevel, pt.PrintStartLine): # noqa
class LogStartLine(InfoLevel, pt.LogStartLine): # noqa
def code(self):
return "Q011"
@@ -1640,67 +1608,48 @@ class PrintStartLine(InfoLevel, pt.PrintStartLine): # noqa
@dataclass
class PrintModelResultLine(InfoLevel, pt.PrintModelResultLine):
class LogModelResult(InfoLevel, pt.LogModelResult):
def code(self):
return "Q012"
def message(self) -> str:
info = "OK created"
if self.status == "error":
info = "ERROR creating"
status = ui.red(self.status.upper())
else:
info = "OK created"
status = ui.green(self.status)
msg = f"{info} {self.description}"
return format_fancy_output_line(
msg=msg,
status=ui.green(self.status),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintModelErrorResultLine(ErrorLevel, pt.PrintModelErrorResultLine):
def code(self):
return "Q013"
def message(self) -> str:
info = "ERROR creating"
msg = f"{info} {self.description}"
return format_fancy_output_line(
msg=msg,
status=ui.red(self.status.upper()),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
# Skipped Q013, Q014
@dataclass
class PrintSnapshotErrorResultLine(ErrorLevel, pt.PrintSnapshotErrorResultLine):
def code(self):
return "Q014"
def message(self) -> str:
info = "ERROR snapshotting"
msg = "{info} {description}".format(info=info, description=self.description, **self.cfg)
return format_fancy_output_line(
msg=msg,
status=ui.red(self.status.upper()),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintSnapshotResultLine(InfoLevel, pt.PrintSnapshotResultLine):
class LogSnapshotResult(ErrorLevel, pt.LogSnapshotResult):
def code(self):
return "Q015"
def message(self) -> str:
info = "OK snapshotted"
if self.status == "error":
info = "ERROR snapshotting"
status = ui.red(self.status.upper())
else:
info = "OK snapshotted"
status = ui.green(self.status)
msg = "{info} {description}".format(info=info, description=self.description, **self.cfg)
return format_fancy_output_line(
msg=msg,
status=ui.green(self.status),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
@@ -1708,109 +1657,69 @@ class PrintSnapshotResultLine(InfoLevel, pt.PrintSnapshotResultLine):
@dataclass
class PrintSeedErrorResultLine(ErrorLevel, pt.PrintSeedErrorResultLine):
class LogSeedResult(InfoLevel, pt.LogSeedResult):
def code(self):
return "Q016"
def message(self) -> str:
info = "ERROR loading"
if self.status == "error":
info = "ERROR loading"
status = ui.red(self.status.upper())
else:
info = "OK loaded"
status = ui.green(self.status)
msg = f"{info} seed file {self.schema}.{self.relation}"
return format_fancy_output_line(
msg=msg,
status=ui.red(self.status.upper()),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintSeedResultLine(InfoLevel, pt.PrintSeedResultLine):
def code(self):
return "Q017"
def message(self) -> str:
info = "OK loaded"
msg = f"{info} seed file {self.schema}.{self.relation}"
return format_fancy_output_line(
msg=msg,
status=ui.green(self.status),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
# Skipped Q017
@dataclass
class PrintFreshnessErrorLine(ErrorLevel, pt.PrintFreshnessErrorLine):
class LogFreshnessResult(InfoLevel, pt.LogFreshnessResult):
def code(self):
return "Q018"
def message(self) -> str:
info = "ERROR"
if self.status == "runtimeerr":
info = "ERROR"
status = ui.red(info)
elif self.status == "error":
info = "ERROR STALE"
status = ui.red(info)
elif self.status == "warn":
info = "WARN"
status = ui.yellow(info)
else:
info = "PASS"
status = ui.green(info)
msg = f"{info} freshness of {self.source_name}.{self.table_name}"
return format_fancy_output_line(
msg=msg,
status=ui.red(info),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintFreshnessErrorStaleLine(ErrorLevel, pt.PrintFreshnessErrorStaleLine):
def code(self):
return "Q019"
def message(self) -> str:
info = "ERROR STALE"
msg = f"{info} freshness of {self.source_name}.{self.table_name}"
return format_fancy_output_line(
msg=msg,
status=ui.red(info),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@classmethod
def status_to_level(cls, status):
if status == "runtimeerr":
return "error"
elif status == "pass":
return "info"
else: # warn, error
return status
@dataclass
class PrintFreshnessWarnLine(WarnLevel, pt.PrintFreshnessWarnLine):
def code(self):
return "Q020"
def message(self) -> str:
info = "WARN"
msg = f"{info} freshness of {self.source_name}.{self.table_name}"
return format_fancy_output_line(
msg=msg,
status=ui.yellow(info),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintFreshnessPassLine(InfoLevel, pt.PrintFreshnessPassLine):
def code(self):
return "Q021"
def message(self) -> str:
info = "PASS"
msg = f"{info} freshness of {self.source_name}.{self.table_name}"
return format_fancy_output_line(
msg=msg,
status=ui.green(info),
index=self.index,
total=self.total,
execution_time=self.execution_time,
)
@dataclass
class PrintCancelLine(ErrorLevel, pt.PrintCancelLine):
class LogCancelLine(ErrorLevel, pt.LogCancelLine):
def code(self):
return "Q022"
@@ -1906,7 +1815,7 @@ class NodeExecuting(DebugLevel, pt.NodeExecuting):
@dataclass
class PrintHookStartLine(InfoLevel, pt.PrintHookStartLine): # noqa
class LogHookStartLine(InfoLevel, pt.LogHookStartLine): # noqa
def code(self):
return "Q032"
@@ -1918,7 +1827,7 @@ class PrintHookStartLine(InfoLevel, pt.PrintHookStartLine): # noqa
@dataclass
class PrintHookEndLine(InfoLevel, pt.PrintHookEndLine): # noqa
class LogHookEndLine(InfoLevel, pt.LogHookEndLine): # noqa
def code(self):
return "Q033"
@@ -2135,7 +2044,7 @@ class TimingInfoCollected(DebugLevel, pt.TimingInfoCollected):
# This prints the stack trace at the debug level while allowing just the nice exception message
# at the error level - or whatever other level chosen. Used in multiple places.
@dataclass
class PrintDebugStackTrace(DebugLevel, pt.PrintDebugStackTrace): # noqa
class LogDebugStackTrace(DebugLevel, pt.LogDebugStackTrace): # noqa
def code(self):
return "Z011"
@@ -2339,7 +2248,7 @@ class EndOfRunSummary(InfoLevel, pt.EndOfRunSummary):
@dataclass
class PrintSkipBecauseError(ErrorLevel, pt.PrintSkipBecauseError):
class LogSkipBecauseError(ErrorLevel, pt.LogSkipBecauseError):
def code(self):
return "Z034"
@@ -2676,48 +2585,22 @@ if 1 == 0:
SeedHeader(header="")
SeedHeaderSeparator(len_header=0)
SQLRunnerException(exc="")
PrintErrorTestResult(
name="",
index=0,
num_models=0,
execution_time=0,
)
PrintPassTestResult(
name="",
index=0,
num_models=0,
execution_time=0,
)
PrintWarnTestResult(
LogTestResult(
name="",
index=0,
num_models=0,
execution_time=0,
num_failures=0,
)
PrintFailureTestResult(
name="",
index=0,
num_models=0,
execution_time=0,
num_failures=0,
)
PrintStartLine(description="", index=0, total=0, node_info=NodeInfo())
PrintModelResultLine(
LogStartLine(description="", index=0, total=0, node_info=NodeInfo())
LogModelResult(
description="",
status="",
index=0,
total=0,
execution_time=0,
)
PrintModelErrorResultLine(
description="",
status="",
index=0,
total=0,
execution_time=0,
)
PrintSnapshotErrorResultLine(
LogSnapshotResult(
status="",
description="",
cfg={},
@@ -2725,15 +2608,7 @@ if 1 == 0:
total=0,
execution_time=0,
)
PrintSnapshotResultLine(
status="",
description="",
cfg={},
index=0,
total=0,
execution_time=0,
)
PrintSeedErrorResultLine(
LogSeedResult(
status="",
index=0,
total=0,
@@ -2741,43 +2616,14 @@ if 1 == 0:
schema="",
relation="",
)
PrintSeedResultLine(
status="",
index=0,
total=0,
execution_time=0,
schema="",
relation="",
)
PrintFreshnessErrorLine(
LogFreshnessResult(
source_name="",
table_name="",
index=0,
total=0,
execution_time=0,
)
PrintFreshnessErrorStaleLine(
source_name="",
table_name="",
index=0,
total=0,
execution_time=0,
)
PrintFreshnessWarnLine(
source_name="",
table_name="",
index=0,
total=0,
execution_time=0,
)
PrintFreshnessPassLine(
source_name="",
table_name="",
index=0,
total=0,
execution_time=0,
)
PrintCancelLine(conn_name="")
LogCancelLine(conn_name="")
DefaultSelector(name="")
NodeStart(unique_id="")
NodeFinished(unique_id="")
@@ -2787,12 +2633,12 @@ if 1 == 0:
WritingInjectedSQLForNode(unique_id="")
NodeCompiling(unique_id="")
NodeExecuting(unique_id="")
PrintHookStartLine(
LogHookStartLine(
statement="",
index=0,
total=0,
)
PrintHookEndLine(
LogHookEndLine(
statement="",
status="",
index=0,
@@ -2829,7 +2675,7 @@ if 1 == 0:
SystemStdErrMsg(bmsg=b"")
SystemReportReturnCode(returncode=0)
TimingInfoCollected()
PrintDebugStackTrace()
LogDebugStackTrace()
CheckCleanPath(path="")
ConfirmCleanPath(path="")
ProtectedCleanPath(path="")
@@ -2849,7 +2695,7 @@ if 1 == 0:
FirstRunResultError(msg="")
AfterFirstRunResultError(msg="")
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False)
PrintSkipBecauseError(schema="", relation="", index=0, total=0)
LogSkipBecauseError(schema="", relation="", index=0, total=0)
EnsureGitInstalled()
DepsCreatingLocalSymlink()
DepsSymlinkNotAvailable()

View File

@@ -96,6 +96,7 @@ from dbt.version import __version__
from dbt.dataclass_schema import StrEnum, dbtClassMixin
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
PARSED_MANIFEST_FILE_NAME = "parsed_manifest.json"
PARSING_STATE = DbtProcessState("parsing")
@@ -405,6 +406,11 @@ class ManifestLoader:
# write out the fully parsed manifest
self.write_manifest_for_partial_parse()
# write out parsed_manifest.json
parsed_manifest_path = os.path.join(
self.root_project.target_path, PARSED_MANIFEST_FILE_NAME
)
self.manifest.write(parsed_manifest_path)
return self.manifest

View File

@@ -37,9 +37,9 @@ from dbt.events.types import (
InternalExceptionOnRun,
GenericExceptionOnRun,
NodeConnectionReleaseError,
PrintDebugStackTrace,
LogDebugStackTrace,
SkippingDetails,
PrintSkipBecauseError,
LogSkipBecauseError,
NodeCompiling,
NodeExecuting,
)
@@ -362,7 +362,7 @@ class BaseRunner(metaclass=ABCMeta):
exc=str(e),
)
)
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
return str(e)
@@ -451,7 +451,7 @@ class BaseRunner(metaclass=ABCMeta):
# failure, print a special 'error skip' message.
if self._skip_caused_by_ephemeral_failure():
fire_event(
PrintSkipBecauseError(
LogSkipBecauseError(
schema=schema_name,
relation=node_name,
index=self.node_index,

View File

@@ -16,14 +16,11 @@ from dbt.contracts.results import (
FreshnessStatus,
)
from dbt.exceptions import RuntimeException, InternalException
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
FreshnessCheckComplete,
PrintStartLine,
PrintFreshnessErrorLine,
PrintFreshnessErrorStaleLine,
PrintFreshnessWarnLine,
PrintFreshnessPassLine,
LogStartLine,
LogFreshnessResult,
)
from dbt.node_types import NodeType
@@ -41,7 +38,7 @@ class FreshnessRunner(BaseRunner):
def before_execute(self):
description = "freshness of {0.source_name}.{0.name}".format(self.node)
fire_event(
PrintStartLine(
LogStartLine(
description=description,
index=self.node_index,
total=self.num_nodes,
@@ -56,50 +53,19 @@ class FreshnessRunner(BaseRunner):
else:
source_name = result.source_name
table_name = result.table_name
if result.status == FreshnessStatus.RuntimeErr:
fire_event(
PrintFreshnessErrorLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Error:
fire_event(
PrintFreshnessErrorStaleLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
elif result.status == FreshnessStatus.Warn:
fire_event(
PrintFreshnessWarnLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
else:
fire_event(
PrintFreshnessPassLine(
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
level = LogFreshnessResult.status_to_level(str(result.status))
fire_event(
LogFreshnessResult(
info=info(level=level),
status=result.status,
source_name=source_name,
table_name=table_name,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
def error_result(self, node, message, start_time, timing_info):
return self._build_run_result(

View File

@@ -28,17 +28,16 @@ from dbt.exceptions import (
ValidationException,
missing_materialization,
)
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.functions import fire_event, get_invocation_id, info
from dbt.events.types import (
DatabaseErrorRunningHook,
EmptyLine,
HooksRunning,
HookFinished,
PrintModelErrorResultLine,
PrintModelResultLine,
PrintStartLine,
PrintHookEndLine,
PrintHookStartLine,
LogModelResult,
LogStartLine,
LogHookEndLine,
LogHookStartLine,
)
from dbt.logger import (
TextOnly,
@@ -176,7 +175,7 @@ class ModelRunner(CompileRunner):
def print_start_line(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
@@ -187,27 +186,22 @@ class ModelRunner(CompileRunner):
def print_result_line(self, result):
description = self.describe_node()
if result.status == NodeStatus.Error:
fire_event(
PrintModelErrorResultLine(
description=description,
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
)
status = result.status
level = "error"
else:
fire_event(
PrintModelResultLine(
description=description,
status=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
)
status = result.message
level = "info"
fire_event(
LogModelResult(
description=description,
status=status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
info=info(level=level),
)
)
def before_execute(self):
self.print_start_line()
@@ -355,7 +349,7 @@ class RunTask(CompileTask):
with UniqueID(hook.unique_id):
with hook_meta_ctx, startctx:
fire_event(
PrintHookStartLine(
LogHookStartLine(
statement=hook_text,
index=idx,
total=num_hooks,
@@ -375,7 +369,7 @@ class RunTask(CompileTask):
with finishctx, DbtModelState({"node_status": "passed"}):
hook._event_status["node_status"] = RunStatus.Success
fire_event(
PrintHookEndLine(
LogHookEndLine(
statement=hook_text,
status=status,
index=idx,

View File

@@ -15,7 +15,7 @@ from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
PrintDebugStackTrace,
LogDebugStackTrace,
)
@@ -57,11 +57,11 @@ class RunOperationTask(ManifestTask):
self._run_unsafe()
except dbt.exceptions.Exception as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
else:
success = True

View File

@@ -29,7 +29,7 @@ from dbt.logger import (
from dbt.events.functions import fire_event
from dbt.events.types import (
EmptyLine,
PrintCancelLine,
LogCancelLine,
DefaultSelector,
NodeStart,
NodeFinished,
@@ -364,7 +364,7 @@ class GraphRunnableTask(ManifestTask):
continue
# if we don't have a manifest/don't have a node, print
# anyway.
fire_event(PrintCancelLine(conn_name=conn_name))
fire_event(LogCancelLine(conn_name=conn_name))
pool.join()

View File

@@ -9,14 +9,13 @@ from dbt.contracts.results import RunStatus
from dbt.exceptions import InternalException
from dbt.graph import ResourceTypeSelector
from dbt.logger import TextOnly
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
SeedHeader,
SeedHeaderSeparator,
EmptyLine,
PrintSeedErrorResultLine,
PrintSeedResultLine,
PrintStartLine,
LogSeedResult,
LogStartLine,
)
from dbt.node_types import NodeType
from dbt.contracts.results import NodeStatus
@@ -28,7 +27,7 @@ class SeedRunner(ModelRunner):
def before_execute(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,
@@ -47,30 +46,19 @@ class SeedRunner(ModelRunner):
def print_result_line(self, result):
model = result.node
if result.status == NodeStatus.Error:
fire_event(
PrintSeedErrorResultLine(
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
)
else:
fire_event(
PrintSeedResultLine(
status=result.message,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
level = "error" if result.status == NodeStatus.Error else "info"
fire_event(
LogSeedResult(
info=info(level=level),
status=result.status,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
schema=self.node.schema,
relation=model.alias,
node_info=model.node_info,
)
)
class SeedTask(RunTask):

View File

@@ -1,8 +1,8 @@
from .run import ModelRunner, RunTask
from dbt.exceptions import InternalException
from dbt.events.functions import fire_event
from dbt.events.types import PrintSnapshotErrorResultLine, PrintSnapshotResultLine
from dbt.events.functions import fire_event, info
from dbt.events.types import LogSnapshotResult
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.contracts.results import NodeStatus
@@ -15,30 +15,19 @@ class SnapshotRunner(ModelRunner):
def print_result_line(self, result):
model = result.node
cfg = model.config.to_dict(omit_none=True)
if result.status == NodeStatus.Error:
fire_event(
PrintSnapshotErrorResultLine(
status=result.status,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
else:
fire_event(
PrintSnapshotResultLine(
status=result.message,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
level = "error" if result.status == NodeStatus.Error else "info"
fire_event(
LogSnapshotResult(
info=info(level=level),
status=result.status,
description=self.get_node_representation(),
cfg=cfg,
index=self.node_index,
total=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
class SnapshotTask(RunTask):

View File

@@ -19,13 +19,10 @@ from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import TestStatus, PrimitiveDict, RunResult
from dbt.context.providers import generate_runtime_model_context
from dbt.clients.jinja import MacroGenerator
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, info
from dbt.events.types import (
PrintErrorTestResult,
PrintPassTestResult,
PrintWarnTestResult,
PrintFailureTestResult,
PrintStartLine,
LogTestResult,
LogStartLine,
)
from dbt.exceptions import InternalException, invalid_bool_error, missing_materialization
from dbt.graph import (
@@ -67,54 +64,22 @@ class TestRunner(CompileRunner):
def print_result_line(self, result):
model = result.node
if result.status == TestStatus.Error:
fire_event(
PrintErrorTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
fire_event(
LogTestResult(
name=model.name,
info=info(level=LogTestResult.status_to_level(str(result.status))),
status=str(result.status),
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
num_failures=result.failures,
)
elif result.status == TestStatus.Pass:
fire_event(
PrintPassTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
)
)
elif result.status == TestStatus.Warn:
fire_event(
PrintWarnTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
num_failures=result.failures,
node_info=model.node_info,
)
)
elif result.status == TestStatus.Fail:
fire_event(
PrintFailureTestResult(
name=model.name,
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
num_failures=result.failures,
node_info=model.node_info,
)
)
else:
raise RuntimeError("unexpected status: {}".format(result.status))
)
def print_start_line(self):
fire_event(
PrintStartLine(
LogStartLine(
description=self.describe_node(),
index=self.node_index,
total=self.num_nodes,

View File

@@ -332,54 +332,31 @@ sample_values = [
FirstRunResultError(msg=""),
AfterFirstRunResultError(msg=""),
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
PrintStartLine(description="", index=0, total=0, node_info=NodeInfo()),
PrintHookStartLine(statement="", index=0, total=0, node_info=NodeInfo()),
PrintHookEndLine(
LogStartLine(description="", index=0, total=0, node_info=NodeInfo()),
LogHookStartLine(statement="", index=0, total=0, node_info=NodeInfo()),
LogHookEndLine(
statement="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
SkippingDetails(
resource_type="", schema="", node_name="", index=0, total=0, node_info=NodeInfo()
),
PrintErrorTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
PrintPassTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
PrintWarnTestResult(
LogTestResult(
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
),
PrintFailureTestResult(
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
),
PrintSkipBecauseError(schema="", relation="", index=0, total=0),
PrintModelErrorResultLine(
LogSkipBecauseError(schema="", relation="", index=0, total=0),
LogModelResult(
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintModelResultLine(
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSnapshotErrorResultLine(
LogSnapshotResult(
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSnapshotResultLine(
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSeedErrorResultLine(
LogSeedResult(
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
),
PrintSeedResultLine(
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
),
PrintFreshnessErrorLine(
LogFreshnessResult(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintFreshnessErrorStaleLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintFreshnessWarnLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintFreshnessPassLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintCancelLine(conn_name=""),
LogCancelLine(conn_name=""),
DefaultSelector(name=""),
NodeStart(unique_id="", node_info=NodeInfo()),
NodeCompiling(unique_id="", node_info=NodeInfo()),
@@ -418,7 +395,7 @@ sample_values = [
AdapterEventInfo(name="", base_msg="", args=()),
AdapterEventWarning(name="", base_msg="", args=()),
AdapterEventError(name="", base_msg="", args=()),
PrintDebugStackTrace(),
LogDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressGETRequest(url=""),
RegistryIndexProgressGETRequest(url=""),

View File

@@ -5,7 +5,7 @@ from dbt.events.types import (
RollbackFailed,
MainEncounteredError,
PluginLoadError,
PrintStartLine,
LogStartLine,
)
from dbt.events.functions import event_to_dict, LOG_VERSION, reset_metadata_vars
from dbt.events import proto_types as pl
@@ -89,7 +89,7 @@ def test_node_info_events():
"node_started_at": "some_time",
"node_finished_at": "another_time",
}
event = PrintStartLine(
event = LogStartLine(
description="some description",
index=123,
total=111,

View File

@@ -0,0 +1,27 @@
from dbt.contracts.graph.parsed import ParsedModelNode
from dbt.node_types import NodeType
from dbt.contracts.files import FileHash
def test_nodes():
# Create a dummy model node
model_node = ParsedModelNode(
database="testdb",
schema="testschema",
fqn=["my", "test"],
unique_id="test.model.my_node",
raw_code="select 1 from fun",
language="sql",
package_name="test",
root_path="my/path",
path="my_node.sql",
original_file_path="models/my_node.sql",
name="my_node",
resource_type=NodeType.Model,
alias="my_node",
checksum=FileHash.from_contents("select 1 from fun"),
)
assert model_node
# Get a matching proto message
proto_model_msg = model_node.to_msg()
assert proto_model_msg