mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-18 22:21:27 +00:00
Compare commits
25 Commits
enable-pos
...
ct-986-pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55b0c0953e | ||
|
|
7bcf4ddabd | ||
|
|
fd9f254e63 | ||
|
|
464a6abc16 | ||
|
|
fa20de142e | ||
|
|
c81a1f8fa7 | ||
|
|
64b891ab9a | ||
|
|
e1262d6a75 | ||
|
|
a77e75a491 | ||
|
|
8b865b599c | ||
|
|
82f2d165af | ||
|
|
0a64aae0e2 | ||
|
|
6f0106c7cd | ||
|
|
b6af9fd6bf | ||
|
|
6a9471f472 | ||
|
|
89e276d9b4 | ||
|
|
47a6f9a24e | ||
|
|
853e9c710c | ||
|
|
deb95efe68 | ||
|
|
001536a438 | ||
|
|
d21cfc5976 | ||
|
|
8a892dbec6 | ||
|
|
9e2ce9427e | ||
|
|
4a2fcc1307 | ||
|
|
fd3a4e7ad2 |
7
.changes/unreleased/Features-20220817-154857.yaml
Normal file
7
.changes/unreleased/Features-20220817-154857.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
kind: Features
|
||||
body: Proto logging messages
|
||||
time: 2022-08-17T15:48:57.225267-04:00
|
||||
custom:
|
||||
Author: gshank
|
||||
Issue: "5610"
|
||||
PR: "5643"
|
||||
@@ -46,12 +46,6 @@ jobs:
|
||||
with:
|
||||
python-version: "3.8"
|
||||
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
|
||||
- name: Install python dependencies
|
||||
run: |
|
||||
pip install --user --upgrade pip
|
||||
@@ -69,10 +63,3 @@ jobs:
|
||||
# we actually care if these pass, because the normal test run doesn't usually include many json log outputs
|
||||
- name: Run integration tests
|
||||
run: tox -e integration -- -nauto
|
||||
|
||||
# apply our schema tests to every log event from the previous step
|
||||
# skips any output that isn't valid json
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: run
|
||||
args: --manifest-path test/interop/log_parsing/Cargo.toml
|
||||
|
||||
@@ -2,6 +2,7 @@ import abc
|
||||
import os
|
||||
from time import sleep
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
# multiprocessing.RLock is a function returning this type
|
||||
from multiprocessing.synchronize import RLock
|
||||
@@ -48,6 +49,7 @@ from dbt.events.types import (
|
||||
RollbackFailed,
|
||||
)
|
||||
from dbt import flags
|
||||
from dbt.utils import cast_to_str
|
||||
|
||||
SleepTime = Union[int, float] # As taken by time.sleep.
|
||||
AdapterHandle = Any # Adapter connection handle objects can be any class.
|
||||
@@ -304,9 +306,9 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
|
||||
with self.lock:
|
||||
for connection in self.thread_connections.values():
|
||||
if connection.state not in {"closed", "init"}:
|
||||
fire_event(ConnectionLeftOpen(conn_name=connection.name))
|
||||
fire_event(ConnectionLeftOpen(conn_name=cast_to_str(connection.name)))
|
||||
else:
|
||||
fire_event(ConnectionClosed(conn_name=connection.name))
|
||||
fire_event(ConnectionClosed(conn_name=cast_to_str(connection.name)))
|
||||
self.close(connection)
|
||||
|
||||
# garbage collect these connections
|
||||
@@ -332,17 +334,18 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
|
||||
try:
|
||||
connection.handle.rollback()
|
||||
except Exception:
|
||||
fire_event(RollbackFailed(conn_name=connection.name))
|
||||
conn_name = connection.name or ""
|
||||
fire_event(RollbackFailed(conn_name=conn_name, exc_info=traceback.format_exc()))
|
||||
|
||||
@classmethod
|
||||
def _close_handle(cls, connection: Connection) -> None:
|
||||
"""Perform the actual close operation."""
|
||||
# On windows, sometimes connection handles don't have a close() attr.
|
||||
if hasattr(connection.handle, "close"):
|
||||
fire_event(ConnectionClosed2(conn_name=connection.name))
|
||||
fire_event(ConnectionClosed2(conn_name=cast_to_str(connection.name)))
|
||||
connection.handle.close()
|
||||
else:
|
||||
fire_event(ConnectionLeftOpen2(conn_name=connection.name))
|
||||
fire_event(ConnectionLeftOpen2(conn_name=cast_to_str(connection.name)))
|
||||
|
||||
@classmethod
|
||||
def _rollback(cls, connection: Connection) -> None:
|
||||
@@ -353,7 +356,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
|
||||
f'"{connection.name}", but it does not have one open!'
|
||||
)
|
||||
|
||||
fire_event(Rollback(conn_name=connection.name))
|
||||
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
|
||||
cls._rollback_handle(connection)
|
||||
|
||||
connection.transaction_open = False
|
||||
@@ -365,7 +368,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
|
||||
return connection
|
||||
|
||||
if connection.transaction_open and connection.handle:
|
||||
fire_event(Rollback(conn_name=connection.name))
|
||||
fire_event(Rollback(conn_name=cast_to_str(connection.name)))
|
||||
cls._rollback_handle(connection)
|
||||
connection.transaction_open = False
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ from dbt.events.types import (
|
||||
CodeExecution,
|
||||
CodeExecutionStatus,
|
||||
)
|
||||
from dbt.utils import filter_null_values, executor
|
||||
from dbt.utils import filter_null_values, executor, cast_to_str
|
||||
|
||||
from dbt.adapters.base.connections import Connection, AdapterResponse
|
||||
from dbt.adapters.base.meta import AdapterMeta, available
|
||||
@@ -60,7 +60,7 @@ from dbt.adapters.base.relation import (
|
||||
SchemaSearchMap,
|
||||
)
|
||||
from dbt.adapters.base import Column as BaseColumn
|
||||
from dbt.adapters.cache import RelationsCache, _make_key
|
||||
from dbt.adapters.cache import RelationsCache, _make_key_msg
|
||||
|
||||
|
||||
SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
|
||||
@@ -313,7 +313,7 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
fire_event(
|
||||
CacheMiss(
|
||||
conn_name=self.nice_connection_name(),
|
||||
database=database,
|
||||
database=cast_to_str(database),
|
||||
schema=schema,
|
||||
)
|
||||
)
|
||||
@@ -696,9 +696,9 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
relations = self.list_relations_without_caching(schema_relation)
|
||||
fire_event(
|
||||
ListRelations(
|
||||
database=database,
|
||||
database=cast_to_str(database),
|
||||
schema=schema,
|
||||
relations=[_make_key(x) for x in relations],
|
||||
relations=[_make_key_msg(x) for x in relations],
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@ import threading
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
|
||||
from dbt.adapters.reference_keys import _make_key, _make_key_msg, _make_msg_from_key, _ReferenceKey
|
||||
import dbt.exceptions
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.functions import fire_event, fire_event_if
|
||||
from dbt.events.types import (
|
||||
AddLink,
|
||||
AddRelation,
|
||||
@@ -20,8 +20,8 @@ from dbt.events.types import (
|
||||
UncachedRelation,
|
||||
UpdateReference,
|
||||
)
|
||||
import dbt.flags as flags
|
||||
from dbt.utils import lowercase
|
||||
from dbt.helper_types import Lazy
|
||||
|
||||
|
||||
def dot_separated(key: _ReferenceKey) -> str:
|
||||
@@ -299,7 +299,11 @@ class RelationsCache:
|
||||
# if we have not cached the referenced schema at all, we must be
|
||||
# referring to a table outside our control. There's no need to make
|
||||
# a link - we will never drop the referenced relation during a run.
|
||||
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
|
||||
fire_event(
|
||||
UncachedRelation(
|
||||
dep_key=_make_msg_from_key(dep_key), ref_key=_make_msg_from_key(ref_key)
|
||||
)
|
||||
)
|
||||
return
|
||||
if ref_key not in self.relations:
|
||||
# Insert a dummy "external" relation.
|
||||
@@ -309,7 +313,9 @@ class RelationsCache:
|
||||
# Insert a dummy "external" relation.
|
||||
dependent = dependent.replace(type=referenced.External)
|
||||
self.add(dependent)
|
||||
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
|
||||
fire_event(
|
||||
AddLink(dep_key=_make_msg_from_key(dep_key), ref_key=_make_msg_from_key(ref_key))
|
||||
)
|
||||
with self.lock:
|
||||
self._add_link(ref_key, dep_key)
|
||||
|
||||
@@ -320,12 +326,12 @@ class RelationsCache:
|
||||
:param BaseRelation relation: The underlying relation.
|
||||
"""
|
||||
cached = _CachedRelation(relation)
|
||||
fire_event(AddRelation(relation=_make_key(cached)))
|
||||
fire_event(DumpBeforeAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
|
||||
fire_event(AddRelation(relation=_make_key_msg(cached)))
|
||||
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpBeforeAddGraph(dump=self.dump_graph()))
|
||||
|
||||
with self.lock:
|
||||
self._setdefault(cached)
|
||||
fire_event(DumpAfterAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
|
||||
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpAfterAddGraph(dump=self.dump_graph()))
|
||||
|
||||
def _remove_refs(self, keys):
|
||||
"""Removes all references to all entries in keys. This does not
|
||||
@@ -340,19 +346,6 @@ class RelationsCache:
|
||||
for cached in self.relations.values():
|
||||
cached.release_references(keys)
|
||||
|
||||
def _drop_cascade_relation(self, dropped_key):
|
||||
"""Drop the given relation and cascade it appropriately to all
|
||||
dependent relations.
|
||||
|
||||
:param _CachedRelation dropped: An existing _CachedRelation to drop.
|
||||
"""
|
||||
if dropped_key not in self.relations:
|
||||
fire_event(DropMissingRelation(relation=dropped_key))
|
||||
return
|
||||
consequences = self.relations[dropped_key].collect_consequences()
|
||||
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
|
||||
self._remove_refs(consequences)
|
||||
|
||||
def drop(self, relation):
|
||||
"""Drop the named relation and cascade it appropriately to all
|
||||
dependent relations.
|
||||
@@ -365,9 +358,18 @@ class RelationsCache:
|
||||
:param str identifier: The identifier of the relation to drop.
|
||||
"""
|
||||
dropped_key = _make_key(relation)
|
||||
fire_event(DropRelation(dropped=dropped_key))
|
||||
dropped_key_msg = _make_key_msg(relation)
|
||||
fire_event(DropRelation(dropped=dropped_key_msg))
|
||||
with self.lock:
|
||||
self._drop_cascade_relation(dropped_key)
|
||||
if dropped_key not in self.relations:
|
||||
fire_event(DropMissingRelation(relation=dropped_key_msg))
|
||||
return
|
||||
consequences = self.relations[dropped_key].collect_consequences()
|
||||
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
|
||||
consequence_msgs = [_make_msg_from_key(key) for key in consequences]
|
||||
|
||||
fire_event(DropCascade(dropped=dropped_key_msg, consequences=consequence_msgs))
|
||||
self._remove_refs(consequences)
|
||||
|
||||
def _rename_relation(self, old_key, new_relation):
|
||||
"""Rename a relation named old_key to new_key, updating references.
|
||||
@@ -420,7 +422,7 @@ class RelationsCache:
|
||||
)
|
||||
|
||||
if old_key not in self.relations:
|
||||
fire_event(TemporaryRelation(key=old_key))
|
||||
fire_event(TemporaryRelation(key=_make_msg_from_key(old_key)))
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -438,9 +440,13 @@ class RelationsCache:
|
||||
"""
|
||||
old_key = _make_key(old)
|
||||
new_key = _make_key(new)
|
||||
fire_event(RenameSchema(old_key=old_key, new_key=new_key))
|
||||
fire_event(
|
||||
RenameSchema(old_key=_make_msg_from_key(old_key), new_key=_make_msg_from_key(new))
|
||||
)
|
||||
|
||||
fire_event(DumpBeforeRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
|
||||
fire_event_if(
|
||||
flags.LOG_CACHE_EVENTS, lambda: DumpBeforeRenameSchema(dump=self.dump_graph())
|
||||
)
|
||||
|
||||
with self.lock:
|
||||
if self._check_rename_constraints(old_key, new_key):
|
||||
@@ -448,7 +454,9 @@ class RelationsCache:
|
||||
else:
|
||||
self._setdefault(_CachedRelation(new))
|
||||
|
||||
fire_event(DumpAfterRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
|
||||
fire_event_if(
|
||||
flags.LOG_CACHE_EVENTS, lambda: DumpAfterRenameSchema(dump=self.dump_graph())
|
||||
)
|
||||
|
||||
def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
|
||||
"""Case-insensitively yield all relations matching the given schema.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import threading
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from importlib import import_module
|
||||
from typing import Type, Dict, Any, List, Optional, Set
|
||||
@@ -64,12 +65,12 @@ class AdapterContainer:
|
||||
# if we failed to import the target module in particular, inform
|
||||
# the user about it via a runtime error
|
||||
if exc.name == "dbt.adapters." + name:
|
||||
fire_event(AdapterImportError(exc=exc))
|
||||
fire_event(AdapterImportError(exc=str(exc)))
|
||||
raise RuntimeException(f"Could not find adapter type {name}!")
|
||||
# otherwise, the error had to have come from some underlying
|
||||
# library. Log the stack trace.
|
||||
|
||||
fire_event(PluginLoadError())
|
||||
fire_event(PluginLoadError(exc_info=traceback.format_exc()))
|
||||
raise
|
||||
plugin: AdapterPlugin = mod.Plugin
|
||||
plugin_type = plugin.adapter.type()
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from collections import namedtuple
|
||||
from typing import Any, Optional
|
||||
from dbt.events.proto_types import ReferenceKeyMsg
|
||||
|
||||
|
||||
_ReferenceKey = namedtuple("_ReferenceKey", "database schema identifier")
|
||||
@@ -22,3 +23,13 @@ def _make_key(relation: Any) -> _ReferenceKey:
|
||||
return _ReferenceKey(
|
||||
lowercase(relation.database), lowercase(relation.schema), lowercase(relation.identifier)
|
||||
)
|
||||
|
||||
|
||||
def _make_key_msg(relation: Any):
|
||||
return _make_msg_from_key(_make_key(relation))
|
||||
|
||||
|
||||
def _make_msg_from_key(ref_key: _ReferenceKey) -> ReferenceKeyMsg:
|
||||
return ReferenceKeyMsg(
|
||||
database=ref_key.database, schema=ref_key.schema, identifier=ref_key.identifier
|
||||
)
|
||||
|
||||
@@ -10,6 +10,7 @@ from dbt.adapters.base import BaseConnectionManager
|
||||
from dbt.contracts.connection import Connection, ConnectionState, AdapterResponse
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus
|
||||
from dbt.utils import cast_to_str
|
||||
|
||||
|
||||
class SQLConnectionManager(BaseConnectionManager):
|
||||
@@ -55,7 +56,7 @@ class SQLConnectionManager(BaseConnectionManager):
|
||||
connection = self.get_thread_connection()
|
||||
if auto_begin and connection.transaction_open is False:
|
||||
self.begin()
|
||||
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=connection.name))
|
||||
fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=cast_to_str(connection.name)))
|
||||
|
||||
with self.exception_handler(sql):
|
||||
if abridge_sql_log:
|
||||
@@ -63,7 +64,7 @@ class SQLConnectionManager(BaseConnectionManager):
|
||||
else:
|
||||
log_sql = sql
|
||||
|
||||
fire_event(SQLQuery(conn_name=connection.name, sql=log_sql))
|
||||
fire_event(SQLQuery(conn_name=cast_to_str(connection.name), sql=log_sql))
|
||||
pre = time.time()
|
||||
|
||||
cursor = connection.handle.cursor()
|
||||
|
||||
@@ -5,7 +5,7 @@ import dbt.clients.agate_helper
|
||||
from dbt.contracts.connection import Connection
|
||||
import dbt.exceptions
|
||||
from dbt.adapters.base import BaseAdapter, available
|
||||
from dbt.adapters.cache import _make_key
|
||||
from dbt.adapters.cache import _make_key_msg
|
||||
from dbt.adapters.sql import SQLConnectionManager
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
|
||||
@@ -110,7 +110,7 @@ class SQLAdapter(BaseAdapter):
|
||||
ColTypeChange(
|
||||
orig_type=target_column.data_type,
|
||||
new_type=new_type,
|
||||
table=_make_key(current),
|
||||
table=_make_key_msg(current),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -155,7 +155,7 @@ class SQLAdapter(BaseAdapter):
|
||||
|
||||
def create_schema(self, relation: BaseRelation) -> None:
|
||||
relation = relation.without_identifier()
|
||||
fire_event(SchemaCreation(relation=_make_key(relation)))
|
||||
fire_event(SchemaCreation(relation=_make_key_msg(relation)))
|
||||
kwargs = {
|
||||
"relation": relation,
|
||||
}
|
||||
@@ -166,7 +166,7 @@ class SQLAdapter(BaseAdapter):
|
||||
|
||||
def drop_schema(self, relation: BaseRelation) -> None:
|
||||
relation = relation.without_identifier()
|
||||
fire_event(SchemaDrop(relation=_make_key(relation)))
|
||||
fire_event(SchemaDrop(relation=_make_key_msg(relation)))
|
||||
kwargs = {
|
||||
"relation": relation,
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@ from typing import Any, Dict, List
|
||||
import requests
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import (
|
||||
RegistryProgressMakingGETRequest,
|
||||
RegistryProgressGETRequest,
|
||||
RegistryProgressGETResponse,
|
||||
RegistryIndexProgressMakingGETRequest,
|
||||
RegistryIndexProgressGETRequest,
|
||||
RegistryIndexProgressGETResponse,
|
||||
RegistryResponseUnexpectedType,
|
||||
RegistryResponseMissingTopKeys,
|
||||
@@ -37,7 +37,7 @@ def _get_with_retries(package_name, registry_base_url=None):
|
||||
|
||||
def _get(package_name, registry_base_url=None):
|
||||
url = _get_url(package_name, registry_base_url)
|
||||
fire_event(RegistryProgressMakingGETRequest(url=url))
|
||||
fire_event(RegistryProgressGETRequest(url=url))
|
||||
# all exceptions from requests get caught in the retry logic so no need to wrap this here
|
||||
resp = requests.get(url, timeout=30)
|
||||
fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
|
||||
@@ -134,7 +134,7 @@ def get_available_versions(package_name) -> List["str"]:
|
||||
def _get_index(registry_base_url=None):
|
||||
|
||||
url = _get_url("index", registry_base_url)
|
||||
fire_event(RegistryIndexProgressMakingGETRequest(url=url))
|
||||
fire_event(RegistryIndexProgressGETRequest(url=url))
|
||||
# all exceptions from requests get caught in the retry logic so no need to wrap this here
|
||||
resp = requests.get(url, timeout=30)
|
||||
fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code))
|
||||
|
||||
@@ -164,7 +164,7 @@ def write_file(path: str, contents: str = "") -> bool:
|
||||
reason = "Path was possibly too long"
|
||||
# all our hard work and the path was still too long. Log and
|
||||
# continue.
|
||||
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=exc))
|
||||
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=str(exc)))
|
||||
else:
|
||||
raise
|
||||
return True
|
||||
|
||||
@@ -1020,7 +1020,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
|
||||
# log up to 5 items
|
||||
sample = list(islice(merged, 5))
|
||||
fire_event(MergedFromState(nbr_merged=len(merged), sample=sample))
|
||||
fire_event(MergedFromState(num_merged=len(merged), sample=sample))
|
||||
|
||||
# Methods that were formerly in ParseResult
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ from dbt.contracts.graph.unparsed import (
|
||||
MetricFilter,
|
||||
)
|
||||
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
|
||||
from dbt.events.proto_types import NodeInfo
|
||||
from dbt.exceptions import warn_or_error
|
||||
from dbt import flags
|
||||
from dbt.node_types import NodeType
|
||||
@@ -189,7 +190,8 @@ class NodeInfoMixin:
|
||||
"node_started_at": self._event_status.get("started_at"),
|
||||
"node_finished_at": self._event_status.get("finished_at"),
|
||||
}
|
||||
return node_info
|
||||
node_info_msg = NodeInfo(**node_info)
|
||||
return node_info_msg
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -11,11 +11,12 @@ from dbt.contracts.util import (
|
||||
from dbt.exceptions import InternalException
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import TimingInfoCollected
|
||||
from dbt.events.proto_types import RunResultMsg
|
||||
from dbt.logger import (
|
||||
TimingProcessor,
|
||||
JsonOnly,
|
||||
)
|
||||
from dbt.utils import lowercase
|
||||
from dbt.utils import lowercase, cast_to_str, cast_to_int
|
||||
from dbt.dataclass_schema import dbtClassMixin, StrEnum
|
||||
|
||||
import agate
|
||||
@@ -119,6 +120,17 @@ class BaseResult(dbtClassMixin):
|
||||
data["failures"] = None
|
||||
return data
|
||||
|
||||
def to_msg(self):
|
||||
# TODO: add more fields
|
||||
msg = RunResultMsg()
|
||||
msg.status = str(self.status)
|
||||
msg.message = cast_to_str(self.message)
|
||||
msg.thread = self.thread_id
|
||||
msg.execution_time = self.execution_time
|
||||
msg.num_failures = cast_to_int(self.failures)
|
||||
# timing_info, adapter_response, message
|
||||
return msg
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeResult(BaseResult):
|
||||
|
||||
@@ -44,8 +44,6 @@ class PartialParsingDeletedExposure(DebugLevel, Cli, File):
|
||||
|
||||
## Optional (based on your event)
|
||||
|
||||
- Events associated with node status changes must be extended with `NodeInfo` which contains a node_info attribute
|
||||
|
||||
|
||||
All values other than `code` and `node_info` will be included in the `data` node of the json log output.
|
||||
|
||||
@@ -63,3 +61,7 @@ from dbt.events import AdapterLogger
|
||||
logger = AdapterLogger("<database name>")
|
||||
# e.g. AdapterLogger("Snowflake")
|
||||
```
|
||||
|
||||
## Compiling types.proto
|
||||
|
||||
In the core/dbt/events directory: ```protoc --python_betterproto_out . types.proto```
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import traceback
|
||||
from dataclasses import dataclass
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import (
|
||||
@@ -12,57 +13,28 @@ from dbt.events.types import (
|
||||
class AdapterLogger:
|
||||
name: str
|
||||
|
||||
def debug(self, msg, *args, exc_info=None, extra=None, stack_info=False):
|
||||
def debug(self, msg, *args):
|
||||
event = AdapterEventDebug(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
fire_event(event)
|
||||
|
||||
def info(self, msg, *args, exc_info=None, extra=None, stack_info=False):
|
||||
def info(self, msg, *args):
|
||||
event = AdapterEventInfo(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
fire_event(event)
|
||||
|
||||
def warning(self, msg, *args, exc_info=None, extra=None, stack_info=False):
|
||||
def warning(self, msg, *args):
|
||||
event = AdapterEventWarning(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
fire_event(event)
|
||||
|
||||
def error(self, msg, *args, exc_info=None, extra=None, stack_info=False):
|
||||
def error(self, msg, *args):
|
||||
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
fire_event(event)
|
||||
|
||||
# The default exc_info=True is what makes this method different
|
||||
def exception(self, msg, *args, exc_info=True, extra=None, stack_info=False):
|
||||
def exception(self, msg, *args):
|
||||
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
event.exc_info = traceback.format_exc()
|
||||
fire_event(event)
|
||||
|
||||
def critical(self, msg, *args, exc_info=False, extra=None, stack_info=False):
|
||||
def critical(self, msg, *args):
|
||||
event = AdapterEventError(name=self.name, base_msg=msg, args=args)
|
||||
|
||||
event.exc_info = exc_info
|
||||
event.extra = extra
|
||||
event.stack_info = stack_info
|
||||
|
||||
fire_event(event)
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
from abc import ABCMeta, abstractproperty, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from dbt.events.serialization import EventSerialization
|
||||
import os
|
||||
import threading
|
||||
from typing import Any, Dict
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||||
@@ -17,89 +15,80 @@ class Cache:
|
||||
pass
|
||||
|
||||
|
||||
def get_invocation_id() -> str:
|
||||
from dbt.events.functions import get_invocation_id
|
||||
|
||||
return get_invocation_id()
|
||||
|
||||
|
||||
# exactly one pid per concrete event
|
||||
def get_pid() -> int:
|
||||
return os.getpid()
|
||||
|
||||
|
||||
# preformatted time stamp
|
||||
def get_ts_rfc3339() -> str:
|
||||
ts = datetime.utcnow()
|
||||
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
return ts_rfc3339
|
||||
|
||||
|
||||
# in theory threads can change so we don't cache them.
|
||||
def get_thread_name() -> str:
|
||||
return threading.current_thread().name
|
||||
|
||||
|
||||
@dataclass
|
||||
class ShowException:
|
||||
# N.B.:
|
||||
# As long as we stick with the current convention of setting the member vars in the
|
||||
# `message` method of subclasses, this is a safe operation.
|
||||
# If that ever changes we'll want to reassess.
|
||||
class BaseEvent:
|
||||
"""BaseEvent for proto message generated python events"""
|
||||
|
||||
def __post_init__(self):
|
||||
self.exc_info: Any = True
|
||||
self.stack_info: Any = None
|
||||
self.extra: Any = None
|
||||
super().__post_init__()
|
||||
self.info.level = self.level_tag()
|
||||
if not hasattr(self.info, "msg") or not self.info.msg:
|
||||
self.info.msg = self.message()
|
||||
self.info.invocation_id = get_invocation_id()
|
||||
self.info.ts = datetime.utcnow()
|
||||
self.info.pid = get_pid()
|
||||
self.info.thread = get_thread_name()
|
||||
self.info.code = self.code()
|
||||
self.info.name = type(self).__name__
|
||||
|
||||
def level_tag(self):
|
||||
raise Exception("level_tag() not implemented for event")
|
||||
|
||||
def message(self):
|
||||
raise Exception("message() not implemented for event")
|
||||
|
||||
|
||||
# TODO add exhaustiveness checking for subclasses
|
||||
# top-level superclass for all events
|
||||
class Event(metaclass=ABCMeta):
|
||||
# Do not define fields with defaults here
|
||||
@dataclass
|
||||
class TestLevel(BaseEvent):
|
||||
__test__ = False
|
||||
|
||||
# four digit string code that uniquely identifies this type of event
|
||||
# uniqueness and valid characters are enforced by tests
|
||||
@abstractproperty
|
||||
@staticmethod
|
||||
def code() -> str:
|
||||
raise Exception("code() not implemented for event")
|
||||
|
||||
# The 'to_dict' method is added by mashumaro via the EventSerialization.
|
||||
# It should be in all subclasses that are to record actual events.
|
||||
@abstractmethod
|
||||
def to_dict(self):
|
||||
raise Exception("to_dict not implemented for Event")
|
||||
|
||||
# do not define this yourself. inherit it from one of the above level types.
|
||||
@abstractmethod
|
||||
def level_tag(self) -> str:
|
||||
raise Exception("level_tag not implemented for Event")
|
||||
|
||||
# Solely the human readable message. Timestamps and formatting will be added by the logger.
|
||||
# Must override yourself
|
||||
@abstractmethod
|
||||
def message(self) -> str:
|
||||
raise Exception("msg not implemented for Event")
|
||||
|
||||
# exactly one pid per concrete event
|
||||
def get_pid(self) -> int:
|
||||
return os.getpid()
|
||||
|
||||
# in theory threads can change so we don't cache them.
|
||||
def get_thread_name(self) -> str:
|
||||
return threading.current_thread().name
|
||||
|
||||
@classmethod
|
||||
def get_invocation_id(cls) -> str:
|
||||
from dbt.events.functions import get_invocation_id
|
||||
|
||||
return get_invocation_id()
|
||||
|
||||
|
||||
# in preparation for #3977
|
||||
@dataclass # type: ignore[misc]
|
||||
class TestLevel(EventSerialization, Event):
|
||||
def level_tag(self) -> str:
|
||||
return "test"
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class DebugLevel(EventSerialization, Event):
|
||||
class DebugLevel(BaseEvent):
|
||||
def level_tag(self) -> str:
|
||||
return "debug"
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class InfoLevel(EventSerialization, Event):
|
||||
class InfoLevel(BaseEvent):
|
||||
def level_tag(self) -> str:
|
||||
return "info"
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class WarnLevel(EventSerialization, Event):
|
||||
class WarnLevel(BaseEvent):
|
||||
def level_tag(self) -> str:
|
||||
return "warn"
|
||||
|
||||
|
||||
@dataclass # type: ignore[misc]
|
||||
class ErrorLevel(EventSerialization, Event):
|
||||
class ErrorLevel(BaseEvent):
|
||||
def level_tag(self) -> str:
|
||||
return "error"
|
||||
|
||||
@@ -114,10 +103,3 @@ class NoFile:
|
||||
# prevents an event from going to stdout
|
||||
class NoStdOut:
|
||||
pass
|
||||
|
||||
|
||||
# This class represents the node_info which is generated
|
||||
# by the NodeInfoMixin class in dbt.contracts.graph.parsed
|
||||
@dataclass
|
||||
class NodeInfo:
|
||||
node_info: Dict[str, Any]
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import betterproto
|
||||
from colorama import Style
|
||||
import dbt.events.functions as this # don't worry I hate it too.
|
||||
from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache
|
||||
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
|
||||
from dbt.events.base_types import NoStdOut, BaseEvent, NoFile, Cache
|
||||
from dbt.events.types import EventBufferFull, MainReportVersion, EmptyLine
|
||||
import dbt.flags as flags
|
||||
from dbt.constants import SECRET_ENV_PREFIX
|
||||
|
||||
@@ -19,11 +20,11 @@ from logging.handlers import RotatingFileHandler
|
||||
import os
|
||||
import uuid
|
||||
import threading
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from typing import List, Optional, Union, Callable
|
||||
from collections import deque
|
||||
|
||||
global LOG_VERSION
|
||||
LOG_VERSION = 2
|
||||
LOG_VERSION = 3
|
||||
|
||||
# create the global event history buffer with the default max size (10k)
|
||||
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
|
||||
@@ -131,37 +132,23 @@ def scrub_secrets(msg: str, secrets: List[str]) -> str:
|
||||
|
||||
# returns a dictionary representation of the event fields.
|
||||
# the message may contain secrets which must be scrubbed at the usage site.
|
||||
def event_to_serializable_dict(
|
||||
e: T_Event,
|
||||
) -> Dict[str, Any]:
|
||||
def event_to_json(
|
||||
event: BaseEvent,
|
||||
) -> str:
|
||||
event_dict = event_to_dict(event)
|
||||
raw_log_line = json.dumps(event_dict, sort_keys=True)
|
||||
return raw_log_line
|
||||
|
||||
log_line = dict()
|
||||
code: str
|
||||
|
||||
def event_to_dict(event: BaseEvent) -> dict:
|
||||
event_dict = dict()
|
||||
try:
|
||||
log_line = e.to_dict()
|
||||
# We could use to_json here, but it wouldn't sort the keys.
|
||||
# The 'to_json' method just does json.dumps on the dict anyway.
|
||||
event_dict = event.to_dict(casing=betterproto.Casing.SNAKE, include_default_values=True) # type: ignore
|
||||
except AttributeError as exc:
|
||||
event_type = type(e).__name__
|
||||
raise Exception( # TODO this may hang async threads
|
||||
f"type {event_type} is not serializable. {str(exc)}"
|
||||
)
|
||||
|
||||
# We get the code from the event object, so we don't need it in the data
|
||||
if "code" in log_line:
|
||||
del log_line["code"]
|
||||
|
||||
event_dict = {
|
||||
"type": "log_line",
|
||||
"log_version": LOG_VERSION,
|
||||
"ts": get_ts_rfc3339(),
|
||||
"pid": e.get_pid(),
|
||||
"msg": e.message(),
|
||||
"level": e.level_tag(),
|
||||
"data": log_line,
|
||||
"invocation_id": e.get_invocation_id(),
|
||||
"thread_name": e.get_thread_name(),
|
||||
"code": e.code,
|
||||
}
|
||||
|
||||
event_type = type(event).__name__
|
||||
raise Exception(f"type {event_type} is not serializable. {str(exc)}")
|
||||
return event_dict
|
||||
|
||||
|
||||
@@ -171,15 +158,15 @@ def reset_color() -> str:
|
||||
return "" if not this.format_color else Style.RESET_ALL
|
||||
|
||||
|
||||
def create_info_text_log_line(e: T_Event) -> str:
|
||||
def create_info_text_log_line(e: BaseEvent) -> str:
|
||||
color_tag: str = reset_color()
|
||||
ts: str = get_ts().strftime("%H:%M:%S")
|
||||
ts: str = get_ts().strftime("%H:%M:%S") # TODO: get this from the event.ts?
|
||||
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
|
||||
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
|
||||
return log_line
|
||||
|
||||
|
||||
def create_debug_text_log_line(e: T_Event) -> str:
|
||||
def create_debug_text_log_line(e: BaseEvent) -> str:
|
||||
log_line: str = ""
|
||||
# Create a separator if this is the beginning of an invocation
|
||||
if type(e) == MainReportVersion:
|
||||
@@ -188,7 +175,8 @@ def create_debug_text_log_line(e: T_Event) -> str:
|
||||
color_tag: str = reset_color()
|
||||
ts: str = get_ts().strftime("%H:%M:%S.%f")
|
||||
scrubbed_msg: str = scrub_secrets(e.message(), env_secrets())
|
||||
level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} "
|
||||
# Make the levels all 5 characters so they line up
|
||||
level: str = f"{e.level_tag():<5}"
|
||||
thread = ""
|
||||
if threading.current_thread().name:
|
||||
thread_name = threading.current_thread().name
|
||||
@@ -200,18 +188,17 @@ def create_debug_text_log_line(e: T_Event) -> str:
|
||||
|
||||
|
||||
# translates an Event to a completely formatted json log line
|
||||
def create_json_log_line(e: T_Event) -> Optional[str]:
|
||||
def create_json_log_line(e: BaseEvent) -> Optional[str]:
|
||||
if type(e) == EmptyLine:
|
||||
return None # will not be sent to logger
|
||||
# using preformatted ts string instead of formatting it here to be extra careful about timezone
|
||||
values = event_to_serializable_dict(e)
|
||||
raw_log_line = json.dumps(values, sort_keys=True)
|
||||
raw_log_line = event_to_json(e)
|
||||
return scrub_secrets(raw_log_line, env_secrets())
|
||||
|
||||
|
||||
# calls create_stdout_text_log_line() or create_json_log_line() according to logger config
|
||||
def create_log_line(e: T_Event, file_output=False) -> Optional[str]:
|
||||
def create_log_line(e: BaseEvent, file_output=False) -> Optional[str]:
|
||||
if this.format_json:
|
||||
# TODO: Do we want to skip EmptyLine() like the TextOnly for logbook?
|
||||
return create_json_log_line(e) # json output, both console and file
|
||||
elif file_output is True or flags.DEBUG:
|
||||
return create_debug_text_log_line(e) # default file output
|
||||
@@ -241,31 +228,18 @@ def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: s
|
||||
)
|
||||
|
||||
|
||||
def send_exc_to_logger(
|
||||
l: Logger, level_tag: str, log_line: str, exc_info=True, stack_info=False, extra=False
|
||||
):
|
||||
if level_tag == "test":
|
||||
# TODO after implmenting #3977 send to new test level
|
||||
l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
|
||||
elif level_tag == "debug":
|
||||
l.debug(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
|
||||
elif level_tag == "info":
|
||||
l.info(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
|
||||
elif level_tag == "warn":
|
||||
l.warning(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
|
||||
elif level_tag == "error":
|
||||
l.error(log_line, exc_info=exc_info, stack_info=stack_info, extra=extra)
|
||||
else:
|
||||
raise AssertionError(
|
||||
f"While attempting to log {log_line}, encountered the unhandled level: {level_tag}"
|
||||
)
|
||||
# an alternative to fire_event which only creates and logs the event value
|
||||
# if the condition is met. Does nothing otherwise.
|
||||
def fire_event_if(conditional: bool, lazy_e: Callable[[], BaseEvent]) -> None:
|
||||
if conditional:
|
||||
fire_event(lazy_e())
|
||||
|
||||
|
||||
# top-level method for accessing the new eventing system
|
||||
# this is where all the side effects happen branched by event type
|
||||
# (i.e. - mutating the event history, printing to stdout, logging
|
||||
# to files, etc.)
|
||||
def fire_event(e: Event) -> None:
|
||||
def fire_event(e: BaseEvent) -> None:
|
||||
# skip logs when `--log-cache-events` is not passed
|
||||
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
|
||||
return
|
||||
@@ -305,17 +279,7 @@ def fire_event(e: Event) -> None:
|
||||
|
||||
log_line = create_log_line(e)
|
||||
if log_line:
|
||||
if not isinstance(e, ShowException):
|
||||
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
|
||||
else:
|
||||
send_exc_to_logger(
|
||||
STDOUT_LOG,
|
||||
level_tag=e.level_tag(),
|
||||
log_line=log_line,
|
||||
exc_info=e.exc_info,
|
||||
stack_info=e.stack_info,
|
||||
extra=e.extra,
|
||||
)
|
||||
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
|
||||
|
||||
|
||||
def get_invocation_id() -> str:
|
||||
@@ -336,10 +300,3 @@ def set_invocation_id() -> None:
|
||||
def get_ts() -> datetime:
|
||||
ts = datetime.utcnow()
|
||||
return ts
|
||||
|
||||
|
||||
# preformatted time stamp
|
||||
def get_ts_rfc3339() -> str:
|
||||
ts = get_ts()
|
||||
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
return ts_rfc3339
|
||||
|
||||
2139
core/dbt/events/proto_types.py
Normal file
2139
core/dbt/events/proto_types.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,55 +0,0 @@
|
||||
from dbt.helper_types import Lazy
|
||||
from mashumaro import DataClassDictMixin
|
||||
from mashumaro.config import BaseConfig as MashBaseConfig
|
||||
from mashumaro.types import SerializationStrategy
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
# The dbtClassMixin serialization class has a DateTime serialization strategy
|
||||
# class. If a datetime ends up in an event class, we could use a similar class
|
||||
# here to serialize it in our preferred format.
|
||||
|
||||
|
||||
class ExceptionSerialization(SerializationStrategy):
|
||||
def serialize(self, value):
|
||||
out = str(value)
|
||||
return out
|
||||
|
||||
def deserialize(self, value):
|
||||
return Exception(value)
|
||||
|
||||
|
||||
class BaseExceptionSerialization(SerializationStrategy):
|
||||
def serialize(self, value):
|
||||
return str(value)
|
||||
|
||||
def deserialize(self, value):
|
||||
return BaseException(value)
|
||||
|
||||
|
||||
# This is an explicit deserializer for the type Lazy[Dict[str, List[str]]]
|
||||
# mashumaro does not support composing serialization strategies, so all
|
||||
# future uses of Lazy will need to register a unique serialization class like this one.
|
||||
class LazySerialization1(SerializationStrategy):
|
||||
def serialize(self, value) -> Dict[str, List[str]]:
|
||||
return value.force()
|
||||
|
||||
# we _can_ deserialize into a lazy value, but that defers running the deserialization
|
||||
# function till the value is used which can raise errors at very unexpected times.
|
||||
# It's best practice to do strict deserialization unless you're in a very special case.
|
||||
def deserialize(self, value):
|
||||
raise Exception("Don't deserialize into a Lazy value. Try just using the value itself.")
|
||||
|
||||
|
||||
# This class is the equivalent of dbtClassMixin that's used for serialization
|
||||
# in other parts of the code. That class did extra things which we didn't want
|
||||
# to use for events, so this class is a simpler version of dbtClassMixin.
|
||||
class EventSerialization(DataClassDictMixin):
|
||||
|
||||
# This is where we register serializtion strategies per type.
|
||||
class Config(MashBaseConfig):
|
||||
serialization_strategy = {
|
||||
Exception: ExceptionSerialization(),
|
||||
BaseException: ExceptionSerialization(),
|
||||
Lazy[Dict[str, List[str]]]: LazySerialization1(),
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
from dataclasses import dataclass
|
||||
from dbt.events.types import InfoLevel, DebugLevel, WarnLevel, ErrorLevel, ShowException
|
||||
from dbt.events.types import InfoLevel, DebugLevel, WarnLevel, ErrorLevel
|
||||
from dbt.events.base_types import NoFile
|
||||
from dbt.events import proto_types as pl
|
||||
from dbt.events.proto_types import EventInfo # noqa
|
||||
|
||||
|
||||
# Keeping log messages for testing separate since they are used for debugging.
|
||||
@@ -8,54 +10,54 @@ from dbt.events.base_types import NoFile
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrationTestInfo(InfoLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T001"
|
||||
class IntegrationTestInfo(InfoLevel, NoFile, pl.IntegrationTestInfo):
|
||||
def code(self):
|
||||
return "T001"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Integration Test: {self.msg}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrationTestDebug(DebugLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T002"
|
||||
class IntegrationTestDebug(DebugLevel, NoFile, pl.IntegrationTestDebug):
|
||||
def code(self):
|
||||
return "T002"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Integration Test: {self.msg}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrationTestWarn(WarnLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T003"
|
||||
class IntegrationTestWarn(WarnLevel, NoFile, pl.IntegrationTestWarn):
|
||||
def code(self):
|
||||
return "T003"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Integration Test: {self.msg}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrationTestError(ErrorLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T004"
|
||||
class IntegrationTestError(ErrorLevel, NoFile, pl.IntegrationTestError):
|
||||
def code(self):
|
||||
return "T004"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Integration Test: {self.msg}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntegrationTestException(ShowException, ErrorLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T005"
|
||||
class IntegrationTestException(ErrorLevel, NoFile, pl.IntegrationTestException):
|
||||
def code(self):
|
||||
return "T005"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Integration Test: {self.msg}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UnitTestInfo(InfoLevel, NoFile):
|
||||
msg: str
|
||||
code: str = "T006"
|
||||
class UnitTestInfo(InfoLevel, NoFile, pl.UnitTestInfo):
|
||||
def code(self):
|
||||
return "T006"
|
||||
|
||||
def message(self) -> str:
|
||||
return f"Unit Test: {self.msg}"
|
||||
|
||||
1653
core/dbt/events/types.proto
Normal file
1653
core/dbt/events/types.proto
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1063,7 +1063,7 @@ def warn_or_raise(exc, log_fmt=None):
|
||||
if flags.WARN_ERROR:
|
||||
raise exc
|
||||
else:
|
||||
fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))
|
||||
fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt))
|
||||
|
||||
|
||||
def warn(msg, node=None):
|
||||
|
||||
@@ -10,7 +10,7 @@ from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
import dbt.version
|
||||
from dbt.events.functions import fire_event, setup_event_logger
|
||||
from dbt.events.functions import fire_event, setup_event_logger, LOG_VERSION
|
||||
from dbt.events.types import (
|
||||
MainEncounteredError,
|
||||
MainKeyboardInterrupt,
|
||||
@@ -142,7 +142,7 @@ def main(args=None):
|
||||
exit_code = e.code
|
||||
|
||||
except BaseException as e:
|
||||
fire_event(MainEncounteredError(e=str(e)))
|
||||
fire_event(MainEncounteredError(exc=str(e)))
|
||||
fire_event(MainStackTrace(stack_trace=traceback.format_exc()))
|
||||
exit_code = ExitCodes.UnhandledError.value
|
||||
|
||||
@@ -201,7 +201,7 @@ def track_run(task):
|
||||
yield
|
||||
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="ok")
|
||||
except (NotImplementedException, FailedToConnectException) as e:
|
||||
fire_event(MainEncounteredError(e=str(e)))
|
||||
fire_event(MainEncounteredError(exc=str(e)))
|
||||
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
|
||||
except Exception:
|
||||
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
|
||||
@@ -226,7 +226,7 @@ def run_from_args(parsed):
|
||||
level_override = parsed.cls.pre_init_hook(parsed)
|
||||
setup_event_logger(log_path or "logs", level_override)
|
||||
|
||||
fire_event(MainReportVersion(v=str(dbt.version.installed)))
|
||||
fire_event(MainReportVersion(version=str(dbt.version.installed), log_version=LOG_VERSION))
|
||||
fire_event(MainReportArgs(args=args_to_dict(parsed)))
|
||||
|
||||
if dbt.tracking.active_user is not None: # mypy appeasement, always true
|
||||
|
||||
@@ -291,8 +291,7 @@ class ManifestLoader:
|
||||
if source_file:
|
||||
parse_file_type = source_file.parse_file_type
|
||||
fire_event(PartialParsingExceptionFile(file=file_id))
|
||||
file_dict = source_file.to_dict()
|
||||
fire_event(PartialParsingFile(file_dict=file_dict))
|
||||
fire_event(PartialParsingFile(file_id=source_file.file_id))
|
||||
exc_info["parse_file_type"] = parse_file_type
|
||||
fire_event(PartialParsingException(exc_info=exc_info))
|
||||
|
||||
@@ -658,7 +657,9 @@ class ManifestLoader:
|
||||
manifest.metadata.invocation_id = get_invocation_id()
|
||||
return manifest
|
||||
except Exception as exc:
|
||||
fire_event(ParsedFileLoadFailed(path=path, exc=exc))
|
||||
fire_event(
|
||||
ParsedFileLoadFailed(path=path, exc=str(exc), exc_info=traceback.format_exc())
|
||||
)
|
||||
reparse_reason = ReparseReason.load_file_failure
|
||||
else:
|
||||
fire_event(PartialParseSaveFileNotFound())
|
||||
|
||||
@@ -299,7 +299,7 @@ class PartialParsing:
|
||||
else:
|
||||
# It's not clear when this would actually happen.
|
||||
# Logging in case there are other associated errors.
|
||||
fire_event(PartialParsingNodeMissingInSourceFile(source_file=old_source_file))
|
||||
fire_event(PartialParsingNodeMissingInSourceFile(file_id=old_source_file.file_id))
|
||||
|
||||
# replace source_file in saved and add to parsing list
|
||||
file_id = new_source_file.file_id
|
||||
@@ -912,7 +912,7 @@ class PartialParsing:
|
||||
unique_id
|
||||
)
|
||||
schema_file.metrics.remove(unique_id)
|
||||
fire_event(PartialParsingDeletedMetric(id=unique_id))
|
||||
fire_event(PartialParsingDeletedMetric(unique_id=unique_id))
|
||||
|
||||
def get_schema_element(self, elem_list, elem_name):
|
||||
for element in elem_list:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from typing import Type, Union, Dict, Any, Optional
|
||||
|
||||
@@ -108,13 +109,13 @@ class BaseTask(metaclass=ABCMeta):
|
||||
config = cls.ConfigType.from_args(args)
|
||||
except dbt.exceptions.DbtProjectError as exc:
|
||||
fire_event(DbtProjectError())
|
||||
fire_event(DbtProjectErrorException(exc=exc))
|
||||
fire_event(DbtProjectErrorException(exc=str(exc)))
|
||||
|
||||
tracking.track_invalid_invocation(args=args, result_type=exc.result_type)
|
||||
raise dbt.exceptions.RuntimeException("Could not run dbt") from exc
|
||||
except dbt.exceptions.DbtProfileError as exc:
|
||||
fire_event(DbtProfileError())
|
||||
fire_event(DbtProfileErrorException(exc=exc))
|
||||
fire_event(DbtProfileErrorException(exc=str(exc)))
|
||||
|
||||
all_profiles = read_profiles(flags.PROFILES_DIR).keys()
|
||||
|
||||
@@ -345,11 +346,11 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
if e.node is None:
|
||||
e.add_node(ctx.node)
|
||||
|
||||
fire_event(CatchableExceptionOnRun(exc=e))
|
||||
fire_event(CatchableExceptionOnRun(exc=str(e), exc_info=traceback.format_exc()))
|
||||
return str(e)
|
||||
|
||||
def _handle_internal_exception(self, e, ctx):
|
||||
fire_event(InternalExceptionOnRun(build_path=self.node.build_path, exc=e))
|
||||
fire_event(InternalExceptionOnRun(build_path=self.node.build_path, exc=str(e)))
|
||||
return str(e)
|
||||
|
||||
def _handle_generic_exception(self, e, ctx):
|
||||
@@ -357,10 +358,10 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
GenericExceptionOnRun(
|
||||
build_path=self.node.build_path,
|
||||
unique_id=self.node.unique_id,
|
||||
exc=str(e), # TODO: unstring this when serialization is fixed
|
||||
exc=str(e),
|
||||
)
|
||||
)
|
||||
fire_event(PrintDebugStackTrace())
|
||||
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
|
||||
|
||||
return str(e)
|
||||
|
||||
@@ -413,7 +414,11 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
try:
|
||||
self.adapter.release_connection()
|
||||
except Exception as exc:
|
||||
fire_event(NodeConnectionReleaseError(node_name=self.node.name, exc=exc))
|
||||
fire_event(
|
||||
NodeConnectionReleaseError(
|
||||
node_name=self.node.name, exc=str(exc), exc_info=traceback.format_exc()
|
||||
)
|
||||
)
|
||||
return str(exc)
|
||||
|
||||
return None
|
||||
|
||||
@@ -12,7 +12,7 @@ from dbt.events.types import (
|
||||
DepsNoPackagesFound,
|
||||
DepsStartPackageInstall,
|
||||
DepsUpdateAvailable,
|
||||
DepsUTD,
|
||||
DepsUpToDate,
|
||||
DepsInstallInfo,
|
||||
DepsListSubdirectory,
|
||||
DepsNotifyUpdatesAvailable,
|
||||
@@ -72,7 +72,7 @@ class DepsTask(BaseTask):
|
||||
packages_to_upgrade.append(package_name)
|
||||
fire_event(DepsUpdateAvailable(version_latest=version_latest))
|
||||
else:
|
||||
fire_event(DepsUTD())
|
||||
fire_event(DepsUpToDate())
|
||||
if package.get_subdirectory():
|
||||
fire_event(DepsListSubdirectory(subdirectory=package.get_subdirectory()))
|
||||
|
||||
|
||||
@@ -285,6 +285,7 @@ class GenerateTask(CompileTask):
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def interpret_results(self, results: Optional[CatalogResults]) -> bool:
|
||||
if results is None:
|
||||
return False
|
||||
|
||||
@@ -28,7 +28,7 @@ from dbt.events.types import (
|
||||
SettingUpProfile,
|
||||
InvalidProfileTemplateYAML,
|
||||
ProjectNameAlreadyExists,
|
||||
GetAddendum,
|
||||
ProjectCreated,
|
||||
)
|
||||
|
||||
from dbt.include.starter_project import PACKAGE_PATH as starter_project_directory
|
||||
@@ -41,22 +41,6 @@ SLACK_URL = "https://community.getdbt.com/"
|
||||
# This file is not needed for the starter project but exists for finding the resource path
|
||||
IGNORE_FILES = ["__init__.py", "__pycache__"]
|
||||
|
||||
ON_COMPLETE_MESSAGE = """
|
||||
Your new dbt project "{project_name}" was created!
|
||||
|
||||
For more information on how to configure the profiles.yml file,
|
||||
please consult the dbt documentation here:
|
||||
|
||||
{docs_url}
|
||||
|
||||
One more thing:
|
||||
|
||||
Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:
|
||||
|
||||
{slack_url}
|
||||
|
||||
Happy modeling!
|
||||
"""
|
||||
|
||||
# https://click.palletsprojects.com/en/8.0.x/api/#types
|
||||
# click v7.0 has UNPROCESSED, STRING, INT, FLOAT, BOOL, and UUID available.
|
||||
@@ -114,17 +98,6 @@ class InitTask(BaseTask):
|
||||
ProfileWrittenWithSample(name=profile_name, path=str(profiles_filepath))
|
||||
)
|
||||
|
||||
def get_addendum(self, project_name: str, profiles_path: str) -> str:
|
||||
open_cmd = dbt.clients.system.open_dir_cmd()
|
||||
|
||||
return ON_COMPLETE_MESSAGE.format(
|
||||
open_cmd=open_cmd,
|
||||
project_name=project_name,
|
||||
profiles_path=profiles_path,
|
||||
docs_url=DOCS_URL,
|
||||
slack_url=SLACK_URL,
|
||||
)
|
||||
|
||||
def generate_target_from_input(self, profile_template: dict, target: dict = {}) -> dict:
|
||||
"""Generate a target configuration from profile_template and user input."""
|
||||
profile_template_local = copy.deepcopy(profile_template)
|
||||
@@ -324,5 +297,10 @@ class InitTask(BaseTask):
|
||||
return
|
||||
adapter = self.ask_for_adapter_choice()
|
||||
self.create_profile_from_target(adapter, profile_name=project_name)
|
||||
msg = self.get_addendum(project_name, profiles_dir)
|
||||
fire_event(GetAddendum(msg=msg))
|
||||
fire_event(
|
||||
ProjectCreated(
|
||||
project_name=project_name,
|
||||
docs_url=DOCS_URL,
|
||||
slack_url=SLACK_URL,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -17,11 +17,11 @@ from dbt.events.types import (
|
||||
ManifestLoaded,
|
||||
ManifestChecked,
|
||||
ManifestFlatGraphBuilt,
|
||||
ParsingStart,
|
||||
ParsingCompiling,
|
||||
ParsingWritingManifest,
|
||||
ParsingDone,
|
||||
ReportPerformancePath,
|
||||
ParseCmdStart,
|
||||
ParseCmdCompiling,
|
||||
ParseCmdWritingManifest,
|
||||
ParseCmdDone,
|
||||
ParseCmdPerfInfoPath,
|
||||
)
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.graph import Graph
|
||||
@@ -50,7 +50,7 @@ class ParseTask(ConfiguredTask):
|
||||
def write_perf_info(self):
|
||||
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
|
||||
write_file(path, json.dumps(self.loader._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
|
||||
fire_event(ReportPerformancePath(path=path))
|
||||
fire_event(ParseCmdPerfInfoPath(path=path))
|
||||
|
||||
# This method takes code that normally exists in other files
|
||||
# and pulls it in here, to simplify logging and make the
|
||||
@@ -89,14 +89,14 @@ class ParseTask(ConfiguredTask):
|
||||
self.graph = compiler.compile(self.manifest)
|
||||
|
||||
def run(self):
|
||||
fire_event(ParsingStart())
|
||||
fire_event(ParseCmdStart())
|
||||
self.get_full_manifest()
|
||||
if self.args.compile:
|
||||
fire_event(ParsingCompiling())
|
||||
fire_event(ParseCmdCompiling())
|
||||
self.compile_manifest()
|
||||
if self.args.write_manifest:
|
||||
fire_event(ParsingWritingManifest())
|
||||
fire_event(ParseCmdWritingManifest())
|
||||
self.write_manifest()
|
||||
|
||||
self.write_perf_info()
|
||||
fire_event(ParsingDone())
|
||||
fire_event(ParseCmdDone())
|
||||
|
||||
@@ -29,7 +29,7 @@ from dbt.exceptions import (
|
||||
)
|
||||
from dbt.events.functions import fire_event, get_invocation_id
|
||||
from dbt.events.types import (
|
||||
DatabaseErrorRunning,
|
||||
DatabaseErrorRunningHook,
|
||||
EmptyLine,
|
||||
HooksRunning,
|
||||
HookFinished,
|
||||
@@ -389,7 +389,7 @@ class RunTask(CompileTask):
|
||||
try:
|
||||
self.run_hooks(adapter, hook_type, extra_context)
|
||||
except RuntimeException:
|
||||
fire_event(DatabaseErrorRunning(hook_type=hook_type.value))
|
||||
fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value))
|
||||
raise
|
||||
|
||||
def print_results_line(self, results, execution_time):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any
|
||||
import traceback
|
||||
|
||||
import agate
|
||||
|
||||
@@ -55,12 +56,12 @@ class RunOperationTask(ManifestTask):
|
||||
try:
|
||||
self._run_unsafe()
|
||||
except dbt.exceptions.Exception as exc:
|
||||
fire_event(RunningOperationCaughtError(exc=exc))
|
||||
fire_event(PrintDebugStackTrace())
|
||||
fire_event(RunningOperationCaughtError(exc=str(exc)))
|
||||
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
|
||||
success = False
|
||||
except Exception as exc:
|
||||
fire_event(RunningOperationUncaughtError(exc=exc))
|
||||
fire_event(PrintDebugStackTrace())
|
||||
fire_event(RunningOperationUncaughtError(exc=str(exc)))
|
||||
fire_event(PrintDebugStackTrace(exc_info=traceback.format_exc()))
|
||||
success = False
|
||||
else:
|
||||
success = True
|
||||
|
||||
@@ -35,6 +35,7 @@ from dbt.events.types import (
|
||||
NodeFinished,
|
||||
QueryCancelationUnsupported,
|
||||
ConcurrencyLine,
|
||||
EndRunResult,
|
||||
)
|
||||
from dbt.contracts.graph.compiled import CompileResultNode
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
@@ -232,7 +233,7 @@ class GraphRunnableTask(ManifestTask):
|
||||
NodeFinished(
|
||||
node_info=runner.node.node_info,
|
||||
unique_id=runner.node.unique_id,
|
||||
run_result=result.to_dict(),
|
||||
run_result=result.to_msg(),
|
||||
)
|
||||
)
|
||||
# `_event_status` dict is only used for logging. Make sure
|
||||
@@ -471,6 +472,18 @@ class GraphRunnableTask(ManifestTask):
|
||||
selected_uids = frozenset(n.unique_id for n in self._flattened_nodes)
|
||||
result = self.execute_with_hooks(selected_uids)
|
||||
|
||||
# We have other result types here too, including FreshnessResult
|
||||
if isinstance(result, RunExecutionResult):
|
||||
result_msgs = [result.to_msg() for result in result.results]
|
||||
fire_event(
|
||||
EndRunResult(
|
||||
results=result_msgs,
|
||||
generated_at=result.generated_at,
|
||||
elapsed_time=result.elapsed_time,
|
||||
success=GraphRunnableTask.interpret_results(result.results),
|
||||
)
|
||||
)
|
||||
|
||||
if flags.WRITE_JSON:
|
||||
self.write_manifest()
|
||||
self.write_result(result)
|
||||
@@ -478,7 +491,8 @@ class GraphRunnableTask(ManifestTask):
|
||||
self.task_end_messages(result.results)
|
||||
return result
|
||||
|
||||
def interpret_results(self, results):
|
||||
@classmethod
|
||||
def interpret_results(cls, results):
|
||||
if results is None:
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from abc import abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import Generic, TypeVar
|
||||
import traceback
|
||||
|
||||
import dbt.exceptions
|
||||
from dbt.contracts.sql import (
|
||||
@@ -10,7 +11,7 @@ from dbt.contracts.sql import (
|
||||
ResultTable,
|
||||
)
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import SQlRunnerException
|
||||
from dbt.events.types import SQLRunnerException
|
||||
from dbt.task.compile import CompileRunner
|
||||
|
||||
|
||||
@@ -22,7 +23,7 @@ class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
|
||||
CompileRunner.__init__(self, config, adapter, node, node_index, num_nodes)
|
||||
|
||||
def handle_exception(self, e, ctx):
|
||||
fire_event(SQlRunnerException(exc=e))
|
||||
fire_event(SQLRunnerException(exc=str(e), exc_info=traceback.format_exc()))
|
||||
if isinstance(e, dbt.exceptions.Exception):
|
||||
if isinstance(e, dbt.exceptions.RuntimeException):
|
||||
e.add_node(ctx.node)
|
||||
|
||||
@@ -94,7 +94,7 @@ class TestRunner(CompileRunner):
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time,
|
||||
failures=result.failures,
|
||||
num_failures=result.failures,
|
||||
node_info=model.node_info,
|
||||
)
|
||||
)
|
||||
@@ -105,7 +105,7 @@ class TestRunner(CompileRunner):
|
||||
index=self.node_index,
|
||||
num_models=self.num_nodes,
|
||||
execution_time=result.execution_time,
|
||||
failures=result.failures,
|
||||
num_failures=result.failures,
|
||||
node_info=model.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from typing import Optional
|
||||
import traceback
|
||||
|
||||
from dbt.clients.yaml_helper import ( # noqa:F401
|
||||
yaml,
|
||||
@@ -452,7 +453,7 @@ def initialize_tracking(cookie_dir):
|
||||
try:
|
||||
active_user.initialize()
|
||||
except Exception:
|
||||
fire_event(TrackingInitializeFailure())
|
||||
fire_event(TrackingInitializeFailure(exc_info=traceback.format_exc()))
|
||||
active_user = User(None)
|
||||
|
||||
|
||||
|
||||
@@ -619,7 +619,7 @@ def _connection_exception_retry(fn, max_attempts: int, attempt: int = 0):
|
||||
ReadError,
|
||||
) as exc:
|
||||
if attempt <= max_attempts - 1:
|
||||
fire_event(RecordRetryException(exc=exc))
|
||||
fire_event(RecordRetryException(exc=str(exc)))
|
||||
fire_event(RetryExternalCall(attempt=attempt, max=max_attempts))
|
||||
time.sleep(1)
|
||||
return _connection_exception_retry(fn, max_attempts, attempt + 1)
|
||||
@@ -666,3 +666,20 @@ def args_to_dict(args):
|
||||
var_args[key] = str(var_args[key])
|
||||
dict_args[key] = var_args[key]
|
||||
return dict_args
|
||||
|
||||
|
||||
# This is useful for proto generated classes in particular, since
|
||||
# the default for protobuf for strings is the empty string, so
|
||||
# Optional[str] types don't work for generated Python classes.
|
||||
def cast_to_str(string: Optional[str]) -> str:
|
||||
if string is None:
|
||||
return ""
|
||||
else:
|
||||
return string
|
||||
|
||||
|
||||
def cast_to_int(integer: Optional[int]) -> int:
|
||||
if integer is None:
|
||||
return 0
|
||||
else:
|
||||
return integer
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
betterproto[compiler]
|
||||
black==22.6.0
|
||||
bumpversion
|
||||
flake8
|
||||
@@ -7,6 +8,7 @@ ipdb
|
||||
mypy==0.971
|
||||
pip-tools
|
||||
pre-commit
|
||||
protobuf
|
||||
pytest
|
||||
pytest-cov
|
||||
pytest-csv
|
||||
@@ -22,6 +24,7 @@ types-PyYAML
|
||||
types-freezegun
|
||||
types-Jinja2
|
||||
types-mock
|
||||
types-protobuf
|
||||
types-python-dateutil
|
||||
types-pytz
|
||||
types-requests
|
||||
|
||||
@@ -8,6 +8,13 @@ import dbt.exceptions
|
||||
from dbt.version import __version__ as dbt_version
|
||||
from dbt.logger import log_manager
|
||||
|
||||
# Temporary for while we're converting logs to a different format
|
||||
def get_log_msg(log):
|
||||
if "msg" in log:
|
||||
return log["msg"]
|
||||
if "info" in log:
|
||||
if "msg" in log["info"]:
|
||||
return log["info"]["msg"]
|
||||
|
||||
class TestDefaultQueryComments(DBTIntegrationTest):
|
||||
def matches_comment(self, msg) -> bool:
|
||||
@@ -74,7 +81,7 @@ class TestDefaultQueryComments(DBTIntegrationTest):
|
||||
def query_comment(self, model_name, log):
|
||||
# N.B: a temporary string replacement regex to strip the HH:MM:SS from the log line if present.
|
||||
# TODO: make this go away when structured logging is stable
|
||||
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg'])
|
||||
log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", get_log_msg(log))
|
||||
prefix = 'On {}: '.format(model_name)
|
||||
if log_msg.startswith(prefix):
|
||||
msg = log_msg[len(prefix):]
|
||||
@@ -92,7 +99,7 @@ class TestDefaultQueryComments(DBTIntegrationTest):
|
||||
if msg is not None and self.matches_comment(msg):
|
||||
seen = True
|
||||
|
||||
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs)))
|
||||
self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(get_log_msg(logline) for logline in logs)))
|
||||
|
||||
@use_profile('postgres')
|
||||
def test_postgres_comments(self):
|
||||
|
||||
@@ -1,607 +0,0 @@
|
||||
from dbt.adapters.reference_keys import _ReferenceKey
|
||||
from dbt.events.test_types import UnitTestInfo
|
||||
from dbt.events import AdapterLogger
|
||||
from dbt.events.functions import event_to_serializable_dict
|
||||
from dbt.events.base_types import NodeInfo
|
||||
from dbt.events.types import *
|
||||
from dbt.events.test_types import *
|
||||
# from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
|
||||
from dbt.events.base_types import Event, TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel
|
||||
from importlib import reload
|
||||
import dbt.events.functions as event_funcs
|
||||
import dbt.flags as flags
|
||||
from dbt.helper_types import Lazy
|
||||
import inspect
|
||||
import json
|
||||
from unittest import TestCase
|
||||
from dbt.contracts.graph.parsed import (
|
||||
ParsedModelNode, NodeConfig, DependsOn
|
||||
)
|
||||
from dbt.contracts.files import FileHash
|
||||
from mashumaro.types import SerializableType
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
# takes in a class and finds any subclasses for it
|
||||
def get_all_subclasses(cls):
|
||||
all_subclasses = []
|
||||
for subclass in cls.__subclasses__():
|
||||
# If the test breaks because of abcs this list might have to be updated.
|
||||
if subclass in [NodeInfo, AdapterEventBase, TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel]:
|
||||
continue
|
||||
all_subclasses.append(subclass)
|
||||
all_subclasses.extend(get_all_subclasses(subclass))
|
||||
return set(all_subclasses)
|
||||
|
||||
|
||||
class TestAdapterLogger(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
# this interface is documented for adapter maintainers to plug into
|
||||
# so we should test that it at the very least doesn't explode.
|
||||
def test_basic_adapter_logging_interface(self):
|
||||
logger = AdapterLogger("dbt_tests")
|
||||
logger.debug("debug message")
|
||||
logger.info("info message")
|
||||
logger.warning("warning message")
|
||||
logger.error("error message")
|
||||
logger.exception("exception message")
|
||||
logger.critical("exception message")
|
||||
self.assertTrue(True)
|
||||
|
||||
# python loggers allow deferring string formatting via this signature:
|
||||
def test_formatting(self):
|
||||
logger = AdapterLogger("dbt_tests")
|
||||
# tests that it doesn't throw
|
||||
logger.debug("hello {}", 'world')
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="hello {}", args=('world',))
|
||||
self.assertTrue("hello world" in event.message())
|
||||
|
||||
# tests that it doesn't throw
|
||||
logger.debug("1 2 {}", 3)
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="1 2 {}", args=(3,))
|
||||
self.assertTrue("1 2 3" in event.message())
|
||||
|
||||
# tests that it doesn't throw
|
||||
logger.debug("boop{x}boop")
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
# in this case it's that we didn't attempt to replace anything since there
|
||||
# were no args passed after the initial message
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="boop{x}boop", args=())
|
||||
self.assertTrue("boop{x}boop" in event.message())
|
||||
|
||||
class TestEventCodes(TestCase):
|
||||
|
||||
# checks to see if event codes are duplicated to keep codes singluar and clear.
|
||||
# also checks that event codes follow correct namming convention ex. E001
|
||||
def test_event_codes(self):
|
||||
all_concrete = get_all_subclasses(Event)
|
||||
all_codes = set()
|
||||
|
||||
for event in all_concrete:
|
||||
if not inspect.isabstract(event):
|
||||
# must be in the form 1 capital letter, 3 digits
|
||||
self.assertTrue('^[A-Z][0-9]{3}', event.code)
|
||||
# cannot have been used already
|
||||
self.assertFalse(event.code in all_codes, f'{event.code} is assigned more than once. Check types.py for duplicates.')
|
||||
all_codes.add(event.code)
|
||||
|
||||
|
||||
class TestEventBuffer(TestCase):
|
||||
|
||||
def setUp(self) -> None:
|
||||
flags.EVENT_BUFFER_SIZE = 10
|
||||
reload(event_funcs)
|
||||
|
||||
# ensure events are populated to the buffer exactly once
|
||||
def test_buffer_populates(self):
|
||||
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
|
||||
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
|
||||
event1 = event_funcs.EVENT_HISTORY[-2]
|
||||
self.assertTrue(
|
||||
event_funcs.EVENT_HISTORY.count(event1) == 1
|
||||
)
|
||||
|
||||
# ensure events drop from the front of the buffer when buffer maxsize is reached
|
||||
def test_buffer_FIFOs(self):
|
||||
event_funcs.EVENT_HISTORY.clear()
|
||||
for n in range(1,(flags.EVENT_BUFFER_SIZE + 1)):
|
||||
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
|
||||
|
||||
event_full = event_funcs.EVENT_HISTORY[-1]
|
||||
self.assertEqual(event_full.code, 'Z048')
|
||||
self.assertTrue(
|
||||
event_funcs.EVENT_HISTORY.count(event_full) == 1
|
||||
)
|
||||
self.assertTrue(
|
||||
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
|
||||
)
|
||||
|
||||
def MockNode():
|
||||
return ParsedModelNode(
|
||||
alias='model_one',
|
||||
name='model_one',
|
||||
database='dbt',
|
||||
schema='analytics',
|
||||
resource_type=NodeType.Model,
|
||||
unique_id='model.root.model_one',
|
||||
fqn=['root', 'model_one'],
|
||||
package_name='root',
|
||||
original_file_path='model_one.sql',
|
||||
root_path='/usr/src/app',
|
||||
refs=[],
|
||||
sources=[],
|
||||
depends_on=DependsOn(),
|
||||
config=NodeConfig.from_dict({
|
||||
'enabled': True,
|
||||
'materialized': 'view',
|
||||
'persist_docs': {},
|
||||
'post-hook': [],
|
||||
'pre-hook': [],
|
||||
'vars': {},
|
||||
'quoting': {},
|
||||
'column_types': {},
|
||||
'tags': [],
|
||||
}),
|
||||
tags=[],
|
||||
path='model_one.sql',
|
||||
raw_code='',
|
||||
description='',
|
||||
columns={},
|
||||
checksum=FileHash.from_contents(''),
|
||||
)
|
||||
|
||||
|
||||
sample_values = [
|
||||
MainReportVersion(v=''),
|
||||
MainKeyboardInterrupt(),
|
||||
MainEncounteredError(e=BaseException('')),
|
||||
MainStackTrace(stack_trace=''),
|
||||
MainTrackingUserState(user_state=''),
|
||||
ParsingStart(),
|
||||
ParsingCompiling(),
|
||||
ParsingWritingManifest(),
|
||||
ParsingDone(),
|
||||
ManifestDependenciesLoaded(),
|
||||
ManifestLoaderCreated(),
|
||||
ManifestLoaded(),
|
||||
ManifestChecked(),
|
||||
ManifestFlatGraphBuilt(),
|
||||
ReportPerformancePath(path=""),
|
||||
GitSparseCheckoutSubdirectory(subdir=""),
|
||||
GitProgressCheckoutRevision(revision=""),
|
||||
GitProgressUpdatingExistingDependency(dir=""),
|
||||
GitProgressPullingNewDependency(dir=""),
|
||||
GitNothingToDo(sha=""),
|
||||
GitProgressUpdatedCheckoutRange(start_sha="", end_sha=""),
|
||||
GitProgressCheckedOutAt(end_sha=""),
|
||||
SystemErrorRetrievingModTime(path=""),
|
||||
SystemCouldNotWrite(path="", reason="", exc=""),
|
||||
SystemExecutingCmd(cmd=[""]),
|
||||
SystemStdOutMsg(bmsg=b""),
|
||||
SystemStdErrMsg(bmsg=b""),
|
||||
SelectorReportInvalidSelector(
|
||||
valid_selectors="", spec_method="", raw_spec=""
|
||||
),
|
||||
MacroEventInfo(msg=""),
|
||||
MacroEventDebug(msg=""),
|
||||
NewConnection(conn_type="", conn_name=""),
|
||||
ConnectionReused(conn_name=""),
|
||||
ConnectionLeftOpen(conn_name=""),
|
||||
ConnectionClosed(conn_name=""),
|
||||
RollbackFailed(conn_name=""),
|
||||
ConnectionClosed2(conn_name=""),
|
||||
ConnectionLeftOpen2(conn_name=""),
|
||||
Rollback(conn_name=""),
|
||||
CacheMiss(conn_name="", database="", schema=""),
|
||||
ListRelations(database="", schema="", relations=[]),
|
||||
ConnectionUsed(conn_type="", conn_name=""),
|
||||
SQLQuery(conn_name="", sql=""),
|
||||
SQLQueryStatus(status="", elapsed=0.1),
|
||||
CodeExecution(conn_name="", code_content=""),
|
||||
CodeExecutionStatus(status="", elapsed=0.1),
|
||||
SQLCommit(conn_name=""),
|
||||
ColTypeChange(
|
||||
orig_type="", new_type="",
|
||||
table=_ReferenceKey(database="", schema="", identifier=""),
|
||||
),
|
||||
SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||
SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||
UncachedRelation(
|
||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
),
|
||||
AddLink(
|
||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
),
|
||||
AddRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||
DropCascade(
|
||||
dropped=_ReferenceKey(database="", schema="", identifier=""),
|
||||
consequences={_ReferenceKey(database="", schema="", identifier="")},
|
||||
),
|
||||
UpdateReference(
|
||||
old_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
new_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
cached_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
),
|
||||
TemporaryRelation(key=_ReferenceKey(database="", schema="", identifier="")),
|
||||
RenameSchema(
|
||||
old_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||
new_key=_ReferenceKey(database="", schema="", identifier="")
|
||||
),
|
||||
DumpBeforeAddGraph(Lazy.defer(lambda: dict())),
|
||||
DumpAfterAddGraph(Lazy.defer(lambda: dict())),
|
||||
DumpBeforeRenameSchema(Lazy.defer(lambda: dict())),
|
||||
DumpAfterRenameSchema(Lazy.defer(lambda: dict())),
|
||||
AdapterImportError(exc=ModuleNotFoundError()),
|
||||
PluginLoadError(),
|
||||
SystemReportReturnCode(returncode=0),
|
||||
NewConnectionOpening(connection_state=''),
|
||||
TimingInfoCollected(),
|
||||
MergedFromState(nbr_merged=0, sample=[]),
|
||||
MissingProfileTarget(profile_name='', target_name=''),
|
||||
InvalidVarsYAML(),
|
||||
GenericTestFileParse(path=''),
|
||||
MacroFileParse(path=''),
|
||||
PartialParsingFullReparseBecauseOfError(),
|
||||
PartialParsingFile(file_dict={}),
|
||||
PartialParsingExceptionFile(file=''),
|
||||
PartialParsingException(exc_info={}),
|
||||
PartialParsingSkipParsing(),
|
||||
PartialParsingMacroChangeStartFullParse(),
|
||||
ManifestWrongMetadataVersion(version=''),
|
||||
PartialParsingVersionMismatch(saved_version='', current_version=''),
|
||||
PartialParsingFailedBecauseConfigChange(),
|
||||
PartialParsingFailedBecauseProfileChange(),
|
||||
PartialParsingFailedBecauseNewProjectDependency(),
|
||||
PartialParsingFailedBecauseHashChanged(),
|
||||
PartialParsingDeletedMetric(id=''),
|
||||
ParsedFileLoadFailed(path='', exc=''),
|
||||
PartialParseSaveFileNotFound(),
|
||||
StaticParserCausedJinjaRendering(path=''),
|
||||
UsingExperimentalParser(path=''),
|
||||
SampleFullJinjaRendering(path=''),
|
||||
StaticParserFallbackJinjaRendering(path=''),
|
||||
StaticParsingMacroOverrideDetected(path=''),
|
||||
StaticParserSuccess(path=''),
|
||||
StaticParserFailure(path=''),
|
||||
ExperimentalParserSuccess(path=''),
|
||||
ExperimentalParserFailure(path=''),
|
||||
PartialParsingEnabled(deleted=0, added=0, changed=0),
|
||||
PartialParsingAddedFile(file_id=''),
|
||||
PartialParsingDeletedFile(file_id=''),
|
||||
PartialParsingUpdatedFile(file_id=''),
|
||||
PartialParsingNodeMissingInSourceFile(source_file=''),
|
||||
PartialParsingMissingNodes(file_id=''),
|
||||
PartialParsingChildMapMissingUniqueID(unique_id=''),
|
||||
PartialParsingUpdateSchemaFile(file_id=''),
|
||||
PartialParsingDeletedSource(unique_id=''),
|
||||
PartialParsingDeletedExposure(unique_id=''),
|
||||
InvalidDisabledSourceInTestNode(msg=''),
|
||||
InvalidRefInTestNode(msg=''),
|
||||
RunningOperationCaughtError(exc=''),
|
||||
RunningOperationUncaughtError(exc=Exception('')),
|
||||
DbtProjectError(),
|
||||
DbtProjectErrorException(exc=Exception('')),
|
||||
DbtProfileError(),
|
||||
DbtProfileErrorException(exc=Exception('')),
|
||||
ProfileListTitle(),
|
||||
ListSingleProfile(profile=''),
|
||||
NoDefinedProfiles(),
|
||||
ProfileHelpMessage(),
|
||||
CatchableExceptionOnRun(exc=Exception('')),
|
||||
InternalExceptionOnRun(build_path='', exc=Exception('')),
|
||||
GenericExceptionOnRun(build_path='', unique_id='', exc=''),
|
||||
NodeConnectionReleaseError(node_name='', exc=Exception('')),
|
||||
CheckCleanPath(path=''),
|
||||
ConfirmCleanPath(path=''),
|
||||
ProtectedCleanPath(path=''),
|
||||
FinishedCleanPaths(),
|
||||
OpenCommand(open_cmd='', profiles_dir=''),
|
||||
DepsNoPackagesFound(),
|
||||
DepsStartPackageInstall(package_name=''),
|
||||
DepsInstallInfo(version_name=''),
|
||||
DepsUpdateAvailable(version_latest=''),
|
||||
DepsListSubdirectory(subdirectory=''),
|
||||
DepsNotifyUpdatesAvailable(packages=[]),
|
||||
DatabaseErrorRunning(hook_type=''),
|
||||
EmptyLine(),
|
||||
HooksRunning(num_hooks=0, hook_type=''),
|
||||
HookFinished(stat_line='', execution='', execution_time=0),
|
||||
WriteCatalogFailure(num_exceptions=0),
|
||||
CatalogWritten(path=''),
|
||||
CannotGenerateDocs(),
|
||||
BuildingCatalog(),
|
||||
CompileComplete(),
|
||||
FreshnessCheckComplete(),
|
||||
ServingDocsPort(address='', port=0),
|
||||
ServingDocsAccessInfo(port=''),
|
||||
ServingDocsExitInfo(),
|
||||
SeedHeader(header=''),
|
||||
SeedHeaderSeparator(len_header=0),
|
||||
RunResultWarning(resource_type='', node_name='', path=''),
|
||||
RunResultFailure(resource_type='', node_name='', path=''),
|
||||
StatsLine(stats={'pass':0, 'warn':0, 'error':0, 'skip':0, 'total':0}),
|
||||
RunResultError(msg=''),
|
||||
RunResultErrorNoMessage(status=''),
|
||||
SQLCompiledPath(path=''),
|
||||
CheckNodeTestFailure(relation_name=''),
|
||||
FirstRunResultError(msg=''),
|
||||
AfterFirstRunResultError(msg=''),
|
||||
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
|
||||
PrintStartLine(description='', index=0, total=0, node_info={}),
|
||||
PrintHookStartLine(statement='', index=0, total=0, node_info={}),
|
||||
PrintHookEndLine(statement='', status='', index=0, total=0, execution_time=0, node_info={}),
|
||||
SkippingDetails(resource_type='', schema='', node_name='', index=0, total=0, node_info={}),
|
||||
PrintErrorTestResult(name='', index=0, num_models=0, execution_time=0, node_info={}),
|
||||
PrintPassTestResult(name='', index=0, num_models=0, execution_time=0, node_info={}),
|
||||
PrintWarnTestResult(name='', index=0, num_models=0, execution_time=0, failures=0, node_info={}),
|
||||
PrintFailureTestResult(name='', index=0, num_models=0, execution_time=0, failures=0, node_info={}),
|
||||
PrintSkipBecauseError(schema='', relation='', index=0, total=0),
|
||||
PrintModelErrorResultLine(description='', status='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintModelResultLine(description='', status='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintSnapshotErrorResultLine(status='',
|
||||
description='',
|
||||
cfg={},
|
||||
index=0,
|
||||
total=0,
|
||||
execution_time=0,
|
||||
node_info={}),
|
||||
PrintSnapshotResultLine(status='', description='', cfg={}, index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintSeedErrorResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='', node_info={}),
|
||||
PrintSeedResultLine(status='', index=0, total=0, execution_time=0, schema='', relation='', node_info={}),
|
||||
PrintHookEndErrorLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintHookEndErrorStaleLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintHookEndWarnLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintHookEndPassLine(source_name='', table_name='', index=0, total=0, execution_time=0, node_info={}),
|
||||
PrintCancelLine(conn_name=''),
|
||||
DefaultSelector(name=''),
|
||||
NodeStart(unique_id='', node_info={}),
|
||||
NodeCompiling(unique_id='', node_info={}),
|
||||
NodeExecuting(unique_id='', node_info={}),
|
||||
NodeFinished(unique_id='', node_info={}, run_result={}),
|
||||
QueryCancelationUnsupported(type=''),
|
||||
ConcurrencyLine(num_threads=0, target_name=''),
|
||||
StarterProjectPath(dir=''),
|
||||
ConfigFolderDirectory(dir=''),
|
||||
NoSampleProfileFound(adapter=''),
|
||||
ProfileWrittenWithSample(name='', path=''),
|
||||
ProfileWrittenWithTargetTemplateYAML(name='', path=''),
|
||||
ProfileWrittenWithProjectTemplateYAML(name='', path=''),
|
||||
SettingUpProfile(),
|
||||
InvalidProfileTemplateYAML(),
|
||||
ProjectNameAlreadyExists(name=''),
|
||||
GetAddendum(msg=''),
|
||||
DepsSetDownloadDirectory(path=''),
|
||||
EnsureGitInstalled(),
|
||||
DepsCreatingLocalSymlink(),
|
||||
DepsSymlinkNotAvailable(),
|
||||
FoundStats(stat_line=''),
|
||||
CompilingNode(unique_id=''),
|
||||
WritingInjectedSQLForNode(unique_id=''),
|
||||
DisableTracking(),
|
||||
SendingEvent(kwargs=''),
|
||||
SendEventFailure(),
|
||||
FlushEvents(),
|
||||
FlushEventsFailure(),
|
||||
TrackingInitializeFailure(),
|
||||
RetryExternalCall(attempt=0, max=0),
|
||||
GeneralWarningMsg(msg='', log_fmt=''),
|
||||
GeneralWarningException(exc=Exception(''), log_fmt=''),
|
||||
PartialParsingProfileEnvVarsChanged(),
|
||||
AdapterEventDebug(name='', base_msg='', args=()),
|
||||
AdapterEventInfo(name='', base_msg='', args=()),
|
||||
AdapterEventWarning(name='', base_msg='', args=()),
|
||||
AdapterEventError(name='', base_msg='', args=()),
|
||||
PrintDebugStackTrace(),
|
||||
MainReportArgs(args={}),
|
||||
RegistryProgressMakingGETRequest(url=''),
|
||||
RegistryIndexProgressMakingGETRequest(url=""),
|
||||
RegistryIndexProgressGETResponse(url="", resp_code=1),
|
||||
RegistryResponseUnexpectedType(response=""),
|
||||
RegistryResponseMissingTopKeys(response=""),
|
||||
RegistryResponseMissingNestedKeys(response=""),
|
||||
RegistryResponseExtraNestedKeys(response=""),
|
||||
DepsUTD(),
|
||||
PartialParsingNotEnabled(),
|
||||
SQlRunnerException(exc=Exception('')),
|
||||
DropRelation(dropped=_ReferenceKey(database="", schema="", identifier="")),
|
||||
PartialParsingProjectEnvVarsChanged(),
|
||||
RegistryProgressGETResponse(url='', resp_code=1),
|
||||
IntegrationTestDebug(msg=''),
|
||||
IntegrationTestInfo(msg=''),
|
||||
IntegrationTestWarn(msg=''),
|
||||
IntegrationTestError(msg=''),
|
||||
IntegrationTestException(msg=''),
|
||||
EventBufferFull(),
|
||||
RecordRetryException(exc=Exception('')),
|
||||
UnitTestInfo(msg=''),
|
||||
]
|
||||
|
||||
|
||||
class TestEventJSONSerialization(TestCase):
|
||||
|
||||
# attempts to test that every event is serializable to json.
|
||||
# event types that take `Any` are not possible to test in this way since some will serialize
|
||||
# just fine and others won't.
|
||||
def test_all_serializable(self):
|
||||
no_test = [DummyCacheEvent]
|
||||
|
||||
all_non_abstract_events = set(filter(lambda x: not inspect.isabstract(x) and x not in no_test, get_all_subclasses(Event)))
|
||||
all_event_values_list = list(map(lambda x: x.__class__, sample_values))
|
||||
diff = all_non_abstract_events.difference(set(all_event_values_list))
|
||||
self.assertFalse(diff, f"test is missing concrete values in `sample_values`. Please add the values for the aforementioned event classes")
|
||||
|
||||
# make sure everything in the list is a value not a type
|
||||
for event in sample_values:
|
||||
self.assertFalse(type(event) == type)
|
||||
|
||||
# if we have everything we need to test, try to serialize everything
|
||||
for event in sample_values:
|
||||
d = event_to_serializable_dict(event)
|
||||
try:
|
||||
json.dumps(d)
|
||||
except TypeError as e:
|
||||
raise Exception(f"{event} is not serializable to json. Originating exception: {e}")
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
@dataclass
|
||||
class Counter(Generic[T], SerializableType):
|
||||
dummy_val: T
|
||||
count: int = 0
|
||||
|
||||
def next(self) -> T:
|
||||
self.count = self.count + 1
|
||||
return self.dummy_val
|
||||
|
||||
# mashumaro serializer
|
||||
def _serialize() -> Dict[str, int]:
|
||||
return {'count': count}
|
||||
|
||||
|
||||
@dataclass
|
||||
class DummyCacheEvent(InfoLevel, Cache, SerializableType):
|
||||
code = 'X999'
|
||||
counter: Counter
|
||||
|
||||
def message(self) -> str:
|
||||
return f"state: {self.counter.next()}"
|
||||
|
||||
# mashumaro serializer
|
||||
def _serialize() -> str:
|
||||
return "DummyCacheEvent"
|
||||
|
||||
|
||||
# tests that if a cache event uses lazy evaluation for its message
|
||||
# creation, the evaluation will not be forced for cache events when
|
||||
# running without `--log-cache-events`.
|
||||
def skip_cache_event_message_rendering(x: TestCase):
|
||||
# a dummy event that extends `Cache`
|
||||
e = DummyCacheEvent(Counter("some_state"))
|
||||
|
||||
# counter of zero means this potentially expensive function
|
||||
# (emulating dump_graph) has never been called
|
||||
x.assertEqual(e.counter.count, 0)
|
||||
|
||||
# call fire_event
|
||||
event_funcs.fire_event(e)
|
||||
|
||||
# assert that the expensive function has STILL not been called
|
||||
x.assertEqual(e.counter.count, 0)
|
||||
|
||||
# this test checks that every subclass of `Cache` uses the same lazy evaluation
|
||||
# strategy. This ensures that potentially expensive cache event values are not
|
||||
# built unless they are needed for logging purposes. It also checks that these
|
||||
# potentially expensive values are cached, and not evaluated more than once.
|
||||
def all_cache_events_are_lazy(x):
|
||||
cache_events = get_all_subclasses(Cache)
|
||||
matching_classes = []
|
||||
for clazz in cache_events:
|
||||
# this body is only testing subclasses of `Cache` that take a param called "dump"
|
||||
|
||||
# initialize the counter to return a dictionary (emulating dump_graph)
|
||||
counter = Counter(dict())
|
||||
|
||||
# assert that the counter starts at 0
|
||||
x.assertEqual(counter.count, 0)
|
||||
|
||||
# try to create the cache event to use this counter type
|
||||
# fails for cache events that don't have a "dump" param
|
||||
try:
|
||||
clazz()
|
||||
except TypeError as e:
|
||||
print(clazz)
|
||||
# hack that roughly detects attribute names without an instance of the class
|
||||
if 'dump' in str(e):
|
||||
matching_classes.append(clazz)
|
||||
|
||||
# make the class. If this throws, maybe your class didn't use Lazy when it should have
|
||||
e = clazz(dump = Lazy.defer(lambda: counter.next()))
|
||||
|
||||
# assert that initializing the event with the counter
|
||||
# did not evaluate the lazy value
|
||||
x.assertEqual(counter.count, 0)
|
||||
|
||||
# log an event which should trigger evaluation and up
|
||||
# the counter
|
||||
event_funcs.fire_event(e)
|
||||
|
||||
# assert that the counter increased
|
||||
x.assertEqual(counter.count, 1)
|
||||
|
||||
# fire another event which should reuse the previous value
|
||||
# not evaluate the function again
|
||||
event_funcs.fire_event(e)
|
||||
|
||||
# assert that the counter did not increase
|
||||
x.assertEqual(counter.count, 1)
|
||||
|
||||
# if the init function doesn't require something named "dump"
|
||||
# we can just continue
|
||||
else:
|
||||
pass
|
||||
|
||||
# other exceptions are issues and should be thrown
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
# we should have exactly 4 matching classes (raise this threshold if we add more)
|
||||
x.assertEqual(len(matching_classes), 4, f"matching classes:\n{len(matching_classes)}: {matching_classes}")
|
||||
|
||||
|
||||
class SkipsRenderingCacheEventsTEXT(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
flags.LOG_FORMAT = 'text'
|
||||
|
||||
def test_skip_cache_event_message_rendering_TEXT(self):
|
||||
skip_cache_event_message_rendering(self)
|
||||
|
||||
|
||||
class SkipsRenderingCacheEventsJSON(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
flags.LOG_FORMAT = 'json'
|
||||
|
||||
def tearDown(self):
|
||||
flags.LOG_FORMAT = 'text'
|
||||
|
||||
def test_skip_cache_event_message_rendering_JSON(self):
|
||||
skip_cache_event_message_rendering(self)
|
||||
|
||||
|
||||
class TestLazyMemoizationInCacheEventsTEXT(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
flags.LOG_FORMAT = 'text'
|
||||
flags.LOG_CACHE_EVENTS = True
|
||||
|
||||
def tearDown(self):
|
||||
flags.LOG_CACHE_EVENTS = False
|
||||
|
||||
def test_all_cache_events_are_lazy_TEXT(self):
|
||||
all_cache_events_are_lazy(self)
|
||||
|
||||
|
||||
class TestLazyMemoizationInCacheEventsJSON(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
flags.LOG_FORMAT = 'json'
|
||||
flags.LOG_CACHE_EVENTS = True
|
||||
|
||||
def tearDown(self):
|
||||
flags.LOG_FORMAT = 'text'
|
||||
flags.LOG_CACHE_EVENTS = False
|
||||
|
||||
def test_all_cache_events_are_lazy_JSON(self):
|
||||
all_cache_events_are_lazy(self)
|
||||
511
tests/unit/test_events.py
Normal file
511
tests/unit/test_events.py
Normal file
@@ -0,0 +1,511 @@
|
||||
# flake8: noqa
|
||||
from dbt.events.test_types import UnitTestInfo
|
||||
from dbt.events import AdapterLogger
|
||||
from dbt.events.functions import event_to_json, LOG_VERSION
|
||||
from dbt.events.types import *
|
||||
from dbt.events.test_types import *
|
||||
|
||||
from dbt.events.base_types import (
|
||||
BaseEvent,
|
||||
DebugLevel,
|
||||
WarnLevel,
|
||||
InfoLevel,
|
||||
ErrorLevel,
|
||||
TestLevel,
|
||||
)
|
||||
from dbt.events.proto_types import NodeInfo, RunResultMsg, ReferenceKeyMsg
|
||||
from importlib import reload
|
||||
import dbt.events.functions as event_funcs
|
||||
import dbt.flags as flags
|
||||
import inspect
|
||||
import json
|
||||
from dbt.contracts.graph.parsed import ParsedModelNode, NodeConfig, DependsOn
|
||||
from dbt.contracts.files import FileHash
|
||||
from mashumaro.types import SerializableType
|
||||
from typing import Generic, TypeVar, Dict
|
||||
import re
|
||||
|
||||
# takes in a class and finds any subclasses for it
|
||||
def get_all_subclasses(cls):
|
||||
all_subclasses = []
|
||||
for subclass in cls.__subclasses__():
|
||||
# If the test breaks because of abcs this list might have to be updated.
|
||||
if subclass in [TestLevel, DebugLevel, WarnLevel, InfoLevel, ErrorLevel]:
|
||||
continue
|
||||
all_subclasses.append(subclass)
|
||||
all_subclasses.extend(get_all_subclasses(subclass))
|
||||
return set(all_subclasses)
|
||||
|
||||
|
||||
class TestAdapterLogger:
|
||||
# this interface is documented for adapter maintainers to plug into
|
||||
# so we should test that it at the very least doesn't explode.
|
||||
def test_basic_adapter_logging_interface(self):
|
||||
logger = AdapterLogger("dbt_tests")
|
||||
logger.debug("debug message")
|
||||
logger.info("info message")
|
||||
logger.warning("warning message")
|
||||
logger.error("error message")
|
||||
logger.exception("exception message")
|
||||
logger.critical("exception message")
|
||||
|
||||
# python loggers allow deferring string formatting via this signature:
|
||||
def test_formatting(self):
|
||||
logger = AdapterLogger("dbt_tests")
|
||||
# tests that it doesn't throw
|
||||
logger.debug("hello {}", "world")
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="hello {}", args=("world",))
|
||||
assert "hello world" in event.message()
|
||||
|
||||
# tests that it doesn't throw
|
||||
logger.debug("1 2 {}", 3)
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="1 2 {}", args=(3,))
|
||||
assert "1 2 3" in event.message()
|
||||
|
||||
# tests that it doesn't throw
|
||||
logger.debug("boop{x}boop")
|
||||
|
||||
# enters lower in the call stack to test that it formats correctly
|
||||
# in this case it's that we didn't attempt to replace anything since there
|
||||
# were no args passed after the initial message
|
||||
event = AdapterEventDebug(name="dbt_tests", base_msg="boop{x}boop", args=())
|
||||
assert "boop{x}boop" in event.message()
|
||||
|
||||
|
||||
class TestEventCodes:
|
||||
|
||||
# checks to see if event codes are duplicated to keep codes singluar and clear.
|
||||
# also checks that event codes follow correct namming convention ex. E001
|
||||
def test_event_codes(self):
|
||||
all_concrete = get_all_subclasses(BaseEvent)
|
||||
all_codes = set()
|
||||
|
||||
for event in all_concrete:
|
||||
if not inspect.isabstract(event):
|
||||
# must be in the form 1 capital letter, 3 digits
|
||||
assert re.match("^[A-Z][0-9]{3}", event.code)
|
||||
# cannot have been used already
|
||||
assert (
|
||||
event.info.code not in all_codes
|
||||
), f"{event.code} is assigned more than once. Check types.py for duplicates."
|
||||
all_codes.add(event.info.code)
|
||||
|
||||
|
||||
class TestEventBuffer:
|
||||
def setUp(self) -> None:
|
||||
flags.EVENT_BUFFER_SIZE = 10
|
||||
reload(event_funcs)
|
||||
|
||||
# ensure events are populated to the buffer exactly once
|
||||
def test_buffer_populates(self):
|
||||
self.setUp()
|
||||
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
|
||||
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
|
||||
event1 = event_funcs.EVENT_HISTORY[-2]
|
||||
assert event_funcs.EVENT_HISTORY.count(event1) == 1
|
||||
|
||||
# ensure events drop from the front of the buffer when buffer maxsize is reached
|
||||
def test_buffer_FIFOs(self):
|
||||
self.setUp()
|
||||
event_funcs.EVENT_HISTORY.clear()
|
||||
for n in range(1, (flags.EVENT_BUFFER_SIZE + 1)):
|
||||
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
|
||||
|
||||
event_full = event_funcs.EVENT_HISTORY[-1]
|
||||
assert event_full.info.code == "Z048"
|
||||
assert event_funcs.EVENT_HISTORY.count(event_full) == 1
|
||||
assert event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg="Test Event 1")) == 0
|
||||
|
||||
|
||||
def MockNode():
|
||||
return ParsedModelNode(
|
||||
alias="model_one",
|
||||
name="model_one",
|
||||
database="dbt",
|
||||
schema="analytics",
|
||||
resource_type=NodeType.Model,
|
||||
unique_id="model.root.model_one",
|
||||
fqn=["root", "model_one"],
|
||||
package_name="root",
|
||||
original_file_path="model_one.sql",
|
||||
root_path="/usr/src/app",
|
||||
refs=[],
|
||||
sources=[],
|
||||
depends_on=DependsOn(),
|
||||
config=NodeConfig.from_dict(
|
||||
{
|
||||
"enabled": True,
|
||||
"materialized": "view",
|
||||
"persist_docs": {},
|
||||
"post-hook": [],
|
||||
"pre-hook": [],
|
||||
"vars": {},
|
||||
"quoting": {},
|
||||
"column_types": {},
|
||||
"tags": [],
|
||||
}
|
||||
),
|
||||
tags=[],
|
||||
path="model_one.sql",
|
||||
raw_code="",
|
||||
description="",
|
||||
columns={},
|
||||
checksum=FileHash.from_contents(""),
|
||||
)
|
||||
|
||||
|
||||
sample_values = [
|
||||
MainReportVersion(version="", log_version=LOG_VERSION),
|
||||
MainKeyboardInterrupt(),
|
||||
MainEncounteredError(exc=""),
|
||||
MainStackTrace(stack_trace=""),
|
||||
MainTrackingUserState(user_state=""),
|
||||
ParseCmdStart(),
|
||||
ParseCmdCompiling(),
|
||||
ParseCmdWritingManifest(),
|
||||
ParseCmdDone(),
|
||||
ManifestDependenciesLoaded(),
|
||||
ManifestLoaderCreated(),
|
||||
ManifestLoaded(),
|
||||
ManifestChecked(),
|
||||
ManifestFlatGraphBuilt(),
|
||||
ParseCmdPerfInfoPath(path=""),
|
||||
GitSparseCheckoutSubdirectory(subdir=""),
|
||||
GitProgressCheckoutRevision(revision=""),
|
||||
GitProgressUpdatingExistingDependency(dir=""),
|
||||
GitProgressPullingNewDependency(dir=""),
|
||||
GitNothingToDo(sha=""),
|
||||
GitProgressUpdatedCheckoutRange(start_sha="", end_sha=""),
|
||||
GitProgressCheckedOutAt(end_sha=""),
|
||||
SystemErrorRetrievingModTime(path=""),
|
||||
SystemCouldNotWrite(path="", reason="", exc=""),
|
||||
SystemExecutingCmd(cmd=[""]),
|
||||
SystemStdOutMsg(bmsg=b""),
|
||||
SystemStdErrMsg(bmsg=b""),
|
||||
SelectorReportInvalidSelector(valid_selectors="", spec_method="", raw_spec=""),
|
||||
MacroEventInfo(msg=""),
|
||||
MacroEventDebug(msg=""),
|
||||
NewConnection(conn_type="", conn_name=""),
|
||||
ConnectionReused(conn_name=""),
|
||||
ConnectionLeftOpen(conn_name=""),
|
||||
ConnectionClosed(conn_name=""),
|
||||
RollbackFailed(conn_name=""),
|
||||
ConnectionClosed2(conn_name=""),
|
||||
ConnectionLeftOpen2(conn_name=""),
|
||||
Rollback(conn_name=""),
|
||||
CacheMiss(conn_name="", database="", schema=""),
|
||||
ListRelations(database="", schema="", relations=[]),
|
||||
ConnectionUsed(conn_type="", conn_name=""),
|
||||
SQLQuery(conn_name="", sql=""),
|
||||
SQLQueryStatus(status="", elapsed=0.1),
|
||||
CodeExecution(conn_name="", code_content=""),
|
||||
CodeExecutionStatus(status="", elapsed=0.1),
|
||||
SQLCommit(conn_name=""),
|
||||
ColTypeChange(
|
||||
orig_type="",
|
||||
new_type="",
|
||||
table=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
),
|
||||
SchemaCreation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
SchemaDrop(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
UncachedRelation(
|
||||
dep_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
ref_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
),
|
||||
AddLink(
|
||||
dep_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
ref_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
),
|
||||
AddRelation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
DropMissingRelation(relation=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
DropCascade(
|
||||
dropped=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
consequences=[ReferenceKeyMsg(database="", schema="", identifier="")],
|
||||
),
|
||||
UpdateReference(
|
||||
old_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
new_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
cached_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
),
|
||||
TemporaryRelation(key=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
RenameSchema(
|
||||
old_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
new_key=ReferenceKeyMsg(database="", schema="", identifier=""),
|
||||
),
|
||||
DumpBeforeAddGraph(dump=dict()),
|
||||
DumpAfterAddGraph(dump=dict()),
|
||||
DumpBeforeRenameSchema(dump=dict()),
|
||||
DumpAfterRenameSchema(dump=dict()),
|
||||
AdapterImportError(exc=""),
|
||||
PluginLoadError(),
|
||||
SystemReportReturnCode(returncode=0),
|
||||
NewConnectionOpening(connection_state=""),
|
||||
TimingInfoCollected(),
|
||||
MergedFromState(num_merged=0, sample=[]),
|
||||
MissingProfileTarget(profile_name="", target_name=""),
|
||||
InvalidVarsYAML(),
|
||||
GenericTestFileParse(path=""),
|
||||
MacroFileParse(path=""),
|
||||
PartialParsingFullReparseBecauseOfError(),
|
||||
PartialParsingFile(file_id=""),
|
||||
PartialParsingExceptionFile(file=""),
|
||||
PartialParsingException(exc_info={}),
|
||||
PartialParsingSkipParsing(),
|
||||
PartialParsingMacroChangeStartFullParse(),
|
||||
ManifestWrongMetadataVersion(version=""),
|
||||
PartialParsingVersionMismatch(saved_version="", current_version=""),
|
||||
PartialParsingFailedBecauseConfigChange(),
|
||||
PartialParsingFailedBecauseProfileChange(),
|
||||
PartialParsingFailedBecauseNewProjectDependency(),
|
||||
PartialParsingFailedBecauseHashChanged(),
|
||||
PartialParsingDeletedMetric(unique_id=""),
|
||||
ParsedFileLoadFailed(path="", exc=""),
|
||||
PartialParseSaveFileNotFound(),
|
||||
StaticParserCausedJinjaRendering(path=""),
|
||||
UsingExperimentalParser(path=""),
|
||||
SampleFullJinjaRendering(path=""),
|
||||
StaticParserFallbackJinjaRendering(path=""),
|
||||
StaticParsingMacroOverrideDetected(path=""),
|
||||
StaticParserSuccess(path=""),
|
||||
StaticParserFailure(path=""),
|
||||
ExperimentalParserSuccess(path=""),
|
||||
ExperimentalParserFailure(path=""),
|
||||
PartialParsingEnabled(deleted=0, added=0, changed=0),
|
||||
PartialParsingAddedFile(file_id=""),
|
||||
PartialParsingDeletedFile(file_id=""),
|
||||
PartialParsingUpdatedFile(file_id=""),
|
||||
PartialParsingNodeMissingInSourceFile(file_id=""),
|
||||
PartialParsingMissingNodes(file_id=""),
|
||||
PartialParsingChildMapMissingUniqueID(unique_id=""),
|
||||
PartialParsingUpdateSchemaFile(file_id=""),
|
||||
PartialParsingDeletedSource(unique_id=""),
|
||||
PartialParsingDeletedExposure(unique_id=""),
|
||||
InvalidDisabledSourceInTestNode(msg=""),
|
||||
InvalidRefInTestNode(msg=""),
|
||||
RunningOperationCaughtError(exc=""),
|
||||
RunningOperationUncaughtError(exc=""),
|
||||
DbtProjectError(),
|
||||
DbtProjectErrorException(exc=""),
|
||||
DbtProfileError(),
|
||||
DbtProfileErrorException(exc=""),
|
||||
ProfileListTitle(),
|
||||
ListSingleProfile(profile=""),
|
||||
NoDefinedProfiles(),
|
||||
ProfileHelpMessage(),
|
||||
CatchableExceptionOnRun(exc=""),
|
||||
InternalExceptionOnRun(build_path="", exc=""),
|
||||
GenericExceptionOnRun(build_path="", unique_id="", exc=""),
|
||||
NodeConnectionReleaseError(node_name="", exc=""),
|
||||
CheckCleanPath(path=""),
|
||||
ConfirmCleanPath(path=""),
|
||||
ProtectedCleanPath(path=""),
|
||||
FinishedCleanPaths(),
|
||||
OpenCommand(open_cmd="", profiles_dir=""),
|
||||
DepsNoPackagesFound(),
|
||||
DepsStartPackageInstall(package_name=""),
|
||||
DepsInstallInfo(version_name=""),
|
||||
DepsUpdateAvailable(version_latest=""),
|
||||
DepsListSubdirectory(subdirectory=""),
|
||||
DepsNotifyUpdatesAvailable(packages=[]),
|
||||
DatabaseErrorRunningHook(hook_type=""),
|
||||
EmptyLine(),
|
||||
HooksRunning(num_hooks=0, hook_type=""),
|
||||
HookFinished(stat_line="", execution="", execution_time=0),
|
||||
WriteCatalogFailure(num_exceptions=0),
|
||||
CatalogWritten(path=""),
|
||||
CannotGenerateDocs(),
|
||||
BuildingCatalog(),
|
||||
CompileComplete(),
|
||||
FreshnessCheckComplete(),
|
||||
ServingDocsPort(address="", port=0),
|
||||
ServingDocsAccessInfo(port=""),
|
||||
ServingDocsExitInfo(),
|
||||
SeedHeader(header=""),
|
||||
SeedHeaderSeparator(len_header=0),
|
||||
RunResultWarning(resource_type="", node_name="", path=""),
|
||||
RunResultFailure(resource_type="", node_name="", path=""),
|
||||
StatsLine(stats={"pass": 0, "warn": 0, "error": 0, "skip": 0, "total": 0}),
|
||||
RunResultError(msg=""),
|
||||
RunResultErrorNoMessage(status=""),
|
||||
SQLCompiledPath(path=""),
|
||||
CheckNodeTestFailure(relation_name=""),
|
||||
FirstRunResultError(msg=""),
|
||||
AfterFirstRunResultError(msg=""),
|
||||
EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
|
||||
PrintStartLine(description="", index=0, total=0, node_info=NodeInfo()),
|
||||
PrintHookStartLine(statement="", index=0, total=0, node_info=NodeInfo()),
|
||||
PrintHookEndLine(
|
||||
statement="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
SkippingDetails(
|
||||
resource_type="", schema="", node_name="", index=0, total=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintErrorTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
|
||||
PrintPassTestResult(name="", index=0, num_models=0, execution_time=0, node_info=NodeInfo()),
|
||||
PrintWarnTestResult(
|
||||
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintFailureTestResult(
|
||||
name="", index=0, num_models=0, execution_time=0, num_failures=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintSkipBecauseError(schema="", relation="", index=0, total=0),
|
||||
PrintModelErrorResultLine(
|
||||
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintModelResultLine(
|
||||
description="", status="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintSnapshotErrorResultLine(
|
||||
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintSnapshotResultLine(
|
||||
status="", description="", cfg={}, index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintSeedErrorResultLine(
|
||||
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
|
||||
),
|
||||
PrintSeedResultLine(
|
||||
status="", index=0, total=0, execution_time=0, schema="", relation="", node_info=NodeInfo()
|
||||
),
|
||||
PrintHookEndErrorLine(
|
||||
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintHookEndErrorStaleLine(
|
||||
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintHookEndWarnLine(
|
||||
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintHookEndPassLine(
|
||||
source_name="", table_name="", index=0, total=0, execution_time=0, node_info=NodeInfo()
|
||||
),
|
||||
PrintCancelLine(conn_name=""),
|
||||
DefaultSelector(name=""),
|
||||
NodeStart(unique_id="", node_info=NodeInfo()),
|
||||
NodeCompiling(unique_id="", node_info=NodeInfo()),
|
||||
NodeExecuting(unique_id="", node_info=NodeInfo()),
|
||||
NodeFinished(unique_id="", node_info=NodeInfo(), run_result=RunResultMsg()),
|
||||
QueryCancelationUnsupported(type=""),
|
||||
ConcurrencyLine(num_threads=0, target_name=""),
|
||||
StarterProjectPath(dir=""),
|
||||
ConfigFolderDirectory(dir=""),
|
||||
NoSampleProfileFound(adapter=""),
|
||||
ProfileWrittenWithSample(name="", path=""),
|
||||
ProfileWrittenWithTargetTemplateYAML(name="", path=""),
|
||||
ProfileWrittenWithProjectTemplateYAML(name="", path=""),
|
||||
SettingUpProfile(),
|
||||
InvalidProfileTemplateYAML(),
|
||||
ProjectNameAlreadyExists(name=""),
|
||||
ProjectCreated(project_name="", docs_url="", slack_url=""),
|
||||
DepsSetDownloadDirectory(path=""),
|
||||
EnsureGitInstalled(),
|
||||
DepsCreatingLocalSymlink(),
|
||||
DepsSymlinkNotAvailable(),
|
||||
FoundStats(stat_line=""),
|
||||
CompilingNode(unique_id=""),
|
||||
WritingInjectedSQLForNode(unique_id=""),
|
||||
DisableTracking(),
|
||||
SendingEvent(kwargs=""),
|
||||
SendEventFailure(),
|
||||
FlushEvents(),
|
||||
FlushEventsFailure(),
|
||||
TrackingInitializeFailure(),
|
||||
RetryExternalCall(attempt=0, max=0),
|
||||
GeneralWarningMsg(msg="", log_fmt=""),
|
||||
GeneralWarningException(exc="", log_fmt=""),
|
||||
PartialParsingProfileEnvVarsChanged(),
|
||||
AdapterEventDebug(name="", base_msg="", args=()),
|
||||
AdapterEventInfo(name="", base_msg="", args=()),
|
||||
AdapterEventWarning(name="", base_msg="", args=()),
|
||||
AdapterEventError(name="", base_msg="", args=()),
|
||||
PrintDebugStackTrace(),
|
||||
MainReportArgs(args={}),
|
||||
RegistryProgressGETRequest(url=""),
|
||||
RegistryIndexProgressGETRequest(url=""),
|
||||
RegistryIndexProgressGETResponse(url="", resp_code=1),
|
||||
RegistryResponseUnexpectedType(response=""),
|
||||
RegistryResponseMissingTopKeys(response=""),
|
||||
RegistryResponseMissingNestedKeys(response=""),
|
||||
RegistryResponseExtraNestedKeys(response=""),
|
||||
DepsUpToDate(),
|
||||
PartialParsingNotEnabled(),
|
||||
SQLRunnerException(exc=""),
|
||||
DropRelation(dropped=ReferenceKeyMsg(database="", schema="", identifier="")),
|
||||
PartialParsingProjectEnvVarsChanged(),
|
||||
RegistryProgressGETResponse(url="", resp_code=1),
|
||||
IntegrationTestDebug(msg=""),
|
||||
IntegrationTestInfo(msg=""),
|
||||
IntegrationTestWarn(msg=""),
|
||||
IntegrationTestError(msg=""),
|
||||
IntegrationTestException(msg=""),
|
||||
EventBufferFull(),
|
||||
RecordRetryException(exc=""),
|
||||
UnitTestInfo(msg=""),
|
||||
]
|
||||
|
||||
|
||||
class TestEventJSONSerialization:
|
||||
|
||||
# attempts to test that every event is serializable to json.
|
||||
# event types that take `Any` are not possible to test in this way since some will serialize
|
||||
# just fine and others won't.
|
||||
def test_all_serializable(self):
|
||||
no_test = [DummyCacheEvent]
|
||||
|
||||
all_non_abstract_events = set(
|
||||
filter(
|
||||
lambda x: not inspect.isabstract(x) and x not in no_test,
|
||||
get_all_subclasses(BaseEvent),
|
||||
)
|
||||
)
|
||||
all_event_values_list = list(map(lambda x: x.__class__, sample_values))
|
||||
diff = all_non_abstract_events.difference(set(all_event_values_list))
|
||||
assert (
|
||||
not diff
|
||||
), f"test is missing concrete values in `sample_values`. Please add the values for the aforementioned event classes"
|
||||
|
||||
# make sure everything in the list is a value not a type
|
||||
for event in sample_values:
|
||||
assert type(event) != type
|
||||
|
||||
# if we have everything we need to test, try to serialize everything
|
||||
for event in sample_values:
|
||||
event_dict = event.to_dict()
|
||||
try:
|
||||
event_json = event_to_json(event)
|
||||
except Exception as e:
|
||||
raise Exception(f"{event} is not serializable to json. Originating exception: {e}")
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Counter(Generic[T], SerializableType):
|
||||
dummy_val: T
|
||||
count: int = 0
|
||||
|
||||
def next(self) -> T:
|
||||
self.count = self.count + 1
|
||||
return self.dummy_val
|
||||
|
||||
# mashumaro serializer
|
||||
def _serialize() -> Dict[str, int]:
|
||||
return {"count": count}
|
||||
|
||||
|
||||
@dataclass
|
||||
class DummyCacheEvent(InfoLevel, Cache, SerializableType):
|
||||
code = "X999"
|
||||
counter: Counter
|
||||
|
||||
def message(self) -> str:
|
||||
return f"state: {self.counter.next()}"
|
||||
|
||||
# mashumaro serializer
|
||||
def _serialize() -> str:
|
||||
return "DummyCacheEvent"
|
||||
99
tests/unit/test_proto_events.py
Normal file
99
tests/unit/test_proto_events.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import sys
|
||||
from dbt.events.types import (
|
||||
MainReportVersion,
|
||||
MainReportArgs,
|
||||
RollbackFailed,
|
||||
MainEncounteredError,
|
||||
PluginLoadError,
|
||||
PrintStartLine,
|
||||
)
|
||||
from dbt.events.functions import event_to_dict, LOG_VERSION
|
||||
from dbt.events import proto_types as pl
|
||||
from dbt.version import installed
|
||||
|
||||
|
||||
info_keys = {"name", "code", "msg", "level", "invocation_id", "pid", "thread", "ts", "extra"}
|
||||
|
||||
|
||||
def test_events():
|
||||
|
||||
# A001 event
|
||||
event = MainReportVersion(version=str(installed), log_version=LOG_VERSION)
|
||||
event_dict = event_to_dict(event)
|
||||
event_json = event.to_json()
|
||||
serialized = bytes(event)
|
||||
assert "Running with dbt=" in str(serialized)
|
||||
assert set(event_dict.keys()) == {"version", "info", "log_version"}
|
||||
assert set(event_dict["info"].keys()) == info_keys
|
||||
assert event_json
|
||||
assert event.info.code == "A001"
|
||||
|
||||
# Extract EventInfo from serialized message
|
||||
generic_event = pl.GenericMessage().parse(serialized)
|
||||
assert generic_event.info.code == "A001"
|
||||
# get the message class for the real message from the generic message
|
||||
message_class = getattr(sys.modules["dbt.events.proto_types"], generic_event.info.name)
|
||||
new_event = message_class().parse(serialized)
|
||||
assert new_event.info.code == event.info.code
|
||||
assert new_event.version == event.version
|
||||
|
||||
# A002 event
|
||||
event = MainReportArgs(args={"one": "1", "two": "2"})
|
||||
event_dict = event_to_dict(event)
|
||||
event_json = event.to_json()
|
||||
|
||||
assert set(event_dict.keys()) == {"info", "args"}
|
||||
assert set(event_dict["info"].keys()) == info_keys
|
||||
assert event_json
|
||||
assert event.info.code == "A002"
|
||||
|
||||
|
||||
def test_exception_events():
|
||||
event = RollbackFailed(conn_name="test", exc_info="something failed")
|
||||
event_dict = event_to_dict(event)
|
||||
event_json = event.to_json()
|
||||
assert set(event_dict.keys()) == {"info", "conn_name", "exc_info"}
|
||||
assert set(event_dict["info"].keys()) == info_keys
|
||||
assert event_json
|
||||
assert event.info.code == "E009"
|
||||
|
||||
event = PluginLoadError(exc_info="something failed")
|
||||
event_dict = event_to_dict(event)
|
||||
event_json = event.to_json()
|
||||
assert set(event_dict.keys()) == {"info", "exc_info"}
|
||||
assert set(event_dict["info"].keys()) == info_keys
|
||||
assert event_json
|
||||
assert event.info.code == "E036"
|
||||
# This event has no "msg"/"message"
|
||||
assert event.info.msg is None
|
||||
|
||||
# Z002 event
|
||||
event = MainEncounteredError(exc="Rollback failed")
|
||||
event_dict = event_to_dict(event)
|
||||
event_json = event.to_json()
|
||||
|
||||
assert set(event_dict.keys()) == {"info", "exc"}
|
||||
assert set(event_dict["info"].keys()) == info_keys
|
||||
assert event_json
|
||||
assert event.info.code == "Z002"
|
||||
|
||||
|
||||
def test_node_info_events():
|
||||
node_info = {
|
||||
"node_path": "some_path",
|
||||
"node_name": "some_name",
|
||||
"unique_id": "some_id",
|
||||
"resource_type": "model",
|
||||
"materialized": "table",
|
||||
"node_status": "started",
|
||||
"node_started_at": "some_time",
|
||||
"node_finished_at": "another_time",
|
||||
}
|
||||
event = PrintStartLine(
|
||||
description="some description",
|
||||
index=123,
|
||||
total=111,
|
||||
node_info=pl.NodeInfo(**node_info),
|
||||
)
|
||||
assert event
|
||||
assert event.node_info.node_path == "some_path"
|
||||
@@ -2,3 +2,4 @@ from mashumaro.exceptions import MissingField as MissingField
|
||||
from mashumaro.helper import field_options as field_options, pass_through as pass_through
|
||||
from mashumaro.mixins.dict import DataClassDictMixin as DataClassDictMixin
|
||||
from mashumaro.mixins.msgpack import DataClassMessagePackMixin as DataClassMessagePackMixin
|
||||
from mashumaro.mixins.json import DataClassJSONMixin as DataClassJSONMixin
|
||||
|
||||
4
tox.ini
4
tox.ini
@@ -6,7 +6,9 @@ envlist = unit,integration
|
||||
description = unit testing
|
||||
skip_install = true
|
||||
passenv = DBT_* PYTEST_ADDOPTS
|
||||
commands = {envpython} -m pytest --cov=core {posargs} test/unit
|
||||
commands =
|
||||
{envpython} -m pytest --cov=core {posargs} test/unit
|
||||
{envpython} -m pytest --cov=core {posargs} tests/unit
|
||||
deps =
|
||||
-rdev-requirements.txt
|
||||
-reditable-requirements.txt
|
||||
|
||||
Reference in New Issue
Block a user