Compare commits

...

22 Commits

Author SHA1 Message Date
Nathaniel May
1e2bcef0ac use concrete stdlib loggers 2021-11-05 12:28:30 -04:00
Nathaniel May
a546e79d06 inline common processors 2021-11-05 10:38:22 -04:00
Nathaniel May
b8c9555347 logging works now but too many times. 2021-11-05 10:21:23 -04:00
Nathaniel May
2c8f1c9a78 split out loggers into values 2021-11-04 16:55:07 -04:00
Nathaniel May
b1db4d7978 move two log messages to event system 2021-11-03 15:54:53 -04:00
Nathaniel May
d1b9fbb7a3 bug fix for logging 2021-11-03 12:29:43 -04:00
Nathaniel May
29f34769df remove print 2021-11-03 11:57:45 -04:00
Nathaniel May
6ca5fa8f4a first pass at json logging. fails. 2021-11-03 11:57:04 -04:00
Nathaniel May
9febe38781 add adapter logging interface, and change postgres adapter to use it. 2021-11-02 13:53:07 -04:00
Nathaniel May
a517375c6c add comment 2021-11-02 10:47:01 -04:00
Nathaniel May
a9758297d5 make logger global 2021-11-02 10:46:33 -04:00
Nathaniel May
c087d3b2dc failed attempt at file logging 2021-11-01 16:27:01 -04:00
Nathaniel May
55b33031fc use structlog configs 2021-11-01 13:22:02 -04:00
Nathaniel May
593b562611 refactor for cleaner if else tree 2021-11-01 12:21:28 -04:00
Nathaniel May
57d364212d move datetime into event type 2021-11-01 11:32:24 -04:00
Nathaniel May
415cc9c702 add structlog to event module 2021-11-01 10:19:42 -04:00
Nathaniel May
d2f0e2d1e1 Change Graph logger call sites (#4165)
graph call sites for structured logging
2021-10-29 17:08:30 -04:00
Nathaniel May
e29db5897f Client call sites (#4163)
update log call sites with new event system
2021-10-29 16:35:48 -04:00
Nathaniel May
87b8ca9615 Handle exec info (#4168)
handle exec info
2021-10-29 16:01:04 -04:00
Emily Rockman
a3dc5efda7 context call sites (#4164)
* updated context dir to new structured logging
2021-10-29 10:12:09 -05:00
Nathaniel May
1015b89dbf Initial structured logging work with fire_event (#4137)
add event type modeling and fire_event calls
2021-10-29 09:16:06 -04:00
Nathaniel May
5c9fd07050 init 2021-10-26 13:57:30 -04:00
19 changed files with 832 additions and 68 deletions

View File

@@ -2,7 +2,12 @@ import re
import os.path
from dbt.clients.system import run_cmd, rmdir
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
GitSparseCheckoutSubdirectory, GitProgressCheckoutRevision,
GitProgressUpdatingExistingDependency, GitProgressPullingNewDependency,
GitNothingToDo, GitProgressUpdatedCheckoutRange, GitProgressCheckedOutAt
)
import dbt.exceptions
from packaging import version
@@ -18,7 +23,7 @@ def clone(repo, cwd, dirname=None, remove_git_dir=False, revision=None, subdirec
clone_cmd = ['git', 'clone', '--depth', '1']
if subdirectory:
logger.debug(' Subdirectory specified: {}, using sparse checkout.'.format(subdirectory))
fire_event(GitSparseCheckoutSubdirectory(subdir=subdirectory))
out, _ = run_cmd(cwd, ['git', '--version'], env={'LC_ALL': 'C'})
git_version = version.parse(re.search(r"\d+\.\d+\.\d+", out.decode("utf-8")).group(0))
if not git_version >= version.parse("2.25.0"):
@@ -54,7 +59,7 @@ def list_tags(cwd):
def _checkout(cwd, repo, revision):
logger.debug(' Checking out revision {}.'.format(revision))
fire_event(GitProgressCheckoutRevision(revision=revision))
fetch_cmd = ["git", "fetch", "origin", "--depth", "1"]
@@ -118,7 +123,7 @@ def clone_and_checkout(repo, cwd, dirname=None, remove_git_dir=False,
start_sha = None
if exists:
directory = exists.group(1)
logger.debug('Updating existing dependency {}.', directory)
fire_event(GitProgressUpdatingExistingDependency(dir=directory))
else:
matches = re.match("Cloning into '(.+)'", err.decode('utf-8'))
if matches is None:
@@ -126,17 +131,18 @@ def clone_and_checkout(repo, cwd, dirname=None, remove_git_dir=False,
f'Error cloning {repo} - never saw "Cloning into ..." from git'
)
directory = matches.group(1)
logger.debug('Pulling new dependency {}.', directory)
fire_event(GitProgressPullingNewDependency(dir=directory))
full_path = os.path.join(cwd, directory)
start_sha = get_current_sha(full_path)
checkout(full_path, repo, revision)
end_sha = get_current_sha(full_path)
if exists:
if start_sha == end_sha:
logger.debug(' Already at {}, nothing to do.', start_sha[:7])
fire_event(GitNothingToDo(sha=start_sha[:7]))
else:
logger.debug(' Updated checkout from {} to {}.',
start_sha[:7], end_sha[:7])
fire_event(GitProgressUpdatedCheckoutRange(
start_sha=start_sha[:7], end_sha=end_sha[:7]
))
else:
logger.debug(' Checked out at {}.', end_sha[:7])
fire_event(GitProgressCheckedOutAt(end_sha=end_sha[:7]))
return os.path.join(directory, subdirectory or '')

View File

@@ -33,7 +33,6 @@ from dbt.exceptions import (
UndefinedMacroException
)
from dbt import flags
from dbt.logger import GLOBAL_LOGGER as logger # noqa
def _linecache_inject(source, write):

View File

@@ -1,7 +1,11 @@
import functools
import requests
from dbt.events.functions import fire_event
from dbt.events.types import (
RegistryProgressMakingGETRequest,
RegistryProgressGETResponse
)
from dbt.utils import memoized, _connection_exception_retry as connection_exception_retry
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import deprecations
import os
@@ -25,10 +29,9 @@ def _get_with_retries(path, registry_base_url=None):
def _get(path, registry_base_url=None):
url = _get_url(path, registry_base_url)
logger.debug('Making package registry request: GET {}'.format(url))
fire_event(RegistryProgressMakingGETRequest(url=url))
resp = requests.get(url, timeout=30)
logger.debug('Response from registry: GET {} {}'.format(url,
resp.status_code))
fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()
return resp.json()

View File

@@ -15,8 +15,12 @@ from typing import (
Type, NoReturn, List, Optional, Dict, Any, Tuple, Callable, Union
)
from dbt.events.functions import fire_event
from dbt.events.types import (
SystemErrorRetrievingModTime, SystemCouldNotWrite, SystemExecutingCmd, SystemStdOutMsg,
SystemStdErrMsg, SystemReportReturnCode
)
import dbt.exceptions
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import _connection_exception_retry as connection_exception_retry
if sys.platform == 'win32':
@@ -65,9 +69,7 @@ def find_matching(
try:
modification_time = os.path.getmtime(absolute_path)
except OSError:
logger.exception(
f"Error retrieving modification time for file {absolute_path}"
)
fire_event(SystemErrorRetrievingModTime(path=absolute_path))
if reobj.match(local_file):
matching.append({
'searched_path': relative_path_to_search,
@@ -161,10 +163,7 @@ def write_file(path: str, contents: str = '') -> bool:
reason = 'Path was possibly too long'
# all our hard work and the path was still too long. Log and
# continue.
logger.debug(
f'Could not write to path {path}({len(path)} characters): '
f'{reason}\nexception: {exc}'
)
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=exc))
else:
raise
return True
@@ -412,7 +411,7 @@ def _interpret_oserror(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn:
def run_cmd(
cwd: str, cmd: List[str], env: Optional[Dict[str, Any]] = None
) -> Tuple[bytes, bytes]:
logger.debug('Executing "{}"'.format(' '.join(cmd)))
fire_event(SystemExecutingCmd(cmd=cmd))
if len(cmd) == 0:
raise dbt.exceptions.CommandError(cwd, cmd)
@@ -438,11 +437,11 @@ def run_cmd(
except OSError as exc:
_interpret_oserror(exc, cwd, cmd)
logger.debug('STDOUT: "{!s}"'.format(out))
logger.debug('STDERR: "{!s}"'.format(err))
fire_event(SystemStdOutMsg(bmsg=out))
fire_event(SystemStdErrMsg(bmsg=err))
if proc.returncode != 0:
logger.debug('command return code={}'.format(proc.returncode))
fire_event(SystemReportReturnCode(code=proc.returncode))
raise dbt.exceptions.CommandResultError(cwd, cmd, proc.returncode,
out, err)

View File

@@ -12,7 +12,8 @@ from dbt.clients.yaml_helper import ( # noqa: F401
)
from dbt.contracts.graph.compiled import CompiledResource
from dbt.exceptions import raise_compiler_error, MacroReturn
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import MacroEventInfo, MacroEventDebug
from dbt.version import __version__ as dbt_version
# These modules are added to the context. Consider alternative
@@ -443,9 +444,9 @@ class BaseContext(metaclass=ContextMeta):
{% endmacro %}"
"""
if info:
logger.info(msg)
fire_event(MacroEventInfo(msg))
else:
logger.debug(msg)
fire_event(MacroEventDebug(msg))
return ''
@contextproperty

View File

@@ -49,7 +49,6 @@ from dbt.exceptions import (
wrapped_exports,
)
from dbt.config import IsFQNResource
from dbt.logger import GLOBAL_LOGGER as logger # noqa
from dbt.node_types import NodeType
from dbt.utils import (

22
core/dbt/events/README.md Normal file
View File

@@ -0,0 +1,22 @@
# Events Module
The Events module is the implmentation for structured logging. These events represent both a programatic interface to dbt processes as well as human-readable messaging in one centralized place. The centralization allows for leveraging mypy to enforce interface invariants across all dbt events, and the distinct type layer allows for decoupling events and libraries such as loggers.
# Using the Events Module
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
# Adding a New Event
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may be a dataclass with some values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `CliEventABC`) as well as the loglevel this event belongs to.
# Adapter Maintainers
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:
```python
from dbt.logger import GLOBAL_LOGGER as logger
```
Simply change it to these two lines with your adapter's database name, and all your existing call sites will now use the new system for v1.0:
```python
from dbt.events import AdapterLogger
logger = AdapterLogger("<database name>")
# e.g. AdapterLogger("Snowflake")
```

View File

@@ -0,0 +1 @@
from .adapter_endpoint import AdapterLogger # noqa: F401

View File

@@ -0,0 +1,86 @@
from dataclasses import dataclass
from dbt.events.functions import fire_event
from dbt.events.types import (
AdapterEventDebug, AdapterEventInfo, AdapterEventWarning, AdapterEventError
)
from typing import Any
@dataclass
class AdapterLogger():
name: str
def debug(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventDebug(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def info(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventInfo(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def warning(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventWarning(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def error(
self,
msg: str,
exc_info: Any = None,
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventError(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)
def exception(
self,
msg: str,
exc_info: Any = True, # this default is what makes this method different
stack_info: Any = None,
extra: Any = None
):
event = AdapterEventError(name=self.name, raw_msg=msg)
event.exc_info = exc_info
event.stack_info = stack_info
event.extra = extra
fire_event(event)

View File

@@ -0,0 +1,218 @@
from dbt.events.history import EVENT_HISTORY
from dbt.events.types import CliEventABC, Event, ShowException
import dbt.logger as logger # TODO remove references to this logger
import dbt.flags as flags
import logging.config
import logging
import os
import structlog
import sys
# these two loggers be set up with CLI inputs via setup_event_logger
# DO NOT IMPORT AND USE THESE DIRECTLY
global STDOUT_LOGGER
STDOUT_LOGGER = structlog.get_logger()
global FILE_LOGGER
FILE_LOGGER = structlog.get_logger()
def setup_event_logger(log_path):
logger.make_log_dir_if_missing(log_path)
json: bool = flags.LOG_FORMAT == 'json'
# USE_COLORS can be None if the app just started and the cli flags
# havent been applied yet
colors: bool = True if flags.USE_COLORS else False
# TODO this default should live somewhere better
log_dest = os.path.join(logger.LOG_DIR, 'dbt.log')
# see: https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
# logging.config.dictConfig({
# "version": 1,
# "disable_existing_loggers": False,
# "formatters": {
# "plain": {
# "()": structlog.stdlib.ProcessorFormatter,
# "processor": structlog.dev.ConsoleRenderer(colors=False),
# "foreign_pre_chain": pre_chain,
# },
# "colored": {
# "()": structlog.stdlib.ProcessorFormatter,
# "processor": structlog.dev.ConsoleRenderer(colors=True),
# "foreign_pre_chain": pre_chain,
# },
# "json": {
# "()": structlog.stdlib.ProcessorFormatter,
# "processor": structlog.processors.JSONRenderer(),
# "foreign_pre_chain": pre_chain,
# },
# },
# "handlers": {
# "console": {
# "level": "DEBUG",
# "class": "logging.StreamHandler",
# "formatter": "colored",
# },
# "file": {
# "level": "DEBUG",
# "class": "logging.handlers.WatchedFileHandler",
# # TODO this default should live somewhere better
# "filename": os.path.join(logger.LOG_DIR, 'dbt.log'),
# "formatter": "plain",
# },
# "json-console": {
# "level": "DEBUG",
# "class": "logging.StreamHandler",
# "formatter": "json",
# },
# "json-file": {
# "level": "DEBUG",
# "class": "logging.handlers.WatchedFileHandler",
# # TODO this default should live somewhere better
# "filename": os.path.join(logger.LOG_DIR, 'dbt.log.json'),
# "formatter": "json",
# },
# },
# "loggers": {
# "": {
# "handlers": ["json-console", "json-file"] if json else ["console", "file"],
# "level": "DEBUG" if flags.DEBUG else "INFO",
# "propagate": True,
# },
# }
# })
# set-up global logging configurations
structlog.configure(
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=False,
)
# configure the stdout logger
STDOUT_LOGGER = structlog.wrap_logger(
logger=logging.Logger('console logger'),
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper("%H:%M:%S"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
]
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer(colors=colors),
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
STDOUT_LOGGER.addHandler(handler)
# configure the json file handler
if json:
FILE_LOGGER = structlog.wrap_logger(
logger=logging.Logger('json file logger'),
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
]
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.processors.JSONRenderer(),
)
handler = logging.handlers.WatchedFileHandler(filename=log_dest)
handler.setFormatter(formatter)
FILE_LOGGER.addHandler(handler)
# configure the plaintext file handler
else:
# TODO follow pattern from above ^^
FILE_LOGGER = structlog.wrap_logger(
logger=logging.Logger('plaintext file logger'),
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper("%H:%M:%S"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
]
)
formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer(colors=False),
)
handler = logging.handlers.WatchedFileHandler(filename=log_dest)
handler.setFormatter(formatter)
FILE_LOGGER.addHandler(handler)
# top-level method for accessing the new eventing system
# this is where all the side effects happen branched by event type
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: Event) -> None:
EVENT_HISTORY.append(e)
level_tag = e.level_tag()
if isinstance(e, CliEventABC):
log_line = e.cli_msg()
if isinstance(e, ShowException):
event_dict = {
'exc_info': e.exc_info,
'stack_info': e.stack_info,
'extra': e.extra
}
if level_tag == 'test':
# TODO after implmenting #3977 send to new test level
STDOUT_LOGGER.debug(log_line, event_dict)
FILE_LOGGER.debug(log_line, event_dict)
elif level_tag == 'debug':
STDOUT_LOGGER.debug(log_line, event_dict)
FILE_LOGGER.debug(log_line, event_dict)
elif level_tag == 'info':
STDOUT_LOGGER.info(log_line, event_dict)
FILE_LOGGER.info(log_line, event_dict)
elif level_tag == 'warn':
STDOUT_LOGGER.warning(log_line, event_dict)
FILE_LOGGER.warning(log_line, event_dict)
elif level_tag == 'error':
STDOUT_LOGGER.error(log_line, event_dict)
FILE_LOGGER.error(log_line, event_dict)
else:
raise AssertionError(
f"Event type {type(e).__name__} has unhandled level: {e.level_tag()}"
)
# CliEventABC but not ShowException
else:
if level_tag == 'test':
# TODO after implmenting #3977 send to new test level
STDOUT_LOGGER.debug(log_line)
FILE_LOGGER.debug(log_line)
elif level_tag == 'debug':
STDOUT_LOGGER.debug(log_line)
FILE_LOGGER.debug(log_line)
elif level_tag == 'info':
STDOUT_LOGGER.info(log_line)
FILE_LOGGER.info(log_line)
elif level_tag == 'warn':
STDOUT_LOGGER.warning(log_line)
FILE_LOGGER.warning(log_line)
elif level_tag == 'error':
STDOUT_LOGGER.error(log_line)
FILE_LOGGER.error(log_line)
else:
raise AssertionError(
f"Event type {type(e).__name__} has unhandled level: {e.level_tag()}"
)

View File

@@ -0,0 +1,7 @@
from dbt.events.types import Event
from typing import List
# the global history of events for this session
# TODO this is naive and the memory footprint is likely far too large.
EVENT_HISTORY: List[Event] = []

393
core/dbt/events/types.py Normal file
View File

@@ -0,0 +1,393 @@
from abc import ABCMeta, abstractmethod
import argparse
from dataclasses import dataclass
import datetime
from dbt.semver import Matchers, VersionSpecifier
from typing import Any, List
# types to represent log levels
# in preparation for #3977
class TestLevel():
def level_tag(self) -> str:
return "test"
class DebugLevel():
def level_tag(self) -> str:
return "debug"
class InfoLevel():
def level_tag(self) -> str:
return "info"
class WarnLevel():
def level_tag(self) -> str:
return "warn"
class ErrorLevel():
def level_tag(self) -> str:
return "error"
@dataclass
class ShowException():
exc_info: Any = None
stack_info: Any = None
extra: Any = None
# The following classes represent the data necessary to describe a
# particular event to both human readable logs, and machine reliable
# event streams. classes extend superclasses that indicate what
# destinations they are intended for, which mypy uses to enforce
# that the necessary methods are defined.
# top-level superclass for all events
class Event(metaclass=ABCMeta):
ts: datetime.date
def __init__(self):
self.ts = datetime.datetime.now()
# do not define this yourself. inherit it from one of the above level types.
@abstractmethod
def level_tag(self) -> str:
raise Exception("level_tag not implemented for event")
class CliEventABC(Event, metaclass=ABCMeta):
# Solely the human readable message. Timestamps and formatting will be added by the logger.
@abstractmethod
def cli_msg(self) -> str:
raise Exception("cli_msg not implemented for cli event")
@dataclass
class AdapterEventBase():
name: str
raw_msg: str
def cli_msg(self) -> str:
return f"{self.name} adapter: {self.raw_msg}"
class AdapterEventDebug(DebugLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventInfo(InfoLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventWarning(WarnLevel, AdapterEventBase, CliEventABC, ShowException):
pass
class AdapterEventError(ErrorLevel, AdapterEventBase, CliEventABC, ShowException):
pass
@dataclass
class ReportVersion(InfoLevel, CliEventABC):
v: VersionSpecifier
def cli_msg(self):
return f"Running with dbt{str(self.v)}"
@dataclass
class ReportArgs(DebugLevel, CliEventABC):
args: argparse.Namespace
def cli_msg(self):
return f"running dbt with arguments {str(self.args)}"
class ParsingStart(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Start parsing."
class ParsingCompiling(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Compiling."
class ParsingWritingManifest(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Writing manifest."
class ParsingDone(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Done."
class ManifestDependenciesLoaded(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Dependencies loaded"
class ManifestLoaderCreated(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "ManifestLoader created"
class ManifestLoaded(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Manifest loaded"
class ManifestChecked(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Manifest checked"
class ManifestFlatGraphBuilt(InfoLevel, CliEventABC):
def cli_msg(self) -> str:
return "Flat graph built"
@dataclass
class ReportPerformancePath(InfoLevel, CliEventABC):
path: str
def cli_msg(self) -> str:
return f"Performance info: {self.path}"
@dataclass
class GitSparseCheckoutSubdirectory(DebugLevel, CliEventABC):
subdir: str
def cli_msg(self) -> str:
return f" Subdirectory specified: {self.subdir}, using sparse checkout."
@dataclass
class GitProgressCheckoutRevision(DebugLevel, CliEventABC):
revision: str
def cli_msg(self) -> str:
return f" Checking out revision {self.revision}."
@dataclass
class GitProgressUpdatingExistingDependency(DebugLevel, CliEventABC):
dir: str
def cli_msg(self) -> str:
return f"Updating existing dependency {self.dir}."
@dataclass
class GitProgressPullingNewDependency(DebugLevel, CliEventABC):
dir: str
def cli_msg(self) -> str:
return f"Pulling new dependency {self.dir}."
@dataclass
class GitNothingToDo(DebugLevel, CliEventABC):
sha: str
def cli_msg(self) -> str:
return f"Already at {self.sha}, nothing to do."
@dataclass
class GitProgressUpdatedCheckoutRange(DebugLevel, CliEventABC):
start_sha: str
end_sha: str
def cli_msg(self) -> str:
return f" Updated checkout from {self.start_sha} to {self.end_sha}."
@dataclass
class GitProgressCheckedOutAt(DebugLevel, CliEventABC):
end_sha: str
def cli_msg(self) -> str:
return f" Checked out at {self.end_sha}."
@dataclass
class RegistryProgressMakingGETRequest(DebugLevel, CliEventABC):
url: str
def cli_msg(self) -> str:
return f"Making package registry request: GET {self.url}"
@dataclass
class RegistryProgressGETResponse(DebugLevel, CliEventABC):
url: str
resp_code: int
def cli_msg(self) -> str:
return f"Response from registry: GET {self.url} {self.resp_code}"
# TODO this was actually `logger.exception(...)` not `logger.error(...)`
@dataclass
class SystemErrorRetrievingModTime(ErrorLevel, CliEventABC):
path: str
def cli_msg(self) -> str:
return f"Error retrieving modification time for file {self.path}"
@dataclass
class SystemCouldNotWrite(DebugLevel, CliEventABC):
path: str
reason: str
exc: Exception
def cli_msg(self) -> str:
return (
f"Could not write to path {self.path}({len(self.path)} characters): "
f"{self.reason}\nexception: {self.exc}"
)
@dataclass
class SystemExecutingCmd(DebugLevel, CliEventABC):
cmd: List[str]
def cli_msg(self) -> str:
return f'Executing "{" ".join(self.cmd)}"'
@dataclass
class SystemStdOutMsg(DebugLevel, CliEventABC):
bmsg: bytes
def cli_msg(self) -> str:
return f'STDOUT: "{str(self.bmsg)}"'
@dataclass
class SystemStdErrMsg(DebugLevel, CliEventABC):
bmsg: bytes
def cli_msg(self) -> str:
return f'STDERR: "{str(self.bmsg)}"'
@dataclass
class SystemReportReturnCode(DebugLevel, CliEventABC):
code: int
def cli_msg(self) -> str:
return f"command return code={self.code}"
@dataclass
class SelectorAlertUpto3UnusedNodes(InfoLevel, CliEventABC):
node_names: List[str]
def cli_msg(self) -> str:
summary_nodes_str = ("\n - ").join(self.node_names[:3])
and_more_str = (
f"\n - and {len(self.node_names) - 3} more" if len(self.node_names) > 4 else ""
)
return (
f"\nSome tests were excluded because at least one parent is not selected. "
f"Use the --greedy flag to include them."
f"\n - {summary_nodes_str}{and_more_str}"
)
@dataclass
class SelectorAlertAllUnusedNodes(DebugLevel, CliEventABC):
node_names: List[str]
def cli_msg(self) -> str:
debug_nodes_str = ("\n - ").join(self.node_names)
return (
f"Full list of tests that were excluded:"
f"\n - {debug_nodes_str}"
)
@dataclass
class SelectorReportInvalidSelector(InfoLevel, CliEventABC):
selector_methods: dict
spec_method: str
raw_spec: str
def cli_msg(self) -> str:
valid_selectors = ", ".join(self.selector_methods)
return (
f"The '{self.spec_method}' selector specified in {self.raw_spec} is "
f"invalid. Must be one of [{valid_selectors}]"
)
@dataclass
class MacroEventInfo(InfoLevel, CliEventABC):
msg: str
def cli_msg(self) -> str:
return self.msg
@dataclass
class MacroEventDebug(DebugLevel, CliEventABC):
msg: str
def cli_msg(self) -> str:
return self.msg
# since mypy doesn't run on every file we need to suggest to mypy that every
# class gets instantiated. But we don't actually want to run this code.
# making the conditional `if False` causes mypy to skip it as dead code so
# we need to skirt around that by computing something it doesn't check statically.
#
# TODO remove these lines once we run mypy everywhere.
if 1 == 0:
ReportVersion(VersionSpecifier(
build=None,
major='0',
matcher=Matchers['='],
minor='1',
patch='2',
prerelease=None,
))
ParsingStart()
ParsingCompiling()
ParsingWritingManifest()
ParsingDone()
ManifestDependenciesLoaded()
ManifestLoaderCreated()
ManifestLoaded()
ManifestChecked()
ManifestFlatGraphBuilt()
ReportPerformancePath(path='')
GitSparseCheckoutSubdirectory(subdir='')
GitProgressCheckoutRevision(revision='')
GitProgressUpdatingExistingDependency(dir='')
GitProgressPullingNewDependency(dir='')
GitNothingToDo(sha='')
GitProgressUpdatedCheckoutRange(start_sha='', end_sha='')
GitProgressCheckedOutAt(end_sha='')
SystemErrorRetrievingModTime(path='')
SystemCouldNotWrite(path='', reason='', exc=Exception(''))
SystemExecutingCmd(cmd=[''])
SystemStdOutMsg(bmsg=b'')
SystemStdErrMsg(bmsg=b'')
SystemReportReturnCode(code=0)
SelectorAlertUpto3UnusedNodes(node_names=[])
SelectorAlertAllUnusedNodes(node_names=[])
SelectorReportInvalidSelector(selector_methods={'': ''}, spec_method='', raw_spec='')
MacroEventInfo(msg='')
MacroEventDebug(msg='')

View File

@@ -5,7 +5,10 @@ from .queue import GraphQueue
from .selector_methods import MethodManager
from .selector_spec import SelectionCriteria, SelectionSpec
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
SelectorAlertUpto3UnusedNodes, SelectorAlertAllUnusedNodes, SelectorReportInvalidSelector
)
from dbt.node_types import NodeType
from dbt.exceptions import (
InternalException,
@@ -30,21 +33,9 @@ def alert_non_existence(raw_spec, nodes):
def alert_unused_nodes(raw_spec, node_names):
summary_nodes_str = ("\n - ").join(node_names[:3])
debug_nodes_str = ("\n - ").join(node_names)
and_more_str = f"\n - and {len(node_names) - 3} more" if len(node_names) > 4 else ""
summary_msg = (
f"\nSome tests were excluded because at least one parent is not selected. "
f"Use the --greedy flag to include them."
f"\n - {summary_nodes_str}{and_more_str}"
)
logger.info(summary_msg)
fire_event(SelectorAlertUpto3UnusedNodes(node_names=node_names))
if len(node_names) > 4:
debug_msg = (
f"Full list of tests that were excluded:"
f"\n - {debug_nodes_str}"
)
logger.debug(debug_msg)
fire_event(SelectorAlertAllUnusedNodes(node_names=node_names))
def can_select_indirectly(node):
@@ -103,11 +94,11 @@ class NodeSelector(MethodManager):
try:
collected = self.select_included(nodes, spec)
except InvalidSelectorException:
valid_selectors = ", ".join(self.SELECTOR_METHODS)
logger.info(
f"The '{spec.method}' selector specified in {spec.raw} is "
f"invalid. Must be one of [{valid_selectors}]"
)
fire_event(SelectorReportInvalidSelector(
selector_methods=self.SELECTOR_METHODS,
spec_method=spec.method,
raw_spec=spec.raw
))
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)

View File

@@ -45,6 +45,10 @@ DEBUG_LOG_FORMAT = (
SECRET_ENV_PREFIX = 'DBT_ENV_SECRET_'
# TODO this is a terrible place for a default like this
# fix via issue #4179
LOG_DIR: str = "logs"
def get_secret_env() -> List[str]:
return [
@@ -655,8 +659,12 @@ def get_timestamp():
return time.strftime("%H:%M:%S")
def timestamped_line(msg: str) -> str:
return "{} | {}".format(get_timestamp(), msg)
def print_timestamped_line(msg: str, use_color: Optional[str] = None):
if use_color is not None:
msg = dbt.ui.color(msg, use_color)
GLOBAL_LOGGER.info("{} | {}".format(get_timestamp(), msg))
GLOBAL_LOGGER.info(timestamped_line(msg))

View File

@@ -9,6 +9,8 @@ from contextlib import contextmanager
from pathlib import Path
import dbt.version
from dbt.events.functions import fire_event, setup_event_logger
from dbt.events.types import ReportArgs, ReportVersion
import dbt.flags as flags
import dbt.task.build as build_task
import dbt.task.clean as clean_task
@@ -228,18 +230,18 @@ def run_from_args(parsed):
# set log_format in the logger
parsed.cls.pre_init_hook(parsed)
logger.info("Running with dbt{}".format(dbt.version.installed))
# this will convert DbtConfigErrors into RuntimeExceptions
# task could be any one of the task objects
task = parsed.cls.from_args(args=parsed)
logger.debug("running dbt with arguments {parsed}", parsed=str(parsed))
log_path = None
if task.config is not None:
log_path = getattr(task.config, 'log_path', None)
# we can finally set the file logger up
# TODO move as a part of #4179
setup_event_logger(log_path or 'logs')
fire_event(ReportVersion(v=dbt.version.installed))
fire_event(ReportArgs(args=parsed))
log_manager.set_path(log_path)
if dbt.tracking.active_user is not None: # mypy appeasement, always true
logger.debug("Tracking: {}".format(dbt.tracking.active_user.state()))

View File

@@ -11,8 +11,14 @@ from dbt.adapters.factory import get_adapter
from dbt.parser.manifest import (
Manifest, ManifestLoader, _check_manifest
)
from dbt.logger import DbtProcessState, print_timestamped_line
from dbt.logger import DbtProcessState
from dbt.clients.system import write_file
from dbt.events.types import (
ManifestDependenciesLoaded, ManifestLoaderCreated, ManifestLoaded, ManifestChecked,
ManifestFlatGraphBuilt, ParsingStart, ParsingCompiling, ParsingWritingManifest, ParsingDone,
ReportPerformancePath
)
from dbt.events.functions import fire_event
from dbt.graph import Graph
import time
from typing import Optional
@@ -40,7 +46,7 @@ class ParseTask(ConfiguredTask):
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self.loader._perf_info,
cls=dbt.utils.JSONEncoder, indent=4))
print_timestamped_line(f"Performance info: {path}")
fire_event(ReportPerformancePath(path=path))
# This method takes code that normally exists in other files
# and pulls it in here, to simplify logging and make the
@@ -58,22 +64,22 @@ class ParseTask(ConfiguredTask):
with PARSING_STATE:
start_load_all = time.perf_counter()
projects = root_config.load_dependencies()
print_timestamped_line("Dependencies loaded")
fire_event(ManifestDependenciesLoaded())
loader = ManifestLoader(root_config, projects, macro_hook)
print_timestamped_line("ManifestLoader created")
fire_event(ManifestLoaderCreated())
manifest = loader.load()
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())
_check_manifest(manifest, root_config)
print_timestamped_line("Manifest checked")
fire_event(ManifestChecked())
manifest.build_flat_graph()
print_timestamped_line("Flat graph built")
fire_event(ManifestFlatGraphBuilt())
loader._perf_info.load_all_elapsed = (
time.perf_counter() - start_load_all
)
self.loader = loader
self.manifest = manifest
print_timestamped_line("Manifest loaded")
fire_event(ManifestLoaded())
def compile_manifest(self):
adapter = get_adapter(self.config)
@@ -81,14 +87,14 @@ class ParseTask(ConfiguredTask):
self.graph = compiler.compile(self.manifest)
def run(self):
print_timestamped_line('Start parsing.')
fire_event(ParsingStart())
self.get_full_manifest()
if self.args.compile:
print_timestamped_line('Compiling.')
fire_event(ParsingCompiling())
self.compile_manifest()
if self.args.write_manifest:
print_timestamped_line('Writing manifest.')
fire_event(ParsingWritingManifest())
self.write_manifest()
self.write_perf_info()
print_timestamped_line('Done.')
fire_event(ParsingDone())

View File

@@ -65,6 +65,7 @@ setup(
'dbt-extractor==0.4.0',
'typing-extensions>=3.7.4,<3.11',
'werkzeug>=1,<3',
'structlog>=21.2.0,<21.3.0'
# the following are all to match snowflake-connector-python
'requests<3.0.0',
'idna>=2.5,<4',

View File

@@ -6,13 +6,16 @@ import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterResponse
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events import AdapterLogger
from dbt.helper_types import Port
from dataclasses import dataclass
from typing import Optional
logger = AdapterLogger("Postgres")
@dataclass
class PostgresCredentials(Credentials):
host: str

19
test/unit/test_events.py Normal file
View File

@@ -0,0 +1,19 @@
from unittest import mock, TestCase
class TestFlags(TestCase):
def setUp(self):
pass
# this interface is documented for adapter maintainers to plug into
# so we should test that it at the very least doesn't explode.
def test_adapter_logging_interface(self):
from dbt.events import AdapterLogger
logger = AdapterLogger("dbt_tests")
logger.debug("debug message")
logger.info("info message")
logger.warning("warning message")
logger.error("error message")
logger.exception("exception message")
self.assertTrue(True)