Compare commits

...

24 Commits

Author SHA1 Message Date
leahwicz
ec1f609f3e Bumping version to 1.0.0 (#4431) (#4432)
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Github Build Bot <buildbot@fishtownanalytics.com>
2021-12-03 13:34:41 -05:00
Jeremy Cohen
b4ea003559 Changelog entries for rc3 -> final (#4389) (#4430)
* Changelog entries for rc3 -> final

* More updates

* Final entry

* Last fix, and the date

* These few, these happy few
2021-12-03 19:24:43 +01:00
Jeremy Cohen
23e1a9aa4f relax version specifier for dbt-extractor (#4427) (#4429)
Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-03 19:20:40 +01:00
Jeremy Cohen
9882d08a24 add new interop tests for black-box json log schema testing (#4327) (#4428)
Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-03 19:15:41 +01:00
leahwicz
79cc811a68 stringify generic exceptions (#4424) (#4425)
Co-authored-by: Ian Knox <81931810+iknox-fa@users.noreply.github.com>
2021-12-03 12:36:14 -05:00
leahwicz
c82572f745 Info vs debug text formatting (#4418) (#4421)
Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
2021-12-03 09:22:14 -05:00
leahwicz
42a38e4deb Sources aren't materialized (#4417) (#4420)
Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
2021-12-03 09:03:24 -05:00
leahwicz
ecf0ffe68c Add flag to main.py. Reinstantiate after flags (#4416) (#4419)
Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
2021-12-03 08:54:48 -05:00
leahwicz
e9f26ef494 add node type codes to more events + more hook log data (#4378) (#4415)
* add node type codes to more events + more hook log

* minor fixes

* renames started/finished keys

* made process more clear

* fixed errors

* Put back report_node_data in fresshness.py

Co-authored-by: Gerda Shank <gerda@dbtlabs.com>

Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
2021-12-02 19:31:20 -05:00
leahwicz
c77dc59af8 use reference keys instead of relations (#4410) (#4414)
Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-02 18:41:20 -05:00
leahwicz
a5ebe4ff59 Logging README (#4395) (#4413)
* WIP

* more README cleanup

* readme tweaks

* small tweaks

* wording updates

Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
2021-12-02 18:12:28 -05:00
leahwicz
5c01f9006c user configurable event buffer size (#4411) (#4412)
Co-authored-by: Ian Knox <81931810+iknox-fa@users.noreply.github.com>
2021-12-02 18:05:57 -05:00
Jeremy Cohen
c92e1ed9f2 [Backport] #4388 + #4405 (#4408)
* A few final logging touch-ups (#4388)

* Rm unused events, per #4104

* More structured ConcurrencyLine

* Replace \n prefixes with EmptyLine

* Reimplement ui.warning_tag to centralize logic

* Use warning_tag for deprecations too

* Rm more unused event types

* Exclude EmptyLine from json logs

* loglines are not always created by events (#4406)

Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>

* Rollover + backup for dbt.log (#4405)

Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-02 17:51:08 -05:00
Emily Rockman
85dee41a9f update file name (#4402) (#4407)
Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com>
2021-12-02 17:08:32 -05:00
leahwicz
a4456feff0 change json override strategy (#4396) (#4403)
Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-02 17:05:33 -05:00
leahwicz
8d27764b0f allow log_format to be set in profile configs (#4394) (#4401)
Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
2021-12-02 16:49:41 -05:00
leahwicz
e56256d968 use rfc3339 format for log time stamps (#4384) (#4400)
Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-02 15:42:46 -05:00
leahwicz
86cb3ba6fa [#4354] Different output for console and file logs (#4379) (#4399)
* [#4354] Different output for console and file logs

* Tweak some log formats

* Change loging of thread names

Co-authored-by: Gerda Shank <gerda@fishtownanalytics.com>
2021-12-02 15:39:53 -05:00
leahwicz
4d0d2d0d6f Add windows OS error supressing for temp dir cleanups (#4380) (#4398)
Co-authored-by: Ian Knox <81931810+iknox-fa@users.noreply.github.com>
2021-12-02 15:33:10 -05:00
leahwicz
f8a3c27fb8 move event code up a level (#4381) (#4397)
move event code up a level plus minor fixes

Co-authored-by: Nathaniel May <nathaniel.may@fishtownanalytics.com>
2021-12-02 15:27:04 -05:00
leahwicz
30f05b0213 Fix release process (#4385) (#4393) 2021-12-02 12:33:41 -05:00
Jeremy Cohen
f1bebb3629 Tiny touchups for deps, clean (#4366) (#4387)
* Use actual profile name for log msg

* Raise clean dep warning iff configured path missing
2021-12-02 17:35:51 +01:00
Gerda Shank
e7a40345ad Make the stdout logger actually go to stdout (#4368) (#4376) 2021-12-01 11:13:24 -05:00
Emily Rockman
ba94b8212c only log events in cache.py when flag is set set (#4371)
flag is --log-cache-events
2021-11-30 16:05:20 -06:00
42 changed files with 1008 additions and 470 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 1.0.0rc3 current_version = 1.0.0
parse = (?P<major>\d+) parse = (?P<major>\d+)
\.(?P<minor>\d+) \.(?P<minor>\d+)
\.(?P<patch>\d+) \.(?P<patch>\d+)

View File

@@ -95,7 +95,9 @@ jobs:
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v2
with: with:
name: dist name: dist
path: dist/ path: |
dist/
!dist/dbt-${{github.event.inputs.version_number}}.tar.gz
test-build: test-build:
name: verify packages name: verify packages

View File

@@ -0,0 +1,71 @@
# This Action checks makes a dbt run to sample json structured logs
# and checks that they conform to the currently documented schema.
#
# If this action fails it either means we have unintentionally deviated
# from our documented structured logging schema, or we need to bump the
# version of our structured logging and add new documentation to
# communicate these changes.
name: Structured Logging Schema Check
on:
push:
branches:
- "main"
- "*.latest"
- "releases/*"
pull_request:
workflow_dispatch:
permissions: read-all
jobs:
# run the performance measurements on the current or default branch
test-schema:
name: Test Log Schema
runs-on: ubuntu-latest
env:
# turns warnings into errors
RUSTFLAGS: "-D warnings"
# points tests to the log file
LOG_DIR: "/home/runner/work/dbt-core/dbt-core/logs"
# tells integration tests to output into json format
DBT_LOG_FORMAT: 'json'
steps:
- name: checkout dev
uses: actions/checkout@v2
with:
persist-credentials: false
- name: Setup Python
uses: actions/setup-python@v2.2.2
with:
python-version: "3.8"
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: install dbt
run: pip install -r dev-requirements.txt -r editable-requirements.txt
- name: Set up postgres
uses: ./.github/actions/setup-postgres-linux
- name: ls
run: ls
# integration tests generate a ton of logs in different files. the next step will find them all.
# we actually care if these pass, because the normal test run doesn't usually include many json log outputs
- name: Run integration tests
run: tox -e py38-postgres -- -nauto
# apply our schema tests to every log event from the previous step
# skips any output that isn't valid json
- uses: actions-rs/cargo@v1
with:
command: run
args: --manifest-path test/interop/log_parsing/Cargo.toml

View File

@@ -1,4 +1,28 @@
## dbt-core 1.0.0 (Release TBD) ## dbt-core 1.0.0 (December 3, 2021)
### Fixes
- Configure the CLI logger destination to use stdout instead of stderr ([#4368](https://github.com/dbt-labs/dbt-core/pull/4368))
- Make the size of `EVENT_HISTORY` configurable, via `EVENT_BUFFER_SIZE` global config ([#4411](https://github.com/dbt-labs/dbt-core/pull/4411), [#4416](https://github.com/dbt-labs/dbt-core/pull/4416))
- Change type of `log_format` in `profiles.yml` user config to be string, not boolean ([#4394](https://github.com/dbt-labs/dbt-core/pull/4394))
### Under the hood
- Only log cache events if `LOG_CACHE_EVENTS` is enabled, and disable by default. This restores previous behavior ([#4369](https://github.com/dbt-labs/dbt-core/pull/4369))
- Move event codes to be a top-level attribute of JSON-formatted logs, rather than nested in `data` ([#4381](https://github.com/dbt-labs/dbt-core/pull/4381))
- Fix failing integration test on Windows ([#4380](https://github.com/dbt-labs/dbt-core/pull/4380))
- Clean up warning messages for `clean` + `deps` ([#4366](https://github.com/dbt-labs/dbt-core/pull/4366))
- Use RFC3339 timestamps for log messages ([#4384](https://github.com/dbt-labs/dbt-core/pull/4384))
- Different text output for console (info) and file (debug) logs ([#4379](https://github.com/dbt-labs/dbt-core/pull/4379), [#4418](https://github.com/dbt-labs/dbt-core/pull/4418))
- Remove unused events. More structured `ConcurrencyLine`. Replace `\n` message starts/ends with `EmptyLine` events, and exclude `EmptyLine` from JSON-formatted output ([#4388](https://github.com/dbt-labs/dbt-core/pull/4388))
- Update `events` module README ([#4395](https://github.com/dbt-labs/dbt-core/pull/4395))
- Rework approach to JSON serialization for events with non-standard properties ([#4396](https://github.com/dbt-labs/dbt-core/pull/4396))
- Update legacy logger file name to `dbt.log.legacy` ([#4402](https://github.com/dbt-labs/dbt-core/pull/4402))
- Rollover `dbt.log` at 10 MB, and keep up to 5 backups, restoring previous behavior ([#4405](https://github.com/dbt-labs/dbt-core/pull/4405))
- Use reference keys instead of full relation objects in cache events ([#4410](https://github.com/dbt-labs/dbt-core/pull/4410))
- Add `node_type` contextual info to more events ([#4378](https://github.com/dbt-labs/dbt-core/pull/4378))
- Make `materialized` config optional in `node_type` ([#4417](https://github.com/dbt-labs/dbt-core/pull/4417))
- Stringify exception in `GenericExceptionOnRun` to support JSON serialization ([#4424](https://github.com/dbt-labs/dbt-core/pull/4424))
- Add "interop" tests for machine consumption of structured log output ([#4327](https://github.com/dbt-labs/dbt-core/pull/4327))
- Relax version specifier for `dbt-extractor` to `~=0.4.0`, to support compiled wheels for additional architectures when available ([#4427](https://github.com/dbt-labs/dbt-core/pull/4427))
## dbt-core 1.0.0rc3 (November 30, 2021) ## dbt-core 1.0.0rc3 (November 30, 2021)

View File

@@ -39,7 +39,7 @@ from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
) )
from dbt.adapters.base import Column as BaseColumn from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache from dbt.adapters.cache import RelationsCache, _make_key
SeedModel = Union[ParsedSeedNode, CompiledSeedNode] SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
@@ -291,7 +291,7 @@ class BaseAdapter(metaclass=AdapterMeta):
if (database, schema) not in self.cache: if (database, schema) not in self.cache:
fire_event( fire_event(
CacheMiss( CacheMiss(
conn_name=self.nice_connection_name, conn_name=self.nice_connection_name(),
database=database, database=database,
schema=schema schema=schema
) )
@@ -676,7 +676,11 @@ class BaseAdapter(metaclass=AdapterMeta):
relations = self.list_relations_without_caching( relations = self.list_relations_without_caching(
schema_relation schema_relation
) )
fire_event(ListRelations(database=database, schema=schema, relations=relations)) fire_event(ListRelations(
database=database,
schema=schema,
relations=[_make_key(x) for x in relations]
))
return relations return relations

View File

@@ -1,8 +1,8 @@
import threading import threading
from collections import namedtuple
from copy import deepcopy from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
import dbt.exceptions import dbt.exceptions
from dbt.events.functions import fire_event from dbt.events.functions import fire_event
from dbt.events.types import ( from dbt.events.types import (
@@ -22,18 +22,6 @@ from dbt.events.types import (
) )
from dbt.utils import lowercase from dbt.utils import lowercase
_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')
def _make_key(relation) -> _ReferenceKey:
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
# databases and schemas can both be None
return _ReferenceKey(lowercase(relation.database),
lowercase(relation.schema),
lowercase(relation.identifier))
def dot_separated(key: _ReferenceKey) -> str: def dot_separated(key: _ReferenceKey) -> str:
"""Return the key in dot-separated string form. """Return the key in dot-separated string form.
@@ -334,7 +322,7 @@ class RelationsCache:
:param BaseRelation relation: The underlying relation. :param BaseRelation relation: The underlying relation.
""" """
cached = _CachedRelation(relation) cached = _CachedRelation(relation)
fire_event(AddRelation(relation=cached)) fire_event(AddRelation(relation=_make_key(cached)))
fire_event(DumpBeforeAddGraph(dump=self.dump_graph())) fire_event(DumpBeforeAddGraph(dump=self.dump_graph()))
with self.lock: with self.lock:

View File

@@ -0,0 +1,24 @@
# this module exists to resolve circular imports with the events module
from collections import namedtuple
from typing import Optional
_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')
def lowercase(value: Optional[str]) -> Optional[str]:
if value is None:
return None
else:
return value.lower()
def _make_key(relation) -> _ReferenceKey:
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
to keep track of quoting
"""
# databases and schemas can both be None
return _ReferenceKey(lowercase(relation.database),
lowercase(relation.schema),
lowercase(relation.identifier))

View File

@@ -75,7 +75,8 @@ class SQLConnectionManager(BaseConnectionManager):
fire_event( fire_event(
SQLQueryStatus( SQLQueryStatus(
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2) status=self.get_response(cursor)._message,
elapsed=round((time.time() - pre), 2)
) )
) )

View File

@@ -5,6 +5,7 @@ import dbt.clients.agate_helper
from dbt.contracts.connection import Connection from dbt.contracts.connection import Connection
import dbt.exceptions import dbt.exceptions
from dbt.adapters.base import BaseAdapter, available from dbt.adapters.base import BaseAdapter, available
from dbt.adapters.cache import _make_key
from dbt.adapters.sql import SQLConnectionManager from dbt.adapters.sql import SQLConnectionManager
from dbt.events.functions import fire_event from dbt.events.functions import fire_event
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
@@ -182,7 +183,7 @@ class SQLAdapter(BaseAdapter):
def create_schema(self, relation: BaseRelation) -> None: def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier() relation = relation.without_identifier()
fire_event(SchemaCreation(relation=relation)) fire_event(SchemaCreation(relation=_make_key(relation)))
kwargs = { kwargs = {
'relation': relation, 'relation': relation,
} }
@@ -193,7 +194,7 @@ class SQLAdapter(BaseAdapter):
def drop_schema(self, relation: BaseRelation) -> None: def drop_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier() relation = relation.without_identifier()
fire_event(SchemaDrop(relation=relation)) fire_event(SchemaDrop(relation=_make_key(relation)))
kwargs = { kwargs = {
'relation': relation, 'relation': relation,
} }

View File

@@ -545,8 +545,12 @@ class UnsetProfileConfig(RuntimeConfig):
args, profile_renderer, profile_name args, profile_renderer, profile_name
) )
except (DbtProjectError, DbtProfileError) as exc: except (DbtProjectError, DbtProfileError) as exc:
selected_profile_name = Profile.pick_profile_name(
args_profile_name=getattr(args, 'profile', None),
project_profile_name=profile_name
)
fire_event(ProfileLoadError(exc=exc)) fire_event(ProfileLoadError(exc=exc))
fire_event(ProfileNotFound(profile_name=profile_name)) fire_event(ProfileNotFound(profile_name=selected_profile_name))
# return the poisoned form # return the poisoned form
profile = UnsetProfile() profile = UnsetProfile()
# disable anonymous usage statistics # disable anonymous usage statistics

View File

@@ -231,7 +231,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
printer_width: Optional[int] = None printer_width: Optional[int] = None
write_json: Optional[bool] = None write_json: Optional[bool] = None
warn_error: Optional[bool] = None warn_error: Optional[bool] = None
log_format: Optional[bool] = None log_format: Optional[str] = None
debug: Optional[bool] = None debug: Optional[bool] = None
version_check: Optional[bool] = None version_check: Optional[bool] = None
fail_fast: Optional[bool] = None fail_fast: Optional[bool] = None

View File

@@ -36,9 +36,9 @@ class DBTDeprecation:
if self.name not in active_deprecations: if self.name not in active_deprecations:
desc = self.description.format(**kwargs) desc = self.description.format(**kwargs)
msg = ui.line_wrap_message( msg = ui.line_wrap_message(
desc, prefix='* Deprecation Warning:\n\n' desc, prefix='Deprecated functionality\n\n'
) )
dbt.exceptions.warn_or_error(msg) dbt.exceptions.warn_or_error(msg, log_fmt=ui.warning_tag('{}'))
self.track_deprecation_warn() self.track_deprecation_warn()
active_deprecations.add(self.name) active_deprecations.add(self.name)
@@ -62,7 +62,7 @@ class PackageInstallPathDeprecation(DBTDeprecation):
class ConfigPathDeprecation(DBTDeprecation): class ConfigPathDeprecation(DBTDeprecation):
_description = '''\ _description = '''\
The `{deprecated_path}` config has been deprecated in favor of `{exp_path}`. The `{deprecated_path}` config has been renamed to `{exp_path}`.
Please update your `dbt_project.yml` configuration to reflect this change. Please update your `dbt_project.yml` configuration to reflect this change.
''' '''

View File

@@ -6,7 +6,53 @@ The Events module is the implmentation for structured logging. These events repr
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt. The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
# Adding a New Event # Adding a New Event
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may be a dataclass with some values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `CliEventABC`) as well as the loglevel this event belongs to. In `events.types` add a new class that represents the new event. All events must be a dataclass with, at minimum, a code. You may also include some other values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `Cli`) as well as the loglevel this event belongs to. This system has been designed to take full advantage of mypy so running it will catch anything you may miss.
## Required for Every Event
- a string attribute `code`, that's unique across events
- assign a log level by extending `DebugLevel`, `InfoLevel`, `WarnLevel`, or `ErrorLevel`
- a message()
- extend `File` and/or `Cli` based on where it should output
Example
```
@dataclass
class PartialParsingDeletedExposure(DebugLevel, Cli, File):
unique_id: str
code: str = "I049"
def message(self) -> str:
return f"Partial parsing: deleted exposure {self.unique_id}"
```
## Optional (based on your event)
- Events associated with node status changes must have `report_node_data` passed in and be extended with `NodeInfo`
- define `asdict` if your data is not serializable to json
Example
```
@dataclass
class SuperImportantNodeEvent(InfoLevel, File, NodeInfo):
node_name: str
run_result: RunResult
report_node_data: ParsedModelNode # may vary
code: str = "Q036"
def message(self) -> str:
return f"{self.node_name} had overly verbose result of {run_result}"
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
```
All values other than `code` and `report_node_data` will be included in the `data` node of the json log output.
Once your event has been added, add a dummy call to your new event at the bottom of `types.py` and also add your new Event to the list `sample_values` in `test/unit/test_events.py'.
# Adapter Maintainers # Adapter Maintainers
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already: To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:

View File

@@ -1,7 +1,6 @@
from abc import ABCMeta, abstractmethod, abstractproperty from abc import ABCMeta, abstractmethod, abstractproperty
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
import json
import os import os
import threading import threading
from typing import Any, Optional from typing import Any, Optional
@@ -38,6 +37,11 @@ class ErrorLevel():
return "error" return "error"
class Cache():
# Events with this class will only be logged when the `--log-cache-events` flag is passed
pass
@dataclass @dataclass
class Node(): class Node():
node_path: str node_path: str
@@ -70,6 +74,7 @@ class Event(metaclass=ABCMeta):
# fields that should be on all events with their default implementations # fields that should be on all events with their default implementations
log_version: int = 1 log_version: int = 1
ts: Optional[datetime] = None # use getter for non-optional ts: Optional[datetime] = None # use getter for non-optional
ts_rfc3339: Optional[str] = None # use getter for non-optional
pid: Optional[int] = None # use getter for non-optional pid: Optional[int] = None # use getter for non-optional
node_info: Optional[Node] node_info: Optional[Node]
@@ -91,32 +96,20 @@ class Event(metaclass=ABCMeta):
def message(self) -> str: def message(self) -> str:
raise Exception("msg not implemented for Event") raise Exception("msg not implemented for Event")
# override this method to convert non-json serializable fields to json.
# for override examples, see existing concrete types.
#
# there is no type-level mechanism to have mypy enforce json serializability, so we just try
# to serialize and raise an exception at runtime when that fails. This safety mechanism
# only works if we have attempted to serialize every concrete event type in our tests.
def fields_to_json(self, field_value: Any) -> Any:
try:
json.dumps(field_value, sort_keys=True)
return field_value
except TypeError:
val_type = type(field_value).__name__
event_type = type(self).__name__
return Exception(
f"type {val_type} is not serializable to json."
f" First make sure that the call sites for {event_type} match the type hints"
f" and if they do, you can override Event::fields_to_json in {event_type} in"
" types.py to define your own serialization function to any valid json type"
)
# exactly one time stamp per concrete event # exactly one time stamp per concrete event
def get_ts(self) -> datetime: def get_ts(self) -> datetime:
if not self.ts: if not self.ts:
self.ts = datetime.now() self.ts = datetime.utcnow()
self.ts_rfc3339 = self.ts.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
return self.ts return self.ts
# preformatted time stamp
def get_ts_rfc3339(self) -> str:
if not self.ts_rfc3339:
# get_ts() creates the formatted string too so all time logic is centralized
self.get_ts()
return self.ts_rfc3339 # type: ignore
# exactly one pid per concrete event # exactly one pid per concrete event
def get_pid(self) -> int: def get_pid(self) -> int:
if not self.pid: if not self.pid:
@@ -132,6 +125,21 @@ class Event(metaclass=ABCMeta):
from dbt.events.functions import get_invocation_id from dbt.events.functions import get_invocation_id
return get_invocation_id() return get_invocation_id()
# default dict factory for all events. can override on concrete classes.
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
# stringify all exceptions
if isinstance(v, Exception) or isinstance(v, BaseException):
d[k] = str(v)
# skip all binary data
elif isinstance(v, bytes):
continue
else:
d[k] = v
return d
@dataclass # type: ignore @dataclass # type: ignore
class NodeInfo(Event, metaclass=ABCMeta): class NodeInfo(Event, metaclass=ABCMeta):
@@ -143,7 +151,7 @@ class NodeInfo(Event, metaclass=ABCMeta):
node_name=self.report_node_data.name, node_name=self.report_node_data.name,
unique_id=self.report_node_data.unique_id, unique_id=self.report_node_data.unique_id,
resource_type=self.report_node_data.resource_type.value, resource_type=self.report_node_data.resource_type.value,
materialized=self.report_node_data.config.materialized, materialized=self.report_node_data.config.get('materialized'),
node_status=str(self.report_node_data._event_status.get('node_status')), node_status=str(self.report_node_data._event_status.get('node_status')),
node_started_at=self.report_node_data._event_status.get("started_at"), node_started_at=self.report_node_data._event_status.get("started_at"),
node_finished_at=self.report_node_data._event_status.get("finished_at") node_finished_at=self.report_node_data._event_status.get("finished_at")

View File

@@ -2,8 +2,8 @@
from colorama import Style from colorama import Style
from datetime import datetime from datetime import datetime
import dbt.events.functions as this # don't worry I hate it too. import dbt.events.functions as this # don't worry I hate it too.
from dbt.events.base_types import Cli, Event, File, ShowException, NodeInfo from dbt.events.base_types import Cli, Event, File, ShowException, NodeInfo, Cache
from dbt.events.types import EventBufferFull, T_Event from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
import dbt.flags as flags import dbt.flags as flags
# TODO this will need to move eventually # TODO this will need to move eventually
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER
@@ -13,19 +13,21 @@ from io import StringIO, TextIOWrapper
import logbook import logbook
import logging import logging
from logging import Logger from logging import Logger
import sys
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import os import os
import uuid import uuid
import threading
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
import dataclasses import dataclasses
from collections import deque from collections import deque
# create the global event history buffer with a max size of 100k records # create the global event history buffer with the default max size (10k)
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore. # python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
# TODO: make the maxlen something configurable from the command line via args(?) # TODO the flags module has not yet been resolved when this is created
global EVENT_HISTORY global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=100000) # type: ignore EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
# create the global file logger with no configuration # create the global file logger with no configuration
global FILE_LOG global FILE_LOG
@@ -38,7 +40,7 @@ FILE_LOG.addHandler(null_handler)
global STDOUT_LOG global STDOUT_LOG
STDOUT_LOG = logging.getLogger('default_stdout') STDOUT_LOG = logging.getLogger('default_stdout')
STDOUT_LOG.setLevel(logging.INFO) STDOUT_LOG.setLevel(logging.INFO)
stdout_handler = logging.StreamHandler() stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO) stdout_handler.setLevel(logging.INFO)
STDOUT_LOG.addHandler(stdout_handler) STDOUT_LOG.addHandler(stdout_handler)
@@ -48,6 +50,10 @@ invocation_id: Optional[str] = None
def setup_event_logger(log_path, level_override=None): def setup_event_logger(log_path, level_override=None):
# flags have been resolved, and log_path is known
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
make_log_dir_if_missing(log_path) make_log_dir_if_missing(log_path)
this.format_json = flags.LOG_FORMAT == 'json' this.format_json = flags.LOG_FORMAT == 'json'
# USE_COLORS can be None if the app just started and the cli flags # USE_COLORS can be None if the app just started and the cli flags
@@ -64,7 +70,7 @@ def setup_event_logger(log_path, level_override=None):
FORMAT = "%(message)s" FORMAT = "%(message)s"
stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT) stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT)
stdout_handler = logging.StreamHandler() stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(stdout_passthrough_formatter) stdout_handler.setFormatter(stdout_passthrough_formatter)
stdout_handler.setLevel(level) stdout_handler.setLevel(level)
# clear existing stdout TextIOWrapper stream handlers # clear existing stdout TextIOWrapper stream handlers
@@ -80,7 +86,12 @@ def setup_event_logger(log_path, level_override=None):
file_passthrough_formatter = logging.Formatter(fmt=FORMAT) file_passthrough_formatter = logging.Formatter(fmt=FORMAT)
file_handler = RotatingFileHandler(filename=log_dest, encoding='utf8') file_handler = RotatingFileHandler(
filename=log_dest,
encoding='utf8',
maxBytes=10 * 1024 * 1024, # 10 mb
backupCount=5
)
file_handler.setFormatter(file_passthrough_formatter) file_handler.setFormatter(file_passthrough_formatter)
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
this.FILE_LOG.handlers.clear() this.FILE_LOG.handlers.clear()
@@ -130,17 +141,25 @@ def event_to_serializable_dict(
) -> Dict[str, Any]: ) -> Dict[str, Any]:
data = dict() data = dict()
node_info = dict() node_info = dict()
if hasattr(e, '__dataclass_fields__'): log_line = dict()
for field, value in dataclasses.asdict(e).items(): # type: ignore[attr-defined] try:
_json_value = e.fields_to_json(value) log_line = dataclasses.asdict(e, dict_factory=type(e).asdict)
except AttributeError:
event_type = type(e).__name__
raise Exception( # TODO this may hang async threads
f"type {event_type} is not serializable to json."
f" First make sure that the call sites for {event_type} match the type hints"
f" and if they do, you can override the dataclass method `asdict` in {event_type} in"
" types.py to define your own serialization function to a dictionary of valid json"
" types"
)
if isinstance(e, NodeInfo): if isinstance(e, NodeInfo):
node_info = dataclasses.asdict(e.get_node_info()) node_info = dataclasses.asdict(e.get_node_info())
if not isinstance(_json_value, Exception): for field, value in log_line.items(): # type: ignore[attr-defined]
data[field] = _json_value if field not in ["code", "report_node_data"]:
else: data[field] = value
data[field] = f"JSON_SERIALIZE_FAILED: {type(value).__name__, 'NA'}"
event_dict = { event_dict = {
'type': 'log_line', 'type': 'log_line',
@@ -152,7 +171,8 @@ def event_to_serializable_dict(
'data': data, 'data': data,
'invocation_id': e.get_invocation_id(), 'invocation_id': e.get_invocation_id(),
'thread_name': e.get_thread_name(), 'thread_name': e.get_thread_name(),
'node_info': node_info 'node_info': node_info,
'code': e.code
} }
return event_dict return event_dict
@@ -161,35 +181,64 @@ def event_to_serializable_dict(
# translates an Event to a completely formatted text-based log line # translates an Event to a completely formatted text-based log line
# you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg()) # you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg())
# type hinting everything as strings so we don't get any unintentional string conversions via str() # type hinting everything as strings so we don't get any unintentional string conversions via str()
def create_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str: def create_info_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
color_tag: str = '' if this.format_color else Style.RESET_ALL color_tag: str = '' if this.format_color else Style.RESET_ALL
ts: str = e.get_ts().strftime("%H:%M:%S") ts: str = e.get_ts().strftime("%H:%M:%S")
scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets()) scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets())
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
return log_line
def create_debug_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
log_line: str = ''
# Create a separator if this is the beginning of an invocation
if type(e) == MainReportVersion:
separator = 30 * '='
log_line = f'\n\n{separator} {e.get_ts()} | {get_invocation_id()} {separator}\n'
color_tag: str = '' if this.format_color else Style.RESET_ALL
ts: str = e.get_ts().strftime("%H:%M:%S.%f")
scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets())
level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} " level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} "
log_line: str = f"{color_tag}{ts} | [ {level} ] | {scrubbed_msg}" thread = ''
if threading.current_thread().getName():
thread_name = threading.current_thread().getName()
thread_name = thread_name[:10]
thread_name = thread_name.ljust(10, ' ')
thread = f' [{thread_name}]:'
log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}"
return log_line return log_line
# translates an Event to a completely formatted json log line # translates an Event to a completely formatted json log line
# you have to specify which message you want. (i.e. - e.message(), e.cli_msg(), e.file_msg()) # you have to specify which message you want. (i.e. - e.message(), e.cli_msg(), e.file_msg())
def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str: def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> Optional[str]:
values = event_to_serializable_dict(e, lambda dt: dt.isoformat(), lambda x: msg_fn(x)) if type(e) == EmptyLine:
return None # will not be sent to logger
# using preformatted string instead of formatting it here to be extra careful about timezone
values = event_to_serializable_dict(e, lambda _: e.get_ts_rfc3339(), lambda x: msg_fn(x))
raw_log_line = json.dumps(values, sort_keys=True) raw_log_line = json.dumps(values, sort_keys=True)
return scrub_secrets(raw_log_line, env_secrets()) return scrub_secrets(raw_log_line, env_secrets())
# calls create_text_log_line() or create_json_log_line() according to logger config # calls create_stdout_text_log_line() or create_json_log_line() according to logger config
def create_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str: def create_log_line(
return ( e: T_Event,
create_json_log_line(e, msg_fn) msg_fn: Callable[[T_Event], str],
if this.format_json else file_output=False
create_text_log_line(e, msg_fn) ) -> Optional[str]:
) if this.format_json:
return create_json_log_line(e, msg_fn) # json output, both console and file
elif file_output is True or flags.DEBUG:
return create_debug_text_log_line(e, msg_fn) # default file output
else:
return create_info_text_log_line(e, msg_fn) # console output
# allows for resuse of this obnoxious if else tree. # allows for resuse of this obnoxious if else tree.
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra # do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str): def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str):
if not log_line:
return
if level_tag == 'test': if level_tag == 'test':
# TODO after implmenting #3977 send to new test level # TODO after implmenting #3977 send to new test level
l.debug(log_line) l.debug(log_line)
@@ -262,12 +311,17 @@ def send_exc_to_logger(
# (i.e. - mutating the event history, printing to stdout, logging # (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.) # to files, etc.)
def fire_event(e: Event) -> None: def fire_event(e: Event) -> None:
# skip logs when `--log-cache-events` is not passed
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return
# if and only if the event history deque will be completely filled by this event # if and only if the event history deque will be completely filled by this event
# fire warning that old events are now being dropped # fire warning that old events are now being dropped
global EVENT_HISTORY global EVENT_HISTORY
if len(EVENT_HISTORY) == ((EVENT_HISTORY.maxlen or 100000) - 1): if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
EVENT_HISTORY.append(e)
fire_event(EventBufferFull()) fire_event(EventBufferFull())
else:
EVENT_HISTORY.append(e) EVENT_HISTORY.append(e)
# backwards compatibility for plugins that require old logger (dbt-rpc) # backwards compatibility for plugins that require old logger (dbt-rpc)
@@ -275,14 +329,15 @@ def fire_event(e: Event) -> None:
# using Event::message because the legacy logger didn't differentiate messages by # using Event::message because the legacy logger didn't differentiate messages by
# destination # destination
log_line = create_log_line(e, msg_fn=lambda x: x.message()) log_line = create_log_line(e, msg_fn=lambda x: x.message())
if log_line:
send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line) send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line)
return # exit the function to avoid using the current logger as well return # exit the function to avoid using the current logger as well
# always logs debug level regardless of user input # always logs debug level regardless of user input
if isinstance(e, File): if isinstance(e, File):
log_line = create_log_line(e, msg_fn=lambda x: x.file_msg()) log_line = create_log_line(e, msg_fn=lambda x: x.file_msg(), file_output=True)
# doesn't send exceptions to exception logger # doesn't send exceptions to exception logger
if log_line:
send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line) send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line)
if isinstance(e, Cli): if isinstance(e, Cli):
@@ -292,6 +347,7 @@ def fire_event(e: Event) -> None:
return # eat the message in case it was one of the expensive ones return # eat the message in case it was one of the expensive ones
log_line = create_log_line(e, msg_fn=lambda x: x.cli_msg()) log_line = create_log_line(e, msg_fn=lambda x: x.cli_msg())
if log_line:
if not isinstance(e, ShowException): if not isinstance(e, ShowException):
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line) send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
# CliEventABC and ShowException # CliEventABC and ShowException

View File

@@ -1,16 +1,16 @@
import argparse import argparse
from dataclasses import dataclass from dataclasses import dataclass
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
from dbt.events.stubs import ( from dbt.events.stubs import (
_CachedRelation, _CachedRelation,
BaseRelation, BaseRelation,
ParsedModelNode,
ParsedHookNode, ParsedHookNode,
_ReferenceKey, ParsedModelNode,
RunResult RunResult
) )
from dbt import ui from dbt import ui
from dbt.events.base_types import ( from dbt.events.base_types import (
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo, Cache
) )
from dbt.events.format import format_fancy_output_line, pluralize from dbt.events.format import format_fancy_output_line, pluralize
from dbt.node_types import NodeType from dbt.node_types import NodeType
@@ -115,14 +115,6 @@ class MainEncounteredError(ErrorLevel, Cli):
def message(self) -> str: def message(self) -> str:
return f"Encountered an error:\n{str(self.e)}" return f"Encountered an error:\n{str(self.e)}"
# overriding default json serialization for this event
def fields_to_json(self, val: Any) -> Any:
# equality on BaseException is not good enough of a comparison here
if isinstance(val, BaseException):
return str(val)
return val
@dataclass @dataclass
class MainStackTrace(DebugLevel, Cli): class MainStackTrace(DebugLevel, Cli):
@@ -150,12 +142,9 @@ class MainReportArgs(DebugLevel, Cli, File):
def message(self): def message(self):
return f"running dbt with arguments {str(self.args)}" return f"running dbt with arguments {str(self.args)}"
# overriding default json serialization for this event @classmethod
def fields_to_json(self, val: Any) -> Any: def asdict(cls, data: list) -> dict:
if isinstance(val, argparse.Namespace): return dict((k, str(v)) for k, v in data)
return str(val)
return val
@dataclass @dataclass
@@ -354,13 +343,6 @@ class SystemCouldNotWrite(DebugLevel, Cli, File):
f"{self.reason}\nexception: {self.exc}" f"{self.reason}\nexception: {self.exc}"
) )
# overriding default json serialization for this event
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class SystemExecutingCmd(DebugLevel, Cli, File): class SystemExecutingCmd(DebugLevel, Cli, File):
@@ -397,40 +379,6 @@ class SystemReportReturnCode(DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"command return code={self.returncode}" return f"command return code={self.returncode}"
# TODO remove?? Not called outside of this file
@dataclass
class SelectorAlertUpto3UnusedNodes(InfoLevel, Cli, File):
node_names: List[str]
code: str = "I_NEED_A_CODE_5"
def message(self) -> str:
summary_nodes_str = ("\n - ").join(self.node_names[:3])
and_more_str = (
f"\n - and {len(self.node_names) - 3} more" if len(self.node_names) > 4 else ""
)
return (
f"\nSome tests were excluded because at least one parent is not selected. "
f"Use the --greedy flag to include them."
f"\n - {summary_nodes_str}{and_more_str}"
)
# TODO remove?? Not called outside of this file
@dataclass
class SelectorAlertAllUnusedNodes(DebugLevel, Cli, File):
node_names: List[str]
code: str = "I_NEED_A_CODE_6"
def message(self) -> str:
debug_nodes_str = ("\n - ").join(self.node_names)
return (
f"Full list of tests that were excluded:"
f"\n - {debug_nodes_str}"
)
@dataclass @dataclass
class SelectorReportInvalidSelector(InfoLevel, Cli, File): class SelectorReportInvalidSelector(InfoLevel, Cli, File):
@@ -542,7 +490,7 @@ class Rollback(DebugLevel, Cli, File):
@dataclass @dataclass
class CacheMiss(DebugLevel, Cli, File): class CacheMiss(DebugLevel, Cli, File):
conn_name: Any # TODO mypy says this is `Callable[[], str]`?? ¯\_(ツ)_/¯ conn_name: str
database: Optional[str] database: Optional[str]
schema: str schema: str
code: str = "E013" code: str = "E013"
@@ -558,12 +506,20 @@ class CacheMiss(DebugLevel, Cli, File):
class ListRelations(DebugLevel, Cli, File): class ListRelations(DebugLevel, Cli, File):
database: Optional[str] database: Optional[str]
schema: str schema: str
relations: List[BaseRelation] relations: List[_ReferenceKey]
code: str = "E014" code: str = "E014"
def message(self) -> str: def message(self) -> str:
return f"with database={self.database}, schema={self.schema}, relations={self.relations}" return f"with database={self.database}, schema={self.schema}, relations={self.relations}"
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
if type(v) == list:
d[k] = [str(x) for x in v]
return d
@dataclass @dataclass
class ConnectionUsed(DebugLevel, Cli, File): class ConnectionUsed(DebugLevel, Cli, File):
@@ -587,7 +543,7 @@ class SQLQuery(DebugLevel, Cli, File):
@dataclass @dataclass
class SQLQueryStatus(DebugLevel, Cli, File): class SQLQueryStatus(DebugLevel, Cli, File):
status: str # could include AdapterResponse if we resolve circular imports status: str
elapsed: float elapsed: float
code: str = "E017" code: str = "E017"
@@ -617,7 +573,7 @@ class ColTypeChange(DebugLevel, Cli, File):
@dataclass @dataclass
class SchemaCreation(DebugLevel, Cli, File): class SchemaCreation(DebugLevel, Cli, File):
relation: BaseRelation relation: _ReferenceKey
code: str = "E020" code: str = "E020"
def message(self) -> str: def message(self) -> str:
@@ -626,17 +582,21 @@ class SchemaCreation(DebugLevel, Cli, File):
@dataclass @dataclass
class SchemaDrop(DebugLevel, Cli, File): class SchemaDrop(DebugLevel, Cli, File):
relation: BaseRelation relation: _ReferenceKey
code: str = "E021" code: str = "E021"
def message(self) -> str: def message(self) -> str:
return f'Dropping schema "{self.relation}".' return f'Dropping schema "{self.relation}".'
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
# TODO pretty sure this is only ever called in dead code # TODO pretty sure this is only ever called in dead code
# see: core/dbt/adapters/cache.py _add_link vs add_link # see: core/dbt/adapters/cache.py _add_link vs add_link
@dataclass @dataclass
class UncachedRelation(DebugLevel, Cli, File): class UncachedRelation(DebugLevel, Cli, File, Cache):
dep_key: _ReferenceKey dep_key: _ReferenceKey
ref_key: _ReferenceKey ref_key: _ReferenceKey
code: str = "E022" code: str = "E022"
@@ -650,7 +610,7 @@ class UncachedRelation(DebugLevel, Cli, File):
@dataclass @dataclass
class AddLink(DebugLevel, Cli, File): class AddLink(DebugLevel, Cli, File, Cache):
dep_key: _ReferenceKey dep_key: _ReferenceKey
ref_key: _ReferenceKey ref_key: _ReferenceKey
code: str = "E023" code: str = "E023"
@@ -660,23 +620,16 @@ class AddLink(DebugLevel, Cli, File):
@dataclass @dataclass
class AddRelation(DebugLevel, Cli, File): class AddRelation(DebugLevel, Cli, File, Cache):
relation: _CachedRelation relation: _ReferenceKey
code: str = "E024" code: str = "E024"
def message(self) -> str: def message(self) -> str:
return f"Adding relation: {str(self.relation)}" return f"Adding relation: {str(self.relation)}"
# overriding default json serialization for this event
def fields_to_json(self, val: Any) -> Any:
if isinstance(val, _CachedRelation):
return str(val)
return val
@dataclass @dataclass
class DropMissingRelation(DebugLevel, Cli, File): class DropMissingRelation(DebugLevel, Cli, File, Cache):
relation: _ReferenceKey relation: _ReferenceKey
code: str = "E025" code: str = "E025"
@@ -685,7 +638,7 @@ class DropMissingRelation(DebugLevel, Cli, File):
@dataclass @dataclass
class DropCascade(DebugLevel, Cli, File): class DropCascade(DebugLevel, Cli, File, Cache):
dropped: _ReferenceKey dropped: _ReferenceKey
consequences: Set[_ReferenceKey] consequences: Set[_ReferenceKey]
code: str = "E026" code: str = "E026"
@@ -693,9 +646,19 @@ class DropCascade(DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"drop {self.dropped} is cascading to {self.consequences}" return f"drop {self.dropped} is cascading to {self.consequences}"
@classmethod
def asdict(cls, data: list) -> dict:
d = dict()
for k, v in data:
if isinstance(v, list):
d[k] = [str(x) for x in v]
else:
d[k] = str(v) # type: ignore
return d
@dataclass @dataclass
class DropRelation(DebugLevel, Cli, File): class DropRelation(DebugLevel, Cli, File, Cache):
dropped: _ReferenceKey dropped: _ReferenceKey
code: str = "E027" code: str = "E027"
@@ -704,7 +667,7 @@ class DropRelation(DebugLevel, Cli, File):
@dataclass @dataclass
class UpdateReference(DebugLevel, Cli, File): class UpdateReference(DebugLevel, Cli, File, Cache):
old_key: _ReferenceKey old_key: _ReferenceKey
new_key: _ReferenceKey new_key: _ReferenceKey
cached_key: _ReferenceKey cached_key: _ReferenceKey
@@ -716,7 +679,7 @@ class UpdateReference(DebugLevel, Cli, File):
@dataclass @dataclass
class TemporaryRelation(DebugLevel, Cli, File): class TemporaryRelation(DebugLevel, Cli, File, Cache):
key: _ReferenceKey key: _ReferenceKey
code: str = "E029" code: str = "E029"
@@ -725,7 +688,7 @@ class TemporaryRelation(DebugLevel, Cli, File):
@dataclass @dataclass
class RenameSchema(DebugLevel, Cli, File): class RenameSchema(DebugLevel, Cli, File, Cache):
old_key: _ReferenceKey old_key: _ReferenceKey
new_key: _ReferenceKey new_key: _ReferenceKey
code: str = "E030" code: str = "E030"
@@ -735,7 +698,7 @@ class RenameSchema(DebugLevel, Cli, File):
@dataclass @dataclass
class DumpBeforeAddGraph(DebugLevel, Cli, File): class DumpBeforeAddGraph(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway. # large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]] dump: Dict[str, List[str]]
code: str = "E031" code: str = "E031"
@@ -745,7 +708,7 @@ class DumpBeforeAddGraph(DebugLevel, Cli, File):
@dataclass @dataclass
class DumpAfterAddGraph(DebugLevel, Cli, File): class DumpAfterAddGraph(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway. # large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]] dump: Dict[str, List[str]]
code: str = "E032" code: str = "E032"
@@ -755,7 +718,7 @@ class DumpAfterAddGraph(DebugLevel, Cli, File):
@dataclass @dataclass
class DumpBeforeRenameSchema(DebugLevel, Cli, File): class DumpBeforeRenameSchema(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway. # large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]] dump: Dict[str, List[str]]
code: str = "E033" code: str = "E033"
@@ -765,7 +728,7 @@ class DumpBeforeRenameSchema(DebugLevel, Cli, File):
@dataclass @dataclass
class DumpAfterRenameSchema(DebugLevel, Cli, File): class DumpAfterRenameSchema(DebugLevel, Cli, File, Cache):
# large value. delay not necessary since every debug level message is logged anyway. # large value. delay not necessary since every debug level message is logged anyway.
dump: Dict[str, List[str]] dump: Dict[str, List[str]]
code: str = "E034" code: str = "E034"
@@ -782,11 +745,9 @@ class AdapterImportError(InfoLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"Error importing adapter: {self.exc}" return f"Error importing adapter: {self.exc}"
def fields_to_json(self, val: Any) -> Any: @classmethod
if val == self.exc: def asdict(cls, data: list) -> dict:
return str(val()) return dict((k, str(v)) for k, v in data)
return val
@dataclass @dataclass
@@ -842,12 +803,6 @@ class ProfileLoadError(ShowException, DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"Profile not loaded due to error: {self.exc}" return f"Profile not loaded due to error: {self.exc}"
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class ProfileNotFound(InfoLevel, Cli, File): class ProfileNotFound(InfoLevel, Cli, File):
@@ -863,85 +818,7 @@ class InvalidVarsYAML(ErrorLevel, Cli, File):
code: str = "A008" code: str = "A008"
def message(self) -> str: def message(self) -> str:
return "The YAML provided in the --vars argument is not valid.\n" return "The YAML provided in the --vars argument is not valid."
# TODO: Remove? (appears to be uncalled)
@dataclass
class CatchRunException(ShowException, DebugLevel, Cli, File):
build_path: Any
exc: Exception
code: str = "I_NEED_A_CODE_1"
def message(self) -> str:
INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If the \
error persists, open an issue at https://github.com/dbt-labs/dbt-core
""".strip()
prefix = f'Internal error executing {self.build_path}'
error = "{prefix}\n{error}\n\n{note}".format(
prefix=ui.red(prefix),
error=str(self.exc).strip(),
note=INTERNAL_ERROR_STRING
)
return error
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
# TODO: Remove? (appears to be uncalled)
@dataclass
class HandleInternalException(ShowException, DebugLevel, Cli, File):
exc: Exception
code: str = "I_NEED_A_CODE_2"
def message(self) -> str:
return str(self.exc)
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
# TODO: Remove? (appears to be uncalled)
@dataclass
class MessageHandleGenericException(ErrorLevel, Cli, File):
build_path: str
unique_id: str
exc: Exception
code: str = "I_NEED_A_CODE_3"
def message(self) -> str:
node_description = self.build_path
if node_description is None:
node_description = self.unique_id
prefix = "Unhandled error while executing {}".format(node_description)
return "{prefix}\n{error}".format(
prefix=ui.red(prefix),
error=str(self.exc).strip()
)
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
# TODO: Remove? (appears to be uncalled)
@dataclass
class DetailsHandleGenericException(ShowException, DebugLevel, Cli, File):
code: str = "I_NEED_A_CODE_4"
def message(self) -> str:
return ''
@dataclass @dataclass
@@ -1110,12 +987,6 @@ class ParsedFileLoadFailed(ShowException, DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"Failed to load parsed file from disk at {self.path}: {self.exc}" return f"Failed to load parsed file from disk at {self.path}: {self.exc}"
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class PartialParseSaveFileNotFound(InfoLevel, Cli, File): class PartialParseSaveFileNotFound(InfoLevel, Cli, File):
@@ -1329,12 +1200,6 @@ class RunningOperationCaughtError(ErrorLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f'Encountered an error while running operation: {self.exc}' return f'Encountered an error while running operation: {self.exc}'
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class RunningOperationUncaughtError(ErrorLevel, Cli, File): class RunningOperationUncaughtError(ErrorLevel, Cli, File):
@@ -1344,12 +1209,6 @@ class RunningOperationUncaughtError(ErrorLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f'Encountered an error while running operation: {self.exc}' return f'Encountered an error while running operation: {self.exc}'
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class DbtProjectError(ErrorLevel, Cli, File): class DbtProjectError(ErrorLevel, Cli, File):
@@ -1367,12 +1226,6 @@ class DbtProjectErrorException(ErrorLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f" ERROR: {str(self.exc)}" return f" ERROR: {str(self.exc)}"
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class DbtProfileError(ErrorLevel, Cli, File): class DbtProfileError(ErrorLevel, Cli, File):
@@ -1390,12 +1243,6 @@ class DbtProfileErrorException(ErrorLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f" ERROR: {str(self.exc)}" return f" ERROR: {str(self.exc)}"
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class ProfileListTitle(InfoLevel, Cli, File): class ProfileListTitle(InfoLevel, Cli, File):
@@ -1443,12 +1290,6 @@ class CatchableExceptionOnRun(ShowException, DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return str(self.exc) return str(self.exc)
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class InternalExceptionOnRun(DebugLevel, Cli, File): class InternalExceptionOnRun(DebugLevel, Cli, File):
@@ -1469,12 +1310,6 @@ the error persists, open an issue at https://github.com/dbt-labs/dbt-core
note=INTERNAL_ERROR_STRING note=INTERNAL_ERROR_STRING
) )
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
# This prints the stack trace at the debug level while allowing just the nice exception message # This prints the stack trace at the debug level while allowing just the nice exception message
# at the error level - or whatever other level chosen. Used in multiple places. # at the error level - or whatever other level chosen. Used in multiple places.
@@ -1488,9 +1323,9 @@ class PrintDebugStackTrace(ShowException, DebugLevel, Cli, File):
@dataclass @dataclass
class GenericExceptionOnRun(ErrorLevel, Cli, File): class GenericExceptionOnRun(ErrorLevel, Cli, File):
build_path: str build_path: Optional[str]
unique_id: str unique_id: str
exc: Exception exc: str # TODO: make this the actual exception once we have a better searilization strategy
code: str = "W004" code: str = "W004"
def message(self) -> str: def message(self) -> str:
@@ -1503,12 +1338,6 @@ class GenericExceptionOnRun(ErrorLevel, Cli, File):
error=str(self.exc).strip() error=str(self.exc).strip()
) )
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File): class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File):
@@ -1520,12 +1349,6 @@ class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File):
return ('Error releasing connection for node {}: {!s}' return ('Error releasing connection for node {}: {!s}'
.format(self.node_name, self.exc)) .format(self.node_name, self.exc))
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class CheckCleanPath(InfoLevel, Cli): class CheckCleanPath(InfoLevel, Cli):
@@ -1639,7 +1462,7 @@ class DepsNotifyUpdatesAvailable(InfoLevel, Cli, File):
code: str = "M019" code: str = "M019"
def message(self) -> str: def message(self) -> str:
return ('\nUpdates available for packages: {} \ return ('Updates available for packages: {} \
\nUpdate your versions in packages.yml, then run dbt deps'.format(self.packages)) \nUpdate your versions in packages.yml, then run dbt deps'.format(self.packages))
@@ -1756,7 +1579,7 @@ class ServingDocsExitInfo(InfoLevel, Cli, File):
code: str = "Z020" code: str = "Z020"
def message(self) -> str: def message(self) -> str:
return "Press Ctrl+C to exit.\n\n" return "Press Ctrl+C to exit."
@dataclass @dataclass
@@ -1807,7 +1630,7 @@ class StatsLine(InfoLevel, Cli, File):
code: str = "Z023" code: str = "Z023"
def message(self) -> str: def message(self) -> str:
stats_line = ("\nDone. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}") stats_line = ("Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}")
return stats_line.format(**self.stats) return stats_line.format(**self.stats)
@@ -1846,12 +1669,6 @@ class SQlRunnerException(ShowException, DebugLevel, Cli, File):
def message(self) -> str: def message(self) -> str:
return f"Got an exception: {self.exc}" return f"Got an exception: {self.exc}"
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class CheckNodeTestFailure(InfoLevel, Cli, File): class CheckNodeTestFailure(InfoLevel, Cli, File):
@@ -1910,7 +1727,7 @@ class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
index: int index: int
total: int total: int
report_node_data: ParsedModelNode report_node_data: ParsedModelNode
code: str = "Z031" code: str = "Q033"
def message(self) -> str: def message(self) -> str:
msg = f"START {self.description}" msg = f"START {self.description}"
@@ -1928,8 +1745,8 @@ class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
index: int index: int
total: int total: int
truncate: bool truncate: bool
report_node_data: Any # TODO use ParsedHookNode here report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Z032" code: str = "Q032"
def message(self) -> str: def message(self) -> str:
msg = f"START hook: {self.statement}" msg = f"START hook: {self.statement}"
@@ -1948,7 +1765,7 @@ class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
total: int total: int
execution_time: int execution_time: int
truncate: bool truncate: bool
report_node_data: Any # TODO use ParsedHookNode here report_node_data: Any # TODO: resolve ParsedHookNode circular import
code: str = "Q007" code: str = "Q007"
def message(self) -> str: def message(self) -> str:
@@ -1969,7 +1786,7 @@ class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
index: int index: int
total: int total: int
report_node_data: ParsedModelNode report_node_data: ParsedModelNode
code: str = "Z033" code: str = "Q034"
def message(self) -> str: def message(self) -> str:
if self.resource_type in NodeType.refable(): if self.resource_type in NodeType.refable():
@@ -2084,7 +1901,7 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
total: int total: int
execution_time: int execution_time: int
report_node_data: ParsedModelNode report_node_data: ParsedModelNode
code: str = "Z035" code: str = "Q035"
def message(self) -> str: def message(self) -> str:
info = "ERROR creating" info = "ERROR creating"
@@ -2322,6 +2139,10 @@ class NodeFinished(DebugLevel, Cli, File, NodeInfo):
def message(self) -> str: def message(self) -> str:
return f"Finished running node {self.unique_id}" return f"Finished running node {self.unique_id}"
@classmethod
def asdict(cls, data: list) -> dict:
return dict((k, str(v)) for k, v in data)
@dataclass @dataclass
class QueryCancelationUnsupported(InfoLevel, Cli, File): class QueryCancelationUnsupported(InfoLevel, Cli, File):
@@ -2337,11 +2158,12 @@ class QueryCancelationUnsupported(InfoLevel, Cli, File):
@dataclass @dataclass
class ConcurrencyLine(InfoLevel, Cli, File): class ConcurrencyLine(InfoLevel, Cli, File):
concurrency_line: str num_threads: int
target_name: str
code: str = "Q026" code: str = "Q026"
def message(self) -> str: def message(self) -> str:
return self.concurrency_line return f"Concurrency: {self.num_threads} threads (target='{self.target_name}')"
@dataclass @dataclass
@@ -2606,12 +2428,6 @@ class GeneralWarningException(WarnLevel, Cli, File):
return self.log_fmt.format(str(self.exc)) return self.log_fmt.format(str(self.exc))
return str(self.exc) return str(self.exc)
def fields_to_json(self, val: Any) -> Any:
if val == self.exc:
return str(val)
return val
@dataclass @dataclass
class EventBufferFull(WarnLevel, Cli, File): class EventBufferFull(WarnLevel, Cli, File):
@@ -2675,8 +2491,8 @@ if 1 == 0:
SQLQueryStatus(status="", elapsed=0.1) SQLQueryStatus(status="", elapsed=0.1)
SQLCommit(conn_name="") SQLCommit(conn_name="")
ColTypeChange(orig_type="", new_type="", table="") ColTypeChange(orig_type="", new_type="", table="")
SchemaCreation(relation=BaseRelation()) SchemaCreation(relation=_make_key(BaseRelation()))
SchemaDrop(relation=BaseRelation()) SchemaDrop(relation=_make_key(BaseRelation()))
UncachedRelation( UncachedRelation(
dep_key=_ReferenceKey(database="", schema="", identifier=""), dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""), ref_key=_ReferenceKey(database="", schema="", identifier=""),
@@ -2685,7 +2501,7 @@ if 1 == 0:
dep_key=_ReferenceKey(database="", schema="", identifier=""), dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""), ref_key=_ReferenceKey(database="", schema="", identifier=""),
) )
AddRelation(relation=_CachedRelation()) AddRelation(relation=_make_key(_CachedRelation()))
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")) DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier=""))
DropCascade( DropCascade(
dropped=_ReferenceKey(database="", schema="", identifier=""), dropped=_ReferenceKey(database="", schema="", identifier=""),
@@ -2708,8 +2524,6 @@ if 1 == 0:
AdapterImportError(ModuleNotFoundError()) AdapterImportError(ModuleNotFoundError())
PluginLoadError() PluginLoadError()
SystemReportReturnCode(returncode=0) SystemReportReturnCode(returncode=0)
SelectorAlertUpto3UnusedNodes(node_names=[])
SelectorAlertAllUnusedNodes(node_names=[])
NewConnectionOpening(connection_state='') NewConnectionOpening(connection_state='')
TimingInfoCollected() TimingInfoCollected()
MergedFromState(nbr_merged=0, sample=[]) MergedFromState(nbr_merged=0, sample=[])
@@ -2755,8 +2569,6 @@ if 1 == 0:
PartialParsingDeletedExposure(unique_id='') PartialParsingDeletedExposure(unique_id='')
InvalidDisabledSourceInTestNode(msg='') InvalidDisabledSourceInTestNode(msg='')
InvalidRefInTestNode(msg='') InvalidRefInTestNode(msg='')
MessageHandleGenericException(build_path='', unique_id='', exc=Exception(''))
DetailsHandleGenericException()
RunningOperationCaughtError(exc=Exception('')) RunningOperationCaughtError(exc=Exception(''))
RunningOperationUncaughtError(exc=Exception('')) RunningOperationUncaughtError(exc=Exception(''))
DbtProjectError() DbtProjectError()
@@ -2769,7 +2581,7 @@ if 1 == 0:
ProfileHelpMessage() ProfileHelpMessage()
CatchableExceptionOnRun(exc=Exception('')) CatchableExceptionOnRun(exc=Exception(''))
InternalExceptionOnRun(build_path='', exc=Exception('')) InternalExceptionOnRun(build_path='', exc=Exception(''))
GenericExceptionOnRun(build_path='', unique_id='', exc=Exception('')) GenericExceptionOnRun(build_path='', unique_id='', exc='')
NodeConnectionReleaseError(node_name='', exc=Exception('')) NodeConnectionReleaseError(node_name='', exc=Exception(''))
CheckCleanPath(path='') CheckCleanPath(path='')
ConfirmCleanPath(path='') ConfirmCleanPath(path='')
@@ -2952,7 +2764,7 @@ if 1 == 0:
NodeStart(report_node_data=ParsedModelNode(), unique_id='') NodeStart(report_node_data=ParsedModelNode(), unique_id='')
NodeFinished(report_node_data=ParsedModelNode(), unique_id='', run_result=RunResult()) NodeFinished(report_node_data=ParsedModelNode(), unique_id='', run_result=RunResult())
QueryCancelationUnsupported(type='') QueryCancelationUnsupported(type='')
ConcurrencyLine(concurrency_line='') ConcurrencyLine(num_threads=0, target_name='')
NodeCompiling(report_node_data=ParsedModelNode(), unique_id='') NodeCompiling(report_node_data=ParsedModelNode(), unique_id='')
NodeExecuting(report_node_data=ParsedModelNode(), unique_id='') NodeExecuting(report_node_data=ParsedModelNode(), unique_id='')
StarterProjectPath(dir='') StarterProjectPath(dir='')

View File

@@ -33,6 +33,8 @@ SEND_ANONYMOUS_USAGE_STATS = None
PRINTER_WIDTH = 80 PRINTER_WIDTH = 80
WHICH = None WHICH = None
INDIRECT_SELECTION = None INDIRECT_SELECTION = None
LOG_CACHE_EVENTS = None
EVENT_BUFFER_SIZE = 100000
# Global CLI defaults. These flags are set from three places: # Global CLI defaults. These flags are set from three places:
# CLI args, environment variables, and user_config (profiles.yml). # CLI args, environment variables, and user_config (profiles.yml).
@@ -51,7 +53,9 @@ flag_defaults = {
"FAIL_FAST": False, "FAIL_FAST": False,
"SEND_ANONYMOUS_USAGE_STATS": True, "SEND_ANONYMOUS_USAGE_STATS": True,
"PRINTER_WIDTH": 80, "PRINTER_WIDTH": 80,
"INDIRECT_SELECTION": 'eager' "INDIRECT_SELECTION": 'eager',
"LOG_CACHE_EVENTS": False,
"EVENT_BUFFER_SIZE": 100000
} }
@@ -99,7 +103,7 @@ def set_from_args(args, user_config):
USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \ USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \
USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \ USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \
VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \ VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \
WHICH WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE
STRICT_MODE = False # backwards compatibility STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option # cli args without user_config or env var option
@@ -122,6 +126,8 @@ def set_from_args(args, user_config):
SEND_ANONYMOUS_USAGE_STATS = get_flag_value('SEND_ANONYMOUS_USAGE_STATS', args, user_config) SEND_ANONYMOUS_USAGE_STATS = get_flag_value('SEND_ANONYMOUS_USAGE_STATS', args, user_config)
PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config) PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config)
INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config) INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config)
LOG_CACHE_EVENTS = get_flag_value('LOG_CACHE_EVENTS', args, user_config)
EVENT_BUFFER_SIZE = get_flag_value('EVENT_BUFFER_SIZE', args, user_config)
def get_flag_value(flag, args, user_config): def get_flag_value(flag, args, user_config):
@@ -134,7 +140,13 @@ def get_flag_value(flag, args, user_config):
if env_value is not None and env_value != '': if env_value is not None and env_value != '':
env_value = env_value.lower() env_value = env_value.lower()
# non Boolean values # non Boolean values
if flag in ['LOG_FORMAT', 'PRINTER_WIDTH', 'PROFILES_DIR', 'INDIRECT_SELECTION']: if flag in [
'LOG_FORMAT',
'PRINTER_WIDTH',
'PROFILES_DIR',
'INDIRECT_SELECTION',
'EVENT_BUFFER_SIZE'
]:
flag_value = env_value flag_value = env_value
else: else:
flag_value = env_set_bool(env_value) flag_value = env_set_bool(env_value)
@@ -142,7 +154,7 @@ def get_flag_value(flag, args, user_config):
flag_value = getattr(user_config, lc_flag) flag_value = getattr(user_config, lc_flag)
else: else:
flag_value = flag_defaults[flag] flag_value = flag_defaults[flag]
if flag == 'PRINTER_WIDTH': # printer_width must be an int or it hangs if flag in ['PRINTER_WIDTH', 'EVENT_BUFFER_SIZE']: # must be ints
flag_value = int(flag_value) flag_value = int(flag_value)
if flag == 'PROFILES_DIR': if flag == 'PROFILES_DIR':
flag_value = os.path.abspath(flag_value) flag_value = os.path.abspath(flag_value)
@@ -165,5 +177,7 @@ def get_flag_dict():
"fail_fast": FAIL_FAST, "fail_fast": FAIL_FAST,
"send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS, "send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS,
"printer_width": PRINTER_WIDTH, "printer_width": PRINTER_WIDTH,
"indirect_selection": INDIRECT_SELECTION "indirect_selection": INDIRECT_SELECTION,
"log_cache_events": LOG_CACHE_EVENTS,
"event_buffer_size": EVENT_BUFFER_SIZE
} }

View File

@@ -424,7 +424,7 @@ class DelayedFileHandler(logbook.RotatingFileHandler, FormatterMixin):
return return
make_log_dir_if_missing(log_dir) make_log_dir_if_missing(log_dir)
log_path = os.path.join(log_dir, 'dbt.log.old') # TODO hack for now log_path = os.path.join(log_dir, 'dbt.log.legacy') # TODO hack for now
self._super_init(log_path) self._super_init(log_path)
self._replay_buffered() self._replay_buffered()
self._log_path = log_path self._log_path = log_path

View File

@@ -221,24 +221,22 @@ def track_run(task):
def run_from_args(parsed): def run_from_args(parsed):
log_cache_events(getattr(parsed, 'log_cache_events', False)) log_cache_events(getattr(parsed, 'log_cache_events', False))
# we can now use the logger for stdout
# set log_format in the logger
# if 'list' task: set stdout to WARN instead of INFO
level_override = parsed.cls.pre_init_hook(parsed)
fire_event(MainReportVersion(v=str(dbt.version.installed)))
# this will convert DbtConfigErrors into RuntimeExceptions # this will convert DbtConfigErrors into RuntimeExceptions
# task could be any one of the task objects # task could be any one of the task objects
task = parsed.cls.from_args(args=parsed) task = parsed.cls.from_args(args=parsed)
fire_event(MainReportArgs(args=parsed))
# Set up logging
log_path = None log_path = None
if task.config is not None: if task.config is not None:
log_path = getattr(task.config, 'log_path', None) log_path = getattr(task.config, 'log_path', None)
# we can finally set the file logger up
log_manager.set_path(log_path) log_manager.set_path(log_path)
# if 'list' task: set stdout to WARN instead of INFO
level_override = parsed.cls.pre_init_hook(parsed)
setup_event_logger(log_path or 'logs', level_override) setup_event_logger(log_path or 'logs', level_override)
fire_event(MainReportVersion(v=str(dbt.version.installed)))
fire_event(MainReportArgs(args=parsed))
if dbt.tracking.active_user is not None: # mypy appeasement, always true if dbt.tracking.active_user is not None: # mypy appeasement, always true
fire_event(MainTrackingUserState(dbt.tracking.active_user.state())) fire_event(MainTrackingUserState(dbt.tracking.active_user.state()))
@@ -1078,6 +1076,14 @@ def parse_args(args, cls=DBTArgumentParser):
''' '''
) )
p.add_argument(
'--event-buffer-size',
dest='event_buffer_size',
help='''
Sets the max number of events to buffer in EVENT_HISTORY
'''
)
subs = p.add_subparsers(title="Available sub-commands") subs = p.add_subparsers(title="Available sub-commands")
base_subparser = _build_base_subparser() base_subparser = _build_base_subparser()

View File

@@ -960,10 +960,9 @@ class MacroPatchParser(NonSourceParser[UnparsedMacroUpdate, ParsedMacroPatch]):
unique_id = f'macro.{patch.package_name}.{patch.name}' unique_id = f'macro.{patch.package_name}.{patch.name}'
macro = self.manifest.macros.get(unique_id) macro = self.manifest.macros.get(unique_id)
if not macro: if not macro:
warn_or_error( msg = f'Found patch for macro "{patch.name}" ' \
f'WARNING: Found patch for macro "{patch.name}" '
f'which was not found' f'which was not found'
) warn_or_error(msg, log_fmt=warning_tag('{}'))
return return
if macro.patch_path: if macro.patch_path:
package_name, existing_file_path = macro.patch_path.split('://') package_name, existing_file_path = macro.patch_path.split('://')

View File

@@ -334,7 +334,7 @@ class BaseRunner(metaclass=ABCMeta):
GenericExceptionOnRun( GenericExceptionOnRun(
build_path=self.node.build_path, build_path=self.node.build_path,
unique_id=self.node.unique_id, unique_id=self.node.unique_id,
exc=e exc=str(e) # TODO: unstring this when serialization is fixed
) )
) )
fire_event(PrintDebugStackTrace()) fire_event(PrintDebugStackTrace())

View File

@@ -38,7 +38,7 @@ class CleanTask(BaseTask):
""" """
move_to_nearest_project_dir(self.args) move_to_nearest_project_dir(self.args)
if ('dbt_modules' in self.config.clean_targets and if ('dbt_modules' in self.config.clean_targets and
self.config.packages_install_path != 'dbt_modules'): self.config.packages_install_path not in self.config.clean_targets):
deprecations.warn('install-packages-path') deprecations.warn('install-packages-path')
for path in self.config.clean_targets: for path in self.config.clean_targets:
fire_event(CheckCleanPath(path=path)) fire_event(CheckCleanPath(path=path))

View File

@@ -10,7 +10,7 @@ from dbt.deps.resolver import resolve_packages
from dbt.events.functions import fire_event from dbt.events.functions import fire_event
from dbt.events.types import ( from dbt.events.types import (
DepsNoPackagesFound, DepsStartPackageInstall, DepsUpdateAvailable, DepsUTD, DepsNoPackagesFound, DepsStartPackageInstall, DepsUpdateAvailable, DepsUTD,
DepsInstallInfo, DepsListSubdirectory, DepsNotifyUpdatesAvailable DepsInstallInfo, DepsListSubdirectory, DepsNotifyUpdatesAvailable, EmptyLine
) )
from dbt.clients import system from dbt.clients import system
@@ -63,7 +63,7 @@ class DepsTask(BaseTask):
source_type = package.source_type() source_type = package.source_type()
version = package.get_version() version = package.get_version()
fire_event(DepsStartPackageInstall(package=package)) fire_event(DepsStartPackageInstall(package=package.nice_version_name()))
package.install(self.config, renderer) package.install(self.config, renderer)
fire_event(DepsInstallInfo(version_name=package.nice_version_name())) fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
if source_type == 'hub': if source_type == 'hub':
@@ -81,6 +81,7 @@ class DepsTask(BaseTask):
source_type=source_type, source_type=source_type,
version=version) version=version)
if packages_to_upgrade: if packages_to_upgrade:
fire_event(EmptyLine())
fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade)) fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
@classmethod @classmethod

View File

@@ -65,6 +65,8 @@ def print_run_status_line(results) -> None:
stats[result_type] += 1 stats[result_type] += 1
stats['total'] += 1 stats['total'] += 1
with TextOnly():
fire_event(EmptyLine())
fire_event(StatsLine(stats=stats)) fire_event(StatsLine(stats=stats))

View File

@@ -11,7 +11,7 @@ from .printer import (
print_run_end_messages, print_run_end_messages,
get_counts, get_counts,
) )
from datetime import datetime
from dbt import tracking from dbt import tracking
from dbt import utils from dbt import utils
from dbt.adapters.base import BaseRelation from dbt.adapters.base import BaseRelation
@@ -21,7 +21,7 @@ from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.graph.model_config import Hook from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.exceptions import ( from dbt.exceptions import (
CompilationException, CompilationException,
InternalException, InternalException,
@@ -342,6 +342,8 @@ class RunTask(CompileTask):
finishctx = TimestampNamed('node_finished_at') finishctx = TimestampNamed('node_finished_at')
for idx, hook in enumerate(ordered_hooks, start=1): for idx, hook in enumerate(ordered_hooks, start=1):
hook._event_status['started_at'] = datetime.utcnow().isoformat()
hook._event_status['node_status'] = RunningStatus.Started
sql = self.get_hook_sql(adapter, hook, idx, num_hooks, sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
extra_context) extra_context)
@@ -360,19 +362,21 @@ class RunTask(CompileTask):
) )
) )
status = 'OK'
with Timer() as timer: with Timer() as timer:
if len(sql.strip()) > 0: if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False, response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
fetch=False) status = response._message
self.ran_hooks.append(hook) else:
status = 'OK'
self.ran_hooks.append(hook)
hook._event_status['finished_at'] = datetime.utcnow().isoformat()
with finishctx, DbtModelState({'node_status': 'passed'}): with finishctx, DbtModelState({'node_status': 'passed'}):
hook._event_status['node_status'] = RunStatus.Success
fire_event( fire_event(
PrintHookEndLine( PrintHookEndLine(
statement=hook_text, statement=hook_text,
status=str(status), status=status,
index=idx, index=idx,
total=num_hooks, total=num_hooks,
execution_time=timer.elapsed, execution_time=timer.elapsed,
@@ -380,6 +384,11 @@ class RunTask(CompileTask):
report_node_data=hook report_node_data=hook
) )
) )
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
del hook._event_status["started_at"]
del hook._event_status["finished_at"]
del hook._event_status["node_status"]
self._total_executed += len(ordered_hooks) self._total_executed += len(ordered_hooks)

View File

@@ -56,6 +56,7 @@ from dbt.parser.manifest import ManifestLoader
import dbt.exceptions import dbt.exceptions
from dbt import flags from dbt import flags
import dbt.utils import dbt.utils
from dbt.ui import warning_tag
RESULT_FILE_NAME = 'run_results.json' RESULT_FILE_NAME = 'run_results.json'
MANIFEST_FILE_NAME = 'manifest.json' MANIFEST_FILE_NAME = 'manifest.json'
@@ -208,7 +209,7 @@ class GraphRunnableTask(ManifestTask):
with RUNNING_STATE, uid_context: with RUNNING_STATE, uid_context:
startctx = TimestampNamed('node_started_at') startctx = TimestampNamed('node_started_at')
index = self.index_offset(runner.node_index) index = self.index_offset(runner.node_index)
runner.node._event_status['dbt_internal__started_at'] = datetime.utcnow().isoformat() runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
runner.node._event_status['node_status'] = RunningStatus.Started runner.node._event_status['node_status'] = RunningStatus.Started
extended_metadata = ModelMetadata(runner.node, index) extended_metadata = ModelMetadata(runner.node, index)
@@ -224,8 +225,7 @@ class GraphRunnableTask(ManifestTask):
result = runner.run_with_hooks(self.manifest) result = runner.run_with_hooks(self.manifest)
status = runner.get_result_status(result) status = runner.get_result_status(result)
runner.node._event_status['node_status'] = result.status runner.node._event_status['node_status'] = result.status
runner.node._event_status['dbt_internal__finished_at'] = \ runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
datetime.utcnow().isoformat()
finally: finally:
finishctx = TimestampNamed('finished_at') finishctx = TimestampNamed('finished_at')
with finishctx, DbtModelState(status): with finishctx, DbtModelState(status):
@@ -238,8 +238,8 @@ class GraphRunnableTask(ManifestTask):
) )
# `_event_status` dict is only used for logging. Make sure # `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it # it gets deleted when we're done with it
del runner.node._event_status["dbt_internal__started_at"] del runner.node._event_status["started_at"]
del runner.node._event_status["dbt_internal__finished_at"] del runner.node._event_status["finished_at"]
del runner.node._event_status["node_status"] del runner.node._event_status["node_status"]
fail_fast = flags.FAIL_FAST fail_fast = flags.FAIL_FAST
@@ -377,10 +377,8 @@ class GraphRunnableTask(ManifestTask):
num_threads = self.config.threads num_threads = self.config.threads
target_name = self.config.target_name target_name = self.config.target_name
text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
with NodeCount(self.num_nodes): with NodeCount(self.num_nodes):
fire_event(ConcurrencyLine(concurrency_line=concurrency_line)) fire_event(ConcurrencyLine(num_threads=num_threads, target_name=target_name))
with TextOnly(): with TextOnly():
fire_event(EmptyLine()) fire_event(EmptyLine())
@@ -461,8 +459,11 @@ class GraphRunnableTask(ManifestTask):
) )
if len(self._flattened_nodes) == 0: if len(self._flattened_nodes) == 0:
warn_or_error("\nWARNING: Nothing to do. Try checking your model " with TextOnly():
"configs and model specification args") fire_event(EmptyLine())
msg = "Nothing to do. Try checking your model " \
"configs and model specification args"
warn_or_error(msg, log_fmt=warning_tag('{}'))
result = self.get_result( result = self.get_result(
results=[], results=[],
generated_at=datetime.utcnow(), generated_at=datetime.utcnow(),

View File

@@ -6,7 +6,7 @@ from dbt.include.global_project import DOCS_INDEX_FILE_PATH
from http.server import SimpleHTTPRequestHandler from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer from socketserver import TCPServer
from dbt.events.functions import fire_event from dbt.events.functions import fire_event
from dbt.events.types import ServingDocsPort, ServingDocsAccessInfo, ServingDocsExitInfo from dbt.events.types import ServingDocsPort, ServingDocsAccessInfo, ServingDocsExitInfo, EmptyLine
from dbt.task.base import ConfiguredTask from dbt.task.base import ConfiguredTask
@@ -22,6 +22,8 @@ class ServeTask(ConfiguredTask):
fire_event(ServingDocsPort(address=address, port=port)) fire_event(ServingDocsPort(address=address, port=port))
fire_event(ServingDocsAccessInfo(port=port)) fire_event(ServingDocsAccessInfo(port=port))
fire_event(EmptyLine())
fire_event(EmptyLine())
fire_event(ServingDocsExitInfo()) fire_event(ServingDocsExitInfo())
# mypy doesn't think SimpleHTTPRequestHandler is ok here, but it is # mypy doesn't think SimpleHTTPRequestHandler is ok here, but it is

View File

@@ -66,6 +66,4 @@ def line_wrap_message(
def warning_tag(msg: str) -> str: def warning_tag(msg: str) -> str:
# no longer needed, since new logging includes colorized log level return f'[{yellow("WARNING")}]: {msg}'
# return f'[{yellow("WARNING")}]: {msg}'
return msg

View File

@@ -96,5 +96,5 @@ def _get_dbt_plugins_info():
yield plugin_name, mod.version yield plugin_name, mod.version
__version__ = '1.0.0rc3' __version__ = '1.0.0'
installed = get_installed_version() installed = get_installed_version()

View File

@@ -284,12 +284,12 @@ def parse_args(argv=None):
parser.add_argument('adapter') parser.add_argument('adapter')
parser.add_argument('--title-case', '-t', default=None) parser.add_argument('--title-case', '-t', default=None)
parser.add_argument('--dependency', action='append') parser.add_argument('--dependency', action='append')
parser.add_argument('--dbt-core-version', default='1.0.0rc3') parser.add_argument('--dbt-core-version', default='1.0.0')
parser.add_argument('--email') parser.add_argument('--email')
parser.add_argument('--author') parser.add_argument('--author')
parser.add_argument('--url') parser.add_argument('--url')
parser.add_argument('--sql', action='store_true') parser.add_argument('--sql', action='store_true')
parser.add_argument('--package-version', default='1.0.0rc3') parser.add_argument('--package-version', default='1.0.0')
parser.add_argument('--project-version', default='1.0') parser.add_argument('--project-version', default='1.0')
parser.add_argument( parser.add_argument(
'--no-dependency', action='store_false', dest='set_dependency' '--no-dependency', action='store_false', dest='set_dependency'

View File

@@ -25,7 +25,7 @@ with open(os.path.join(this_directory, 'README.md')) as f:
package_name = "dbt-core" package_name = "dbt-core"
package_version = "1.0.0rc3" package_version = "1.0.0"
description = """With dbt, data analysts and engineers can build analytics \ description = """With dbt, data analysts and engineers can build analytics \
the way engineers build applications.""" the way engineers build applications."""

View File

@@ -6,9 +6,9 @@ cffi==1.15.0
charset-normalizer==2.0.8 charset-normalizer==2.0.8
click==8.0.3 click==8.0.3
colorama==0.4.4 colorama==0.4.4
dbt-core==1.0.0rc3 dbt-core==1.0.0
dbt-extractor==0.4.0 dbt-extractor==0.4.0
dbt-postgres==1.0.0rc3 dbt-postgres==1.0.0
future==0.18.2 future==0.18.2
hologram==0.0.14 hologram==0.0.14
idna==3.3 idna==3.3

View File

@@ -1 +1 @@
version = '1.0.0rc3' version = '1.0.0'

View File

@@ -41,7 +41,7 @@ def _dbt_psycopg2_name():
package_name = "dbt-postgres" package_name = "dbt-postgres"
package_version = "1.0.0rc3" package_version = "1.0.0"
description = """The postgres adpter plugin for dbt (data build tool)""" description = """The postgres adpter plugin for dbt (data build tool)"""
this_directory = os.path.abspath(os.path.dirname(__file__)) this_directory = os.path.abspath(os.path.dirname(__file__))

View File

@@ -50,7 +50,7 @@ with open(os.path.join(this_directory, 'README.md')) as f:
package_name = "dbt" package_name = "dbt"
package_version = "1.0.0rc3" package_version = "1.0.0"
description = """With dbt, data analysts and engineers can build analytics \ description = """With dbt, data analysts and engineers can build analytics \
the way engineers build applications.""" the way engineers build applications."""

View File

@@ -43,7 +43,7 @@ class TestConfigPathDeprecation(BaseTestDeprecations):
with self.assertRaises(dbt.exceptions.CompilationException) as exc: with self.assertRaises(dbt.exceptions.CompilationException) as exc:
self.run_dbt(['--warn-error', 'debug']) self.run_dbt(['--warn-error', 'debug'])
exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace
expected = "The `data-paths` config has been deprecated" expected = "The `data-paths` config has been renamed"
assert expected in exc_str assert expected in exc_str

View File

@@ -41,6 +41,14 @@ def temporary_working_directory() -> str:
out : str out : str
The temporary working directory. The temporary working directory.
""" """
# N.B: supressing the OSError is necessary for older (pre 3.10) versions of python
# which do not support the `ignore_cleanup_errors` in tempfile::TemporaryDirectory.
# See: https://github.com/python/cpython/pull/24793
#
# In our case the cleanup is redundent since windows handles clearing
# Appdata/Local/Temp at the os level anyway.
with contextlib.suppress(OSError):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
with change_working_directory(tmpdir): with change_working_directory(tmpdir):
yield tmpdir yield tmpdir

View File

@@ -34,15 +34,3 @@ class TestStatements(DBTIntegrationTest):
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertTablesEqual("statement_actual", "statement_expected") self.assertTablesEqual("statement_actual", "statement_expected")
@use_profile("presto")
def test_presto_statements(self):
self.use_default_project({"seed-paths": [self.dir("seed")]})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 2)
results = self.run_dbt()
self.assertEqual(len(results), 1)
self.assertTablesEqual("statement_actual", "statement_expected")

204
test/interop/log_parsing/Cargo.lock generated Normal file
View File

@@ -0,0 +1,204 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"winapi",
]
[[package]]
name = "itoa"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "libc"
version = "0.2.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
[[package]]
name = "log_parsing"
version = "0.1.0"
dependencies = [
"chrono",
"serde",
"serde_json",
"walkdir",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "proc-macro2"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43"
dependencies = [
"unicode-xid",
]
[[package]]
name = "quote"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "serde"
version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "syn"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi",
"winapi",
]
[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

View File

@@ -0,0 +1,10 @@
[package]
name = "log_parsing"
version = "0.1.0"
edition = "2018"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
chrono = { version = "0.4", features = ["serde"] }
walkdir = "2"

View File

@@ -0,0 +1,260 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::env;
use std::error::Error;
use std::fs::File;
use std::io::{self, BufRead};
use walkdir::WalkDir;
// Applies schema tests to file input
// if these fail, we either have a problem in dbt that needs to be resolved
// or we have changed our interface and the log_version should be bumped in dbt,
// modeled appropriately here, and publish new docs for the new log_version.
fn main() -> Result<(), Box<dyn Error>> {
let log_name = "dbt.log";
let path = env::var("LOG_DIR").expect("must pass absolute log path to tests with env var `LOG_DIR=/logs/live/here/`");
println!("Looking for files named `{}` in {}", log_name, path);
let lines: Vec<String> = get_input(&path, log_name)?;
println!("collected {} log lines.", lines.len());
println!("");
println!("testing type-level schema compliance by deserializing each line...");
let log_lines: Vec<LogLine> = deserialized_input(&lines)
.map_err(|e| format!("schema test failure: json doesn't match type definition\n{}", e))?;
println!("Done.");
println!("");
println!("because we skip non-json log lines, there are {} collected values to test.", log_lines.len());
println!("");
// make sure when we read a string in then output it back to a string the two strings
// contain all the same key-value pairs.
println!("testing serialization loop to make sure all key-value pairs are accounted for");
test_deserialize_serialize_is_unchanged(&lines);
println!("Done.");
println!("");
// make sure each log_line contains the values we expect
println!("testing that the field values in each log line are expected");
for log_line in log_lines {
log_line.value_test()
}
println!("Done.");
Ok(())
}
// each nested type of LogLine should define its own value_test function
// that asserts values are within an expected set of values when possible.
trait ValueTest {
fn value_test(&self) -> ();
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct LogLine {
log_version: isize,
r#type: String,
code: String,
#[serde(with = "custom_date_format")]
ts: DateTime<Utc>,
pid: isize,
msg: String,
level: String,
invocation_id: String,
thread_name: String,
data: serde_json::Value, // TODO be more specific
node_info: serde_json::Value, // TODO be more specific
}
impl ValueTest for LogLine {
fn value_test(&self){
assert_eq!(
self.log_version, 1,
"The log version changed. Be sure this was intentional."
);
assert_eq!(
self.r#type,
"log_line".to_owned(),
"The type value has changed. If this is intentional, bump the log version"
);
assert!(
["debug", "info", "warn", "error"]
.iter()
.any(|level| **level == self.level),
"log level had unexpected value {}",
self.level
);
}
}
// logs output timestamps like this: "2021-11-30T12:31:04.312814Z"
// which is so close to the default except for the decimal.
// this requires handling the date with "%Y-%m-%dT%H:%M:%S%.6f" which requires this
// boilerplate-looking module.
mod custom_date_format {
use chrono::{NaiveDateTime, DateTime, Utc};
use serde::{self, Deserialize, Deserializer, Serializer};
const FORMAT: &'static str = "%Y-%m-%dT%H:%M:%S%.6fZ";
pub fn serialize<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = format!("{}", date.format(FORMAT));
serializer.serialize_str(&s)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(DateTime::<Utc>::from_utc(NaiveDateTime::parse_from_str(&s, FORMAT).map_err(serde::de::Error::custom)?, Utc))
}
}
// finds all files in any subdirectory of this path with this name. returns the contents
// of each file line by line as one continuous structure. No distinction between files.
fn get_input(path: &str, file_name: &str) -> Result<Vec<String>, String> {
WalkDir::new(path)
.follow_links(true)
.into_iter()
// filters out all the exceptions encountered on this walk silently
.filter_map(|e| e.ok())
// walks through each file and returns the contents if the filename matches
.filter_map(|e| {
let f_name = e.file_name().to_string_lossy();
if f_name.ends_with(file_name) {
let contents = File::open(e.path())
.map_err(|e| format!("Something went wrong opening the log file {}\n{}", f_name, e))
.and_then(|file| {
io::BufReader::new(file)
.lines()
.map(|l| {
l.map_err(|e| format!("Something went wrong reading lines of the log file {}\n{}", f_name, e))
})
.collect::<Result<Vec<String>, String>>()
});
Some(contents)
} else {
None
}
})
.collect::<Result<Vec<Vec<String>>, String>>()
.map(|vv| vv.concat())
}
// attemps to deserialize the strings into LogLines. If the string isn't valid
// json it skips it instead of failing. This is so that any tests that generate
// non-json logs won't break the schema test.
fn deserialized_input(log_lines: &[String]) -> serde_json::Result<Vec<LogLine>> {
log_lines
.into_iter()
// if the log line isn't valid json format, toss it
.filter(|log_line| serde_json::from_str::<serde_json::Value>(log_line).is_ok())
// attempt to deserialize into our LogLine type
.map(|log_line| serde_json::from_str::<LogLine>(log_line))
.collect()
}
// turn a String into a LogLine and back into a String returning both Strings so
// they can be compared
fn deserialize_serialize_loop(
log_lines: &[String],
) -> serde_json::Result<Vec<(String, String)>> {
log_lines
.into_iter()
.map(|log_line| {
serde_json::from_str::<LogLine>(log_line).and_then(|parsed| {
serde_json::to_string(&parsed).map(|json| (log_line.clone(), json))
})
})
.collect()
}
// make sure when we read a string in then output it back to a string the two strings
// contain all the same key-value pairs.
fn test_deserialize_serialize_is_unchanged(lines: &[String]) {
let objects: Result<Vec<(serde_json::Value, serde_json::Value)>, serde_json::Error> =
deserialize_serialize_loop(lines).and_then(|v| {
v.into_iter()
.map(|(s0, s1)| {
serde_json::from_str::<serde_json::Value>(&s0).and_then(|s0v| {
serde_json::from_str::<serde_json::Value>(&s1).map(|s1v| (s0v, s1v))
})
})
.collect()
});
match objects {
Err(e) => assert!(false, "{}", e),
Ok(v) => {
for pair in v {
match pair {
(
serde_json::Value::Object(original),
serde_json::Value::Object(looped),
) => {
// looping through each key of each json value gives us meaningful failure messages
// instead of "this big string" != "this other big string"
for (key, value) in original.clone() {
let looped_val = looped.get(&key);
assert_eq!(
looped_val,
Some(&value),
"original key value ({}, {}) expected in re-serialized result",
key,
value
)
}
for (key, value) in looped.clone() {
let original_val = original.get(&key);
assert_eq!(
original_val,
Some(&value),
"looped key value ({}, {}) not found in original result",
key,
value
)
}
}
_ => assert!(false, "not comparing json objects"),
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::*;
const LOG_LINE: &str = r#"{"code": "Z023", "data": {"stats": {"error": 0, "pass": 3, "skip": 0, "total": 3, "warn": 0}}, "invocation_id": "f1e1557c-4f9d-4053-bb50-572cbbf2ca64", "level": "info", "log_version": 1, "msg": "Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3", "node_info": {}, "pid": 75854, "thread_name": "MainThread", "ts": "2021-12-03T01:32:38.334601Z", "type": "log_line"}"#;
#[test]
fn test_basic_loop() {
assert!(deserialize_serialize_loop(&[LOG_LINE.to_owned()]).is_ok())
}
#[test]
fn test_values() {
assert!(deserialized_input(&[LOG_LINE.to_owned()]).map(|v| {
v.into_iter().map(|ll| ll.value_test())
}).is_ok())
}
#[test]
fn test_values_loop() {
test_deserialize_serialize_is_unchanged(&[LOG_LINE.to_owned()]);
}
}

View File

@@ -1,19 +1,20 @@
from dbt import events
from dbt.events.functions import EVENT_HISTORY, fire_event
from dbt.events.test_types import UnitTestInfo from dbt.events.test_types import UnitTestInfo
from argparse import Namespace from argparse import Namespace
from dbt.events import AdapterLogger from dbt.events import AdapterLogger
from dbt.events.functions import event_to_serializable_dict from dbt.events.functions import event_to_serializable_dict
from dbt.events.types import * from dbt.events.types import *
from dbt.events.test_types import * from dbt.events.test_types import *
from dbt.events.base_types import Event, Node from dbt.events.base_types import Event
from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
from importlib import reload
import dbt.events.functions as event_funcs
import dbt.flags as flags
import inspect import inspect
import json import json
import datetime
from unittest import TestCase from unittest import TestCase
from dbt.contracts.graph.parsed import ( from dbt.contracts.graph.parsed import (
ParsedModelNode, NodeConfig, DependsOn, ParsedMacro ParsedModelNode, NodeConfig, DependsOn
) )
from dbt.contracts.files import FileHash from dbt.contracts.files import FileHash
@@ -88,29 +89,29 @@ class TestEventCodes(TestCase):
class TestEventBuffer(TestCase): class TestEventBuffer(TestCase):
def setUp(self) -> None:
flags.EVENT_BUFFER_SIZE = 10
reload(event_funcs)
# ensure events are populated to the buffer exactly once # ensure events are populated to the buffer exactly once
def test_buffer_populates(self): def test_buffer_populates(self):
fire_event(UnitTestInfo(msg="Test Event 1")) event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
fire_event(UnitTestInfo(msg="Test Event 2")) event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
self.assertTrue( self.assertTrue(
EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1 event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
) )
# ensure events drop from the front of the buffer when buffer maxsize is reached # ensure events drop from the front of the buffer when buffer maxsize is reached
# TODO commenting out till we can make this not spit out 100k log lines. def test_buffer_FIFOs(self):
# def test_buffer_FIFOs(self): for n in range(0,(flags.EVENT_BUFFER_SIZE + 1)):
# for n in range(0,100001): event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
# fire_event(UnitTestInfo(msg=f"Test Event {n}"))
# self.assertTrue(
# EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
# )
# self.assertTrue(
# EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
# )
def dump_callable():
return dict()
self.assertTrue(
event_funcs.EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
)
self.assertTrue(
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
)
def MockNode(): def MockNode():
return ParsedModelNode( return ParsedModelNode(
@@ -195,8 +196,8 @@ sample_values = [
SQLQueryStatus(status="", elapsed=0.1), SQLQueryStatus(status="", elapsed=0.1),
SQLCommit(conn_name=""), SQLCommit(conn_name=""),
ColTypeChange(orig_type="", new_type="", table=""), ColTypeChange(orig_type="", new_type="", table=""),
SchemaCreation(relation=BaseRelation()), SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier="")),
SchemaDrop(relation=BaseRelation()), SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier="")),
UncachedRelation( UncachedRelation(
dep_key=_ReferenceKey(database="", schema="", identifier=""), dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""), ref_key=_ReferenceKey(database="", schema="", identifier=""),
@@ -205,7 +206,7 @@ sample_values = [
dep_key=_ReferenceKey(database="", schema="", identifier=""), dep_key=_ReferenceKey(database="", schema="", identifier=""),
ref_key=_ReferenceKey(database="", schema="", identifier=""), ref_key=_ReferenceKey(database="", schema="", identifier=""),
), ),
AddRelation(relation=_CachedRelation()), AddRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")), DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
DropCascade( DropCascade(
dropped=_ReferenceKey(database="", schema="", identifier=""), dropped=_ReferenceKey(database="", schema="", identifier=""),
@@ -221,15 +222,13 @@ sample_values = [
old_key=_ReferenceKey(database="", schema="", identifier=""), old_key=_ReferenceKey(database="", schema="", identifier=""),
new_key=_ReferenceKey(database="", schema="", identifier="") new_key=_ReferenceKey(database="", schema="", identifier="")
), ),
DumpBeforeAddGraph(dump_callable), DumpBeforeAddGraph(dict()),
DumpAfterAddGraph(dump_callable), DumpAfterAddGraph(dict()),
DumpBeforeRenameSchema(dump_callable), DumpBeforeRenameSchema(dict()),
DumpAfterRenameSchema(dump_callable), DumpAfterRenameSchema(dict()),
AdapterImportError(ModuleNotFoundError()), AdapterImportError(ModuleNotFoundError()),
PluginLoadError(), PluginLoadError(),
SystemReportReturnCode(returncode=0), SystemReportReturnCode(returncode=0),
SelectorAlertUpto3UnusedNodes(node_names=[]),
SelectorAlertAllUnusedNodes(node_names=[]),
NewConnectionOpening(connection_state=''), NewConnectionOpening(connection_state=''),
TimingInfoCollected(), TimingInfoCollected(),
MergedFromState(nbr_merged=0, sample=[]), MergedFromState(nbr_merged=0, sample=[]),
@@ -275,8 +274,6 @@ sample_values = [
PartialParsingDeletedExposure(unique_id=''), PartialParsingDeletedExposure(unique_id=''),
InvalidDisabledSourceInTestNode(msg=''), InvalidDisabledSourceInTestNode(msg=''),
InvalidRefInTestNode(msg=''), InvalidRefInTestNode(msg=''),
MessageHandleGenericException(build_path='', unique_id='', exc=Exception('')),
DetailsHandleGenericException(),
RunningOperationCaughtError(exc=Exception('')), RunningOperationCaughtError(exc=Exception('')),
RunningOperationUncaughtError(exc=Exception('')), RunningOperationUncaughtError(exc=Exception('')),
DbtProjectError(), DbtProjectError(),
@@ -289,7 +286,7 @@ sample_values = [
ProfileHelpMessage(), ProfileHelpMessage(),
CatchableExceptionOnRun(exc=Exception('')), CatchableExceptionOnRun(exc=Exception('')),
InternalExceptionOnRun(build_path='', exc=Exception('')), InternalExceptionOnRun(build_path='', exc=Exception('')),
GenericExceptionOnRun(build_path='', unique_id='', exc=Exception('')), GenericExceptionOnRun(build_path='', unique_id='', exc=''),
NodeConnectionReleaseError(node_name='', exc=Exception('')), NodeConnectionReleaseError(node_name='', exc=Exception('')),
CheckCleanPath(path=''), CheckCleanPath(path=''),
ConfirmCleanPath(path=''), ConfirmCleanPath(path=''),
@@ -359,7 +356,7 @@ sample_values = [
NodeExecuting(unique_id='', report_node_data=MockNode()), NodeExecuting(unique_id='', report_node_data=MockNode()),
NodeFinished(unique_id='', report_node_data=MockNode(), run_result=''), NodeFinished(unique_id='', report_node_data=MockNode(), run_result=''),
QueryCancelationUnsupported(type=''), QueryCancelationUnsupported(type=''),
ConcurrencyLine(concurrency_line=''), ConcurrencyLine(num_threads=0, target_name=''),
StarterProjectPath(dir=''), StarterProjectPath(dir=''),
ConfigFolderDirectory(dir=''), ConfigFolderDirectory(dir=''),
NoSampleProfileFound(adapter=''), NoSampleProfileFound(adapter=''),
@@ -395,8 +392,6 @@ sample_values = [
MainReportArgs(Namespace()), MainReportArgs(Namespace()),
RegistryProgressMakingGETRequest(''), RegistryProgressMakingGETRequest(''),
DepsUTD(), DepsUTD(),
CatchRunException('', Exception('')),
HandleInternalException(Exception('')),
PartialParsingNotEnabled(), PartialParsingNotEnabled(),
SQlRunnerException(Exception('')), SQlRunnerException(Exception('')),
DropRelation(''), DropRelation(''),
@@ -429,7 +424,7 @@ class TestEventJSONSerialization(TestCase):
# if we have everything we need to test, try to serialize everything # if we have everything we need to test, try to serialize everything
for event in sample_values: for event in sample_values:
d = event_to_serializable_dict(event, lambda dt: dt.isoformat(), lambda x: x.message()) d = event_to_serializable_dict(event, lambda _: event.get_ts_rfc3339(), lambda x: x.message())
try: try:
json.dumps(d) json.dumps(d)
except TypeError as e: except TypeError as e: