Compare commits

...

18 Commits

Author SHA1 Message Date
Nathaniel May
e7268243b9 update postgres adapter to use new logger 2021-11-02 12:33:32 -04:00
Nathaniel May
e35106d30b integrate new exception logging model 2021-10-29 16:14:27 -04:00
Nathaniel May
3a495a7187 add exception method to interface 2021-10-29 16:05:07 -04:00
Nathaniel May
1e07443b88 add test for documented adapter maintainer interface 2021-10-29 16:05:07 -04:00
Nathaniel May
222af3045d fix rebase mistakes 2021-10-29 16:04:48 -04:00
Nathaniel May
c750a06abb add adapter logging interface 2021-10-29 16:04:48 -04:00
Nathaniel May
4e4e81a02e flake8 fixes 2021-10-29 16:04:48 -04:00
Nathaniel May
f203e0d861 add one more event that actually has data 2021-10-29 16:04:48 -04:00
Nathaniel May
e31ff4aeab move comment block 2021-10-29 16:04:23 -04:00
Nathaniel May
9caf39d928 flake8 fixes 2021-10-29 16:04:23 -04:00
Nathaniel May
16c63b77ed revert flake8 command change 2021-10-29 16:03:25 -04:00
Nathaniel May
1718831ca6 flake8 fixes 2021-10-29 16:03:25 -04:00
Nathaniel May
d627701989 update flake8 command with per-file-ignore 2021-10-29 16:03:25 -04:00
Nathaniel May
3453bb37ef add event type modeling and fire_event calls 2021-10-29 16:03:25 -04:00
Nathaniel May
87b8ca9615 Handle exec info (#4168)
handle exec info
2021-10-29 16:01:04 -04:00
Emily Rockman
a3dc5efda7 context call sites (#4164)
* updated context dir to new structured logging
2021-10-29 10:12:09 -05:00
Nathaniel May
1015b89dbf Initial structured logging work with fire_event (#4137)
add event type modeling and fire_event calls
2021-10-29 09:16:06 -04:00
Nathaniel May
5c9fd07050 init 2021-10-26 13:57:30 -04:00
12 changed files with 405 additions and 18 deletions

View File

@@ -12,7 +12,8 @@ from dbt.clients.yaml_helper import ( # noqa: F401
)
from dbt.contracts.graph.compiled import CompiledResource
from dbt.exceptions import raise_compiler_error, MacroReturn
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import MacroEventInfo, MacroEventDebug
from dbt.version import __version__ as dbt_version
# These modules are added to the context. Consider alternative
@@ -443,9 +444,9 @@ class BaseContext(metaclass=ContextMeta):
{% endmacro %}"
"""
if info:
logger.info(msg)
fire_event(MacroEventInfo(msg))
else:
logger.debug(msg)
fire_event(MacroEventDebug(msg))
return ''
@contextproperty

View File

@@ -49,7 +49,6 @@ from dbt.exceptions import (
wrapped_exports,
)
from dbt.config import IsFQNResource
from dbt.logger import GLOBAL_LOGGER as logger # noqa
from dbt.node_types import NodeType
from dbt.utils import (

22
core/dbt/events/README.md Normal file
View File

@@ -0,0 +1,22 @@
# Events Module
The Events module is the implmentation for structured logging. These events represent both a programatic interface to dbt processes as well as human-readable messaging in one centralized place. The centralization allows for leveraging mypy to enforce interface invariants across all dbt events, and the distinct type layer allows for decoupling events and libraries such as loggers.
# Using the Events Module
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
# Adding a New Event
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may be a dataclass with some values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `CliEventABC`) as well as the loglevel this event belongs to.
# Adapter Maintainers
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:
```python
from dbt.logger import GLOBAL_LOGGER as logger
```
Simply change it to these two lines with your adapter's database name, and all your existing call sites will now use the new system for v1.0:
```python
from dbt.events import AdapterLogger
logger = AdapterLogger("<database name>")
# e.g. AdapterLogger("Snowflake")
```

View File

@@ -0,0 +1 @@
from .adapter_endpoint import AdapterLogger # noqa: F401

View File

@@ -0,0 +1,86 @@
from dataclasses import dataclass
from dbt.events.functions import fire_event
from dbt.events.types import (
AdapterEventDebug, AdapterEventInfo, AdapterEventWarning, AdapterEventError
)
from typing import Any
@dataclass
class AdapterLogger():
name: str
def debug(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventDebug(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def info(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventInfo(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def warning(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventWarning(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def error(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventError(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def exception(
self,
msg: str,
exc_info: Any = True, # this default is what makes this method different
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventError(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)

View File

@@ -0,0 +1,64 @@
import dbt.logger as logger # type: ignore # TODO eventually remove dependency on this logger
from dbt.events.history import EVENT_HISTORY
from dbt.events.types import CliEventABC, Event, ShowException
# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: Event) -> None:
EVENT_HISTORY.append(e)
if isinstance(e, CliEventABC):
if e.level_tag() == 'test' and not isinstance(e, ShowException):
# TODO after implmenting #3977 send to new test level
logger.GLOBAL_LOGGER.debug(logger.timestamped_line(e.cli_msg()))
elif e.level_tag() == 'test' and isinstance(e, ShowException):
# TODO after implmenting #3977 send to new test level
logger.GLOBAL_LOGGER.debug(
logger.timestamped_line(e.cli_msg()),
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra
)
elif e.level_tag() == 'debug' and not isinstance(e, ShowException):
logger.GLOBAL_LOGGER.debug(logger.timestamped_line(e.cli_msg()))
elif e.level_tag() == 'debug' and isinstance(e, ShowException):
logger.GLOBAL_LOGGER.debug(
logger.timestamped_line(e.cli_msg()),
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra
)
elif e.level_tag() == 'info' and not isinstance(e, ShowException):
logger.GLOBAL_LOGGER.info(logger.timestamped_line(e.cli_msg()))
elif e.level_tag() == 'info' and isinstance(e, ShowException):
logger.GLOBAL_LOGGER.info(
logger.timestamped_line(e.cli_msg()),
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra
)
elif e.level_tag() == 'warn' and not isinstance(e, ShowException):
logger.GLOBAL_LOGGER.warning(logger.timestamped_line(e.cli_msg()))
elif e.level_tag() == 'warn' and isinstance(e, ShowException):
logger.GLOBAL_LOGGER.warning(
logger.timestamped_line(e.cli_msg()),
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra
)
elif e.level_tag() == 'error' and not isinstance(e, ShowException):
logger.GLOBAL_LOGGER.error(logger.timestamped_line(e.cli_msg()))
elif e.level_tag() == 'error' and isinstance(e, ShowException):
logger.GLOBAL_LOGGER.error(
logger.timestamped_line(e.cli_msg()),
exc_info=e.exc_info,
stack_info=e.stack_info,
extra=e.extra
)
else:
raise AssertionError(
f"Event type {type(e).__name__} has unhandled level: {e.level_tag()}"
)

View File

@@ -0,0 +1,7 @@
from dbt.events.types import Event
from typing import List
# the global history of events for this session
# TODO this is naive and the memory footprint is likely far too large.
EVENT_HISTORY: List[Event] = []

175
core/dbt/events/types.py Normal file
View File

@@ -0,0 +1,175 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Any
# types to represent log levels
# in preparation for #3977
class TestLevel():
def level_tag(self) -> str:
return "test"
class DebugLevel():
def level_tag(self) -> str:
return "debug"
class InfoLevel():
def level_tag(self) -> str:
return "info"
class WarnLevel():
def level_tag(self) -> str:
return "warn"
class ErrorLevel():
def level_tag(self) -> str:
return "error"
@dataclass
class ShowException():
exc_info: Any = None
stack_info: Any = None
extra: Any = None
# The following classes represent the data necessary to describe a
# particular event to both human readable logs, and machine reliable
# event streams. classes extend superclasses that indicate what
# destinations they are intended for, which mypy uses to enforce
# that the necessary methods are defined.
# top-level superclass for all events
class Event(metaclass=ABCMeta):
# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
raise Exception("level_tag not implemented for event")
class CliEventABC(Event, metaclass=ABCMeta):
# Solely the human readable message. Timestamps and formatting will be added by the logger.
@abstractmethod
def cli_msg(self) -> str:
raise Exception("cli_msg not implemented for cli event")
@dataclass
class AdapterEventBase():
name: str
raw_msg: str
def cli_msg(self) -> str:
return f"{self.name} adapter: {self.raw_msg}"
class AdapterEventDebug(DebugLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventInfo(InfoLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventWarning(WarnLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventError(ErrorLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class ParsingStart(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Start parsing."
class ParsingCompiling(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Compiling."
class ParsingWritingManifest(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Writing manifest."
class ParsingDone(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Done."
class ManifestDependenciesLoaded(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Dependencies loaded"
class ManifestLoaderCreated(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "ManifestLoader created"
class ManifestLoaded(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Manifest loaded"
class ManifestChecked(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Manifest checked"
class ManifestFlatGraphBuilt(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Flat graph built"
@dataclass
class ReportPerformancePath(InfoLevel, CliEventABC):
path: str
def cli_msg(self) -> str:
return f"Performance info: {self.path}"
@dataclass
class MacroEventInfo(InfoLevel, CliEventABC):
msg: str
def cli_msg(self) -> str:
return self.msg
@dataclass
class MacroEventDebug(DebugLevel, CliEventABC):
msg: str
def cli_msg(self) -> str:
return self.msg
# since mypy doesn't run on every file we need to suggest to mypy that every
# class gets instantiated. But we don't actually want to run this code.
# making the conditional `if False` causes mypy to skip it as dead code so
# we need to skirt around that by computing something it doesn't check statically.
#
# TODO remove these lines once we run mypy everywhere.
if 1 == 0:
ParsingStart()
ParsingCompiling()
ParsingWritingManifest()
ParsingDone()
ManifestDependenciesLoaded()
ManifestLoaderCreated()
ManifestLoaded()
ManifestChecked()
ManifestFlatGraphBuilt()
ReportPerformancePath(path='')
MacroEventInfo(msg='')
MacroEventDebug(msg='')

View File

@@ -655,8 +655,12 @@ def get_timestamp():
return time.strftime("%H:%M:%S")
def timestamped_line(msg: str) -> str:
return "{} | {}".format(get_timestamp(), msg)
def print_timestamped_line(msg: str, use_color: Optional[str] = None):
if use_color is not None:
msg = dbt.ui.color(msg, use_color)
GLOBAL_LOGGER.info("{} | {}".format(get_timestamp(), msg))
GLOBAL_LOGGER.info(timestamped_line(msg))

View File

@@ -11,8 +11,14 @@ from dbt.adapters.factory import get_adapter
from dbt.parser.manifest import (
Manifest, ManifestLoader, _check_manifest
)
from dbt.logger import DbtProcessState, print_timestamped_line
from dbt.logger import DbtProcessState
from dbt.clients.system import write_file
from dbt.events.types import (
ManifestDependenciesLoaded, ManifestLoaderCreated, ManifestLoaded, ManifestChecked,
ManifestFlatGraphBuilt, ParsingStart, ParsingCompiling, ParsingWritingManifest, ParsingDone,
ReportPerformancePath
)
from dbt.events.functions import fire_event
from dbt.graph import Graph
import time
from typing import Optional
@@ -40,7 +46,7 @@ class ParseTask(ConfiguredTask):
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self.loader._perf_info,
cls=dbt.utils.JSONEncoder, indent=4))
print_timestamped_line(f"Performance info: {path}")
fire_event(ReportPerformancePath(path=path))
# This method takes code that normally exists in other files
# and pulls it in here, to simplify logging and make the
@@ -58,22 +64,22 @@ class ParseTask(ConfiguredTask):
with PARSING_STATE:
start_load_all = time.perf_counter()
projects = root_config.load_dependencies()
print_timestamped_line("Dependencies loaded")
fire_event(ManifestDependenciesLoaded())
loader = ManifestLoader(root_config, projects, macro_hook)
print_timestamped_line("ManifestLoader created")
fire_event(ManifestLoaderCreated())
manifest = loader.load()
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())
_check_manifest(manifest, root_config)
print_timestamped_line("Manifest checked")
fire_event(ManifestChecked())
manifest.build_flat_graph()
print_timestamped_line("Flat graph built")
fire_event(ManifestFlatGraphBuilt())
loader._perf_info.load_all_elapsed = (
time.perf_counter() - start_load_all
)
self.loader = loader
self.manifest = manifest
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())
def compile_manifest(self):
adapter = get_adapter(self.config)
@@ -81,14 +87,14 @@ class ParseTask(ConfiguredTask):
self.graph = compiler.compile(self.manifest)
def run(self):
print_timestamped_line('Start parsing.')
fire_event(ParsingStart())
self.get_full_manifest()
if self.args.compile:
print_timestamped_line('Compiling.')
fire_event(ParsingCompiling())
self.compile_manifest()
if self.args.write_manifest:
print_timestamped_line('Writing manifest.')
fire_event(ParsingWritingManifest())
self.write_manifest()
self.write_perf_info()
print_timestamped_line('Done.')
fire_event(ParsingDone())

View File

@@ -6,13 +6,16 @@ import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterResponse
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events import AdapterLogger
from dbt.helper_types import Port
from dataclasses import dataclass
from typing import Optional
logger = AdapterLogger("Postgres")
@dataclass
class PostgresCredentials(Credentials):
host: str

19
test/unit/test_events.py Normal file
View File

@@ -0,0 +1,19 @@
from unittest import mock, TestCase
class TestFlags(TestCase):
def setUp(self):
pass
# this interface is documented for adapter maintainers to plug into
# so we should test that it at the very least doesn't explode.
def test_adapter_logging_interface(self):
from dbt.events import AdapterLogger
logger = AdapterLogger("dbt_tests")
logger.debug("debug message")
logger.info("info message")
logger.warning("warning message")
logger.error("error message")
logger.exception("exception message")
self.assertTrue(True)