mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
4 Commits
v1.7.1
...
add-except
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d342165f6d | ||
|
|
a3dc5efda7 | ||
|
|
1015b89dbf | ||
|
|
5c9fd07050 |
@@ -12,7 +12,8 @@ from dbt.clients.yaml_helper import ( # noqa: F401
|
||||
)
|
||||
from dbt.contracts.graph.compiled import CompiledResource
|
||||
from dbt.exceptions import raise_compiler_error, MacroReturn
|
||||
from dbt.logger import GLOBAL_LOGGER as logger
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.events.types import MacroEventInfo, MacroEventDebug
|
||||
from dbt.version import __version__ as dbt_version
|
||||
|
||||
# These modules are added to the context. Consider alternative
|
||||
@@ -443,9 +444,9 @@ class BaseContext(metaclass=ContextMeta):
|
||||
{% endmacro %}"
|
||||
"""
|
||||
if info:
|
||||
logger.info(msg)
|
||||
fire_event(MacroEventInfo(msg))
|
||||
else:
|
||||
logger.debug(msg)
|
||||
fire_event(MacroEventDebug(msg))
|
||||
return ''
|
||||
|
||||
@contextproperty
|
||||
|
||||
@@ -49,7 +49,6 @@ from dbt.exceptions import (
|
||||
wrapped_exports,
|
||||
)
|
||||
from dbt.config import IsFQNResource
|
||||
from dbt.logger import GLOBAL_LOGGER as logger # noqa
|
||||
from dbt.node_types import NodeType
|
||||
|
||||
from dbt.utils import (
|
||||
|
||||
9
core/dbt/events/README.md
Normal file
9
core/dbt/events/README.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# Events Module
|
||||
|
||||
The Events module is the implmentation for structured logging. These events represent both a programatic interface to dbt processes as well as human-readable messaging in one centralized place. The centralization allows for leveraging mypy to enforce interface invariants across all dbt events, and the distinct type layer allows for decoupling events and libraries such as loggers.
|
||||
|
||||
# Using the Events Module
|
||||
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
|
||||
|
||||
# Adding a New Event
|
||||
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may be a dataclass with some values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `CliEventABC`) as well as the loglevel this event belongs to.
|
||||
0
core/dbt/events/__init__.py
Normal file
0
core/dbt/events/__init__.py
Normal file
30
core/dbt/events/functions.py
Normal file
30
core/dbt/events/functions.py
Normal file
@@ -0,0 +1,30 @@
|
||||
|
||||
import dbt.logger as logger # type: ignore # TODO eventually remove dependency on this logger
|
||||
from dbt.events.history import EVENT_HISTORY
|
||||
from dbt.events.types import CliEventABC, Event
|
||||
|
||||
|
||||
# top-level method for accessing the new eventing system
|
||||
# this is where all the side effects happen branched by event type
|
||||
# (i.e. - mutating the event history, printing to stdout, logging
|
||||
# to files, etc.)
|
||||
def fire_event(e: Event) -> None:
|
||||
EVENT_HISTORY.append(e)
|
||||
if isinstance(e, CliEventABC):
|
||||
if e.level_tag() == 'test':
|
||||
# TODO after implmenting #3977 send to new test level
|
||||
logger.GLOBAL_LOGGER.debug(logger.timestamped_line(e.cli_msg()))
|
||||
elif e.level_tag() == 'debug':
|
||||
logger.GLOBAL_LOGGER.debug(logger.timestamped_line(e.cli_msg()))
|
||||
elif e.level_tag() == 'info':
|
||||
logger.GLOBAL_LOGGER.info(logger.timestamped_line(e.cli_msg()))
|
||||
elif e.level_tag() == 'warn':
|
||||
logger.GLOBAL_LOGGER.warning()(logger.timestamped_line(e.cli_msg()))
|
||||
elif e.level_tag() == 'error':
|
||||
logger.GLOBAL_LOGGER.error(logger.timestamped_line(e.cli_msg()))
|
||||
elif e.level_tag() == 'exception':
|
||||
logger.GLOBAL_LOGGER.exception(logger.timestamped_line(e.cli_msg()))
|
||||
else:
|
||||
raise AssertionError(
|
||||
f"Event type {type(e).__name__} has unhandled level: {e.level_tag()}"
|
||||
)
|
||||
7
core/dbt/events/history.py
Normal file
7
core/dbt/events/history.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from dbt.events.types import Event
|
||||
from typing import List
|
||||
|
||||
|
||||
# the global history of events for this session
|
||||
# TODO this is naive and the memory footprint is likely far too large.
|
||||
EVENT_HISTORY: List[Event] = []
|
||||
147
core/dbt/events/types.py
Normal file
147
core/dbt/events/types.py
Normal file
@@ -0,0 +1,147 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
# types to represent log levels
|
||||
|
||||
# in preparation for #3977
|
||||
class TestLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "test"
|
||||
|
||||
|
||||
class DebugLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "debug"
|
||||
|
||||
|
||||
class InfoLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "info"
|
||||
|
||||
|
||||
class WarnLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "warn"
|
||||
|
||||
|
||||
class ErrorLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "error"
|
||||
|
||||
|
||||
class ExceptionLevel():
|
||||
def level_tag(self) -> str:
|
||||
return "exception"
|
||||
|
||||
|
||||
# The following classes represent the data necessary to describe a
|
||||
# particular event to both human readable logs, and machine reliable
|
||||
# event streams. classes extend superclasses that indicate what
|
||||
# destinations they are intended for, which mypy uses to enforce
|
||||
# that the necessary methods are defined.
|
||||
|
||||
|
||||
# top-level superclass for all events
|
||||
class Event(metaclass=ABCMeta):
|
||||
# do not define this yourself. inherit it from one of the above level types.
|
||||
@abstractmethod
|
||||
def level_tag(self) -> str:
|
||||
raise Exception("level_tag not implemented for event")
|
||||
|
||||
|
||||
class CliEventABC(Event, metaclass=ABCMeta):
|
||||
# Solely the human readable message. Timestamps and formatting will be added by the logger.
|
||||
@abstractmethod
|
||||
def cli_msg(self) -> str:
|
||||
raise Exception("cli_msg not implemented for cli event")
|
||||
|
||||
|
||||
class ParsingStart(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Start parsing."
|
||||
|
||||
|
||||
class ParsingCompiling(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Compiling."
|
||||
|
||||
|
||||
class ParsingWritingManifest(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Writing manifest."
|
||||
|
||||
|
||||
class ParsingDone(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Done."
|
||||
|
||||
|
||||
class ManifestDependenciesLoaded(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Dependencies loaded"
|
||||
|
||||
|
||||
class ManifestLoaderCreated(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "ManifestLoader created"
|
||||
|
||||
|
||||
class ManifestLoaded(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Manifest loaded"
|
||||
|
||||
|
||||
class ManifestChecked(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Manifest checked"
|
||||
|
||||
|
||||
class ManifestFlatGraphBuilt(InfoLevel, CliEventABC):
|
||||
def cli_msg(self) -> str:
|
||||
return "Flat graph built"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReportPerformancePath(InfoLevel, CliEventABC):
|
||||
path: str
|
||||
|
||||
def cli_msg(self) -> str:
|
||||
return f"Performance info: {self.path}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class MacroEventInfo(InfoLevel, CliEventABC):
|
||||
msg: str
|
||||
|
||||
def cli_msg(self) -> str:
|
||||
return self.msg
|
||||
|
||||
|
||||
@dataclass
|
||||
class MacroEventDebug(DebugLevel, CliEventABC):
|
||||
msg: str
|
||||
|
||||
def cli_msg(self) -> str:
|
||||
return self.msg
|
||||
|
||||
|
||||
# since mypy doesn't run on every file we need to suggest to mypy that every
|
||||
# class gets instantiated. But we don't actually want to run this code.
|
||||
# making the conditional `if False` causes mypy to skip it as dead code so
|
||||
# we need to skirt around that by computing something it doesn't check statically.
|
||||
#
|
||||
# TODO remove these lines once we run mypy everywhere.
|
||||
if 1 == 0:
|
||||
ParsingStart()
|
||||
ParsingCompiling()
|
||||
ParsingWritingManifest()
|
||||
ParsingDone()
|
||||
ManifestDependenciesLoaded()
|
||||
ManifestLoaderCreated()
|
||||
ManifestLoaded()
|
||||
ManifestChecked()
|
||||
ManifestFlatGraphBuilt()
|
||||
ReportPerformancePath(path='')
|
||||
MacroEventInfo(msg='')
|
||||
MacroEventDebug(msg='')
|
||||
@@ -655,8 +655,12 @@ def get_timestamp():
|
||||
return time.strftime("%H:%M:%S")
|
||||
|
||||
|
||||
def timestamped_line(msg: str) -> str:
|
||||
return "{} | {}".format(get_timestamp(), msg)
|
||||
|
||||
|
||||
def print_timestamped_line(msg: str, use_color: Optional[str] = None):
|
||||
if use_color is not None:
|
||||
msg = dbt.ui.color(msg, use_color)
|
||||
|
||||
GLOBAL_LOGGER.info("{} | {}".format(get_timestamp(), msg))
|
||||
GLOBAL_LOGGER.info(timestamped_line(msg))
|
||||
|
||||
@@ -11,8 +11,14 @@ from dbt.adapters.factory import get_adapter
|
||||
from dbt.parser.manifest import (
|
||||
Manifest, ManifestLoader, _check_manifest
|
||||
)
|
||||
from dbt.logger import DbtProcessState, print_timestamped_line
|
||||
from dbt.logger import DbtProcessState
|
||||
from dbt.clients.system import write_file
|
||||
from dbt.events.types import (
|
||||
ManifestDependenciesLoaded, ManifestLoaderCreated, ManifestLoaded, ManifestChecked,
|
||||
ManifestFlatGraphBuilt, ParsingStart, ParsingCompiling, ParsingWritingManifest, ParsingDone,
|
||||
ReportPerformancePath
|
||||
)
|
||||
from dbt.events.functions import fire_event
|
||||
from dbt.graph import Graph
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -40,7 +46,7 @@ class ParseTask(ConfiguredTask):
|
||||
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
|
||||
write_file(path, json.dumps(self.loader._perf_info,
|
||||
cls=dbt.utils.JSONEncoder, indent=4))
|
||||
print_timestamped_line(f"Performance info: {path}")
|
||||
fire_event(ReportPerformancePath(path=path))
|
||||
|
||||
# This method takes code that normally exists in other files
|
||||
# and pulls it in here, to simplify logging and make the
|
||||
@@ -58,22 +64,22 @@ class ParseTask(ConfiguredTask):
|
||||
with PARSING_STATE:
|
||||
start_load_all = time.perf_counter()
|
||||
projects = root_config.load_dependencies()
|
||||
print_timestamped_line("Dependencies loaded")
|
||||
fire_event(ManifestDependenciesLoaded())
|
||||
loader = ManifestLoader(root_config, projects, macro_hook)
|
||||
print_timestamped_line("ManifestLoader created")
|
||||
fire_event(ManifestLoaderCreated())
|
||||
manifest = loader.load()
|
||||
print_timestamped_line("Manifest loaded")
|
||||
fire_event(ManifestLoaded())
|
||||
_check_manifest(manifest, root_config)
|
||||
print_timestamped_line("Manifest checked")
|
||||
fire_event(ManifestChecked())
|
||||
manifest.build_flat_graph()
|
||||
print_timestamped_line("Flat graph built")
|
||||
fire_event(ManifestFlatGraphBuilt())
|
||||
loader._perf_info.load_all_elapsed = (
|
||||
time.perf_counter() - start_load_all
|
||||
)
|
||||
|
||||
self.loader = loader
|
||||
self.manifest = manifest
|
||||
print_timestamped_line("Manifest loaded")
|
||||
fire_event(ManifestLoaded())
|
||||
|
||||
def compile_manifest(self):
|
||||
adapter = get_adapter(self.config)
|
||||
@@ -81,14 +87,14 @@ class ParseTask(ConfiguredTask):
|
||||
self.graph = compiler.compile(self.manifest)
|
||||
|
||||
def run(self):
|
||||
print_timestamped_line('Start parsing.')
|
||||
fire_event(ParsingStart())
|
||||
self.get_full_manifest()
|
||||
if self.args.compile:
|
||||
print_timestamped_line('Compiling.')
|
||||
fire_event(ParsingCompiling())
|
||||
self.compile_manifest()
|
||||
if self.args.write_manifest:
|
||||
print_timestamped_line('Writing manifest.')
|
||||
fire_event(ParsingWritingManifest())
|
||||
self.write_manifest()
|
||||
|
||||
self.write_perf_info()
|
||||
print_timestamped_line('Done.')
|
||||
fire_event(ParsingDone())
|
||||
|
||||
Reference in New Issue
Block a user