Compare commits

...

25 Commits

Author SHA1 Message Date
Gerda Shank
55b0c0953e Limit EndRunResult, make event_to_dict function 2022-08-31 17:11:34 -04:00
Gerda Shank
7bcf4ddabd Remove rust checks from structured logging test 2022-08-30 15:27:08 -04:00
Gerda Shank
fd9f254e63 More cleanup and organizing 2022-08-30 15:26:50 -04:00
Gerda Shank
464a6abc16 Fix error in event, add EndResult 2022-08-26 11:04:35 -04:00
Gerda Shank
fa20de142e Convert remaining logging events to proto messages 2022-08-25 12:59:33 -04:00
Gerda Shank
c81a1f8fa7 Update NodeInfo events 2022-08-22 14:34:38 -04:00
Gerda Shank
64b891ab9a Switch to using class names (instead of codes) for proto messages 2022-08-19 12:06:04 -04:00
Gerda Shank
e1262d6a75 Move test_events.py to tests/unit from test/unit 2022-08-19 09:47:53 -04:00
Gerda Shank
a77e75a491 Remove types_pb2.py and exclusion in pre-commit 2022-08-18 16:46:55 -04:00
Gerda Shank
8b865b599c Create GenericMessage for deserializing 2022-08-18 16:42:24 -04:00
Gerda Shank
82f2d165af Update more events 2022-08-18 14:36:29 -04:00
Gerda Shank
0a64aae0e2 Fix test_query_comments test 2022-08-17 17:50:09 -04:00
Gerda Shank
6f0106c7cd turn off flake8 for splat import 2022-08-17 17:21:37 -04:00
Gerda Shank
b6af9fd6bf misc flake8, tweak test 2022-08-17 16:26:34 -04:00
Gerda Shank
6a9471f472 Changie 2022-08-17 15:49:05 -04:00
Gerda Shank
89e276d9b4 rename thread_name to thread; misc cleanup 2022-08-17 15:47:52 -04:00
Gerda Shank
47a6f9a24e Include betterproto in dev-requirements 2022-08-17 09:52:55 -04:00
Gerda Shank
853e9c710c Use betterproto 2022-08-17 09:50:19 -04:00
Gerda Shank
deb95efe68 Add a couple more proto messages 2022-08-16 12:46:01 -04:00
Gerda Shank
001536a438 correct typo in SQLRunnerException. comment out kwarg setting in
AdapterLogger
2022-08-12 08:09:25 -04:00
Gerda Shank
d21cfc5976 Cleanup logging and various events and tests 2022-08-11 18:48:17 -04:00
Gerda Shank
8a892dbec6 exclude _pb2.py in pre-commit 2022-08-11 11:53:23 -04:00
Gerda Shank
9e2ce9427e install types-protobuf 2022-08-10 23:55:01 -04:00
Gerda Shank
4a2fcc1307 Change import of DataClassJSONMixin 2022-08-10 20:06:43 -04:00
Gerda Shank
fd3a4e7ad2 First test proto files 2022-08-10 19:44:13 -04:00
46 changed files with 6552 additions and 2822 deletions

View File

@@ -0,0 +1,7 @@
kind: Features
body: Proto logging messages
time: 2022-08-17T15:48:57.225267-04:00
custom:
Author: gshank
Issue: "5610"
PR: "5643"

View File

@@ -46,12 +46,6 @@ jobs:
with:
python-version: "3.8"
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Install python dependencies
run: |
pip install --user --upgrade pip
@@ -69,10 +63,3 @@ jobs:
# we actually care if these pass, because the normal test run doesn't usually include many json log outputs
- name: Run integration tests
run: tox -e integration -- -nauto
# apply our schema tests to every log event from the previous step
# skips any output that isn't valid json
- uses: actions-rs/cargo@v1
with:
command: run
args: --manifest-path test/interop/log_parsing/Cargo.toml

View File

@@ -2,6 +2,7 @@ import abc
import os
from time import sleep
import sys
import traceback
# multiprocessing.RLock is a function returning this type
from multiprocessing.synchronize import RLock
@@ -48,6 +49,7 @@ from dbt.events.types import (
RollbackFailed,
)
from dbt import flags
from dbt.utils import cast_to_str
SleepTime = Union[int, float] # As taken by time.sleep.
AdapterHandle = Any # Adapter connection handle objects can be any class.
@@ -304,9 +306,9 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
with self.lock:
for connection in self.thread_connections.values():
if connection.state not in {"closed", "init"}:
fire_event(ConnectionLeftOpen(conn_name=connection.name))
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
else:
fire_event(ConnectionClosed(conn_name=connection.name))
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
self.close(connection)
# garbage collect these connections
@@ -332,17 +334,18 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
try:
connection.handle.rollback()
except Exception:
fire_event(RollbackFailed(conn_name=connection.name))
conn_name = connection.name or ""
fire_event(RollbackFailed(conn_name=conn_name, exc_info=traceback.format_exc()))
@classmethod
def _close_handle(cls, connection: Connection) -> None:
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, "close"):
fire_event(ConnectionClosed2(conn_name=connection.name))
fire_event(ConnectionClosed2(conn_name=cast_to_str(connection.name)))
connection.handle.close()
else:
fire_event(ConnectionLeftOpen2(conn_name=connection.name))
fire_event(ConnectionLeftOpen2(conn_name=cast_to_str(connection.name)))
@classmethod
def _rollback(cls, connection: Connection) -> None:
@@ -353,7 +356,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
f'"{connection.name}", but it does not have one open!'
)
fire_event(Rollback(conn_name=connection.name))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)
connection.transaction_open = False
@@ -365,7 +368,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
return connection
if connection.transaction_open and connection.handle:
fire_event(Rollback(conn_name=connection.name))
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
cls._rollback_handle(connection)
connection.transaction_open = False

View File

@@ -49,7 +49,7 @@ from dbt.events.types import (
CodeExecution,
CodeExecutionStatus,
)
from dbt.utils import filter_null_values, executor
from dbt.utils import filter_null_values, executor, cast_to_str
from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
@@ -60,7 +60,7 @@ from dbt.adapters.base.relation import (
SchemaSearchMap,
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache, _make_key
from dbt.adapters.cache import RelationsCache, _make_key_msg
SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
@@ -313,7 +313,7 @@ class BaseAdapter(metaclass=AdapterMeta):
fire_event(
CacheMiss(
conn_name=self.nice_connection_name(),
database=database,
database=cast_to_str(database),
schema=schema,
)
)
@@ -696,9 +696,9 @@ class BaseAdapter(metaclass=AdapterMeta):
relations = self.list_relations_without_caching(schema_relation)
fire_event(
ListRelations(
database=database,
database=cast_to_str(database),
schema=schema,
relations=[_make_key(x) for x in relations],
relations=[_make_key_msg(x) for x in relations],
)
)

View File

@@ -2,9 +2,9 @@ import threading
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
from dbt.adapters.reference_keys import _make_key, _make_key_msg, _make_msg_from_key, _ReferenceKey
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.functions import fire_event, fire_event_if
from dbt.events.types import (
AddLink,
AddRelation,
@@ -20,8 +20,8 @@ from dbt.events.types import (
UncachedRelation,
UpdateReference,
)
import dbt.flags as flags
from dbt.utils import lowercase
from dbt.helper_types import Lazy
def dot_separated(key: _ReferenceKey) -> str:
@@ -299,7 +299,11 @@ class RelationsCache:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
fire_event(
UncachedRelation(
dep_key=_make_msg_from_key(dep_key), ref_key=_make_msg_from_key(ref_key)
)
)
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
@@ -309,7 +313,9 @@ class RelationsCache:
# Insert a dummy "external" relation.
dependent = dependent.replace(type=referenced.External)
self.add(dependent)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
fire_event(
AddLink(dep_key=_make_msg_from_key(dep_key), ref_key=_make_msg_from_key(ref_key))
)
with self.lock:
self._add_link(ref_key, dep_key)
@@ -320,12 +326,12 @@ class RelationsCache:
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(AddRelation(relation=_make_key(cached)))
fire_event(DumpBeforeAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event(AddRelation(relation=_make_key_msg(cached)))
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpBeforeAddGraph(dump=self.dump_graph()))
with self.lock:
self._setdefault(cached)
fire_event(DumpAfterAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpAfterAddGraph(dump=self.dump_graph()))
def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
@@ -340,19 +346,6 @@ class RelationsCache:
for cached in self.relations.values():
cached.release_references(keys)
def _drop_cascade_relation(self, dropped_key):
"""Drop the given relation and cascade it appropriately to all
dependent relations.
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key))
return
consequences = self.relations[dropped_key].collect_consequences()
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
self._remove_refs(consequences)
def drop(self, relation):
"""Drop the named relation and cascade it appropriately to all
dependent relations.
@@ -365,9 +358,18 @@ class RelationsCache:
:param str identifier: The identifier of the relation to drop.
"""
dropped_key = _make_key(relation)
fire_event(DropRelation(dropped=dropped_key))
dropped_key_msg = _make_key_msg(relation)
fire_event(DropRelation(dropped=dropped_key_msg))
with self.lock:
self._drop_cascade_relation(dropped_key)
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key_msg))
return
consequences = self.relations[dropped_key].collect_consequences()
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
consequence_msgs = [_make_msg_from_key(key) for key in consequences]
fire_event(DropCascade(dropped=dropped_key_msg, consequences=consequence_msgs))
self._remove_refs(consequences)
def _rename_relation(self, old_key, new_relation):
"""Rename a relation named old_key to new_key, updating references.
@@ -420,7 +422,7 @@ class RelationsCache:
)
if old_key not in self.relations:
fire_event(TemporaryRelation(key=old_key))
fire_event(TemporaryRelation(key=_make_msg_from_key(old_key)))
return False
return True
@@ -438,9 +440,13 @@ class RelationsCache:
"""
old_key = _make_key(old)
new_key = _make_key(new)
fire_event(RenameSchema(old_key=old_key, new_key=new_key))
fire_event(
RenameSchema(old_key=_make_msg_from_key(old_key), new_key=_make_msg_from_key(new))
)
fire_event(DumpBeforeRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpBeforeRenameSchema(dump=self.dump_graph())
)
with self.lock:
if self._check_rename_constraints(old_key, new_key):
@@ -448,7 +454,9 @@ class RelationsCache:
else:
self._setdefault(_CachedRelation(new))
fire_event(DumpAfterRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpAfterRenameSchema(dump=self.dump_graph())
)
def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
"""Case-insensitively yield all relations matching the given schema.

View File

@@ -1,4 +1,5 @@
import threading
import traceback
from pathlib import Path
from importlib import import_module
from typing import Type, Dict, Any, List, Optional, Set
@@ -64,12 +65,12 @@ class AdapterContainer:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == "dbt.adapters." + name:
fire_event(AdapterImportError(exc=exc))
fire_event(AdapterImportError(exc=str(exc)))
raise RuntimeException(f"Could not find adapter type {name}!")
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
fire_event(PluginLoadError())
fire_event(PluginLoadError(exc_info=traceback.format_exc()))
raise
plugin: AdapterPlugin = mod.Plugin
plugin_type = plugin.adapter.type()

View File

@@ -2,6 +2,7 @@
from collections import namedtuple
from typing import Any, Optional
from dbt.events.proto_types import ReferenceKeyMsg
_ReferenceKey = namedtuple("_ReferenceKey", "database schema identifier")
@@ -22,3 +23,13 @@ def _make_key(relation: Any) -> _ReferenceKey:
return _ReferenceKey(
lowercase(relation.database), lowercase(relation.schema), lowercase(relation.identifier)
)
def _make_key_msg(relation: Any):
return _make_msg_from_key(_make_key(relation))
def _make_msg_from_key(ref_key: _ReferenceKey) -> ReferenceKeyMsg:
return ReferenceKeyMsg(
database=ref_key.database, schema=ref_key.schema, identifier=ref_key.identifier
)

View File

@@ -10,6 +10,7 @@ from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import Connection, ConnectionState, AdapterResponse
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus
from dbt.utils import cast_to_str
class SQLConnectionManager(BaseConnectionManager):
@@ -55,7 +56,7 @@ class SQLConnectionManager(BaseConnectionManager):
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=cast_to_str(connection.name)))
with self.exception_handler(sql):
if abridge_sql_log:
@@ -63,7 +64,7 @@ class SQLConnectionManager(BaseConnectionManager):
else:
log_sql = sql
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
fire_event(SQLQuery(conn_name=cast_to_str(connection.name), sql=log_sql))
pre = time.time()
cursor = connection.handle.cursor()

View File

@@ -5,7 +5,7 @@ import dbt.clients.agate_helper
from dbt.contracts.connection import Connection
import dbt.exceptions
from dbt.adapters.base import BaseAdapter, available
from dbt.adapters.cache import _make_key
from dbt.adapters.cache import _make_key_msg
from dbt.adapters.sql import SQLConnectionManager
from dbt.events.functions import fire_event
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
@@ -110,7 +110,7 @@ class SQLAdapter(BaseAdapter):
ColTypeChange(
orig_type=target_column.data_type,
new_type=new_type,
table=_make_key(current),
table=_make_key_msg(current),
)
)
@@ -155,7 +155,7 @@ class SQLAdapter(BaseAdapter):
def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
fire_event(SchemaCreation(relation=_make_key(relation)))
fire_event(SchemaCreation(relation=_make_key_msg(relation)))
kwargs = {
"relation": relation,
}
@@ -166,7 +166,7 @@ class SQLAdapter(BaseAdapter):
def drop_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()
fire_event(SchemaDrop(relation=_make_key(relation)))
fire_event(SchemaDrop(relation=_make_key_msg(relation)))
kwargs = {
"relation": relation,
}

View File

@@ -3,9 +3,9 @@ from typing import Any, Dict, List
import requests
from dbt.events.functions import fire_event
from dbt.events.types import (
RegistryProgressMakingGETRequest,
RegistryProgressGETRequest,
RegistryProgressGETResponse,
RegistryIndexProgressMakingGETRequest,
RegistryIndexProgressGETRequest,
RegistryIndexProgressGETResponse,
RegistryResponseUnexpectedType,
RegistryResponseMissingTopKeys,
@@ -37,7 +37,7 @@ def _get_with_retries(package_name, registry_base_url=None):
def _get(package_name, registry_base_url=None):
url = _get_url(package_name, registry_base_url)
fire_event(RegistryProgressMakingGETRequest(url=url))
fire_event(RegistryProgressGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
@@ -134,7 +134,7 @@ def get_available_versions(package_name) -> List["str"]:
def _get_index(registry_base_url=None):
url = _get_url("index", registry_base_url)
fire_event(RegistryIndexProgressMakingGETRequest(url=url))
fire_event(RegistryIndexProgressGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code))

View File

@@ -164,7 +164,7 @@ def write_file(path: str, contents: str = "") -> bool:
reason = "Path was possibly too long"
# all our hard work and the path was still too long. Log and
# continue.
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=exc))
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=str(exc)))
else:
raise
return True

View File

@@ -1020,7 +1020,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
# log up to 5 items
sample = list(islice(merged, 5))
fire_event(MergedFromState(nbr_merged=len(merged), sample=sample))
fire_event(MergedFromState(num_merged=len(merged), sample=sample))
# Methods that were formerly in ParseResult

View File

@@ -39,6 +39,7 @@ from dbt.contracts.graph.unparsed import (
MetricFilter,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.events.proto_types import NodeInfo
from dbt.exceptions import warn_or_error
from dbt import flags
from dbt.node_types import NodeType
@@ -189,7 +190,8 @@ class NodeInfoMixin:
"node_started_at": self._event_status.get("started_at"),
"node_finished_at": self._event_status.get("finished_at"),
}
return node_info
node_info_msg = NodeInfo(**node_info)
return node_info_msg
@dataclass

View File

@@ -11,11 +11,12 @@ from dbt.contracts.util import (
from dbt.exceptions import InternalException
from dbt.events.functions import fire_event
from dbt.events.types import TimingInfoCollected
from dbt.events.proto_types import RunResultMsg
from dbt.logger import (
TimingProcessor,
JsonOnly,
)
from dbt.utils import lowercase
from dbt.utils import lowercase, cast_to_str, cast_to_int
from dbt.dataclass_schema import dbtClassMixin, StrEnum
import agate
@@ -119,6 +120,17 @@ class BaseResult(dbtClassMixin):
data["failures"] = None
return data
def to_msg(self):
# TODO: add more fields
msg = RunResultMsg()
msg.status = str(self.status)
msg.message = cast_to_str(self.message)
msg.thread = self.thread_id
msg.execution_time = self.execution_time
msg.num_failures = cast_to_int(self.failures)
# timing_info, adapter_response, message
return msg
@dataclass
class NodeResult(BaseResult):

View File

@@ -44,8 +44,6 @@ class PartialParsingDeletedExposure(DebugLevel, Cli, File):
## Optional (based on your event)
- Events associated with node status changes must be extended with `NodeInfo` which contains a node_info attribute
All values other than `code` and `node_info` will be included in the `data` node of the json log output.
@@ -63,3 +61,7 @@ from dbt.events import AdapterLogger
logger = AdapterLogger("<database name>")
# e.g. AdapterLogger("Snowflake")
```
## Compiling types.proto
In the core/dbt/events directory: ```protoc --python_betterproto_out . types.proto```

View File

@@ -1,3 +1,4 @@
import traceback
from dataclasses import dataclass
from dbt.events.functions import fire_event
from dbt.events.types import (
@@ -12,57 +13,28 @@ from dbt.events.types import (
class AdapterLogger:
name: str
def debug(self, msg, *args, exc_info=None, extra=None, stack_info=False):
def debug(self, msg, *args):
event = AdapterEventDebug(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
fire_event(event)
def info(self, msg, *args, exc_info=None, extra=None, stack_info=False):
def info(self, msg, *args):
event = AdapterEventInfo(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
fire_event(event)
def warning(self, msg, *args, exc_info=None, extra=None, stack_info=False):
def warning(self, msg, *args):
event = AdapterEventWarning(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
fire_event(event)
def error(self, msg, *args, exc_info=None, extra=None, stack_info=False):
def error(self, msg, *args):
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
fire_event(event)
# The default exc_info=True is what makes this method different
def exception(self, msg, *args, exc_info=True, extra=None, stack_info=False):
def exception(self, msg, *args):
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
event.exc_info = traceback.format_exc()
fire_event(event)
def critical(self, msg, *args, exc_info=False, extra=None, stack_info=False):
def critical(self, msg, *args):
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
event.exc_info = exc_info
event.extra = extra
event.stack_info = stack_info
fire_event(event)

View File

@@ -1,9 +1,7 @@
from abc import ABCMeta, abstractproperty, abstractmethod
from dataclasses import dataclass
from dbt.events.serialization import EventSerialization
import os
import threading
from typing import Any, Dict
from datetime import datetime
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@@ -17,89 +15,80 @@ class Cache:
pass
def get_invocation_id() -> str:
from dbt.events.functions import get_invocation_id
return get_invocation_id()
# exactly one pid per concrete event
def get_pid() -> int:
return os.getpid()
# preformatted time stamp
def get_ts_rfc3339() -> str:
ts = datetime.utcnow()
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339
# in theory threads can change so we don't cache them.
def get_thread_name() -> str:
return threading.current_thread().name
@dataclass
class ShowException:
# N.B.:
# As long as we stick with the current convention of setting the member vars in the
# `message` method of subclasses, this is a safe operation.
# If that ever changes we'll want to reassess.
class BaseEvent:
"""BaseEvent for proto message generated python events"""
def __post_init__(self):
self.exc_info: Any = True
self.stack_info: Any = None
self.extra: Any = None
super().__post_init__()
self.info.level = self.level_tag()
if not hasattr(self.info, "msg") or not self.info.msg:
self.info.msg = self.message()
self.info.invocation_id = get_invocation_id()
self.info.ts = datetime.utcnow()
self.info.pid = get_pid()
self.info.thread = get_thread_name()
self.info.code = self.code()
self.info.name = type(self).__name__
def level_tag(self):
raise Exception("level_tag() not implemented for event")
def message(self):
raise Exception("message() not implemented for event")
# TODO add exhaustiveness checking for subclasses
# top-level superclass for all events
class Event(metaclass=ABCMeta):
# Do not define fields with defaults here
@dataclass
class TestLevel(BaseEvent):
__test__ = False
# four digit string code that uniquely identifies this type of event
# uniqueness and valid characters are enforced by tests
@abstractproperty
@staticmethod
def code() -> str:
raise Exception("code() not implemented for event")
# The 'to_dict' method is added by mashumaro via the EventSerialization.
# It should be in all subclasses that are to record actual events.
@abstractmethod
def to_dict(self):
raise Exception("to_dict not implemented for Event")
# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
raise Exception("level_tag not implemented for Event")
# Solely the human readable message. Timestamps and formatting will be added by the logger.
# Must override yourself
@abstractmethod
def message(self) -> str:
raise Exception("msg not implemented for Event")
# exactly one pid per concrete event
def get_pid(self) -> int:
return os.getpid()
# in theory threads can change so we don't cache them.
def get_thread_name(self) -> str:
return threading.current_thread().name
@classmethod
def get_invocation_id(cls) -> str:
from dbt.events.functions import get_invocation_id
return get_invocation_id()
# in preparation for #3977
@dataclass # type: ignore[misc]
class TestLevel(EventSerialization, Event):
def level_tag(self) -> str:
return "test"
@dataclass # type: ignore[misc]
class DebugLevel(EventSerialization, Event):
class DebugLevel(BaseEvent):
def level_tag(self) -> str:
return "debug"
@dataclass # type: ignore[misc]
class InfoLevel(EventSerialization, Event):
class InfoLevel(BaseEvent):
def level_tag(self) -> str:
return "info"
@dataclass # type: ignore[misc]
class WarnLevel(EventSerialization, Event):
class WarnLevel(BaseEvent):
def level_tag(self) -> str:
return "warn"
@dataclass # type: ignore[misc]
class ErrorLevel(EventSerialization, Event):
class ErrorLevel(BaseEvent):
def level_tag(self) -> str:
return "error"
@@ -114,10 +103,3 @@ class NoFile:
# prevents an event from going to stdout
class NoStdOut:
pass
# This class represents the node_info which is generated
# by the NodeInfoMixin class in dbt.contracts.graph.parsed
@dataclass
class NodeInfo:
node_info: Dict[str, Any]

View File

@@ -1,7 +1,8 @@
import betterproto
from colorama import Style
import dbt.events.functions as this # don't worry I hate it too.
from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
from dbt.events.types import EventBufferFull, MainReportVersion, EmptyLine
import dbt.flags as flags
from dbt.constants import SECRET_ENV_PREFIX
@@ -19,11 +20,11 @@ from logging.handlers import RotatingFileHandler
import os
import uuid
import threading
from typing import Any, Dict, List, Optional, Union
from typing import List, Optional, Union, Callable
from collections import deque
global LOG_VERSION
LOG_VERSION = 2
LOG_VERSION = 3
# create the global event history buffer with the default max size (10k)
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
@@ -131,37 +132,23 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str:
# returns a dictionary representation of the event fields.
# the message may contain secrets which must be scrubbed at the usage site.
def event_to_serializable_dict(
e: T_Event,
) -> Dict[str, Any]:
def event_to_json(
event: BaseEvent,
) -> str:
event_dict = event_to_dict(event)
raw_log_line = json.dumps(event_dict, sort_keys=True)
return raw_log_line
log_line = dict()
code: str
def event_to_dict(event: BaseEvent) -> dict:
event_dict = dict()
try:
log_line = e.to_dict()
# We could use to_json here, but it wouldn't sort the keys.
# The 'to_json' method just does json.dumps on the dict anyway.
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
except AttributeError as exc:
event_type = type(e).__name__
raise Exception( # TODO this may hang async threads
f"type {event_type} is not serializable. {str(exc)}"
)
# We get the code from the event object, so we don't need it in the data
if "code" in log_line:
del log_line["code"]
event_dict = {
"type": "log_line",
"log_version": LOG_VERSION,
"ts": get_ts_rfc3339(),
"pid": e.get_pid(),
"msg": e.message(),
"level": e.level_tag(),
"data": log_line,
"invocation_id": e.get_invocation_id(),
"thread_name": e.get_thread_name(),
"code": e.code,
}
event_type = type(event).__name__
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
return event_dict
@@ -171,15 +158,15 @@ def reset_color() -> str:
return "" if not this.format_color else Style.RESET_ALL
def create_info_text_log_line(e: T_Event) -> str:
def create_info_text_log_line(e: BaseEvent) -> str:
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S")
ts: str = get_ts().strftime("%H:%M:%S") # TODO: get this from the event.ts?
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
return log_line
def create_debug_text_log_line(e: T_Event) -> str:
def create_debug_text_log_line(e: BaseEvent) -> str:
log_line: str = ""
# Create a separator if this is the beginning of an invocation
if type(e) == MainReportVersion:
@@ -188,7 +175,8 @@ def create_debug_text_log_line(e: T_Event) -> str:
color_tag: str = reset_color()
ts: str = get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} "
# Make the levels all 5 characters so they line up
level: str = f"{e.level_tag():<5}"
thread = ""
if threading.current_thread().name:
thread_name = threading.current_thread().name
@@ -200,18 +188,17 @@ def create_debug_text_log_line(e: T_Event) -> str:
# translates an Event to a completely formatted json log line
def create_json_log_line(e: T_Event) -> Optional[str]:
def create_json_log_line(e: BaseEvent) -> Optional[str]:
if type(e) == EmptyLine:
return None # will not be sent to logger
# using preformatted ts string instead of formatting it here to be extra careful about timezone
values = event_to_serializable_dict(e)
raw_log_line = json.dumps(values, sort_keys=True)
raw_log_line = event_to_json(e)
return scrub_secrets(raw_log_line, env_secrets())
# calls create_stdout_text_log_line() or create_json_log_line() according to logger config
def create_log_line(e: T_Event, file_output=False) -> Optional[str]:
def create_log_line(e: BaseEvent, file_output=False) -> Optional[str]:
if this.format_json:
# TODO: Do we want to skip EmptyLine() like the TextOnly for logbook?
return create_json_log_line(e) # json output, both console and file
elif file_output is True or flags.DEBUG:
return create_debug_text_log_line(e) # default file output
@@ -241,31 +228,18 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s
)
def send_exc_to_logger(
l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False
):
if level_tag == "test":
# TODO after implmenting #3977 send to new test level
l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
elif level_tag == "debug":
l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
elif level_tag == "info":
l.info(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
elif level_tag == "warn":
l.warning(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
elif level_tag == "error":
l.error(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
else:
raise AssertionError(
f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}"
)
# an alternative to fire_event which only creates and logs the event value
# if the condition is met. Does nothing otherwise.
def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
if conditional:
fire_event(lazy_e())
# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: Event) -> None:
def fire_event(e: BaseEvent) -> None:
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return
@@ -305,17 +279,7 @@ def fire_event(e: Event) -> None:
log_line = create_log_line(e)
if log_line:
if not isinstance(e, ShowException):
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
else:
send_exc_to_logger(
STDOUT_LOG,
level_tag=e.level_tag(),
log_line=log_line,
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra,
)
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
def get_invocation_id() -> str:
@@ -336,10 +300,3 @@ def set_invocation_id() -> None:
def get_ts() -> datetime:
ts = datetime.utcnow()
return ts
# preformatted time stamp
def get_ts_rfc3339() -> str:
ts = get_ts()
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339

File diff suppressed because it is too large Load Diff

View File

@@ -1,55 +0,0 @@
from dbt.helper_types import Lazy
from mashumaro import DataClassDictMixin
from mashumaro.config import BaseConfig as MashBaseConfig
from mashumaro.types import SerializationStrategy
from typing import Dict, List
# The dbtClassMixin serialization class has a DateTime serialization strategy
# class. If a datetime ends up in an event class, we could use a similar class
# here to serialize it in our preferred format.
class ExceptionSerialization(SerializationStrategy):
def serialize(self, value):
out = str(value)
return out
def deserialize(self, value):
return Exception(value)
class BaseExceptionSerialization(SerializationStrategy):
def serialize(self, value):
return str(value)
def deserialize(self, value):
return BaseException(value)
# This is an explicit deserializer for the type Lazy[Dict[str, List[str]]]
# mashumaro does not support composing serialization strategies, so all
# future uses of Lazy will need to register a unique serialization class like this one.
class LazySerialization1(SerializationStrategy):
def serialize(self, value) -> Dict[str, List[str]]:
return value.force()
# we _can_ deserialize into a lazy value, but that defers running the deserialization
# function till the value is used which can raise errors at very unexpected times.
# It's best practice to do strict deserialization unless you're in a very special case.
def deserialize(self, value):
raise Exception("Don't deserialize into a Lazy value. Try just using the value itself.")
# This class is the equivalent of dbtClassMixin that's used for serialization
# in other parts of the code. That class did extra things which we didn't want
# to use for events, so this class is a simpler version of dbtClassMixin.
class EventSerialization(DataClassDictMixin):
# This is where we register serializtion strategies per type.
class Config(MashBaseConfig):
serialization_strategy = {
Exception: ExceptionSerialization(),
BaseException: ExceptionSerialization(),
Lazy[Dict[str, List[str]]]: LazySerialization1(),
}

View File

@@ -1,6 +1,8 @@
from dataclasses import dataclass
from dbt.events.types import InfoLevel, DebugLevel, WarnLevel, ErrorLevel, ShowException
from dbt.events.types import InfoLevel, DebugLevel, WarnLevel, ErrorLevel
from dbt.events.base_types import NoFile
from dbt.events import proto_types as pl
from dbt.events.proto_types import EventInfo # noqa
# Keeping log messages for testing separate since they are used for debugging.
@@ -8,54 +10,54 @@ from dbt.events.base_types import NoFile
@dataclass
class IntegrationTestInfo(InfoLevel, NoFile):
msg: str
code: str = "T001"
class IntegrationTestInfo(InfoLevel, NoFile, pl.IntegrationTestInfo):
def code(self):
return "T001"
def message(self) -> str:
return f"Integration Test: {self.msg}"
@dataclass
class IntegrationTestDebug(DebugLevel, NoFile):
msg: str
code: str = "T002"
class IntegrationTestDebug(DebugLevel, NoFile, pl.IntegrationTestDebug):
def code(self):
return "T002"
def message(self) -> str:
return f"Integration Test: {self.msg}"
@dataclass
class IntegrationTestWarn(WarnLevel, NoFile):
msg: str
code: str = "T003"
class IntegrationTestWarn(WarnLevel, NoFile, pl.IntegrationTestWarn):
def code(self):
return "T003"
def message(self) -> str:
return f"Integration Test: {self.msg}"
@dataclass
class IntegrationTestError(ErrorLevel, NoFile):
msg: str
code: str = "T004"
class IntegrationTestError(ErrorLevel, NoFile, pl.IntegrationTestError):
def code(self):
return "T004"
def message(self) -> str:
return f"Integration Test: {self.msg}"
@dataclass
class IntegrationTestException(ShowException, ErrorLevel, NoFile):
msg: str
code: str = "T005"
class IntegrationTestException(ErrorLevel, NoFile, pl.IntegrationTestException):
def code(self):
return "T005"
def message(self) -> str:
return f"Integration Test: {self.msg}"
@dataclass
class UnitTestInfo(InfoLevel, NoFile):
msg: str
code: str = "T006"
class UnitTestInfo(InfoLevel, NoFile, pl.UnitTestInfo):
def code(self):
return "T006"
def message(self) -> str:
return f"Unit Test: {self.msg}"

1653
core/dbt/events/types.proto Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1063,7 +1063,7 @@ def warn_or_raise(exc, log_fmt=None):
if flags.WARN_ERROR:
raise exc
else:
fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))
fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt))
def warn(msg, node=None):

View File

@@ -10,7 +10,7 @@ from contextlib import contextmanager
from pathlib import Path
import dbt.version
from dbt.events.functions import fire_event, setup_event_logger
from dbt.events.functions import fire_event, setup_event_logger, LOG_VERSION
from dbt.events.types import (
MainEncounteredError,
MainKeyboardInterrupt,
@@ -142,7 +142,7 @@ def main(args=None):
exit_code = e.code
except BaseException as e:
fire_event(MainEncounteredError(e=str(e)))
fire_event(MainEncounteredError(exc=str(e)))
fire_event(MainStackTrace(stack_trace=traceback.format_exc()))
exit_code = ExitCodes.UnhandledError.value
@@ -201,7 +201,7 @@ def track_run(task):
yield
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="ok")
except (NotImplementedException, FailedToConnectException) as e:
fire_event(MainEncounteredError(e=str(e)))
fire_event(MainEncounteredError(exc=str(e)))
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
except Exception:
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
@@ -226,7 +226,7 @@ def run_from_args(parsed):
level_override = parsed.cls.pre_init_hook(parsed)
setup_event_logger(log_path or "logs", level_override)
fire_event(MainReportVersion(v=str(dbt.version.installed)))
fire_event(MainReportVersion(version=str(dbt.version.installed), log_version=LOG_VERSION))
fire_event(MainReportArgs(args=args_to_dict(parsed)))
if dbt.tracking.active_user is not None: # mypy appeasement, always true

View File

@@ -291,8 +291,7 @@ class ManifestLoader:
if source_file:
parse_file_type = source_file.parse_file_type
fire_event(PartialParsingExceptionFile(file=file_id))
file_dict = source_file.to_dict()
fire_event(PartialParsingFile(file_dict=file_dict))
fire_event(PartialParsingFile(file_id=source_file.file_id))
exc_info["parse_file_type"] = parse_file_type
fire_event(PartialParsingException(exc_info=exc_info))
@@ -658,7 +657,9 @@ class ManifestLoader:
manifest.metadata.invocation_id = get_invocation_id()
return manifest
except Exception as exc:
fire_event(ParsedFileLoadFailed(path=path, exc=exc))
fire_event(
ParsedFileLoadFailed(path=path, exc=str(exc), exc_info=traceback.format_exc())
)
reparse_reason = ReparseReason.load_file_failure
else:
fire_event(PartialParseSaveFileNotFound())

View File

@@ -299,7 +299,7 @@ class PartialParsing:
else:
# It's not clear when this would actually happen.
# Logging in case there are other associated errors.
fire_event(PartialParsingNodeMissingInSourceFile(source_file=old_source_file))
fire_event(PartialParsingNodeMissingInSourceFile(file_id=old_source_file.file_id))
# replace source_file in saved and add to parsing list
file_id = new_source_file.file_id
@@ -912,7 +912,7 @@ class PartialParsing:
unique_id
)
schema_file.metrics.remove(unique_id)
fire_event(PartialParsingDeletedMetric(id=unique_id))
fire_event(PartialParsingDeletedMetric(unique_id=unique_id))
def get_schema_element(self, elem_list, elem_name):
for element in elem_list:

View File

@@ -1,6 +1,7 @@
import os
import threading
import time
import traceback
from abc import ABCMeta, abstractmethod
from typing import Type, Union, Dict, Any, Optional
@@ -108,13 +109,13 @@ class BaseTask(metaclass=ABCMeta):
config = cls.ConfigType.from_args(args)
except dbt.exceptions.DbtProjectError as exc:
fire_event(DbtProjectError())
fire_event(DbtProjectErrorException(exc=exc))
fire_event(DbtProjectErrorException(exc=str(exc)))
tracking.track_invalid_invocation(args=args, result_type=exc.result_type)
raise dbt.exceptions.RuntimeException("Could not run dbt") from exc
except dbt.exceptions.DbtProfileError as exc:
fire_event(DbtProfileError())
fire_event(DbtProfileErrorException(exc=exc))
fire_event(DbtProfileErrorException(exc=str(exc)))
all_profiles = read_profiles(flags.PROFILES_DIR).keys()
@@ -345,11 +346,11 @@ class BaseRunner(metaclass=ABCMeta):
if e.node is None:
e.add_node(ctx.node)
fire_event(CatchableExceptionOnRun(exc=e))
fire_event(CatchableExceptionOnRun(exc=str(e), exc_info=traceback.format_exc()))
return str(e)
def _handle_internal_exception(self, e, ctx):
fire_event(InternalExceptionOnRun(build_path=self.node.build_path, exc=e))
fire_event(InternalExceptionOnRun(build_path=self.node.build_path, exc=str(e)))
return str(e)
def _handle_generic_exception(self, e, ctx):
@@ -357,10 +358,10 @@ class BaseRunner(metaclass=ABCMeta):
GenericExceptionOnRun(
build_path=self.node.build_path,
unique_id=self.node.unique_id,
exc=str(e), # TODO: unstring this when serialization is fixed
exc=str(e),
)
)
fire_event(PrintDebugStackTrace())
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
return str(e)
@@ -413,7 +414,11 @@ class BaseRunner(metaclass=ABCMeta):
try:
self.adapter.release_connection()
except Exception as exc:
fire_event(NodeConnectionReleaseError(node_name=self.node.name, exc=exc))
fire_event(
NodeConnectionReleaseError(
node_name=self.node.name, exc=str(exc), exc_info=traceback.format_exc()
)
)
return str(exc)
return None

View File

@@ -12,7 +12,7 @@ from dbt.events.types import (
DepsNoPackagesFound,
DepsStartPackageInstall,
DepsUpdateAvailable,
DepsUTD,
DepsUpToDate,
DepsInstallInfo,
DepsListSubdirectory,
DepsNotifyUpdatesAvailable,
@@ -72,7 +72,7 @@ class DepsTask(BaseTask):
packages_to_upgrade.append(package_name)
fire_event(DepsUpdateAvailable(version_latest=version_latest))
else:
fire_event(DepsUTD())
fire_event(DepsUpToDate())
if package.get_subdirectory():
fire_event(DepsListSubdirectory(subdirectory=package.get_subdirectory()))

View File

@@ -285,6 +285,7 @@ class GenerateTask(CompileTask):
errors=errors,
)
@classmethod
def interpret_results(self, results: Optional[CatalogResults]) -> bool:
if results is None:
return False

View File

@@ -28,7 +28,7 @@ from dbt.events.types import (
SettingUpProfile,
InvalidProfileTemplateYAML,
ProjectNameAlreadyExists,
GetAddendum,
ProjectCreated,
)
from dbt.include.starter_project import PACKAGE_PATH as starter_project_directory
@@ -41,22 +41,6 @@ SLACK_URL = "https://community.getdbt.com/"
# This file is not needed for the starter project but exists for finding the resource path
IGNORE_FILES = ["__init__.py", "__pycache__"]
ON_COMPLETE_MESSAGE = """
Your new dbt project "{project_name}" was created!
For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:
{docs_url}
One more thing:
Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:
{slack_url}
Happy modeling!
"""
# https://click.palletsprojects.com/en/8.0.x/api/#types
# click v7.0 has UNPROCESSED, STRING, INT, FLOAT, BOOL, and UUID available.
@@ -114,17 +98,6 @@ class InitTask(BaseTask):
ProfileWrittenWithSample(name=profile_name, path=str(profiles_filepath))
)
def get_addendum(self, project_name: str, profiles_path: str) -> str:
open_cmd = dbt.clients.system.open_dir_cmd()
return ON_COMPLETE_MESSAGE.format(
open_cmd=open_cmd,
project_name=project_name,
profiles_path=profiles_path,
docs_url=DOCS_URL,
slack_url=SLACK_URL,
)
def generate_target_from_input(self, profile_template: dict, target: dict = {}) -> dict:
"""Generate a target configuration from profile_template and user input."""
profile_template_local = copy.deepcopy(profile_template)
@@ -324,5 +297,10 @@ class InitTask(BaseTask):
return
adapter = self.ask_for_adapter_choice()
self.create_profile_from_target(adapter, profile_name=project_name)
msg = self.get_addendum(project_name, profiles_dir)
fire_event(GetAddendum(msg=msg))
fire_event(
ProjectCreated(
project_name=project_name,
docs_url=DOCS_URL,
slack_url=SLACK_URL,
)
)

View File

@@ -17,11 +17,11 @@ from dbt.events.types import (
ManifestLoaded,
ManifestChecked,
ManifestFlatGraphBuilt,
ParsingStart,
ParsingCompiling,
ParsingWritingManifest,
ParsingDone,
ReportPerformancePath,
ParseCmdStart,
ParseCmdCompiling,
ParseCmdWritingManifest,
ParseCmdDone,
ParseCmdPerfInfoPath,
)
from dbt.events.functions import fire_event
from dbt.graph import Graph
@@ -50,7 +50,7 @@ class ParseTask(ConfiguredTask):
def write_perf_info(self):
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self.loader._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
fire_event(ReportPerformancePath(path=path))
fire_event(ParseCmdPerfInfoPath(path=path))
# This method takes code that normally exists in other files
# and pulls it in here, to simplify logging and make the
@@ -89,14 +89,14 @@ class ParseTask(ConfiguredTask):
self.graph = compiler.compile(self.manifest)
def run(self):
fire_event(ParsingStart())
fire_event(ParseCmdStart())
self.get_full_manifest()
if self.args.compile:
fire_event(ParsingCompiling())
fire_event(ParseCmdCompiling())
self.compile_manifest()
if self.args.write_manifest:
fire_event(ParsingWritingManifest())
fire_event(ParseCmdWritingManifest())
self.write_manifest()
self.write_perf_info()
fire_event(ParsingDone())
fire_event(ParseCmdDone())

View File

@@ -29,7 +29,7 @@ from dbt.exceptions import (
)
from dbt.events.functions import fire_event, get_invocation_id
from dbt.events.types import (
DatabaseErrorRunning,
DatabaseErrorRunningHook,
EmptyLine,
HooksRunning,
HookFinished,
@@ -389,7 +389,7 @@ class RunTask(CompileTask):
try:
self.run_hooks(adapter, hook_type, extra_context)
except RuntimeException:
fire_event(DatabaseErrorRunning(hook_type=hook_type.value))
fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value))
raise
def print_results_line(self, results, execution_time):

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Dict, Any
import traceback
import agate
@@ -55,12 +56,12 @@ class RunOperationTask(ManifestTask):
try:
self._run_unsafe()
except dbt.exceptions.Exception as exc:
fire_event(RunningOperationCaughtError(exc=exc))
fire_event(PrintDebugStackTrace())
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=exc))
fire_event(PrintDebugStackTrace())
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
success = False
else:
success = True

View File

@@ -35,6 +35,7 @@ from dbt.events.types import (
NodeFinished,
QueryCancelationUnsupported,
ConcurrencyLine,
EndRunResult,
)
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import Manifest
@@ -232,7 +233,7 @@ class GraphRunnableTask(ManifestTask):
NodeFinished(
node_info=runner.node.node_info,
unique_id=runner.node.unique_id,
run_result=result.to_dict(),
run_result=result.to_msg(),
)
)
# `_event_status` dict is only used for logging. Make sure
@@ -471,6 +472,18 @@ class GraphRunnableTask(ManifestTask):
selected_uids = frozenset(n.unique_id for n in self._flattened_nodes)
result = self.execute_with_hooks(selected_uids)
# We have other result types here too, including FreshnessResult
if isinstance(result, RunExecutionResult):
result_msgs = [result.to_msg() for result in result.results]
fire_event(
EndRunResult(
results=result_msgs,
generated_at=result.generated_at,
elapsed_time=result.elapsed_time,
success=GraphRunnableTask.interpret_results(result.results),
)
)
if flags.WRITE_JSON:
self.write_manifest()
self.write_result(result)
@@ -478,7 +491,8 @@ class GraphRunnableTask(ManifestTask):
self.task_end_messages(result.results)
return result
def interpret_results(self, results):
@classmethod
def interpret_results(cls, results):
if results is None:
return False

View File

@@ -1,6 +1,7 @@
from abc import abstractmethod
from datetime import datetime
from typing import Generic, TypeVar
import traceback
import dbt.exceptions
from dbt.contracts.sql import (
@@ -10,7 +11,7 @@ from dbt.contracts.sql import (
ResultTable,
)
from dbt.events.functions import fire_event
from dbt.events.types import SQlRunnerException
from dbt.events.types import SQLRunnerException
from dbt.task.compile import CompileRunner
@@ -22,7 +23,7 @@ class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
CompileRunner.__init__(self, config, adapter, node, node_index, num_nodes)
def handle_exception(self, e, ctx):
fire_event(SQlRunnerException(exc=e))
fire_event(SQLRunnerException(exc=str(e), exc_info=traceback.format_exc()))
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt.exceptions.RuntimeException):
e.add_node(ctx.node)

View File

@@ -94,7 +94,7 @@ class TestRunner(CompileRunner):
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
failures=result.failures,
num_failures=result.failures,
node_info=model.node_info,
)
)
@@ -105,7 +105,7 @@ class TestRunner(CompileRunner):
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
failures=result.failures,
num_failures=result.failures,
node_info=model.node_info,
)
)

View File

@@ -1,4 +1,5 @@
from typing import Optional
import traceback
from dbt.clients.yaml_helper import ( # noqa:F401
yaml,
@@ -452,7 +453,7 @@ def initialize_tracking(cookie_dir):
try:
active_user.initialize()
except Exception:
fire_event(TrackingInitializeFailure())
fire_event(TrackingInitializeFailure(exc_info=traceback.format_exc()))
active_user = User(None)

View File

@@ -619,7 +619,7 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
ReadError,
) as exc:
if attempt <= max_attempts - 1:
fire_event(RecordRetryException(exc=exc))
fire_event(RecordRetryException(exc=str(exc)))
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
time.sleep(1)
return _connection_exception_retry(fn, max_attempts, attempt + 1)
@@ -666,3 +666,20 @@ def args_to_dict(args):
var_args[key] = str(var_args[key])
dict_args[key] = var_args[key]
return dict_args
# This is useful for proto generated classes in particular, since
# the default for protobuf for strings is the empty string, so
# Optional[str] types don't work for generated Python classes.
def cast_to_str(string: Optional[str]) -> str:
if string is None:
return ""
else:
return string
def cast_to_int(integer: Optional[int]) -> int:
if integer is None:
return 0
else:
return integer

View File

@@ -1,3 +1,4 @@
betterproto[compiler]
black==22.6.0
bumpversion
flake8
@@ -7,6 +8,7 @@ ipdb
mypy==0.971
pip-tools
pre-commit
protobuf
pytest
pytest-cov
pytest-csv
@@ -22,6 +24,7 @@ types-PyYAML
types-freezegun
types-Jinja2
types-mock
types-protobuf
types-python-dateutil
types-pytz
types-requests

View File

@@ -8,6 +8,13 @@ import dbt.exceptions
from dbt.version import __version__ as dbt_version
from dbt.logger import log_manager
# Temporary for while we're converting logs to a different format
def get_log_msg(log):
if "msg" in log:
return log["msg"]
if "info" in log:
if "msg" in log["info"]:
return log["info"]["msg"]
class TestDefaultQueryComments(DBTIntegrationTest):
def matches_comment(self, msg) -> bool:
@@ -74,7 +81,7 @@ class TestDefaultQueryComments(DBTIntegrationTest):
def query_comment(self, model_name, log):
# N.B: a temporary string replacement regex to strip the HH:MM:SS from the log line if present.
# TODO: make this go away when structured logging is stable
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg'])
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", get_log_msg(log))
prefix = 'On {}: '.format(model_name)
if log_msg.startswith(prefix):
msg = log_msg[len(prefix):]
@@ -92,7 +99,7 @@ class TestDefaultQueryComments(DBTIntegrationTest):
if msg is not None and self.matches_comment(msg):
seen = True
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs)))
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(get_log_msg(logline) for logline in logs)))
@use_profile('postgres')
def test_postgres_comments(self):

View File

@@ -1,607 +0,0 @@
from dbt.adapters.reference_keys import _ReferenceKey
from dbt.events.test_types import UnitTestInfo
from dbt.events import AdapterLogger
from dbt.events.functions import event_to_serializable_dict
from dbt.events.base_types import NodeInfo
from dbt.events.types import *
from dbt.events.test_types import *
# from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
from dbt.events.base_types import Event, TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel
from importlib import reload
import dbt.events.functions as event_funcs
import dbt.flags as flags
from dbt.helper_types import Lazy
import inspect
import json
from unittest import TestCase
from dbt.contracts.graph.parsed import (
ParsedModelNode, NodeConfig, DependsOn
)
from dbt.contracts.files import FileHash
from mashumaro.types import SerializableType
from typing import Generic, TypeVar
# takes in a class and finds any subclasses for it
def get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
# If the test breaks because of abcs this list might have to be updated.
if subclass in [NodeInfo, AdapterEventBase, TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel]:
continue
all_subclasses.append(subclass)
all_subclasses.extend(get_all_subclasses(subclass))
return set(all_subclasses)
class TestAdapterLogger(TestCase):
def setUp(self):
pass
# this interface is documented for adapter maintainers to plug into
# so we should test that it at the very least doesn't explode.
def test_basic_adapter_logging_interface(self):
logger = AdapterLogger("dbt_tests")
logger.debug("debug message")
logger.info("info message")
logger.warning("warning message")
logger.error("error message")
logger.exception("exception message")
logger.critical("exception message")
self.assertTrue(True)
# python loggers allow deferring string formatting via this signature:
def test_formatting(self):
logger = AdapterLogger("dbt_tests")
# tests that it doesn't throw
logger.debug("hello {}", 'world')
# enters lower in the call stack to test that it formats correctly
event = AdapterEventDebug(name="dbt_tests", base_msg="hello {}", args=('world',))
self.assertTrue("hello world" in event.message())
# tests that it doesn't throw
logger.debug("1 2 {}", 3)
# enters lower in the call stack to test that it formats correctly
event = AdapterEventDebug(name="dbt_tests", base_msg="1 2 {}", args=(3,))
self.assertTrue("1 2 3" in event.message())
# tests that it doesn't throw
logger.debug("boop{x}boop")
# enters lower in the call stack to test that it formats correctly
# in this case it's that we didn't attempt to replace anything since there
# were no args passed after the initial message
event = AdapterEventDebug(name="dbt_tests", base_msg="boop{x}boop", args=())
self.assertTrue("boop{x}boop" in event.message())
class TestEventCodes(TestCase):
# checks to see if event codes are duplicated to keep codes singluar and clear.
# also checks that event codes follow correct namming convention ex. E001
def test_event_codes(self):
all_concrete = get_all_subclasses(Event)
all_codes = set()
for event in all_concrete:
if not inspect.isabstract(event):
# must be in the form 1 capital letter, 3 digits
self.assertTrue('^[A-Z][0-9]{3}', event.code)
# cannot have been used already
self.assertFalse(event.code in all_codes, f'{event.code} is assigned more than once. Check types.py for duplicates.')
all_codes.add(event.code)
class TestEventBuffer(TestCase):
def setUp(self) -> None:
flags.EVENT_BUFFER_SIZE = 10
reload(event_funcs)
# ensure events are populated to the buffer exactly once
def test_buffer_populates(self):
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
event1 = event_funcs.EVENT_HISTORY[-2]
self.assertTrue(
event_funcs.EVENT_HISTORY.count(event1) == 1
)
# ensure events drop from the front of the buffer when buffer maxsize is reached
def test_buffer_FIFOs(self):
event_funcs.EVENT_HISTORY.clear()
for n in range(1,(flags.EVENT_BUFFER_SIZE + 1)):
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
event_full = event_funcs.EVENT_HISTORY[-1]
self.assertEqual(event_full.code, 'Z048')
self.assertTrue(
event_funcs.EVENT_HISTORY.count(event_full) == 1
)
self.assertTrue(
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
)
def MockNode():
return ParsedModelNode(
alias='model_one',
name='model_one',
database='dbt',
schema='analytics',
resource_type=NodeType.Model,
unique_id='model.root.model_one',
fqn=['root', 'model_one'],
package_name='root',
original_file_path='model_one.sql',
root_path='/usr/src/app',
refs=[],
sources=[],
depends_on=DependsOn(),
config=NodeConfig.from_dict({
'enabled': True,
'materialized': 'view',
'persist_docs': {},
'post-hook': [],
'pre-hook': [],
'vars': {},
'quoting': {},
'column_types': {},
'tags': [],
}),
tags=[],
path='model_one.sql',
raw_code='',
description='',
columns={},
checksum=FileHash.from_contents(''),
)
sample_values = [
MainReportVersion(v=''),
MainKeyboardInterrupt(),
MainEncounteredError(e=BaseException('')),
MainStackTrace(stack_trace=''),
MainTrackingUserState(user_state=''),
ParsingStart(),
ParsingCompiling(),
ParsingWritingManifest(),
ParsingDone(),
ManifestDependenciesLoaded(),
ManifestLoaderCreated(),
ManifestLoaded(),
ManifestChecked(),
ManifestFlatGraphBuilt(),
ReportPerformancePath(path=""),
GitSparseCheckoutSubdirectory(subdir=""),
GitProgressCheckoutRevision(revision=""),
GitProgressUpdatingExistingDependency(dir=""),
GitProgressPullingNewDependency(dir=""),
GitNothingToDo(sha=""),
GitProgressUpdatedCheckoutRange(start_sha="", end_sha=""),
GitProgressCheckedOutAt(end_sha=""),
SystemErrorRetrievingModTime(path=""),
SystemCouldNotWrite(path="", reason="", exc=""),
SystemExecutingCmd(cmd=[""]),
SystemStdOutMsg(bmsg=b""),
SystemStdErrMsg(bmsg=b""),
SelectorReportInvalidSelector(
valid_selectors="", spec_method="", raw_spec=""
),
MacroEventInfo(msg=""),
MacroEventDebug(msg=""),
NewConnection(conn_type="", conn_name=""),
ConnectionReused(conn_name=""),
ConnectionLeftOpen(conn_name=""),
ConnectionClosed(conn_name=""),
RollbackFailed(conn_name=""),
ConnectionClosed2(conn_name=""),
ConnectionLeftOpen2(conn_name=""),
Rollback(conn_name=""),
CacheMiss(conn_name="", database="", schema=""),
ListRelations(database="", schema="", relations=[]),
ConnectionUsed(conn_type="", conn_name=""),
SQLQuery(conn_name="", sql=""),
SQLQueryStatus(status="", elapsed=0.1),
CodeExecution(conn_name="", code_content=""),
CodeExecutionStatus(status="", elapsed=0.1),
SQLCommit(conn_name=""),
ColTypeChange(
orig_type="", new_type="",
table=_ReferenceKey(database="", schema="", identifier=""),
),
SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier="")),
SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier="")),
UncachedRelation(
dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""),
),
AddLink(
dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""),
),
AddRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
DropCascade(
dropped=_ReferenceKey(database="", schema="", identifier=""),
consequences={_ReferenceKey(database="", schema="", identifier="")},
),
UpdateReference(
old_key=_ReferenceKey(database="", schema="", identifier=""),
new_key=_ReferenceKey(database="", schema="", identifier=""),
cached_key=_ReferenceKey(database="", schema="", identifier=""),
),
TemporaryRelation(key=_ReferenceKey(database="", schema="", identifier="")),
RenameSchema(
old_key=_ReferenceKey(database="", schema="", identifier=""),
new_key=_ReferenceKey(database="", schema="", identifier="")
),
DumpBeforeAddGraph(Lazy.defer(lambda: dict())),
DumpAfterAddGraph(Lazy.defer(lambda: dict())),
DumpBeforeRenameSchema(Lazy.defer(lambda: dict())),
DumpAfterRenameSchema(Lazy.defer(lambda: dict())),
AdapterImportError(exc=ModuleNotFoundError()),
PluginLoadError(),
SystemReportReturnCode(returncode=0),
NewConnectionOpening(connection_state=''),
TimingInfoCollected(),
MergedFromState(nbr_merged=0, sample=[]),
MissingProfileTarget(profile_name='', target_name=''),
InvalidVarsYAML(),
GenericTestFileParse(path=''),
MacroFileParse(path=''),
PartialParsingFullReparseBecauseOfError(),
PartialParsingFile(file_dict={}),
PartialParsingExceptionFile(file=''),
PartialParsingException(exc_info={}),
PartialParsingSkipParsing(),
PartialParsingMacroChangeStartFullParse(),
ManifestWrongMetadataVersion(version=''),
PartialParsingVersionMismatch(saved_version='', current_version=''),
PartialParsingFailedBecauseConfigChange(),
PartialParsingFailedBecauseProfileChange(),
PartialParsingFailedBecauseNewProjectDependency(),
PartialParsingFailedBecauseHashChanged(),
PartialParsingDeletedMetric(id=''),
ParsedFileLoadFailed(path='', exc=''),
PartialParseSaveFileNotFound(),
StaticParserCausedJinjaRendering(path=''),
UsingExperimentalParser(path=''),
SampleFullJinjaRendering(path=''),
StaticParserFallbackJinjaRendering(path=''),
StaticParsingMacroOverrideDetected(path=''),
StaticParserSuccess(path=''),
StaticParserFailure(path=''),
ExperimentalParserSuccess(path=''),
ExperimentalParserFailure(path=''),
PartialParsingEnabled(deleted=0, added=0, changed=0),
PartialParsingAddedFile(file_id=''),
PartialParsingDeletedFile(file_id=''),
PartialParsingUpdatedFile(file_id=''),
PartialParsingNodeMissingInSourceFile(source_file=''),
PartialParsingMissingNodes(file_id=''),
PartialParsingChildMapMissingUniqueID(unique_id=''),
PartialParsingUpdateSchemaFile(file_id=''),
PartialParsingDeletedSource(unique_id=''),
PartialParsingDeletedExposure(unique_id=''),
InvalidDisabledSourceInTestNode(msg=''),
InvalidRefInTestNode(msg=''),
RunningOperationCaughtError(exc=''),
RunningOperationUncaughtError(exc=Exception('')),
DbtProjectError(),
DbtProjectErrorException(exc=Exception('')),
DbtProfileError(),
DbtProfileErrorException(exc=Exception('')),
ProfileListTitle(),
ListSingleProfile(profile=''),
NoDefinedProfiles(),
ProfileHelpMessage(),
CatchableExceptionOnRun(exc=Exception('')),
InternalExceptionOnRun(build_path='', exc=Exception('')),
GenericExceptionOnRun(build_path='', unique_id='', exc=''),
NodeConnectionReleaseError(node_name='', exc=Exception('')),
CheckCleanPath(path=''),
ConfirmCleanPath(path=''),
ProtectedCleanPath(path=''),
FinishedCleanPaths(),
OpenCommand(open_cmd='', profiles_dir=''),
DepsNoPackagesFound(),
DepsStartPackageInstall(package_name=''),
DepsInstallInfo(version_name=''),
DepsUpdateAvailable(version_latest=''),
DepsListSubdirectory(subdirectory=''),
DepsNotifyUpdatesAvailable(packages=[]),
DatabaseErrorRunning(hook_type=''),
EmptyLine(),
HooksRunning(num_hooks=0, hook_type=''),
HookFinished(stat_line='', execution='', execution_time=0),
WriteCatalogFailure(num_exceptions=0),
CatalogWritten(path=''),
CannotGenerateDocs(),
BuildingCatalog(),
CompileComplete(),
FreshnessCheckComplete(),
ServingDocsPort(address='', port=0),
ServingDocsAccessInfo(port=''),
ServingDocsExitInfo(),
SeedHeader(header=''),
SeedHeaderSeparator(len_header=0),
RunResultWarning(resource_type='', node_name='', path=''),
RunResultFailure(resource_type='', node_name='', path=''),
StatsLine(stats={'pass':0, 'warn':0, 'error':0, 'skip':0, 'total':0}),
RunResultError(msg=''),
RunResultErrorNoMessage(status=''),
SQLCompiledPath(path=''),
CheckNodeTestFailure(relation_name=''),
FirstRunResultError(msg=''),
AfterFirstRunResultError(msg=''),
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
PrintStartLine(description='', index=0, total=0, node_info={}),
PrintHookStartLine(statement='', index=0, total=0, node_info={}),
PrintHookEndLine(statement='', status='', index=0, total=0, execution_time=0, node_info={}),
SkippingDetails(resource_type='', schema='', node_name='', index=0, total=0, node_info={}),
PrintErrorTestResult(name='', index=0, num_models=0, execution_time=0, node_info={}),
PrintPassTestResult(name='', index=0, num_models=0, execution_time=0, node_info={}),
PrintWarnTestResult(name='', index=0, num_models=0, execution_time=0, failures=0, node_info={}),
PrintFailureTestResult(name='', index=0, num_models=0, execution_time=0, failures=0, node_info={}),
PrintSkipBecauseError(schema='', relation='', index=0, total=0),
PrintModelErrorResultLine(description='', status='', index=0, total=0, execution_time=0, node_info={}),
PrintModelResultLine(description='', status='', index=0, total=0, execution_time=0, node_info={}),
PrintSnapshotErrorResultLine(status='',
description='',
cfg={},
index=0,
total=0,
execution_time=0,
node_info={}),
PrintSnapshotResultLine(status='', description='', cfg={}, index=0, total=0, execution_time=0, node_info={}),
PrintSeedErrorResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='', node_info={}),
PrintSeedResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='', node_info={}),
PrintHookEndErrorLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
PrintHookEndErrorStaleLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
PrintHookEndWarnLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
PrintHookEndPassLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
PrintCancelLine(conn_name=''),
DefaultSelector(name=''),
NodeStart(unique_id='', node_info={}),
NodeCompiling(unique_id='', node_info={}),
NodeExecuting(unique_id='', node_info={}),
NodeFinished(unique_id='', node_info={}, run_result={}),
QueryCancelationUnsupported(type=''),
ConcurrencyLine(num_threads=0, target_name=''),
StarterProjectPath(dir=''),
ConfigFolderDirectory(dir=''),
NoSampleProfileFound(adapter=''),
ProfileWrittenWithSample(name='', path=''),
ProfileWrittenWithTargetTemplateYAML(name='', path=''),
ProfileWrittenWithProjectTemplateYAML(name='', path=''),
SettingUpProfile(),
InvalidProfileTemplateYAML(),
ProjectNameAlreadyExists(name=''),
GetAddendum(msg=''),
DepsSetDownloadDirectory(path=''),
EnsureGitInstalled(),
DepsCreatingLocalSymlink(),
DepsSymlinkNotAvailable(),
FoundStats(stat_line=''),
CompilingNode(unique_id=''),
WritingInjectedSQLForNode(unique_id=''),
DisableTracking(),
SendingEvent(kwargs=''),
SendEventFailure(),
FlushEvents(),
FlushEventsFailure(),
TrackingInitializeFailure(),
RetryExternalCall(attempt=0, max=0),
GeneralWarningMsg(msg='', log_fmt=''),
GeneralWarningException(exc=Exception(''), log_fmt=''),
PartialParsingProfileEnvVarsChanged(),
AdapterEventDebug(name='', base_msg='', args=()),
AdapterEventInfo(name='', base_msg='', args=()),
AdapterEventWarning(name='', base_msg='', args=()),
AdapterEventError(name='', base_msg='', args=()),
PrintDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressMakingGETRequest(url=''),
RegistryIndexProgressMakingGETRequest(url=""),
RegistryIndexProgressGETResponse(url="", resp_code=1),
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
DepsUTD(),
PartialParsingNotEnabled(),
SQlRunnerException(exc=Exception('')),
DropRelation(dropped=_ReferenceKey(database="", schema="", identifier="")),
PartialParsingProjectEnvVarsChanged(),
RegistryProgressGETResponse(url='', resp_code=1),
IntegrationTestDebug(msg=''),
IntegrationTestInfo(msg=''),
IntegrationTestWarn(msg=''),
IntegrationTestError(msg=''),
IntegrationTestException(msg=''),
EventBufferFull(),
RecordRetryException(exc=Exception('')),
UnitTestInfo(msg=''),
]
class TestEventJSONSerialization(TestCase):
# attempts to test that every event is serializable to json.
# event types that take `Any` are not possible to test in this way since some will serialize
# just fine and others won't.
def test_all_serializable(self):
no_test = [DummyCacheEvent]
all_non_abstract_events = set(filter(lambda x: not inspect.isabstract(x) and x not in no_test, get_all_subclasses(Event)))
all_event_values_list = list(map(lambda x: x.__class__, sample_values))
diff = all_non_abstract_events.difference(set(all_event_values_list))
self.assertFalse(diff, f"test is missing concrete values in `sample_values`. Please add the values for the aforementioned event classes")
# make sure everything in the list is a value not a type
for event in sample_values:
self.assertFalse(type(event) == type)
# if we have everything we need to test, try to serialize everything
for event in sample_values:
d = event_to_serializable_dict(event)
try:
json.dumps(d)
except TypeError as e:
raise Exception(f"{event} is not serializable to json. Originating exception: {e}")
T = TypeVar('T')
@dataclass
class Counter(Generic[T], SerializableType):
dummy_val: T
count: int = 0
def next(self) -> T:
self.count = self.count + 1
return self.dummy_val
# mashumaro serializer
def _serialize() -> Dict[str, int]:
return {'count': count}
@dataclass
class DummyCacheEvent(InfoLevel, Cache, SerializableType):
code = 'X999'
counter: Counter
def message(self) -> str:
return f"state: {self.counter.next()}"
# mashumaro serializer
def _serialize() -> str:
return "DummyCacheEvent"
# tests that if a cache event uses lazy evaluation for its message
# creation, the evaluation will not be forced for cache events when
# running without `--log-cache-events`.
def skip_cache_event_message_rendering(x: TestCase):
# a dummy event that extends `Cache`
e = DummyCacheEvent(Counter("some_state"))
# counter of zero means this potentially expensive function
# (emulating dump_graph) has never been called
x.assertEqual(e.counter.count, 0)
# call fire_event
event_funcs.fire_event(e)
# assert that the expensive function has STILL not been called
x.assertEqual(e.counter.count, 0)
# this test checks that every subclass of `Cache` uses the same lazy evaluation
# strategy. This ensures that potentially expensive cache event values are not
# built unless they are needed for logging purposes. It also checks that these
# potentially expensive values are cached, and not evaluated more than once.
def all_cache_events_are_lazy(x):
cache_events = get_all_subclasses(Cache)
matching_classes = []
for clazz in cache_events:
# this body is only testing subclasses of `Cache` that take a param called "dump"
# initialize the counter to return a dictionary (emulating dump_graph)
counter = Counter(dict())
# assert that the counter starts at 0
x.assertEqual(counter.count, 0)
# try to create the cache event to use this counter type
# fails for cache events that don't have a "dump" param
try:
clazz()
except TypeError as e:
print(clazz)
# hack that roughly detects attribute names without an instance of the class
if 'dump' in str(e):
matching_classes.append(clazz)
# make the class. If this throws, maybe your class didn't use Lazy when it should have
e = clazz(dump = Lazy.defer(lambda: counter.next()))
# assert that initializing the event with the counter
# did not evaluate the lazy value
x.assertEqual(counter.count, 0)
# log an event which should trigger evaluation and up
# the counter
event_funcs.fire_event(e)
# assert that the counter increased
x.assertEqual(counter.count, 1)
# fire another event which should reuse the previous value
# not evaluate the function again
event_funcs.fire_event(e)
# assert that the counter did not increase
x.assertEqual(counter.count, 1)
# if the init function doesn't require something named "dump"
# we can just continue
else:
pass
# other exceptions are issues and should be thrown
except Exception as e:
raise e
# we should have exactly 4 matching classes (raise this threshold if we add more)
x.assertEqual(len(matching_classes), 4, f"matching classes:\n{len(matching_classes)}: {matching_classes}")
class SkipsRenderingCacheEventsTEXT(TestCase):
def setUp(self):
flags.LOG_FORMAT = 'text'
def test_skip_cache_event_message_rendering_TEXT(self):
skip_cache_event_message_rendering(self)
class SkipsRenderingCacheEventsJSON(TestCase):
def setUp(self):
flags.LOG_FORMAT = 'json'
def tearDown(self):
flags.LOG_FORMAT = 'text'
def test_skip_cache_event_message_rendering_JSON(self):
skip_cache_event_message_rendering(self)
class TestLazyMemoizationInCacheEventsTEXT(TestCase):
def setUp(self):
flags.LOG_FORMAT = 'text'
flags.LOG_CACHE_EVENTS = True
def tearDown(self):
flags.LOG_CACHE_EVENTS = False
def test_all_cache_events_are_lazy_TEXT(self):
all_cache_events_are_lazy(self)
class TestLazyMemoizationInCacheEventsJSON(TestCase):
def setUp(self):
flags.LOG_FORMAT = 'json'
flags.LOG_CACHE_EVENTS = True
def tearDown(self):
flags.LOG_FORMAT = 'text'
flags.LOG_CACHE_EVENTS = False
def test_all_cache_events_are_lazy_JSON(self):
all_cache_events_are_lazy(self)

511
tests/unit/test_events.py Normal file
View File

@@ -0,0 +1,511 @@
# flake8: noqa
from dbt.events.test_types import UnitTestInfo
from dbt.events import AdapterLogger
from dbt.events.functions import event_to_json, LOG_VERSION
from dbt.events.types import *
from dbt.events.test_types import *
from dbt.events.base_types import (
BaseEvent,
DebugLevel,
WarnLevel,
InfoLevel,
ErrorLevel,
TestLevel,
)
from dbt.events.proto_types import NodeInfo, RunResultMsg, ReferenceKeyMsg
from importlib import reload
import dbt.events.functions as event_funcs
import dbt.flags as flags
import inspect
import json
from dbt.contracts.graph.parsed import ParsedModelNode, NodeConfig, DependsOn
from dbt.contracts.files import FileHash
from mashumaro.types import SerializableType
from typing import Generic, TypeVar, Dict
import re
# takes in a class and finds any subclasses for it
def get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
# If the test breaks because of abcs this list might have to be updated.
if subclass in [TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel]:
continue
all_subclasses.append(subclass)
all_subclasses.extend(get_all_subclasses(subclass))
return set(all_subclasses)
class TestAdapterLogger:
# this interface is documented for adapter maintainers to plug into
# so we should test that it at the very least doesn't explode.
def test_basic_adapter_logging_interface(self):
logger = AdapterLogger("dbt_tests")
logger.debug("debug message")
logger.info("info message")
logger.warning("warning message")
logger.error("error message")
logger.exception("exception message")
logger.critical("exception message")
# python loggers allow deferring string formatting via this signature:
def test_formatting(self):
logger = AdapterLogger("dbt_tests")
# tests that it doesn't throw
logger.debug("hello {}", "world")
# enters lower in the call stack to test that it formats correctly
event = AdapterEventDebug(name="dbt_tests", base_msg="hello {}", args=("world",))
assert "hello world" in event.message()
# tests that it doesn't throw
logger.debug("1 2 {}", 3)
# enters lower in the call stack to test that it formats correctly
event = AdapterEventDebug(name="dbt_tests", base_msg="1 2 {}", args=(3,))
assert "1 2 3" in event.message()
# tests that it doesn't throw
logger.debug("boop{x}boop")
# enters lower in the call stack to test that it formats correctly
# in this case it's that we didn't attempt to replace anything since there
# were no args passed after the initial message
event = AdapterEventDebug(name="dbt_tests", base_msg="boop{x}boop", args=())
assert "boop{x}boop" in event.message()
class TestEventCodes:
# checks to see if event codes are duplicated to keep codes singluar and clear.
# also checks that event codes follow correct namming convention ex. E001
def test_event_codes(self):
all_concrete = get_all_subclasses(BaseEvent)
all_codes = set()
for event in all_concrete:
if not inspect.isabstract(event):
# must be in the form 1 capital letter, 3 digits
assert re.match("^[A-Z][0-9]{3}", event.code)
# cannot have been used already
assert (
event.info.code not in all_codes
), f"{event.code} is assigned more than once. Check types.py for duplicates."
all_codes.add(event.info.code)
class TestEventBuffer:
def setUp(self) -> None:
flags.EVENT_BUFFER_SIZE = 10
reload(event_funcs)
# ensure events are populated to the buffer exactly once
def test_buffer_populates(self):
self.setUp()
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
event1 = event_funcs.EVENT_HISTORY[-2]
assert event_funcs.EVENT_HISTORY.count(event1) == 1
# ensure events drop from the front of the buffer when buffer maxsize is reached
def test_buffer_FIFOs(self):
self.setUp()
event_funcs.EVENT_HISTORY.clear()
for n in range(1, (flags.EVENT_BUFFER_SIZE + 1)):
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
event_full = event_funcs.EVENT_HISTORY[-1]
assert event_full.info.code == "Z048"
assert event_funcs.EVENT_HISTORY.count(event_full) == 1
assert event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg="Test Event 1")) == 0
def MockNode():
return ParsedModelNode(
alias="model_one",
name="model_one",
database="dbt",
schema="analytics",
resource_type=NodeType.Model,
unique_id="model.root.model_one",
fqn=["root", "model_one"],
package_name="root",
original_file_path="model_one.sql",
root_path="/usr/src/app",
refs=[],
sources=[],
depends_on=DependsOn(),
config=NodeConfig.from_dict(
{
"enabled": True,
"materialized": "view",
"persist_docs": {},
"post-hook": [],
"pre-hook": [],
"vars": {},
"quoting": {},
"column_types": {},
"tags": [],
}
),
tags=[],
path="model_one.sql",
raw_code="",
description="",
columns={},
checksum=FileHash.from_contents(""),
)
sample_values = [
MainReportVersion(version="", log_version=LOG_VERSION),
MainKeyboardInterrupt(),
MainEncounteredError(exc=""),
MainStackTrace(stack_trace=""),
MainTrackingUserState(user_state=""),
ParseCmdStart(),
ParseCmdCompiling(),
ParseCmdWritingManifest(),
ParseCmdDone(),
ManifestDependenciesLoaded(),
ManifestLoaderCreated(),
ManifestLoaded(),
ManifestChecked(),
ManifestFlatGraphBuilt(),
ParseCmdPerfInfoPath(path=""),
GitSparseCheckoutSubdirectory(subdir=""),
GitProgressCheckoutRevision(revision=""),
GitProgressUpdatingExistingDependency(dir=""),
GitProgressPullingNewDependency(dir=""),
GitNothingToDo(sha=""),
GitProgressUpdatedCheckoutRange(start_sha="", end_sha=""),
GitProgressCheckedOutAt(end_sha=""),
SystemErrorRetrievingModTime(path=""),
SystemCouldNotWrite(path="", reason="", exc=""),
SystemExecutingCmd(cmd=[""]),
SystemStdOutMsg(bmsg=b""),
SystemStdErrMsg(bmsg=b""),
SelectorReportInvalidSelector(valid_selectors="", spec_method="", raw_spec=""),
MacroEventInfo(msg=""),
MacroEventDebug(msg=""),
NewConnection(conn_type="", conn_name=""),
ConnectionReused(conn_name=""),
ConnectionLeftOpen(conn_name=""),
ConnectionClosed(conn_name=""),
RollbackFailed(conn_name=""),
ConnectionClosed2(conn_name=""),
ConnectionLeftOpen2(conn_name=""),
Rollback(conn_name=""),
CacheMiss(conn_name="", database="", schema=""),
ListRelations(database="", schema="", relations=[]),
ConnectionUsed(conn_type="", conn_name=""),
SQLQuery(conn_name="", sql=""),
SQLQueryStatus(status="", elapsed=0.1),
CodeExecution(conn_name="", code_content=""),
CodeExecutionStatus(status="", elapsed=0.1),
SQLCommit(conn_name=""),
ColTypeChange(
orig_type="",
new_type="",
table=ReferenceKeyMsg(database="", schema="", identifier=""),
),
SchemaCreation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
SchemaDrop(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
UncachedRelation(
dep_key=ReferenceKeyMsg(database="", schema="", identifier=""),
ref_key=ReferenceKeyMsg(database="", schema="", identifier=""),
),
AddLink(
dep_key=ReferenceKeyMsg(database="", schema="", identifier=""),
ref_key=ReferenceKeyMsg(database="", schema="", identifier=""),
),
AddRelation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
DropMissingRelation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
DropCascade(
dropped=ReferenceKeyMsg(database="", schema="", identifier=""),
consequences=[ReferenceKeyMsg(database="", schema="", identifier="")],
),
UpdateReference(
old_key=ReferenceKeyMsg(database="", schema="", identifier=""),
new_key=ReferenceKeyMsg(database="", schema="", identifier=""),
cached_key=ReferenceKeyMsg(database="", schema="", identifier=""),
),
TemporaryRelation(key=ReferenceKeyMsg(database="", schema="", identifier="")),
RenameSchema(
old_key=ReferenceKeyMsg(database="", schema="", identifier=""),
new_key=ReferenceKeyMsg(database="", schema="", identifier=""),
),
DumpBeforeAddGraph(dump=dict()),
DumpAfterAddGraph(dump=dict()),
DumpBeforeRenameSchema(dump=dict()),
DumpAfterRenameSchema(dump=dict()),
AdapterImportError(exc=""),
PluginLoadError(),
SystemReportReturnCode(returncode=0),
NewConnectionOpening(connection_state=""),
TimingInfoCollected(),
MergedFromState(num_merged=0, sample=[]),
MissingProfileTarget(profile_name="", target_name=""),
InvalidVarsYAML(),
GenericTestFileParse(path=""),
MacroFileParse(path=""),
PartialParsingFullReparseBecauseOfError(),
PartialParsingFile(file_id=""),
PartialParsingExceptionFile(file=""),
PartialParsingException(exc_info={}),
PartialParsingSkipParsing(),
PartialParsingMacroChangeStartFullParse(),
ManifestWrongMetadataVersion(version=""),
PartialParsingVersionMismatch(saved_version="", current_version=""),
PartialParsingFailedBecauseConfigChange(),
PartialParsingFailedBecauseProfileChange(),
PartialParsingFailedBecauseNewProjectDependency(),
PartialParsingFailedBecauseHashChanged(),
PartialParsingDeletedMetric(unique_id=""),
ParsedFileLoadFailed(path="", exc=""),
PartialParseSaveFileNotFound(),
StaticParserCausedJinjaRendering(path=""),
UsingExperimentalParser(path=""),
SampleFullJinjaRendering(path=""),
StaticParserFallbackJinjaRendering(path=""),
StaticParsingMacroOverrideDetected(path=""),
StaticParserSuccess(path=""),
StaticParserFailure(path=""),
ExperimentalParserSuccess(path=""),
ExperimentalParserFailure(path=""),
PartialParsingEnabled(deleted=0, added=0, changed=0),
PartialParsingAddedFile(file_id=""),
PartialParsingDeletedFile(file_id=""),
PartialParsingUpdatedFile(file_id=""),
PartialParsingNodeMissingInSourceFile(file_id=""),
PartialParsingMissingNodes(file_id=""),
PartialParsingChildMapMissingUniqueID(unique_id=""),
PartialParsingUpdateSchemaFile(file_id=""),
PartialParsingDeletedSource(unique_id=""),
PartialParsingDeletedExposure(unique_id=""),
InvalidDisabledSourceInTestNode(msg=""),
InvalidRefInTestNode(msg=""),
RunningOperationCaughtError(exc=""),
RunningOperationUncaughtError(exc=""),
DbtProjectError(),
DbtProjectErrorException(exc=""),
DbtProfileError(),
DbtProfileErrorException(exc=""),
ProfileListTitle(),
ListSingleProfile(profile=""),
NoDefinedProfiles(),
ProfileHelpMessage(),
CatchableExceptionOnRun(exc=""),
InternalExceptionOnRun(build_path="", exc=""),
GenericExceptionOnRun(build_path="", unique_id="", exc=""),
NodeConnectionReleaseError(node_name="", exc=""),
CheckCleanPath(path=""),
ConfirmCleanPath(path=""),
ProtectedCleanPath(path=""),
FinishedCleanPaths(),
OpenCommand(open_cmd="", profiles_dir=""),
DepsNoPackagesFound(),
DepsStartPackageInstall(package_name=""),
DepsInstallInfo(version_name=""),
DepsUpdateAvailable(version_latest=""),
DepsListSubdirectory(subdirectory=""),
DepsNotifyUpdatesAvailable(packages=[]),
DatabaseErrorRunningHook(hook_type=""),
EmptyLine(),
HooksRunning(num_hooks=0, hook_type=""),
HookFinished(stat_line="", execution="", execution_time=0),
WriteCatalogFailure(num_exceptions=0),
CatalogWritten(path=""),
CannotGenerateDocs(),
BuildingCatalog(),
CompileComplete(),
FreshnessCheckComplete(),
ServingDocsPort(address="", port=0),
ServingDocsAccessInfo(port=""),
ServingDocsExitInfo(),
SeedHeader(header=""),
SeedHeaderSeparator(len_header=0),
RunResultWarning(resource_type="", node_name="", path=""),
RunResultFailure(resource_type="", node_name="", path=""),
StatsLine(stats={"pass": 0, "warn": 0, "error": 0, "skip": 0, "total": 0}),
RunResultError(msg=""),
RunResultErrorNoMessage(status=""),
SQLCompiledPath(path=""),
CheckNodeTestFailure(relation_name=""),
FirstRunResultError(msg=""),
AfterFirstRunResultError(msg=""),
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
PrintStartLine(description="", index=0, total=0, node_info=NodeInfo()),
PrintHookStartLine(statement="", index=0, total=0, node_info=NodeInfo()),
PrintHookEndLine(
statement="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
SkippingDetails(
resource_type="", schema="", node_name="", index=0, total=0, node_info=NodeInfo()
),
PrintErrorTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
PrintPassTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
PrintWarnTestResult(
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
),
PrintFailureTestResult(
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
),
PrintSkipBecauseError(schema="", relation="", index=0, total=0),
PrintModelErrorResultLine(
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintModelResultLine(
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSnapshotErrorResultLine(
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSnapshotResultLine(
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintSeedErrorResultLine(
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
),
PrintSeedResultLine(
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
),
PrintHookEndErrorLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintHookEndErrorStaleLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintHookEndWarnLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintHookEndPassLine(
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
),
PrintCancelLine(conn_name=""),
DefaultSelector(name=""),
NodeStart(unique_id="", node_info=NodeInfo()),
NodeCompiling(unique_id="", node_info=NodeInfo()),
NodeExecuting(unique_id="", node_info=NodeInfo()),
NodeFinished(unique_id="", node_info=NodeInfo(), run_result=RunResultMsg()),
QueryCancelationUnsupported(type=""),
ConcurrencyLine(num_threads=0, target_name=""),
StarterProjectPath(dir=""),
ConfigFolderDirectory(dir=""),
NoSampleProfileFound(adapter=""),
ProfileWrittenWithSample(name="", path=""),
ProfileWrittenWithTargetTemplateYAML(name="", path=""),
ProfileWrittenWithProjectTemplateYAML(name="", path=""),
SettingUpProfile(),
InvalidProfileTemplateYAML(),
ProjectNameAlreadyExists(name=""),
ProjectCreated(project_name="", docs_url="", slack_url=""),
DepsSetDownloadDirectory(path=""),
EnsureGitInstalled(),
DepsCreatingLocalSymlink(),
DepsSymlinkNotAvailable(),
FoundStats(stat_line=""),
CompilingNode(unique_id=""),
WritingInjectedSQLForNode(unique_id=""),
DisableTracking(),
SendingEvent(kwargs=""),
SendEventFailure(),
FlushEvents(),
FlushEventsFailure(),
TrackingInitializeFailure(),
RetryExternalCall(attempt=0, max=0),
GeneralWarningMsg(msg="", log_fmt=""),
GeneralWarningException(exc="", log_fmt=""),
PartialParsingProfileEnvVarsChanged(),
AdapterEventDebug(name="", base_msg="", args=()),
AdapterEventInfo(name="", base_msg="", args=()),
AdapterEventWarning(name="", base_msg="", args=()),
AdapterEventError(name="", base_msg="", args=()),
PrintDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressGETRequest(url=""),
RegistryIndexProgressGETRequest(url=""),
RegistryIndexProgressGETResponse(url="", resp_code=1),
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
DepsUpToDate(),
PartialParsingNotEnabled(),
SQLRunnerException(exc=""),
DropRelation(dropped=ReferenceKeyMsg(database="", schema="", identifier="")),
PartialParsingProjectEnvVarsChanged(),
RegistryProgressGETResponse(url="", resp_code=1),
IntegrationTestDebug(msg=""),
IntegrationTestInfo(msg=""),
IntegrationTestWarn(msg=""),
IntegrationTestError(msg=""),
IntegrationTestException(msg=""),
EventBufferFull(),
RecordRetryException(exc=""),
UnitTestInfo(msg=""),
]
class TestEventJSONSerialization:
# attempts to test that every event is serializable to json.
# event types that take `Any` are not possible to test in this way since some will serialize
# just fine and others won't.
def test_all_serializable(self):
no_test = [DummyCacheEvent]
all_non_abstract_events = set(
filter(
lambda x: not inspect.isabstract(x) and x not in no_test,
get_all_subclasses(BaseEvent),
)
)
all_event_values_list = list(map(lambda x: x.__class__, sample_values))
diff = all_non_abstract_events.difference(set(all_event_values_list))
assert (
not diff
), f"test is missing concrete values in `sample_values`. Please add the values for the aforementioned event classes"
# make sure everything in the list is a value not a type
for event in sample_values:
assert type(event) != type
# if we have everything we need to test, try to serialize everything
for event in sample_values:
event_dict = event.to_dict()
try:
event_json = event_to_json(event)
except Exception as e:
raise Exception(f"{event} is not serializable to json. Originating exception: {e}")
T = TypeVar("T")
@dataclass
class Counter(Generic[T], SerializableType):
dummy_val: T
count: int = 0
def next(self) -> T:
self.count = self.count + 1
return self.dummy_val
# mashumaro serializer
def _serialize() -> Dict[str, int]:
return {"count": count}
@dataclass
class DummyCacheEvent(InfoLevel, Cache, SerializableType):
code = "X999"
counter: Counter
def message(self) -> str:
return f"state: {self.counter.next()}"
# mashumaro serializer
def _serialize() -> str:
return "DummyCacheEvent"

View File

@@ -0,0 +1,99 @@
import sys
from dbt.events.types import (
MainReportVersion,
MainReportArgs,
RollbackFailed,
MainEncounteredError,
PluginLoadError,
PrintStartLine,
)
from dbt.events.functions import event_to_dict, LOG_VERSION
from dbt.events import proto_types as pl
from dbt.version import installed
info_keys = {"name", "code", "msg", "level", "invocation_id", "pid", "thread", "ts", "extra"}
def test_events():
# A001 event
event = MainReportVersion(version=str(installed), log_version=LOG_VERSION)
event_dict = event_to_dict(event)
event_json = event.to_json()
serialized = bytes(event)
assert "Running with dbt=" in str(serialized)
assert set(event_dict.keys()) == {"version", "info", "log_version"}
assert set(event_dict["info"].keys()) == info_keys
assert event_json
assert event.info.code == "A001"
# Extract EventInfo from serialized message
generic_event = pl.GenericMessage().parse(serialized)
assert generic_event.info.code == "A001"
# get the message class for the real message from the generic message
message_class = getattr(sys.modules["dbt.events.proto_types"], generic_event.info.name)
new_event = message_class().parse(serialized)
assert new_event.info.code == event.info.code
assert new_event.version == event.version
# A002 event
event = MainReportArgs(args={"one": "1", "two": "2"})
event_dict = event_to_dict(event)
event_json = event.to_json()
assert set(event_dict.keys()) == {"info", "args"}
assert set(event_dict["info"].keys()) == info_keys
assert event_json
assert event.info.code == "A002"
def test_exception_events():
event = RollbackFailed(conn_name="test", exc_info="something failed")
event_dict = event_to_dict(event)
event_json = event.to_json()
assert set(event_dict.keys()) == {"info", "conn_name", "exc_info"}
assert set(event_dict["info"].keys()) == info_keys
assert event_json
assert event.info.code == "E009"
event = PluginLoadError(exc_info="something failed")
event_dict = event_to_dict(event)
event_json = event.to_json()
assert set(event_dict.keys()) == {"info", "exc_info"}
assert set(event_dict["info"].keys()) == info_keys
assert event_json
assert event.info.code == "E036"
# This event has no "msg"/"message"
assert event.info.msg is None
# Z002 event
event = MainEncounteredError(exc="Rollback failed")
event_dict = event_to_dict(event)
event_json = event.to_json()
assert set(event_dict.keys()) == {"info", "exc"}
assert set(event_dict["info"].keys()) == info_keys
assert event_json
assert event.info.code == "Z002"
def test_node_info_events():
node_info = {
"node_path": "some_path",
"node_name": "some_name",
"unique_id": "some_id",
"resource_type": "model",
"materialized": "table",
"node_status": "started",
"node_started_at": "some_time",
"node_finished_at": "another_time",
}
event = PrintStartLine(
description="some description",
index=123,
total=111,
node_info=pl.NodeInfo(**node_info),
)
assert event
assert event.node_info.node_path == "some_path"

View File

@@ -2,3 +2,4 @@ from mashumaro.exceptions import MissingField as MissingField
from mashumaro.helper import field_options as field_options, pass_through as pass_through
from mashumaro.mixins.dict import DataClassDictMixin as DataClassDictMixin
from mashumaro.mixins.msgpack import DataClassMessagePackMixin as DataClassMessagePackMixin
from mashumaro.mixins.json import DataClassJSONMixin as DataClassJSONMixin

View File

@@ -6,7 +6,9 @@ envlist = unit,integration
description = unit testing
skip_install = true
passenv = DBT_* PYTEST_ADDOPTS
commands = {envpython} -m pytest --cov=core {posargs} test/unit
commands =
{envpython} -m pytest --cov=core {posargs} test/unit
{envpython} -m pytest --cov=core {posargs} tests/unit
deps =
-rdev-requirements.txt
-reditable-requirements.txt