mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 18:11:28 +00:00
Compare commits
24 Commits
jerco/upda
...
v1.0.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec1f609f3e | ||
|
|
b4ea003559 | ||
|
|
23e1a9aa4f | ||
|
|
9882d08a24 | ||
|
|
79cc811a68 | ||
|
|
c82572f745 | ||
|
|
42a38e4deb | ||
|
|
ecf0ffe68c | ||
|
|
e9f26ef494 | ||
|
|
c77dc59af8 | ||
|
|
a5ebe4ff59 | ||
|
|
5c01f9006c | ||
|
|
c92e1ed9f2 | ||
|
|
85dee41a9f | ||
|
|
a4456feff0 | ||
|
|
8d27764b0f | ||
|
|
e56256d968 | ||
|
|
86cb3ba6fa | ||
|
|
4d0d2d0d6f | ||
|
|
f8a3c27fb8 | ||
|
|
30f05b0213 | ||
|
|
f1bebb3629 | ||
|
|
e7a40345ad | ||
|
|
ba94b8212c |
@@ -1,5 +1,5 @@
|
|||||||
[bumpversion]
|
[bumpversion]
|
||||||
current_version = 1.0.0rc3
|
current_version = 1.0.0
|
||||||
parse = (?P<major>\d+)
|
parse = (?P<major>\d+)
|
||||||
\.(?P<minor>\d+)
|
\.(?P<minor>\d+)
|
||||||
\.(?P<patch>\d+)
|
\.(?P<patch>\d+)
|
||||||
|
|||||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -95,7 +95,9 @@ jobs:
|
|||||||
- uses: actions/upload-artifact@v2
|
- uses: actions/upload-artifact@v2
|
||||||
with:
|
with:
|
||||||
name: dist
|
name: dist
|
||||||
path: dist/
|
path: |
|
||||||
|
dist/
|
||||||
|
!dist/dbt-${{github.event.inputs.version_number}}.tar.gz
|
||||||
|
|
||||||
test-build:
|
test-build:
|
||||||
name: verify packages
|
name: verify packages
|
||||||
|
|||||||
71
.github/workflows/structured-logging-schema-check.yml
vendored
Normal file
71
.github/workflows/structured-logging-schema-check.yml
vendored
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# This Action checks makes a dbt run to sample json structured logs
|
||||||
|
# and checks that they conform to the currently documented schema.
|
||||||
|
#
|
||||||
|
# If this action fails it either means we have unintentionally deviated
|
||||||
|
# from our documented structured logging schema, or we need to bump the
|
||||||
|
# version of our structured logging and add new documentation to
|
||||||
|
# communicate these changes.
|
||||||
|
|
||||||
|
|
||||||
|
name: Structured Logging Schema Check
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- "main"
|
||||||
|
- "*.latest"
|
||||||
|
- "releases/*"
|
||||||
|
pull_request:
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
permissions: read-all
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
# run the performance measurements on the current or default branch
|
||||||
|
test-schema:
|
||||||
|
name: Test Log Schema
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
env:
|
||||||
|
# turns warnings into errors
|
||||||
|
RUSTFLAGS: "-D warnings"
|
||||||
|
# points tests to the log file
|
||||||
|
LOG_DIR: "/home/runner/work/dbt-core/dbt-core/logs"
|
||||||
|
# tells integration tests to output into json format
|
||||||
|
DBT_LOG_FORMAT: 'json'
|
||||||
|
steps:
|
||||||
|
|
||||||
|
- name: checkout dev
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
with:
|
||||||
|
persist-credentials: false
|
||||||
|
|
||||||
|
- name: Setup Python
|
||||||
|
uses: actions/setup-python@v2.2.2
|
||||||
|
with:
|
||||||
|
python-version: "3.8"
|
||||||
|
|
||||||
|
- uses: actions-rs/toolchain@v1
|
||||||
|
with:
|
||||||
|
profile: minimal
|
||||||
|
toolchain: stable
|
||||||
|
override: true
|
||||||
|
|
||||||
|
- name: install dbt
|
||||||
|
run: pip install -r dev-requirements.txt -r editable-requirements.txt
|
||||||
|
|
||||||
|
- name: Set up postgres
|
||||||
|
uses: ./.github/actions/setup-postgres-linux
|
||||||
|
|
||||||
|
- name: ls
|
||||||
|
run: ls
|
||||||
|
|
||||||
|
# integration tests generate a ton of logs in different files. the next step will find them all.
|
||||||
|
# we actually care if these pass, because the normal test run doesn't usually include many json log outputs
|
||||||
|
- name: Run integration tests
|
||||||
|
run: tox -e py38-postgres -- -nauto
|
||||||
|
|
||||||
|
# apply our schema tests to every log event from the previous step
|
||||||
|
# skips any output that isn't valid json
|
||||||
|
- uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: run
|
||||||
|
args: --manifest-path test/interop/log_parsing/Cargo.toml
|
||||||
26
CHANGELOG.md
26
CHANGELOG.md
@@ -1,4 +1,28 @@
|
|||||||
## dbt-core 1.0.0 (Release TBD)
|
## dbt-core 1.0.0 (December 3, 2021)
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
- Configure the CLI logger destination to use stdout instead of stderr ([#4368](https://github.com/dbt-labs/dbt-core/pull/4368))
|
||||||
|
- Make the size of `EVENT_HISTORY` configurable, via `EVENT_BUFFER_SIZE` global config ([#4411](https://github.com/dbt-labs/dbt-core/pull/4411), [#4416](https://github.com/dbt-labs/dbt-core/pull/4416))
|
||||||
|
- Change type of `log_format` in `profiles.yml` user config to be string, not boolean ([#4394](https://github.com/dbt-labs/dbt-core/pull/4394))
|
||||||
|
|
||||||
|
### Under the hood
|
||||||
|
- Only log cache events if `LOG_CACHE_EVENTS` is enabled, and disable by default. This restores previous behavior ([#4369](https://github.com/dbt-labs/dbt-core/pull/4369))
|
||||||
|
- Move event codes to be a top-level attribute of JSON-formatted logs, rather than nested in `data` ([#4381](https://github.com/dbt-labs/dbt-core/pull/4381))
|
||||||
|
- Fix failing integration test on Windows ([#4380](https://github.com/dbt-labs/dbt-core/pull/4380))
|
||||||
|
- Clean up warning messages for `clean` + `deps` ([#4366](https://github.com/dbt-labs/dbt-core/pull/4366))
|
||||||
|
- Use RFC3339 timestamps for log messages ([#4384](https://github.com/dbt-labs/dbt-core/pull/4384))
|
||||||
|
- Different text output for console (info) and file (debug) logs ([#4379](https://github.com/dbt-labs/dbt-core/pull/4379), [#4418](https://github.com/dbt-labs/dbt-core/pull/4418))
|
||||||
|
- Remove unused events. More structured `ConcurrencyLine`. Replace `\n` message starts/ends with `EmptyLine` events, and exclude `EmptyLine` from JSON-formatted output ([#4388](https://github.com/dbt-labs/dbt-core/pull/4388))
|
||||||
|
- Update `events` module README ([#4395](https://github.com/dbt-labs/dbt-core/pull/4395))
|
||||||
|
- Rework approach to JSON serialization for events with non-standard properties ([#4396](https://github.com/dbt-labs/dbt-core/pull/4396))
|
||||||
|
- Update legacy logger file name to `dbt.log.legacy` ([#4402](https://github.com/dbt-labs/dbt-core/pull/4402))
|
||||||
|
- Rollover `dbt.log` at 10 MB, and keep up to 5 backups, restoring previous behavior ([#4405](https://github.com/dbt-labs/dbt-core/pull/4405))
|
||||||
|
- Use reference keys instead of full relation objects in cache events ([#4410](https://github.com/dbt-labs/dbt-core/pull/4410))
|
||||||
|
- Add `node_type` contextual info to more events ([#4378](https://github.com/dbt-labs/dbt-core/pull/4378))
|
||||||
|
- Make `materialized` config optional in `node_type` ([#4417](https://github.com/dbt-labs/dbt-core/pull/4417))
|
||||||
|
- Stringify exception in `GenericExceptionOnRun` to support JSON serialization ([#4424](https://github.com/dbt-labs/dbt-core/pull/4424))
|
||||||
|
- Add "interop" tests for machine consumption of structured log output ([#4327](https://github.com/dbt-labs/dbt-core/pull/4327))
|
||||||
|
- Relax version specifier for `dbt-extractor` to `~=0.4.0`, to support compiled wheels for additional architectures when available ([#4427](https://github.com/dbt-labs/dbt-core/pull/4427))
|
||||||
|
|
||||||
## dbt-core 1.0.0rc3 (November 30, 2021)
|
## dbt-core 1.0.0rc3 (November 30, 2021)
|
||||||
|
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ from dbt.adapters.base.relation import (
|
|||||||
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
|
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
|
||||||
)
|
)
|
||||||
from dbt.adapters.base import Column as BaseColumn
|
from dbt.adapters.base import Column as BaseColumn
|
||||||
from dbt.adapters.cache import RelationsCache
|
from dbt.adapters.cache import RelationsCache, _make_key
|
||||||
|
|
||||||
|
|
||||||
SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
|
SeedModel = Union[ParsedSeedNode, CompiledSeedNode]
|
||||||
@@ -291,7 +291,7 @@ class BaseAdapter(metaclass=AdapterMeta):
|
|||||||
if (database, schema) not in self.cache:
|
if (database, schema) not in self.cache:
|
||||||
fire_event(
|
fire_event(
|
||||||
CacheMiss(
|
CacheMiss(
|
||||||
conn_name=self.nice_connection_name,
|
conn_name=self.nice_connection_name(),
|
||||||
database=database,
|
database=database,
|
||||||
schema=schema
|
schema=schema
|
||||||
)
|
)
|
||||||
@@ -676,7 +676,11 @@ class BaseAdapter(metaclass=AdapterMeta):
|
|||||||
relations = self.list_relations_without_caching(
|
relations = self.list_relations_without_caching(
|
||||||
schema_relation
|
schema_relation
|
||||||
)
|
)
|
||||||
fire_event(ListRelations(database=database, schema=schema, relations=relations))
|
fire_event(ListRelations(
|
||||||
|
database=database,
|
||||||
|
schema=schema,
|
||||||
|
relations=[_make_key(x) for x in relations]
|
||||||
|
))
|
||||||
|
|
||||||
return relations
|
return relations
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
import threading
|
import threading
|
||||||
from collections import namedtuple
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||||
|
|
||||||
|
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
|
||||||
import dbt.exceptions
|
import dbt.exceptions
|
||||||
from dbt.events.functions import fire_event
|
from dbt.events.functions import fire_event
|
||||||
from dbt.events.types import (
|
from dbt.events.types import (
|
||||||
@@ -22,18 +22,6 @@ from dbt.events.types import (
|
|||||||
)
|
)
|
||||||
from dbt.utils import lowercase
|
from dbt.utils import lowercase
|
||||||
|
|
||||||
_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')
|
|
||||||
|
|
||||||
|
|
||||||
def _make_key(relation) -> _ReferenceKey:
|
|
||||||
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
|
|
||||||
to keep track of quoting
|
|
||||||
"""
|
|
||||||
# databases and schemas can both be None
|
|
||||||
return _ReferenceKey(lowercase(relation.database),
|
|
||||||
lowercase(relation.schema),
|
|
||||||
lowercase(relation.identifier))
|
|
||||||
|
|
||||||
|
|
||||||
def dot_separated(key: _ReferenceKey) -> str:
|
def dot_separated(key: _ReferenceKey) -> str:
|
||||||
"""Return the key in dot-separated string form.
|
"""Return the key in dot-separated string form.
|
||||||
@@ -334,7 +322,7 @@ class RelationsCache:
|
|||||||
:param BaseRelation relation: The underlying relation.
|
:param BaseRelation relation: The underlying relation.
|
||||||
"""
|
"""
|
||||||
cached = _CachedRelation(relation)
|
cached = _CachedRelation(relation)
|
||||||
fire_event(AddRelation(relation=cached))
|
fire_event(AddRelation(relation=_make_key(cached)))
|
||||||
fire_event(DumpBeforeAddGraph(dump=self.dump_graph()))
|
fire_event(DumpBeforeAddGraph(dump=self.dump_graph()))
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
|||||||
24
core/dbt/adapters/reference_keys.py
Normal file
24
core/dbt/adapters/reference_keys.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
# this module exists to resolve circular imports with the events module
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
_ReferenceKey = namedtuple('_ReferenceKey', 'database schema identifier')
|
||||||
|
|
||||||
|
|
||||||
|
def lowercase(value: Optional[str]) -> Optional[str]:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return value.lower()
|
||||||
|
|
||||||
|
|
||||||
|
def _make_key(relation) -> _ReferenceKey:
|
||||||
|
"""Make _ReferenceKeys with lowercase values for the cache so we don't have
|
||||||
|
to keep track of quoting
|
||||||
|
"""
|
||||||
|
# databases and schemas can both be None
|
||||||
|
return _ReferenceKey(lowercase(relation.database),
|
||||||
|
lowercase(relation.schema),
|
||||||
|
lowercase(relation.identifier))
|
||||||
@@ -75,7 +75,8 @@ class SQLConnectionManager(BaseConnectionManager):
|
|||||||
|
|
||||||
fire_event(
|
fire_event(
|
||||||
SQLQueryStatus(
|
SQLQueryStatus(
|
||||||
status=str(self.get_response(cursor)), elapsed=round((time.time() - pre), 2)
|
status=self.get_response(cursor)._message,
|
||||||
|
elapsed=round((time.time() - pre), 2)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import dbt.clients.agate_helper
|
|||||||
from dbt.contracts.connection import Connection
|
from dbt.contracts.connection import Connection
|
||||||
import dbt.exceptions
|
import dbt.exceptions
|
||||||
from dbt.adapters.base import BaseAdapter, available
|
from dbt.adapters.base import BaseAdapter, available
|
||||||
|
from dbt.adapters.cache import _make_key
|
||||||
from dbt.adapters.sql import SQLConnectionManager
|
from dbt.adapters.sql import SQLConnectionManager
|
||||||
from dbt.events.functions import fire_event
|
from dbt.events.functions import fire_event
|
||||||
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
|
from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
|
||||||
@@ -182,7 +183,7 @@ class SQLAdapter(BaseAdapter):
|
|||||||
|
|
||||||
def create_schema(self, relation: BaseRelation) -> None:
|
def create_schema(self, relation: BaseRelation) -> None:
|
||||||
relation = relation.without_identifier()
|
relation = relation.without_identifier()
|
||||||
fire_event(SchemaCreation(relation=relation))
|
fire_event(SchemaCreation(relation=_make_key(relation)))
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'relation': relation,
|
'relation': relation,
|
||||||
}
|
}
|
||||||
@@ -193,7 +194,7 @@ class SQLAdapter(BaseAdapter):
|
|||||||
|
|
||||||
def drop_schema(self, relation: BaseRelation) -> None:
|
def drop_schema(self, relation: BaseRelation) -> None:
|
||||||
relation = relation.without_identifier()
|
relation = relation.without_identifier()
|
||||||
fire_event(SchemaDrop(relation=relation))
|
fire_event(SchemaDrop(relation=_make_key(relation)))
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'relation': relation,
|
'relation': relation,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -545,8 +545,12 @@ class UnsetProfileConfig(RuntimeConfig):
|
|||||||
args, profile_renderer, profile_name
|
args, profile_renderer, profile_name
|
||||||
)
|
)
|
||||||
except (DbtProjectError, DbtProfileError) as exc:
|
except (DbtProjectError, DbtProfileError) as exc:
|
||||||
|
selected_profile_name = Profile.pick_profile_name(
|
||||||
|
args_profile_name=getattr(args, 'profile', None),
|
||||||
|
project_profile_name=profile_name
|
||||||
|
)
|
||||||
fire_event(ProfileLoadError(exc=exc))
|
fire_event(ProfileLoadError(exc=exc))
|
||||||
fire_event(ProfileNotFound(profile_name=profile_name))
|
fire_event(ProfileNotFound(profile_name=selected_profile_name))
|
||||||
# return the poisoned form
|
# return the poisoned form
|
||||||
profile = UnsetProfile()
|
profile = UnsetProfile()
|
||||||
# disable anonymous usage statistics
|
# disable anonymous usage statistics
|
||||||
|
|||||||
@@ -231,7 +231,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
|
|||||||
printer_width: Optional[int] = None
|
printer_width: Optional[int] = None
|
||||||
write_json: Optional[bool] = None
|
write_json: Optional[bool] = None
|
||||||
warn_error: Optional[bool] = None
|
warn_error: Optional[bool] = None
|
||||||
log_format: Optional[bool] = None
|
log_format: Optional[str] = None
|
||||||
debug: Optional[bool] = None
|
debug: Optional[bool] = None
|
||||||
version_check: Optional[bool] = None
|
version_check: Optional[bool] = None
|
||||||
fail_fast: Optional[bool] = None
|
fail_fast: Optional[bool] = None
|
||||||
|
|||||||
@@ -36,9 +36,9 @@ class DBTDeprecation:
|
|||||||
if self.name not in active_deprecations:
|
if self.name not in active_deprecations:
|
||||||
desc = self.description.format(**kwargs)
|
desc = self.description.format(**kwargs)
|
||||||
msg = ui.line_wrap_message(
|
msg = ui.line_wrap_message(
|
||||||
desc, prefix='* Deprecation Warning:\n\n'
|
desc, prefix='Deprecated functionality\n\n'
|
||||||
)
|
)
|
||||||
dbt.exceptions.warn_or_error(msg)
|
dbt.exceptions.warn_or_error(msg, log_fmt=ui.warning_tag('{}'))
|
||||||
self.track_deprecation_warn()
|
self.track_deprecation_warn()
|
||||||
active_deprecations.add(self.name)
|
active_deprecations.add(self.name)
|
||||||
|
|
||||||
@@ -62,7 +62,7 @@ class PackageInstallPathDeprecation(DBTDeprecation):
|
|||||||
|
|
||||||
class ConfigPathDeprecation(DBTDeprecation):
|
class ConfigPathDeprecation(DBTDeprecation):
|
||||||
_description = '''\
|
_description = '''\
|
||||||
The `{deprecated_path}` config has been deprecated in favor of `{exp_path}`.
|
The `{deprecated_path}` config has been renamed to `{exp_path}`.
|
||||||
Please update your `dbt_project.yml` configuration to reflect this change.
|
Please update your `dbt_project.yml` configuration to reflect this change.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,53 @@ The Events module is the implmentation for structured logging. These events repr
|
|||||||
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
|
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `events.functions::fire_event` is the entry point to the module from everywhere in dbt.
|
||||||
|
|
||||||
# Adding a New Event
|
# Adding a New Event
|
||||||
In `events.types` add a new class that represents the new event. This may be a simple class with no values, or it may be a dataclass with some values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `CliEventABC`) as well as the loglevel this event belongs to.
|
In `events.types` add a new class that represents the new event. All events must be a dataclass with, at minimum, a code. You may also include some other values to construct downstream messaging. Only include the data necessary to construct this message within this class. You must extend all destinations (e.g. - if your log message belongs on the cli, extend `Cli`) as well as the loglevel this event belongs to. This system has been designed to take full advantage of mypy so running it will catch anything you may miss.
|
||||||
|
|
||||||
|
## Required for Every Event
|
||||||
|
|
||||||
|
- a string attribute `code`, that's unique across events
|
||||||
|
- assign a log level by extending `DebugLevel`, `InfoLevel`, `WarnLevel`, or `ErrorLevel`
|
||||||
|
- a message()
|
||||||
|
- extend `File` and/or `Cli` based on where it should output
|
||||||
|
|
||||||
|
Example
|
||||||
|
```
|
||||||
|
@dataclass
|
||||||
|
class PartialParsingDeletedExposure(DebugLevel, Cli, File):
|
||||||
|
unique_id: str
|
||||||
|
code: str = "I049"
|
||||||
|
|
||||||
|
def message(self) -> str:
|
||||||
|
return f"Partial parsing: deleted exposure {self.unique_id}"
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Optional (based on your event)
|
||||||
|
|
||||||
|
- Events associated with node status changes must have `report_node_data` passed in and be extended with `NodeInfo`
|
||||||
|
- define `asdict` if your data is not serializable to json
|
||||||
|
|
||||||
|
Example
|
||||||
|
```
|
||||||
|
@dataclass
|
||||||
|
class SuperImportantNodeEvent(InfoLevel, File, NodeInfo):
|
||||||
|
node_name: str
|
||||||
|
run_result: RunResult
|
||||||
|
report_node_data: ParsedModelNode # may vary
|
||||||
|
code: str = "Q036"
|
||||||
|
|
||||||
|
def message(self) -> str:
|
||||||
|
return f"{self.node_name} had overly verbose result of {run_result}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
return dict((k, str(v)) for k, v in data)
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
All values other than `code` and `report_node_data` will be included in the `data` node of the json log output.
|
||||||
|
|
||||||
|
Once your event has been added, add a dummy call to your new event at the bottom of `types.py` and also add your new Event to the list `sample_values` in `test/unit/test_events.py'.
|
||||||
|
|
||||||
# Adapter Maintainers
|
# Adapter Maintainers
|
||||||
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:
|
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
from abc import ABCMeta, abstractmethod, abstractproperty
|
from abc import ABCMeta, abstractmethod, abstractproperty
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
@@ -38,6 +37,11 @@ class ErrorLevel():
|
|||||||
return "error"
|
return "error"
|
||||||
|
|
||||||
|
|
||||||
|
class Cache():
|
||||||
|
# Events with this class will only be logged when the `--log-cache-events` flag is passed
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Node():
|
class Node():
|
||||||
node_path: str
|
node_path: str
|
||||||
@@ -70,6 +74,7 @@ class Event(metaclass=ABCMeta):
|
|||||||
# fields that should be on all events with their default implementations
|
# fields that should be on all events with their default implementations
|
||||||
log_version: int = 1
|
log_version: int = 1
|
||||||
ts: Optional[datetime] = None # use getter for non-optional
|
ts: Optional[datetime] = None # use getter for non-optional
|
||||||
|
ts_rfc3339: Optional[str] = None # use getter for non-optional
|
||||||
pid: Optional[int] = None # use getter for non-optional
|
pid: Optional[int] = None # use getter for non-optional
|
||||||
node_info: Optional[Node]
|
node_info: Optional[Node]
|
||||||
|
|
||||||
@@ -91,32 +96,20 @@ class Event(metaclass=ABCMeta):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
raise Exception("msg not implemented for Event")
|
raise Exception("msg not implemented for Event")
|
||||||
|
|
||||||
# override this method to convert non-json serializable fields to json.
|
|
||||||
# for override examples, see existing concrete types.
|
|
||||||
#
|
|
||||||
# there is no type-level mechanism to have mypy enforce json serializability, so we just try
|
|
||||||
# to serialize and raise an exception at runtime when that fails. This safety mechanism
|
|
||||||
# only works if we have attempted to serialize every concrete event type in our tests.
|
|
||||||
def fields_to_json(self, field_value: Any) -> Any:
|
|
||||||
try:
|
|
||||||
json.dumps(field_value, sort_keys=True)
|
|
||||||
return field_value
|
|
||||||
except TypeError:
|
|
||||||
val_type = type(field_value).__name__
|
|
||||||
event_type = type(self).__name__
|
|
||||||
return Exception(
|
|
||||||
f"type {val_type} is not serializable to json."
|
|
||||||
f" First make sure that the call sites for {event_type} match the type hints"
|
|
||||||
f" and if they do, you can override Event::fields_to_json in {event_type} in"
|
|
||||||
" types.py to define your own serialization function to any valid json type"
|
|
||||||
)
|
|
||||||
|
|
||||||
# exactly one time stamp per concrete event
|
# exactly one time stamp per concrete event
|
||||||
def get_ts(self) -> datetime:
|
def get_ts(self) -> datetime:
|
||||||
if not self.ts:
|
if not self.ts:
|
||||||
self.ts = datetime.now()
|
self.ts = datetime.utcnow()
|
||||||
|
self.ts_rfc3339 = self.ts.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||||
return self.ts
|
return self.ts
|
||||||
|
|
||||||
|
# preformatted time stamp
|
||||||
|
def get_ts_rfc3339(self) -> str:
|
||||||
|
if not self.ts_rfc3339:
|
||||||
|
# get_ts() creates the formatted string too so all time logic is centralized
|
||||||
|
self.get_ts()
|
||||||
|
return self.ts_rfc3339 # type: ignore
|
||||||
|
|
||||||
# exactly one pid per concrete event
|
# exactly one pid per concrete event
|
||||||
def get_pid(self) -> int:
|
def get_pid(self) -> int:
|
||||||
if not self.pid:
|
if not self.pid:
|
||||||
@@ -132,6 +125,21 @@ class Event(metaclass=ABCMeta):
|
|||||||
from dbt.events.functions import get_invocation_id
|
from dbt.events.functions import get_invocation_id
|
||||||
return get_invocation_id()
|
return get_invocation_id()
|
||||||
|
|
||||||
|
# default dict factory for all events. can override on concrete classes.
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
d = dict()
|
||||||
|
for k, v in data:
|
||||||
|
# stringify all exceptions
|
||||||
|
if isinstance(v, Exception) or isinstance(v, BaseException):
|
||||||
|
d[k] = str(v)
|
||||||
|
# skip all binary data
|
||||||
|
elif isinstance(v, bytes):
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
d[k] = v
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
@dataclass # type: ignore
|
@dataclass # type: ignore
|
||||||
class NodeInfo(Event, metaclass=ABCMeta):
|
class NodeInfo(Event, metaclass=ABCMeta):
|
||||||
@@ -143,7 +151,7 @@ class NodeInfo(Event, metaclass=ABCMeta):
|
|||||||
node_name=self.report_node_data.name,
|
node_name=self.report_node_data.name,
|
||||||
unique_id=self.report_node_data.unique_id,
|
unique_id=self.report_node_data.unique_id,
|
||||||
resource_type=self.report_node_data.resource_type.value,
|
resource_type=self.report_node_data.resource_type.value,
|
||||||
materialized=self.report_node_data.config.materialized,
|
materialized=self.report_node_data.config.get('materialized'),
|
||||||
node_status=str(self.report_node_data._event_status.get('node_status')),
|
node_status=str(self.report_node_data._event_status.get('node_status')),
|
||||||
node_started_at=self.report_node_data._event_status.get("started_at"),
|
node_started_at=self.report_node_data._event_status.get("started_at"),
|
||||||
node_finished_at=self.report_node_data._event_status.get("finished_at")
|
node_finished_at=self.report_node_data._event_status.get("finished_at")
|
||||||
|
|||||||
@@ -2,8 +2,8 @@
|
|||||||
from colorama import Style
|
from colorama import Style
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import dbt.events.functions as this # don't worry I hate it too.
|
import dbt.events.functions as this # don't worry I hate it too.
|
||||||
from dbt.events.base_types import Cli, Event, File, ShowException, NodeInfo
|
from dbt.events.base_types import Cli, Event, File, ShowException, NodeInfo, Cache
|
||||||
from dbt.events.types import EventBufferFull, T_Event
|
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
|
||||||
import dbt.flags as flags
|
import dbt.flags as flags
|
||||||
# TODO this will need to move eventually
|
# TODO this will need to move eventually
|
||||||
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER
|
from dbt.logger import SECRET_ENV_PREFIX, make_log_dir_if_missing, GLOBAL_LOGGER
|
||||||
@@ -13,19 +13,21 @@ from io import StringIO, TextIOWrapper
|
|||||||
import logbook
|
import logbook
|
||||||
import logging
|
import logging
|
||||||
from logging import Logger
|
from logging import Logger
|
||||||
|
import sys
|
||||||
from logging.handlers import RotatingFileHandler
|
from logging.handlers import RotatingFileHandler
|
||||||
import os
|
import os
|
||||||
import uuid
|
import uuid
|
||||||
|
import threading
|
||||||
from typing import Any, Callable, Dict, List, Optional, Union
|
from typing import Any, Callable, Dict, List, Optional, Union
|
||||||
import dataclasses
|
import dataclasses
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
# create the global event history buffer with a max size of 100k records
|
# create the global event history buffer with the default max size (10k)
|
||||||
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
|
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
|
||||||
# TODO: make the maxlen something configurable from the command line via args(?)
|
# TODO the flags module has not yet been resolved when this is created
|
||||||
global EVENT_HISTORY
|
global EVENT_HISTORY
|
||||||
EVENT_HISTORY = deque(maxlen=100000) # type: ignore
|
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
|
||||||
|
|
||||||
# create the global file logger with no configuration
|
# create the global file logger with no configuration
|
||||||
global FILE_LOG
|
global FILE_LOG
|
||||||
@@ -38,7 +40,7 @@ FILE_LOG.addHandler(null_handler)
|
|||||||
global STDOUT_LOG
|
global STDOUT_LOG
|
||||||
STDOUT_LOG = logging.getLogger('default_stdout')
|
STDOUT_LOG = logging.getLogger('default_stdout')
|
||||||
STDOUT_LOG.setLevel(logging.INFO)
|
STDOUT_LOG.setLevel(logging.INFO)
|
||||||
stdout_handler = logging.StreamHandler()
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||||
stdout_handler.setLevel(logging.INFO)
|
stdout_handler.setLevel(logging.INFO)
|
||||||
STDOUT_LOG.addHandler(stdout_handler)
|
STDOUT_LOG.addHandler(stdout_handler)
|
||||||
|
|
||||||
@@ -48,6 +50,10 @@ invocation_id: Optional[str] = None
|
|||||||
|
|
||||||
|
|
||||||
def setup_event_logger(log_path, level_override=None):
|
def setup_event_logger(log_path, level_override=None):
|
||||||
|
# flags have been resolved, and log_path is known
|
||||||
|
global EVENT_HISTORY
|
||||||
|
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
|
||||||
|
|
||||||
make_log_dir_if_missing(log_path)
|
make_log_dir_if_missing(log_path)
|
||||||
this.format_json = flags.LOG_FORMAT == 'json'
|
this.format_json = flags.LOG_FORMAT == 'json'
|
||||||
# USE_COLORS can be None if the app just started and the cli flags
|
# USE_COLORS can be None if the app just started and the cli flags
|
||||||
@@ -64,7 +70,7 @@ def setup_event_logger(log_path, level_override=None):
|
|||||||
FORMAT = "%(message)s"
|
FORMAT = "%(message)s"
|
||||||
stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT)
|
stdout_passthrough_formatter = logging.Formatter(fmt=FORMAT)
|
||||||
|
|
||||||
stdout_handler = logging.StreamHandler()
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||||
stdout_handler.setFormatter(stdout_passthrough_formatter)
|
stdout_handler.setFormatter(stdout_passthrough_formatter)
|
||||||
stdout_handler.setLevel(level)
|
stdout_handler.setLevel(level)
|
||||||
# clear existing stdout TextIOWrapper stream handlers
|
# clear existing stdout TextIOWrapper stream handlers
|
||||||
@@ -80,7 +86,12 @@ def setup_event_logger(log_path, level_override=None):
|
|||||||
|
|
||||||
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)
|
file_passthrough_formatter = logging.Formatter(fmt=FORMAT)
|
||||||
|
|
||||||
file_handler = RotatingFileHandler(filename=log_dest, encoding='utf8')
|
file_handler = RotatingFileHandler(
|
||||||
|
filename=log_dest,
|
||||||
|
encoding='utf8',
|
||||||
|
maxBytes=10 * 1024 * 1024, # 10 mb
|
||||||
|
backupCount=5
|
||||||
|
)
|
||||||
file_handler.setFormatter(file_passthrough_formatter)
|
file_handler.setFormatter(file_passthrough_formatter)
|
||||||
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
|
file_handler.setLevel(logging.DEBUG) # always debug regardless of user input
|
||||||
this.FILE_LOG.handlers.clear()
|
this.FILE_LOG.handlers.clear()
|
||||||
@@ -130,17 +141,25 @@ def event_to_serializable_dict(
|
|||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
data = dict()
|
data = dict()
|
||||||
node_info = dict()
|
node_info = dict()
|
||||||
if hasattr(e, '__dataclass_fields__'):
|
log_line = dict()
|
||||||
for field, value in dataclasses.asdict(e).items(): # type: ignore[attr-defined]
|
try:
|
||||||
_json_value = e.fields_to_json(value)
|
log_line = dataclasses.asdict(e, dict_factory=type(e).asdict)
|
||||||
|
except AttributeError:
|
||||||
|
event_type = type(e).__name__
|
||||||
|
raise Exception( # TODO this may hang async threads
|
||||||
|
f"type {event_type} is not serializable to json."
|
||||||
|
f" First make sure that the call sites for {event_type} match the type hints"
|
||||||
|
f" and if they do, you can override the dataclass method `asdict` in {event_type} in"
|
||||||
|
" types.py to define your own serialization function to a dictionary of valid json"
|
||||||
|
" types"
|
||||||
|
)
|
||||||
|
|
||||||
if isinstance(e, NodeInfo):
|
if isinstance(e, NodeInfo):
|
||||||
node_info = dataclasses.asdict(e.get_node_info())
|
node_info = dataclasses.asdict(e.get_node_info())
|
||||||
|
|
||||||
if not isinstance(_json_value, Exception):
|
for field, value in log_line.items(): # type: ignore[attr-defined]
|
||||||
data[field] = _json_value
|
if field not in ["code", "report_node_data"]:
|
||||||
else:
|
data[field] = value
|
||||||
data[field] = f"JSON_SERIALIZE_FAILED: {type(value).__name__, 'NA'}"
|
|
||||||
|
|
||||||
event_dict = {
|
event_dict = {
|
||||||
'type': 'log_line',
|
'type': 'log_line',
|
||||||
@@ -152,7 +171,8 @@ def event_to_serializable_dict(
|
|||||||
'data': data,
|
'data': data,
|
||||||
'invocation_id': e.get_invocation_id(),
|
'invocation_id': e.get_invocation_id(),
|
||||||
'thread_name': e.get_thread_name(),
|
'thread_name': e.get_thread_name(),
|
||||||
'node_info': node_info
|
'node_info': node_info,
|
||||||
|
'code': e.code
|
||||||
}
|
}
|
||||||
|
|
||||||
return event_dict
|
return event_dict
|
||||||
@@ -161,35 +181,64 @@ def event_to_serializable_dict(
|
|||||||
# translates an Event to a completely formatted text-based log line
|
# translates an Event to a completely formatted text-based log line
|
||||||
# you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg())
|
# you have to specify which message you want. (i.e. - e.message, e.cli_msg(), e.file_msg())
|
||||||
# type hinting everything as strings so we don't get any unintentional string conversions via str()
|
# type hinting everything as strings so we don't get any unintentional string conversions via str()
|
||||||
def create_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
def create_info_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
||||||
color_tag: str = '' if this.format_color else Style.RESET_ALL
|
color_tag: str = '' if this.format_color else Style.RESET_ALL
|
||||||
ts: str = e.get_ts().strftime("%H:%M:%S")
|
ts: str = e.get_ts().strftime("%H:%M:%S")
|
||||||
scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets())
|
scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets())
|
||||||
|
log_line: str = f"{color_tag}{ts} {scrubbed_msg}"
|
||||||
|
return log_line
|
||||||
|
|
||||||
|
|
||||||
|
def create_debug_text_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
||||||
|
log_line: str = ''
|
||||||
|
# Create a separator if this is the beginning of an invocation
|
||||||
|
if type(e) == MainReportVersion:
|
||||||
|
separator = 30 * '='
|
||||||
|
log_line = f'\n\n{separator} {e.get_ts()} | {get_invocation_id()} {separator}\n'
|
||||||
|
color_tag: str = '' if this.format_color else Style.RESET_ALL
|
||||||
|
ts: str = e.get_ts().strftime("%H:%M:%S.%f")
|
||||||
|
scrubbed_msg: str = scrub_secrets(msg_fn(e), env_secrets())
|
||||||
level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} "
|
level: str = e.level_tag() if len(e.level_tag()) == 5 else f"{e.level_tag()} "
|
||||||
log_line: str = f"{color_tag}{ts} | [ {level} ] | {scrubbed_msg}"
|
thread = ''
|
||||||
|
if threading.current_thread().getName():
|
||||||
|
thread_name = threading.current_thread().getName()
|
||||||
|
thread_name = thread_name[:10]
|
||||||
|
thread_name = thread_name.ljust(10, ' ')
|
||||||
|
thread = f' [{thread_name}]:'
|
||||||
|
log_line = log_line + f"{color_tag}{ts} [{level}]{thread} {scrubbed_msg}"
|
||||||
return log_line
|
return log_line
|
||||||
|
|
||||||
|
|
||||||
# translates an Event to a completely formatted json log line
|
# translates an Event to a completely formatted json log line
|
||||||
# you have to specify which message you want. (i.e. - e.message(), e.cli_msg(), e.file_msg())
|
# you have to specify which message you want. (i.e. - e.message(), e.cli_msg(), e.file_msg())
|
||||||
def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
def create_json_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> Optional[str]:
|
||||||
values = event_to_serializable_dict(e, lambda dt: dt.isoformat(), lambda x: msg_fn(x))
|
if type(e) == EmptyLine:
|
||||||
|
return None # will not be sent to logger
|
||||||
|
# using preformatted string instead of formatting it here to be extra careful about timezone
|
||||||
|
values = event_to_serializable_dict(e, lambda _: e.get_ts_rfc3339(), lambda x: msg_fn(x))
|
||||||
raw_log_line = json.dumps(values, sort_keys=True)
|
raw_log_line = json.dumps(values, sort_keys=True)
|
||||||
return scrub_secrets(raw_log_line, env_secrets())
|
return scrub_secrets(raw_log_line, env_secrets())
|
||||||
|
|
||||||
|
|
||||||
# calls create_text_log_line() or create_json_log_line() according to logger config
|
# calls create_stdout_text_log_line() or create_json_log_line() according to logger config
|
||||||
def create_log_line(e: T_Event, msg_fn: Callable[[T_Event], str]) -> str:
|
def create_log_line(
|
||||||
return (
|
e: T_Event,
|
||||||
create_json_log_line(e, msg_fn)
|
msg_fn: Callable[[T_Event], str],
|
||||||
if this.format_json else
|
file_output=False
|
||||||
create_text_log_line(e, msg_fn)
|
) -> Optional[str]:
|
||||||
)
|
if this.format_json:
|
||||||
|
return create_json_log_line(e, msg_fn) # json output, both console and file
|
||||||
|
elif file_output is True or flags.DEBUG:
|
||||||
|
return create_debug_text_log_line(e, msg_fn) # default file output
|
||||||
|
else:
|
||||||
|
return create_info_text_log_line(e, msg_fn) # console output
|
||||||
|
|
||||||
|
|
||||||
# allows for resuse of this obnoxious if else tree.
|
# allows for resuse of this obnoxious if else tree.
|
||||||
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
|
# do not use for exceptions, it doesn't pass along exc_info, stack_info, or extra
|
||||||
def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str):
|
def send_to_logger(l: Union[Logger, logbook.Logger], level_tag: str, log_line: str):
|
||||||
|
if not log_line:
|
||||||
|
return
|
||||||
if level_tag == 'test':
|
if level_tag == 'test':
|
||||||
# TODO after implmenting #3977 send to new test level
|
# TODO after implmenting #3977 send to new test level
|
||||||
l.debug(log_line)
|
l.debug(log_line)
|
||||||
@@ -262,28 +311,34 @@ def send_exc_to_logger(
|
|||||||
# (i.e. - mutating the event history, printing to stdout, logging
|
# (i.e. - mutating the event history, printing to stdout, logging
|
||||||
# to files, etc.)
|
# to files, etc.)
|
||||||
def fire_event(e: Event) -> None:
|
def fire_event(e: Event) -> None:
|
||||||
|
# skip logs when `--log-cache-events` is not passed
|
||||||
|
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
|
||||||
|
return
|
||||||
|
|
||||||
# if and only if the event history deque will be completely filled by this event
|
# if and only if the event history deque will be completely filled by this event
|
||||||
# fire warning that old events are now being dropped
|
# fire warning that old events are now being dropped
|
||||||
global EVENT_HISTORY
|
global EVENT_HISTORY
|
||||||
if len(EVENT_HISTORY) == ((EVENT_HISTORY.maxlen or 100000) - 1):
|
if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
|
||||||
|
EVENT_HISTORY.append(e)
|
||||||
fire_event(EventBufferFull())
|
fire_event(EventBufferFull())
|
||||||
|
else:
|
||||||
EVENT_HISTORY.append(e)
|
EVENT_HISTORY.append(e)
|
||||||
|
|
||||||
# backwards compatibility for plugins that require old logger (dbt-rpc)
|
# backwards compatibility for plugins that require old logger (dbt-rpc)
|
||||||
if flags.ENABLE_LEGACY_LOGGER:
|
if flags.ENABLE_LEGACY_LOGGER:
|
||||||
# using Event::message because the legacy logger didn't differentiate messages by
|
# using Event::message because the legacy logger didn't differentiate messages by
|
||||||
# destination
|
# destination
|
||||||
log_line = create_log_line(e, msg_fn=lambda x: x.message())
|
log_line = create_log_line(e, msg_fn=lambda x: x.message())
|
||||||
|
if log_line:
|
||||||
send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line)
|
send_to_logger(GLOBAL_LOGGER, e.level_tag(), log_line)
|
||||||
return # exit the function to avoid using the current logger as well
|
return # exit the function to avoid using the current logger as well
|
||||||
|
|
||||||
# always logs debug level regardless of user input
|
# always logs debug level regardless of user input
|
||||||
if isinstance(e, File):
|
if isinstance(e, File):
|
||||||
log_line = create_log_line(e, msg_fn=lambda x: x.file_msg())
|
log_line = create_log_line(e, msg_fn=lambda x: x.file_msg(), file_output=True)
|
||||||
# doesn't send exceptions to exception logger
|
# doesn't send exceptions to exception logger
|
||||||
send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line)
|
if log_line:
|
||||||
|
send_to_logger(FILE_LOG, level_tag=e.level_tag(), log_line=log_line)
|
||||||
|
|
||||||
if isinstance(e, Cli):
|
if isinstance(e, Cli):
|
||||||
# explicitly checking the debug flag here so that potentially expensive-to-construct
|
# explicitly checking the debug flag here so that potentially expensive-to-construct
|
||||||
@@ -292,18 +347,19 @@ def fire_event(e: Event) -> None:
|
|||||||
return # eat the message in case it was one of the expensive ones
|
return # eat the message in case it was one of the expensive ones
|
||||||
|
|
||||||
log_line = create_log_line(e, msg_fn=lambda x: x.cli_msg())
|
log_line = create_log_line(e, msg_fn=lambda x: x.cli_msg())
|
||||||
if not isinstance(e, ShowException):
|
if log_line:
|
||||||
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
|
if not isinstance(e, ShowException):
|
||||||
# CliEventABC and ShowException
|
send_to_logger(STDOUT_LOG, level_tag=e.level_tag(), log_line=log_line)
|
||||||
else:
|
# CliEventABC and ShowException
|
||||||
send_exc_to_logger(
|
else:
|
||||||
STDOUT_LOG,
|
send_exc_to_logger(
|
||||||
level_tag=e.level_tag(),
|
STDOUT_LOG,
|
||||||
log_line=log_line,
|
level_tag=e.level_tag(),
|
||||||
exc_info=e.exc_info,
|
log_line=log_line,
|
||||||
stack_info=e.stack_info,
|
exc_info=e.exc_info,
|
||||||
extra=e.extra
|
stack_info=e.stack_info,
|
||||||
)
|
extra=e.extra
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_invocation_id() -> str:
|
def get_invocation_id() -> str:
|
||||||
|
|||||||
@@ -1,16 +1,16 @@
|
|||||||
import argparse
|
import argparse
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
|
||||||
from dbt.events.stubs import (
|
from dbt.events.stubs import (
|
||||||
_CachedRelation,
|
_CachedRelation,
|
||||||
BaseRelation,
|
BaseRelation,
|
||||||
ParsedModelNode,
|
|
||||||
ParsedHookNode,
|
ParsedHookNode,
|
||||||
_ReferenceKey,
|
ParsedModelNode,
|
||||||
RunResult
|
RunResult
|
||||||
)
|
)
|
||||||
from dbt import ui
|
from dbt import ui
|
||||||
from dbt.events.base_types import (
|
from dbt.events.base_types import (
|
||||||
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo
|
Cli, Event, File, DebugLevel, InfoLevel, WarnLevel, ErrorLevel, ShowException, NodeInfo, Cache
|
||||||
)
|
)
|
||||||
from dbt.events.format import format_fancy_output_line, pluralize
|
from dbt.events.format import format_fancy_output_line, pluralize
|
||||||
from dbt.node_types import NodeType
|
from dbt.node_types import NodeType
|
||||||
@@ -115,14 +115,6 @@ class MainEncounteredError(ErrorLevel, Cli):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Encountered an error:\n{str(self.e)}"
|
return f"Encountered an error:\n{str(self.e)}"
|
||||||
|
|
||||||
# overriding default json serialization for this event
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
# equality on BaseException is not good enough of a comparison here
|
|
||||||
if isinstance(val, BaseException):
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class MainStackTrace(DebugLevel, Cli):
|
class MainStackTrace(DebugLevel, Cli):
|
||||||
@@ -150,12 +142,9 @@ class MainReportArgs(DebugLevel, Cli, File):
|
|||||||
def message(self):
|
def message(self):
|
||||||
return f"running dbt with arguments {str(self.args)}"
|
return f"running dbt with arguments {str(self.args)}"
|
||||||
|
|
||||||
# overriding default json serialization for this event
|
@classmethod
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
def asdict(cls, data: list) -> dict:
|
||||||
if isinstance(val, argparse.Namespace):
|
return dict((k, str(v)) for k, v in data)
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -354,13 +343,6 @@ class SystemCouldNotWrite(DebugLevel, Cli, File):
|
|||||||
f"{self.reason}\nexception: {self.exc}"
|
f"{self.reason}\nexception: {self.exc}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# overriding default json serialization for this event
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SystemExecutingCmd(DebugLevel, Cli, File):
|
class SystemExecutingCmd(DebugLevel, Cli, File):
|
||||||
@@ -397,40 +379,6 @@ class SystemReportReturnCode(DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"command return code={self.returncode}"
|
return f"command return code={self.returncode}"
|
||||||
|
|
||||||
# TODO remove?? Not called outside of this file
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SelectorAlertUpto3UnusedNodes(InfoLevel, Cli, File):
|
|
||||||
node_names: List[str]
|
|
||||||
code: str = "I_NEED_A_CODE_5"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
summary_nodes_str = ("\n - ").join(self.node_names[:3])
|
|
||||||
and_more_str = (
|
|
||||||
f"\n - and {len(self.node_names) - 3} more" if len(self.node_names) > 4 else ""
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
f"\nSome tests were excluded because at least one parent is not selected. "
|
|
||||||
f"Use the --greedy flag to include them."
|
|
||||||
f"\n - {summary_nodes_str}{and_more_str}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO remove?? Not called outside of this file
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SelectorAlertAllUnusedNodes(DebugLevel, Cli, File):
|
|
||||||
node_names: List[str]
|
|
||||||
code: str = "I_NEED_A_CODE_6"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
debug_nodes_str = ("\n - ").join(self.node_names)
|
|
||||||
return (
|
|
||||||
f"Full list of tests that were excluded:"
|
|
||||||
f"\n - {debug_nodes_str}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SelectorReportInvalidSelector(InfoLevel, Cli, File):
|
class SelectorReportInvalidSelector(InfoLevel, Cli, File):
|
||||||
@@ -542,7 +490,7 @@ class Rollback(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CacheMiss(DebugLevel, Cli, File):
|
class CacheMiss(DebugLevel, Cli, File):
|
||||||
conn_name: Any # TODO mypy says this is `Callable[[], str]`?? ¯\_(ツ)_/¯
|
conn_name: str
|
||||||
database: Optional[str]
|
database: Optional[str]
|
||||||
schema: str
|
schema: str
|
||||||
code: str = "E013"
|
code: str = "E013"
|
||||||
@@ -558,12 +506,20 @@ class CacheMiss(DebugLevel, Cli, File):
|
|||||||
class ListRelations(DebugLevel, Cli, File):
|
class ListRelations(DebugLevel, Cli, File):
|
||||||
database: Optional[str]
|
database: Optional[str]
|
||||||
schema: str
|
schema: str
|
||||||
relations: List[BaseRelation]
|
relations: List[_ReferenceKey]
|
||||||
code: str = "E014"
|
code: str = "E014"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"with database={self.database}, schema={self.schema}, relations={self.relations}"
|
return f"with database={self.database}, schema={self.schema}, relations={self.relations}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
d = dict()
|
||||||
|
for k, v in data:
|
||||||
|
if type(v) == list:
|
||||||
|
d[k] = [str(x) for x in v]
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ConnectionUsed(DebugLevel, Cli, File):
|
class ConnectionUsed(DebugLevel, Cli, File):
|
||||||
@@ -587,7 +543,7 @@ class SQLQuery(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SQLQueryStatus(DebugLevel, Cli, File):
|
class SQLQueryStatus(DebugLevel, Cli, File):
|
||||||
status: str # could include AdapterResponse if we resolve circular imports
|
status: str
|
||||||
elapsed: float
|
elapsed: float
|
||||||
code: str = "E017"
|
code: str = "E017"
|
||||||
|
|
||||||
@@ -617,7 +573,7 @@ class ColTypeChange(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SchemaCreation(DebugLevel, Cli, File):
|
class SchemaCreation(DebugLevel, Cli, File):
|
||||||
relation: BaseRelation
|
relation: _ReferenceKey
|
||||||
code: str = "E020"
|
code: str = "E020"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
@@ -626,17 +582,21 @@ class SchemaCreation(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SchemaDrop(DebugLevel, Cli, File):
|
class SchemaDrop(DebugLevel, Cli, File):
|
||||||
relation: BaseRelation
|
relation: _ReferenceKey
|
||||||
code: str = "E021"
|
code: str = "E021"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f'Dropping schema "{self.relation}".'
|
return f'Dropping schema "{self.relation}".'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
return dict((k, str(v)) for k, v in data)
|
||||||
|
|
||||||
|
|
||||||
# TODO pretty sure this is only ever called in dead code
|
# TODO pretty sure this is only ever called in dead code
|
||||||
# see: core/dbt/adapters/cache.py _add_link vs add_link
|
# see: core/dbt/adapters/cache.py _add_link vs add_link
|
||||||
@dataclass
|
@dataclass
|
||||||
class UncachedRelation(DebugLevel, Cli, File):
|
class UncachedRelation(DebugLevel, Cli, File, Cache):
|
||||||
dep_key: _ReferenceKey
|
dep_key: _ReferenceKey
|
||||||
ref_key: _ReferenceKey
|
ref_key: _ReferenceKey
|
||||||
code: str = "E022"
|
code: str = "E022"
|
||||||
@@ -650,7 +610,7 @@ class UncachedRelation(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AddLink(DebugLevel, Cli, File):
|
class AddLink(DebugLevel, Cli, File, Cache):
|
||||||
dep_key: _ReferenceKey
|
dep_key: _ReferenceKey
|
||||||
ref_key: _ReferenceKey
|
ref_key: _ReferenceKey
|
||||||
code: str = "E023"
|
code: str = "E023"
|
||||||
@@ -660,23 +620,16 @@ class AddLink(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AddRelation(DebugLevel, Cli, File):
|
class AddRelation(DebugLevel, Cli, File, Cache):
|
||||||
relation: _CachedRelation
|
relation: _ReferenceKey
|
||||||
code: str = "E024"
|
code: str = "E024"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Adding relation: {str(self.relation)}"
|
return f"Adding relation: {str(self.relation)}"
|
||||||
|
|
||||||
# overriding default json serialization for this event
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if isinstance(val, _CachedRelation):
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DropMissingRelation(DebugLevel, Cli, File):
|
class DropMissingRelation(DebugLevel, Cli, File, Cache):
|
||||||
relation: _ReferenceKey
|
relation: _ReferenceKey
|
||||||
code: str = "E025"
|
code: str = "E025"
|
||||||
|
|
||||||
@@ -685,7 +638,7 @@ class DropMissingRelation(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DropCascade(DebugLevel, Cli, File):
|
class DropCascade(DebugLevel, Cli, File, Cache):
|
||||||
dropped: _ReferenceKey
|
dropped: _ReferenceKey
|
||||||
consequences: Set[_ReferenceKey]
|
consequences: Set[_ReferenceKey]
|
||||||
code: str = "E026"
|
code: str = "E026"
|
||||||
@@ -693,9 +646,19 @@ class DropCascade(DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"drop {self.dropped} is cascading to {self.consequences}"
|
return f"drop {self.dropped} is cascading to {self.consequences}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
d = dict()
|
||||||
|
for k, v in data:
|
||||||
|
if isinstance(v, list):
|
||||||
|
d[k] = [str(x) for x in v]
|
||||||
|
else:
|
||||||
|
d[k] = str(v) # type: ignore
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DropRelation(DebugLevel, Cli, File):
|
class DropRelation(DebugLevel, Cli, File, Cache):
|
||||||
dropped: _ReferenceKey
|
dropped: _ReferenceKey
|
||||||
code: str = "E027"
|
code: str = "E027"
|
||||||
|
|
||||||
@@ -704,7 +667,7 @@ class DropRelation(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class UpdateReference(DebugLevel, Cli, File):
|
class UpdateReference(DebugLevel, Cli, File, Cache):
|
||||||
old_key: _ReferenceKey
|
old_key: _ReferenceKey
|
||||||
new_key: _ReferenceKey
|
new_key: _ReferenceKey
|
||||||
cached_key: _ReferenceKey
|
cached_key: _ReferenceKey
|
||||||
@@ -716,7 +679,7 @@ class UpdateReference(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TemporaryRelation(DebugLevel, Cli, File):
|
class TemporaryRelation(DebugLevel, Cli, File, Cache):
|
||||||
key: _ReferenceKey
|
key: _ReferenceKey
|
||||||
code: str = "E029"
|
code: str = "E029"
|
||||||
|
|
||||||
@@ -725,7 +688,7 @@ class TemporaryRelation(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RenameSchema(DebugLevel, Cli, File):
|
class RenameSchema(DebugLevel, Cli, File, Cache):
|
||||||
old_key: _ReferenceKey
|
old_key: _ReferenceKey
|
||||||
new_key: _ReferenceKey
|
new_key: _ReferenceKey
|
||||||
code: str = "E030"
|
code: str = "E030"
|
||||||
@@ -735,7 +698,7 @@ class RenameSchema(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DumpBeforeAddGraph(DebugLevel, Cli, File):
|
class DumpBeforeAddGraph(DebugLevel, Cli, File, Cache):
|
||||||
# large value. delay not necessary since every debug level message is logged anyway.
|
# large value. delay not necessary since every debug level message is logged anyway.
|
||||||
dump: Dict[str, List[str]]
|
dump: Dict[str, List[str]]
|
||||||
code: str = "E031"
|
code: str = "E031"
|
||||||
@@ -745,7 +708,7 @@ class DumpBeforeAddGraph(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DumpAfterAddGraph(DebugLevel, Cli, File):
|
class DumpAfterAddGraph(DebugLevel, Cli, File, Cache):
|
||||||
# large value. delay not necessary since every debug level message is logged anyway.
|
# large value. delay not necessary since every debug level message is logged anyway.
|
||||||
dump: Dict[str, List[str]]
|
dump: Dict[str, List[str]]
|
||||||
code: str = "E032"
|
code: str = "E032"
|
||||||
@@ -755,7 +718,7 @@ class DumpAfterAddGraph(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DumpBeforeRenameSchema(DebugLevel, Cli, File):
|
class DumpBeforeRenameSchema(DebugLevel, Cli, File, Cache):
|
||||||
# large value. delay not necessary since every debug level message is logged anyway.
|
# large value. delay not necessary since every debug level message is logged anyway.
|
||||||
dump: Dict[str, List[str]]
|
dump: Dict[str, List[str]]
|
||||||
code: str = "E033"
|
code: str = "E033"
|
||||||
@@ -765,7 +728,7 @@ class DumpBeforeRenameSchema(DebugLevel, Cli, File):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DumpAfterRenameSchema(DebugLevel, Cli, File):
|
class DumpAfterRenameSchema(DebugLevel, Cli, File, Cache):
|
||||||
# large value. delay not necessary since every debug level message is logged anyway.
|
# large value. delay not necessary since every debug level message is logged anyway.
|
||||||
dump: Dict[str, List[str]]
|
dump: Dict[str, List[str]]
|
||||||
code: str = "E034"
|
code: str = "E034"
|
||||||
@@ -782,11 +745,9 @@ class AdapterImportError(InfoLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Error importing adapter: {self.exc}"
|
return f"Error importing adapter: {self.exc}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
@classmethod
|
||||||
if val == self.exc:
|
def asdict(cls, data: list) -> dict:
|
||||||
return str(val())
|
return dict((k, str(v)) for k, v in data)
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -842,12 +803,6 @@ class ProfileLoadError(ShowException, DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Profile not loaded due to error: {self.exc}"
|
return f"Profile not loaded due to error: {self.exc}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ProfileNotFound(InfoLevel, Cli, File):
|
class ProfileNotFound(InfoLevel, Cli, File):
|
||||||
@@ -863,85 +818,7 @@ class InvalidVarsYAML(ErrorLevel, Cli, File):
|
|||||||
code: str = "A008"
|
code: str = "A008"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return "The YAML provided in the --vars argument is not valid.\n"
|
return "The YAML provided in the --vars argument is not valid."
|
||||||
|
|
||||||
|
|
||||||
# TODO: Remove? (appears to be uncalled)
|
|
||||||
@dataclass
|
|
||||||
class CatchRunException(ShowException, DebugLevel, Cli, File):
|
|
||||||
build_path: Any
|
|
||||||
exc: Exception
|
|
||||||
code: str = "I_NEED_A_CODE_1"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If the \
|
|
||||||
error persists, open an issue at https://github.com/dbt-labs/dbt-core
|
|
||||||
""".strip()
|
|
||||||
prefix = f'Internal error executing {self.build_path}'
|
|
||||||
error = "{prefix}\n{error}\n\n{note}".format(
|
|
||||||
prefix=ui.red(prefix),
|
|
||||||
error=str(self.exc).strip(),
|
|
||||||
note=INTERNAL_ERROR_STRING
|
|
||||||
)
|
|
||||||
return error
|
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: Remove? (appears to be uncalled)
|
|
||||||
@dataclass
|
|
||||||
class HandleInternalException(ShowException, DebugLevel, Cli, File):
|
|
||||||
exc: Exception
|
|
||||||
code: str = "I_NEED_A_CODE_2"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
return str(self.exc)
|
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
# TODO: Remove? (appears to be uncalled)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class MessageHandleGenericException(ErrorLevel, Cli, File):
|
|
||||||
build_path: str
|
|
||||||
unique_id: str
|
|
||||||
exc: Exception
|
|
||||||
code: str = "I_NEED_A_CODE_3"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
node_description = self.build_path
|
|
||||||
if node_description is None:
|
|
||||||
node_description = self.unique_id
|
|
||||||
prefix = "Unhandled error while executing {}".format(node_description)
|
|
||||||
return "{prefix}\n{error}".format(
|
|
||||||
prefix=ui.red(prefix),
|
|
||||||
error=str(self.exc).strip()
|
|
||||||
)
|
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
# TODO: Remove? (appears to be uncalled)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DetailsHandleGenericException(ShowException, DebugLevel, Cli, File):
|
|
||||||
code: str = "I_NEED_A_CODE_4"
|
|
||||||
|
|
||||||
def message(self) -> str:
|
|
||||||
return ''
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -1110,12 +987,6 @@ class ParsedFileLoadFailed(ShowException, DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Failed to load parsed file from disk at {self.path}: {self.exc}"
|
return f"Failed to load parsed file from disk at {self.path}: {self.exc}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class PartialParseSaveFileNotFound(InfoLevel, Cli, File):
|
class PartialParseSaveFileNotFound(InfoLevel, Cli, File):
|
||||||
@@ -1329,12 +1200,6 @@ class RunningOperationCaughtError(ErrorLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f'Encountered an error while running operation: {self.exc}'
|
return f'Encountered an error while running operation: {self.exc}'
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RunningOperationUncaughtError(ErrorLevel, Cli, File):
|
class RunningOperationUncaughtError(ErrorLevel, Cli, File):
|
||||||
@@ -1344,12 +1209,6 @@ class RunningOperationUncaughtError(ErrorLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f'Encountered an error while running operation: {self.exc}'
|
return f'Encountered an error while running operation: {self.exc}'
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DbtProjectError(ErrorLevel, Cli, File):
|
class DbtProjectError(ErrorLevel, Cli, File):
|
||||||
@@ -1367,12 +1226,6 @@ class DbtProjectErrorException(ErrorLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f" ERROR: {str(self.exc)}"
|
return f" ERROR: {str(self.exc)}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DbtProfileError(ErrorLevel, Cli, File):
|
class DbtProfileError(ErrorLevel, Cli, File):
|
||||||
@@ -1390,12 +1243,6 @@ class DbtProfileErrorException(ErrorLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f" ERROR: {str(self.exc)}"
|
return f" ERROR: {str(self.exc)}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ProfileListTitle(InfoLevel, Cli, File):
|
class ProfileListTitle(InfoLevel, Cli, File):
|
||||||
@@ -1443,12 +1290,6 @@ class CatchableExceptionOnRun(ShowException, DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return str(self.exc)
|
return str(self.exc)
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class InternalExceptionOnRun(DebugLevel, Cli, File):
|
class InternalExceptionOnRun(DebugLevel, Cli, File):
|
||||||
@@ -1469,12 +1310,6 @@ the error persists, open an issue at https://github.com/dbt-labs/dbt-core
|
|||||||
note=INTERNAL_ERROR_STRING
|
note=INTERNAL_ERROR_STRING
|
||||||
)
|
)
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
# This prints the stack trace at the debug level while allowing just the nice exception message
|
# This prints the stack trace at the debug level while allowing just the nice exception message
|
||||||
# at the error level - or whatever other level chosen. Used in multiple places.
|
# at the error level - or whatever other level chosen. Used in multiple places.
|
||||||
@@ -1488,9 +1323,9 @@ class PrintDebugStackTrace(ShowException, DebugLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class GenericExceptionOnRun(ErrorLevel, Cli, File):
|
class GenericExceptionOnRun(ErrorLevel, Cli, File):
|
||||||
build_path: str
|
build_path: Optional[str]
|
||||||
unique_id: str
|
unique_id: str
|
||||||
exc: Exception
|
exc: str # TODO: make this the actual exception once we have a better searilization strategy
|
||||||
code: str = "W004"
|
code: str = "W004"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
@@ -1503,12 +1338,6 @@ class GenericExceptionOnRun(ErrorLevel, Cli, File):
|
|||||||
error=str(self.exc).strip()
|
error=str(self.exc).strip()
|
||||||
)
|
)
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File):
|
class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File):
|
||||||
@@ -1520,12 +1349,6 @@ class NodeConnectionReleaseError(ShowException, DebugLevel, Cli, File):
|
|||||||
return ('Error releasing connection for node {}: {!s}'
|
return ('Error releasing connection for node {}: {!s}'
|
||||||
.format(self.node_name, self.exc))
|
.format(self.node_name, self.exc))
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CheckCleanPath(InfoLevel, Cli):
|
class CheckCleanPath(InfoLevel, Cli):
|
||||||
@@ -1639,7 +1462,7 @@ class DepsNotifyUpdatesAvailable(InfoLevel, Cli, File):
|
|||||||
code: str = "M019"
|
code: str = "M019"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return ('\nUpdates available for packages: {} \
|
return ('Updates available for packages: {} \
|
||||||
\nUpdate your versions in packages.yml, then run dbt deps'.format(self.packages))
|
\nUpdate your versions in packages.yml, then run dbt deps'.format(self.packages))
|
||||||
|
|
||||||
|
|
||||||
@@ -1756,7 +1579,7 @@ class ServingDocsExitInfo(InfoLevel, Cli, File):
|
|||||||
code: str = "Z020"
|
code: str = "Z020"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return "Press Ctrl+C to exit.\n\n"
|
return "Press Ctrl+C to exit."
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -1807,7 +1630,7 @@ class StatsLine(InfoLevel, Cli, File):
|
|||||||
code: str = "Z023"
|
code: str = "Z023"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
stats_line = ("\nDone. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}")
|
stats_line = ("Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}")
|
||||||
return stats_line.format(**self.stats)
|
return stats_line.format(**self.stats)
|
||||||
|
|
||||||
|
|
||||||
@@ -1846,12 +1669,6 @@ class SQlRunnerException(ShowException, DebugLevel, Cli, File):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Got an exception: {self.exc}"
|
return f"Got an exception: {self.exc}"
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CheckNodeTestFailure(InfoLevel, Cli, File):
|
class CheckNodeTestFailure(InfoLevel, Cli, File):
|
||||||
@@ -1910,7 +1727,7 @@ class PrintStartLine(InfoLevel, Cli, File, NodeInfo):
|
|||||||
index: int
|
index: int
|
||||||
total: int
|
total: int
|
||||||
report_node_data: ParsedModelNode
|
report_node_data: ParsedModelNode
|
||||||
code: str = "Z031"
|
code: str = "Q033"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
msg = f"START {self.description}"
|
msg = f"START {self.description}"
|
||||||
@@ -1928,8 +1745,8 @@ class PrintHookStartLine(InfoLevel, Cli, File, NodeInfo):
|
|||||||
index: int
|
index: int
|
||||||
total: int
|
total: int
|
||||||
truncate: bool
|
truncate: bool
|
||||||
report_node_data: Any # TODO use ParsedHookNode here
|
report_node_data: Any # TODO: resolve ParsedHookNode circular import
|
||||||
code: str = "Z032"
|
code: str = "Q032"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
msg = f"START hook: {self.statement}"
|
msg = f"START hook: {self.statement}"
|
||||||
@@ -1948,7 +1765,7 @@ class PrintHookEndLine(InfoLevel, Cli, File, NodeInfo):
|
|||||||
total: int
|
total: int
|
||||||
execution_time: int
|
execution_time: int
|
||||||
truncate: bool
|
truncate: bool
|
||||||
report_node_data: Any # TODO use ParsedHookNode here
|
report_node_data: Any # TODO: resolve ParsedHookNode circular import
|
||||||
code: str = "Q007"
|
code: str = "Q007"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
@@ -1969,7 +1786,7 @@ class SkippingDetails(InfoLevel, Cli, File, NodeInfo):
|
|||||||
index: int
|
index: int
|
||||||
total: int
|
total: int
|
||||||
report_node_data: ParsedModelNode
|
report_node_data: ParsedModelNode
|
||||||
code: str = "Z033"
|
code: str = "Q034"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
if self.resource_type in NodeType.refable():
|
if self.resource_type in NodeType.refable():
|
||||||
@@ -2084,7 +1901,7 @@ class PrintModelErrorResultLine(ErrorLevel, Cli, File, NodeInfo):
|
|||||||
total: int
|
total: int
|
||||||
execution_time: int
|
execution_time: int
|
||||||
report_node_data: ParsedModelNode
|
report_node_data: ParsedModelNode
|
||||||
code: str = "Z035"
|
code: str = "Q035"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
info = "ERROR creating"
|
info = "ERROR creating"
|
||||||
@@ -2322,6 +2139,10 @@ class NodeFinished(DebugLevel, Cli, File, NodeInfo):
|
|||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return f"Finished running node {self.unique_id}"
|
return f"Finished running node {self.unique_id}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def asdict(cls, data: list) -> dict:
|
||||||
|
return dict((k, str(v)) for k, v in data)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class QueryCancelationUnsupported(InfoLevel, Cli, File):
|
class QueryCancelationUnsupported(InfoLevel, Cli, File):
|
||||||
@@ -2337,11 +2158,12 @@ class QueryCancelationUnsupported(InfoLevel, Cli, File):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ConcurrencyLine(InfoLevel, Cli, File):
|
class ConcurrencyLine(InfoLevel, Cli, File):
|
||||||
concurrency_line: str
|
num_threads: int
|
||||||
|
target_name: str
|
||||||
code: str = "Q026"
|
code: str = "Q026"
|
||||||
|
|
||||||
def message(self) -> str:
|
def message(self) -> str:
|
||||||
return self.concurrency_line
|
return f"Concurrency: {self.num_threads} threads (target='{self.target_name}')"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -2606,12 +2428,6 @@ class GeneralWarningException(WarnLevel, Cli, File):
|
|||||||
return self.log_fmt.format(str(self.exc))
|
return self.log_fmt.format(str(self.exc))
|
||||||
return str(self.exc)
|
return str(self.exc)
|
||||||
|
|
||||||
def fields_to_json(self, val: Any) -> Any:
|
|
||||||
if val == self.exc:
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
return val
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class EventBufferFull(WarnLevel, Cli, File):
|
class EventBufferFull(WarnLevel, Cli, File):
|
||||||
@@ -2675,8 +2491,8 @@ if 1 == 0:
|
|||||||
SQLQueryStatus(status="", elapsed=0.1)
|
SQLQueryStatus(status="", elapsed=0.1)
|
||||||
SQLCommit(conn_name="")
|
SQLCommit(conn_name="")
|
||||||
ColTypeChange(orig_type="", new_type="", table="")
|
ColTypeChange(orig_type="", new_type="", table="")
|
||||||
SchemaCreation(relation=BaseRelation())
|
SchemaCreation(relation=_make_key(BaseRelation()))
|
||||||
SchemaDrop(relation=BaseRelation())
|
SchemaDrop(relation=_make_key(BaseRelation()))
|
||||||
UncachedRelation(
|
UncachedRelation(
|
||||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
@@ -2685,7 +2501,7 @@ if 1 == 0:
|
|||||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
)
|
)
|
||||||
AddRelation(relation=_CachedRelation())
|
AddRelation(relation=_make_key(_CachedRelation()))
|
||||||
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier=""))
|
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier=""))
|
||||||
DropCascade(
|
DropCascade(
|
||||||
dropped=_ReferenceKey(database="", schema="", identifier=""),
|
dropped=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
@@ -2708,8 +2524,6 @@ if 1 == 0:
|
|||||||
AdapterImportError(ModuleNotFoundError())
|
AdapterImportError(ModuleNotFoundError())
|
||||||
PluginLoadError()
|
PluginLoadError()
|
||||||
SystemReportReturnCode(returncode=0)
|
SystemReportReturnCode(returncode=0)
|
||||||
SelectorAlertUpto3UnusedNodes(node_names=[])
|
|
||||||
SelectorAlertAllUnusedNodes(node_names=[])
|
|
||||||
NewConnectionOpening(connection_state='')
|
NewConnectionOpening(connection_state='')
|
||||||
TimingInfoCollected()
|
TimingInfoCollected()
|
||||||
MergedFromState(nbr_merged=0, sample=[])
|
MergedFromState(nbr_merged=0, sample=[])
|
||||||
@@ -2755,8 +2569,6 @@ if 1 == 0:
|
|||||||
PartialParsingDeletedExposure(unique_id='')
|
PartialParsingDeletedExposure(unique_id='')
|
||||||
InvalidDisabledSourceInTestNode(msg='')
|
InvalidDisabledSourceInTestNode(msg='')
|
||||||
InvalidRefInTestNode(msg='')
|
InvalidRefInTestNode(msg='')
|
||||||
MessageHandleGenericException(build_path='', unique_id='', exc=Exception(''))
|
|
||||||
DetailsHandleGenericException()
|
|
||||||
RunningOperationCaughtError(exc=Exception(''))
|
RunningOperationCaughtError(exc=Exception(''))
|
||||||
RunningOperationUncaughtError(exc=Exception(''))
|
RunningOperationUncaughtError(exc=Exception(''))
|
||||||
DbtProjectError()
|
DbtProjectError()
|
||||||
@@ -2769,7 +2581,7 @@ if 1 == 0:
|
|||||||
ProfileHelpMessage()
|
ProfileHelpMessage()
|
||||||
CatchableExceptionOnRun(exc=Exception(''))
|
CatchableExceptionOnRun(exc=Exception(''))
|
||||||
InternalExceptionOnRun(build_path='', exc=Exception(''))
|
InternalExceptionOnRun(build_path='', exc=Exception(''))
|
||||||
GenericExceptionOnRun(build_path='', unique_id='', exc=Exception(''))
|
GenericExceptionOnRun(build_path='', unique_id='', exc='')
|
||||||
NodeConnectionReleaseError(node_name='', exc=Exception(''))
|
NodeConnectionReleaseError(node_name='', exc=Exception(''))
|
||||||
CheckCleanPath(path='')
|
CheckCleanPath(path='')
|
||||||
ConfirmCleanPath(path='')
|
ConfirmCleanPath(path='')
|
||||||
@@ -2952,7 +2764,7 @@ if 1 == 0:
|
|||||||
NodeStart(report_node_data=ParsedModelNode(), unique_id='')
|
NodeStart(report_node_data=ParsedModelNode(), unique_id='')
|
||||||
NodeFinished(report_node_data=ParsedModelNode(), unique_id='', run_result=RunResult())
|
NodeFinished(report_node_data=ParsedModelNode(), unique_id='', run_result=RunResult())
|
||||||
QueryCancelationUnsupported(type='')
|
QueryCancelationUnsupported(type='')
|
||||||
ConcurrencyLine(concurrency_line='')
|
ConcurrencyLine(num_threads=0, target_name='')
|
||||||
NodeCompiling(report_node_data=ParsedModelNode(), unique_id='')
|
NodeCompiling(report_node_data=ParsedModelNode(), unique_id='')
|
||||||
NodeExecuting(report_node_data=ParsedModelNode(), unique_id='')
|
NodeExecuting(report_node_data=ParsedModelNode(), unique_id='')
|
||||||
StarterProjectPath(dir='')
|
StarterProjectPath(dir='')
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ SEND_ANONYMOUS_USAGE_STATS = None
|
|||||||
PRINTER_WIDTH = 80
|
PRINTER_WIDTH = 80
|
||||||
WHICH = None
|
WHICH = None
|
||||||
INDIRECT_SELECTION = None
|
INDIRECT_SELECTION = None
|
||||||
|
LOG_CACHE_EVENTS = None
|
||||||
|
EVENT_BUFFER_SIZE = 100000
|
||||||
|
|
||||||
# Global CLI defaults. These flags are set from three places:
|
# Global CLI defaults. These flags are set from three places:
|
||||||
# CLI args, environment variables, and user_config (profiles.yml).
|
# CLI args, environment variables, and user_config (profiles.yml).
|
||||||
@@ -51,7 +53,9 @@ flag_defaults = {
|
|||||||
"FAIL_FAST": False,
|
"FAIL_FAST": False,
|
||||||
"SEND_ANONYMOUS_USAGE_STATS": True,
|
"SEND_ANONYMOUS_USAGE_STATS": True,
|
||||||
"PRINTER_WIDTH": 80,
|
"PRINTER_WIDTH": 80,
|
||||||
"INDIRECT_SELECTION": 'eager'
|
"INDIRECT_SELECTION": 'eager',
|
||||||
|
"LOG_CACHE_EVENTS": False,
|
||||||
|
"EVENT_BUFFER_SIZE": 100000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -99,7 +103,7 @@ def set_from_args(args, user_config):
|
|||||||
USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \
|
USE_EXPERIMENTAL_PARSER, STATIC_PARSER, WRITE_JSON, PARTIAL_PARSE, \
|
||||||
USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \
|
USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT, INDIRECT_SELECTION, \
|
||||||
VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \
|
VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS, PRINTER_WIDTH, \
|
||||||
WHICH
|
WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE
|
||||||
|
|
||||||
STRICT_MODE = False # backwards compatibility
|
STRICT_MODE = False # backwards compatibility
|
||||||
# cli args without user_config or env var option
|
# cli args without user_config or env var option
|
||||||
@@ -122,6 +126,8 @@ def set_from_args(args, user_config):
|
|||||||
SEND_ANONYMOUS_USAGE_STATS = get_flag_value('SEND_ANONYMOUS_USAGE_STATS', args, user_config)
|
SEND_ANONYMOUS_USAGE_STATS = get_flag_value('SEND_ANONYMOUS_USAGE_STATS', args, user_config)
|
||||||
PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config)
|
PRINTER_WIDTH = get_flag_value('PRINTER_WIDTH', args, user_config)
|
||||||
INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config)
|
INDIRECT_SELECTION = get_flag_value('INDIRECT_SELECTION', args, user_config)
|
||||||
|
LOG_CACHE_EVENTS = get_flag_value('LOG_CACHE_EVENTS', args, user_config)
|
||||||
|
EVENT_BUFFER_SIZE = get_flag_value('EVENT_BUFFER_SIZE', args, user_config)
|
||||||
|
|
||||||
|
|
||||||
def get_flag_value(flag, args, user_config):
|
def get_flag_value(flag, args, user_config):
|
||||||
@@ -134,7 +140,13 @@ def get_flag_value(flag, args, user_config):
|
|||||||
if env_value is not None and env_value != '':
|
if env_value is not None and env_value != '':
|
||||||
env_value = env_value.lower()
|
env_value = env_value.lower()
|
||||||
# non Boolean values
|
# non Boolean values
|
||||||
if flag in ['LOG_FORMAT', 'PRINTER_WIDTH', 'PROFILES_DIR', 'INDIRECT_SELECTION']:
|
if flag in [
|
||||||
|
'LOG_FORMAT',
|
||||||
|
'PRINTER_WIDTH',
|
||||||
|
'PROFILES_DIR',
|
||||||
|
'INDIRECT_SELECTION',
|
||||||
|
'EVENT_BUFFER_SIZE'
|
||||||
|
]:
|
||||||
flag_value = env_value
|
flag_value = env_value
|
||||||
else:
|
else:
|
||||||
flag_value = env_set_bool(env_value)
|
flag_value = env_set_bool(env_value)
|
||||||
@@ -142,7 +154,7 @@ def get_flag_value(flag, args, user_config):
|
|||||||
flag_value = getattr(user_config, lc_flag)
|
flag_value = getattr(user_config, lc_flag)
|
||||||
else:
|
else:
|
||||||
flag_value = flag_defaults[flag]
|
flag_value = flag_defaults[flag]
|
||||||
if flag == 'PRINTER_WIDTH': # printer_width must be an int or it hangs
|
if flag in ['PRINTER_WIDTH', 'EVENT_BUFFER_SIZE']: # must be ints
|
||||||
flag_value = int(flag_value)
|
flag_value = int(flag_value)
|
||||||
if flag == 'PROFILES_DIR':
|
if flag == 'PROFILES_DIR':
|
||||||
flag_value = os.path.abspath(flag_value)
|
flag_value = os.path.abspath(flag_value)
|
||||||
@@ -165,5 +177,7 @@ def get_flag_dict():
|
|||||||
"fail_fast": FAIL_FAST,
|
"fail_fast": FAIL_FAST,
|
||||||
"send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS,
|
"send_anonymous_usage_stats": SEND_ANONYMOUS_USAGE_STATS,
|
||||||
"printer_width": PRINTER_WIDTH,
|
"printer_width": PRINTER_WIDTH,
|
||||||
"indirect_selection": INDIRECT_SELECTION
|
"indirect_selection": INDIRECT_SELECTION,
|
||||||
|
"log_cache_events": LOG_CACHE_EVENTS,
|
||||||
|
"event_buffer_size": EVENT_BUFFER_SIZE
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -424,7 +424,7 @@ class DelayedFileHandler(logbook.RotatingFileHandler, FormatterMixin):
|
|||||||
return
|
return
|
||||||
|
|
||||||
make_log_dir_if_missing(log_dir)
|
make_log_dir_if_missing(log_dir)
|
||||||
log_path = os.path.join(log_dir, 'dbt.log.old') # TODO hack for now
|
log_path = os.path.join(log_dir, 'dbt.log.legacy') # TODO hack for now
|
||||||
self._super_init(log_path)
|
self._super_init(log_path)
|
||||||
self._replay_buffered()
|
self._replay_buffered()
|
||||||
self._log_path = log_path
|
self._log_path = log_path
|
||||||
|
|||||||
@@ -221,24 +221,22 @@ def track_run(task):
|
|||||||
def run_from_args(parsed):
|
def run_from_args(parsed):
|
||||||
log_cache_events(getattr(parsed, 'log_cache_events', False))
|
log_cache_events(getattr(parsed, 'log_cache_events', False))
|
||||||
|
|
||||||
# we can now use the logger for stdout
|
|
||||||
# set log_format in the logger
|
|
||||||
# if 'list' task: set stdout to WARN instead of INFO
|
|
||||||
level_override = parsed.cls.pre_init_hook(parsed)
|
|
||||||
|
|
||||||
fire_event(MainReportVersion(v=str(dbt.version.installed)))
|
|
||||||
|
|
||||||
# this will convert DbtConfigErrors into RuntimeExceptions
|
# this will convert DbtConfigErrors into RuntimeExceptions
|
||||||
# task could be any one of the task objects
|
# task could be any one of the task objects
|
||||||
task = parsed.cls.from_args(args=parsed)
|
task = parsed.cls.from_args(args=parsed)
|
||||||
fire_event(MainReportArgs(args=parsed))
|
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
log_path = None
|
log_path = None
|
||||||
if task.config is not None:
|
if task.config is not None:
|
||||||
log_path = getattr(task.config, 'log_path', None)
|
log_path = getattr(task.config, 'log_path', None)
|
||||||
# we can finally set the file logger up
|
|
||||||
log_manager.set_path(log_path)
|
log_manager.set_path(log_path)
|
||||||
|
# if 'list' task: set stdout to WARN instead of INFO
|
||||||
|
level_override = parsed.cls.pre_init_hook(parsed)
|
||||||
setup_event_logger(log_path or 'logs', level_override)
|
setup_event_logger(log_path or 'logs', level_override)
|
||||||
|
|
||||||
|
fire_event(MainReportVersion(v=str(dbt.version.installed)))
|
||||||
|
fire_event(MainReportArgs(args=parsed))
|
||||||
|
|
||||||
if dbt.tracking.active_user is not None: # mypy appeasement, always true
|
if dbt.tracking.active_user is not None: # mypy appeasement, always true
|
||||||
fire_event(MainTrackingUserState(dbt.tracking.active_user.state()))
|
fire_event(MainTrackingUserState(dbt.tracking.active_user.state()))
|
||||||
|
|
||||||
@@ -1078,6 +1076,14 @@ def parse_args(args, cls=DBTArgumentParser):
|
|||||||
'''
|
'''
|
||||||
)
|
)
|
||||||
|
|
||||||
|
p.add_argument(
|
||||||
|
'--event-buffer-size',
|
||||||
|
dest='event_buffer_size',
|
||||||
|
help='''
|
||||||
|
Sets the max number of events to buffer in EVENT_HISTORY
|
||||||
|
'''
|
||||||
|
)
|
||||||
|
|
||||||
subs = p.add_subparsers(title="Available sub-commands")
|
subs = p.add_subparsers(title="Available sub-commands")
|
||||||
|
|
||||||
base_subparser = _build_base_subparser()
|
base_subparser = _build_base_subparser()
|
||||||
|
|||||||
@@ -960,10 +960,9 @@ class MacroPatchParser(NonSourceParser[UnparsedMacroUpdate, ParsedMacroPatch]):
|
|||||||
unique_id = f'macro.{patch.package_name}.{patch.name}'
|
unique_id = f'macro.{patch.package_name}.{patch.name}'
|
||||||
macro = self.manifest.macros.get(unique_id)
|
macro = self.manifest.macros.get(unique_id)
|
||||||
if not macro:
|
if not macro:
|
||||||
warn_or_error(
|
msg = f'Found patch for macro "{patch.name}" ' \
|
||||||
f'WARNING: Found patch for macro "{patch.name}" '
|
f'which was not found'
|
||||||
f'which was not found'
|
warn_or_error(msg, log_fmt=warning_tag('{}'))
|
||||||
)
|
|
||||||
return
|
return
|
||||||
if macro.patch_path:
|
if macro.patch_path:
|
||||||
package_name, existing_file_path = macro.patch_path.split('://')
|
package_name, existing_file_path = macro.patch_path.split('://')
|
||||||
|
|||||||
@@ -334,7 +334,7 @@ class BaseRunner(metaclass=ABCMeta):
|
|||||||
GenericExceptionOnRun(
|
GenericExceptionOnRun(
|
||||||
build_path=self.node.build_path,
|
build_path=self.node.build_path,
|
||||||
unique_id=self.node.unique_id,
|
unique_id=self.node.unique_id,
|
||||||
exc=e
|
exc=str(e) # TODO: unstring this when serialization is fixed
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
fire_event(PrintDebugStackTrace())
|
fire_event(PrintDebugStackTrace())
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ class CleanTask(BaseTask):
|
|||||||
"""
|
"""
|
||||||
move_to_nearest_project_dir(self.args)
|
move_to_nearest_project_dir(self.args)
|
||||||
if ('dbt_modules' in self.config.clean_targets and
|
if ('dbt_modules' in self.config.clean_targets and
|
||||||
self.config.packages_install_path != 'dbt_modules'):
|
self.config.packages_install_path not in self.config.clean_targets):
|
||||||
deprecations.warn('install-packages-path')
|
deprecations.warn('install-packages-path')
|
||||||
for path in self.config.clean_targets:
|
for path in self.config.clean_targets:
|
||||||
fire_event(CheckCleanPath(path=path))
|
fire_event(CheckCleanPath(path=path))
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from dbt.deps.resolver import resolve_packages
|
|||||||
from dbt.events.functions import fire_event
|
from dbt.events.functions import fire_event
|
||||||
from dbt.events.types import (
|
from dbt.events.types import (
|
||||||
DepsNoPackagesFound, DepsStartPackageInstall, DepsUpdateAvailable, DepsUTD,
|
DepsNoPackagesFound, DepsStartPackageInstall, DepsUpdateAvailable, DepsUTD,
|
||||||
DepsInstallInfo, DepsListSubdirectory, DepsNotifyUpdatesAvailable
|
DepsInstallInfo, DepsListSubdirectory, DepsNotifyUpdatesAvailable, EmptyLine
|
||||||
)
|
)
|
||||||
from dbt.clients import system
|
from dbt.clients import system
|
||||||
|
|
||||||
@@ -63,7 +63,7 @@ class DepsTask(BaseTask):
|
|||||||
source_type = package.source_type()
|
source_type = package.source_type()
|
||||||
version = package.get_version()
|
version = package.get_version()
|
||||||
|
|
||||||
fire_event(DepsStartPackageInstall(package=package))
|
fire_event(DepsStartPackageInstall(package=package.nice_version_name()))
|
||||||
package.install(self.config, renderer)
|
package.install(self.config, renderer)
|
||||||
fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
|
fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
|
||||||
if source_type == 'hub':
|
if source_type == 'hub':
|
||||||
@@ -81,6 +81,7 @@ class DepsTask(BaseTask):
|
|||||||
source_type=source_type,
|
source_type=source_type,
|
||||||
version=version)
|
version=version)
|
||||||
if packages_to_upgrade:
|
if packages_to_upgrade:
|
||||||
|
fire_event(EmptyLine())
|
||||||
fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
|
fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -65,6 +65,8 @@ def print_run_status_line(results) -> None:
|
|||||||
stats[result_type] += 1
|
stats[result_type] += 1
|
||||||
stats['total'] += 1
|
stats['total'] += 1
|
||||||
|
|
||||||
|
with TextOnly():
|
||||||
|
fire_event(EmptyLine())
|
||||||
fire_event(StatsLine(stats=stats))
|
fire_event(StatsLine(stats=stats))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ from .printer import (
|
|||||||
print_run_end_messages,
|
print_run_end_messages,
|
||||||
get_counts,
|
get_counts,
|
||||||
)
|
)
|
||||||
|
from datetime import datetime
|
||||||
from dbt import tracking
|
from dbt import tracking
|
||||||
from dbt import utils
|
from dbt import utils
|
||||||
from dbt.adapters.base import BaseRelation
|
from dbt.adapters.base import BaseRelation
|
||||||
@@ -21,7 +21,7 @@ from dbt.contracts.graph.compiled import CompileResultNode
|
|||||||
from dbt.contracts.graph.manifest import WritableManifest
|
from dbt.contracts.graph.manifest import WritableManifest
|
||||||
from dbt.contracts.graph.model_config import Hook
|
from dbt.contracts.graph.model_config import Hook
|
||||||
from dbt.contracts.graph.parsed import ParsedHookNode
|
from dbt.contracts.graph.parsed import ParsedHookNode
|
||||||
from dbt.contracts.results import NodeStatus, RunResult, RunStatus
|
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
|
||||||
from dbt.exceptions import (
|
from dbt.exceptions import (
|
||||||
CompilationException,
|
CompilationException,
|
||||||
InternalException,
|
InternalException,
|
||||||
@@ -342,6 +342,8 @@ class RunTask(CompileTask):
|
|||||||
finishctx = TimestampNamed('node_finished_at')
|
finishctx = TimestampNamed('node_finished_at')
|
||||||
|
|
||||||
for idx, hook in enumerate(ordered_hooks, start=1):
|
for idx, hook in enumerate(ordered_hooks, start=1):
|
||||||
|
hook._event_status['started_at'] = datetime.utcnow().isoformat()
|
||||||
|
hook._event_status['node_status'] = RunningStatus.Started
|
||||||
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
|
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
|
||||||
extra_context)
|
extra_context)
|
||||||
|
|
||||||
@@ -360,19 +362,21 @@ class RunTask(CompileTask):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
status = 'OK'
|
|
||||||
|
|
||||||
with Timer() as timer:
|
with Timer() as timer:
|
||||||
if len(sql.strip()) > 0:
|
if len(sql.strip()) > 0:
|
||||||
status, _ = adapter.execute(sql, auto_begin=False,
|
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
|
||||||
fetch=False)
|
status = response._message
|
||||||
self.ran_hooks.append(hook)
|
else:
|
||||||
|
status = 'OK'
|
||||||
|
|
||||||
|
self.ran_hooks.append(hook)
|
||||||
|
hook._event_status['finished_at'] = datetime.utcnow().isoformat()
|
||||||
with finishctx, DbtModelState({'node_status': 'passed'}):
|
with finishctx, DbtModelState({'node_status': 'passed'}):
|
||||||
|
hook._event_status['node_status'] = RunStatus.Success
|
||||||
fire_event(
|
fire_event(
|
||||||
PrintHookEndLine(
|
PrintHookEndLine(
|
||||||
statement=hook_text,
|
statement=hook_text,
|
||||||
status=str(status),
|
status=status,
|
||||||
index=idx,
|
index=idx,
|
||||||
total=num_hooks,
|
total=num_hooks,
|
||||||
execution_time=timer.elapsed,
|
execution_time=timer.elapsed,
|
||||||
@@ -380,6 +384,11 @@ class RunTask(CompileTask):
|
|||||||
report_node_data=hook
|
report_node_data=hook
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
# `_event_status` dict is only used for logging. Make sure
|
||||||
|
# it gets deleted when we're done with it
|
||||||
|
del hook._event_status["started_at"]
|
||||||
|
del hook._event_status["finished_at"]
|
||||||
|
del hook._event_status["node_status"]
|
||||||
|
|
||||||
self._total_executed += len(ordered_hooks)
|
self._total_executed += len(ordered_hooks)
|
||||||
|
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ from dbt.parser.manifest import ManifestLoader
|
|||||||
import dbt.exceptions
|
import dbt.exceptions
|
||||||
from dbt import flags
|
from dbt import flags
|
||||||
import dbt.utils
|
import dbt.utils
|
||||||
|
from dbt.ui import warning_tag
|
||||||
|
|
||||||
RESULT_FILE_NAME = 'run_results.json'
|
RESULT_FILE_NAME = 'run_results.json'
|
||||||
MANIFEST_FILE_NAME = 'manifest.json'
|
MANIFEST_FILE_NAME = 'manifest.json'
|
||||||
@@ -208,7 +209,7 @@ class GraphRunnableTask(ManifestTask):
|
|||||||
with RUNNING_STATE, uid_context:
|
with RUNNING_STATE, uid_context:
|
||||||
startctx = TimestampNamed('node_started_at')
|
startctx = TimestampNamed('node_started_at')
|
||||||
index = self.index_offset(runner.node_index)
|
index = self.index_offset(runner.node_index)
|
||||||
runner.node._event_status['dbt_internal__started_at'] = datetime.utcnow().isoformat()
|
runner.node._event_status['started_at'] = datetime.utcnow().isoformat()
|
||||||
runner.node._event_status['node_status'] = RunningStatus.Started
|
runner.node._event_status['node_status'] = RunningStatus.Started
|
||||||
extended_metadata = ModelMetadata(runner.node, index)
|
extended_metadata = ModelMetadata(runner.node, index)
|
||||||
|
|
||||||
@@ -224,8 +225,7 @@ class GraphRunnableTask(ManifestTask):
|
|||||||
result = runner.run_with_hooks(self.manifest)
|
result = runner.run_with_hooks(self.manifest)
|
||||||
status = runner.get_result_status(result)
|
status = runner.get_result_status(result)
|
||||||
runner.node._event_status['node_status'] = result.status
|
runner.node._event_status['node_status'] = result.status
|
||||||
runner.node._event_status['dbt_internal__finished_at'] = \
|
runner.node._event_status['finished_at'] = datetime.utcnow().isoformat()
|
||||||
datetime.utcnow().isoformat()
|
|
||||||
finally:
|
finally:
|
||||||
finishctx = TimestampNamed('finished_at')
|
finishctx = TimestampNamed('finished_at')
|
||||||
with finishctx, DbtModelState(status):
|
with finishctx, DbtModelState(status):
|
||||||
@@ -238,8 +238,8 @@ class GraphRunnableTask(ManifestTask):
|
|||||||
)
|
)
|
||||||
# `_event_status` dict is only used for logging. Make sure
|
# `_event_status` dict is only used for logging. Make sure
|
||||||
# it gets deleted when we're done with it
|
# it gets deleted when we're done with it
|
||||||
del runner.node._event_status["dbt_internal__started_at"]
|
del runner.node._event_status["started_at"]
|
||||||
del runner.node._event_status["dbt_internal__finished_at"]
|
del runner.node._event_status["finished_at"]
|
||||||
del runner.node._event_status["node_status"]
|
del runner.node._event_status["node_status"]
|
||||||
|
|
||||||
fail_fast = flags.FAIL_FAST
|
fail_fast = flags.FAIL_FAST
|
||||||
@@ -377,10 +377,8 @@ class GraphRunnableTask(ManifestTask):
|
|||||||
num_threads = self.config.threads
|
num_threads = self.config.threads
|
||||||
target_name = self.config.target_name
|
target_name = self.config.target_name
|
||||||
|
|
||||||
text = "Concurrency: {} threads (target='{}')"
|
|
||||||
concurrency_line = text.format(num_threads, target_name)
|
|
||||||
with NodeCount(self.num_nodes):
|
with NodeCount(self.num_nodes):
|
||||||
fire_event(ConcurrencyLine(concurrency_line=concurrency_line))
|
fire_event(ConcurrencyLine(num_threads=num_threads, target_name=target_name))
|
||||||
with TextOnly():
|
with TextOnly():
|
||||||
fire_event(EmptyLine())
|
fire_event(EmptyLine())
|
||||||
|
|
||||||
@@ -461,8 +459,11 @@ class GraphRunnableTask(ManifestTask):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if len(self._flattened_nodes) == 0:
|
if len(self._flattened_nodes) == 0:
|
||||||
warn_or_error("\nWARNING: Nothing to do. Try checking your model "
|
with TextOnly():
|
||||||
"configs and model specification args")
|
fire_event(EmptyLine())
|
||||||
|
msg = "Nothing to do. Try checking your model " \
|
||||||
|
"configs and model specification args"
|
||||||
|
warn_or_error(msg, log_fmt=warning_tag('{}'))
|
||||||
result = self.get_result(
|
result = self.get_result(
|
||||||
results=[],
|
results=[],
|
||||||
generated_at=datetime.utcnow(),
|
generated_at=datetime.utcnow(),
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from dbt.include.global_project import DOCS_INDEX_FILE_PATH
|
|||||||
from http.server import SimpleHTTPRequestHandler
|
from http.server import SimpleHTTPRequestHandler
|
||||||
from socketserver import TCPServer
|
from socketserver import TCPServer
|
||||||
from dbt.events.functions import fire_event
|
from dbt.events.functions import fire_event
|
||||||
from dbt.events.types import ServingDocsPort, ServingDocsAccessInfo, ServingDocsExitInfo
|
from dbt.events.types import ServingDocsPort, ServingDocsAccessInfo, ServingDocsExitInfo, EmptyLine
|
||||||
|
|
||||||
from dbt.task.base import ConfiguredTask
|
from dbt.task.base import ConfiguredTask
|
||||||
|
|
||||||
@@ -22,6 +22,8 @@ class ServeTask(ConfiguredTask):
|
|||||||
|
|
||||||
fire_event(ServingDocsPort(address=address, port=port))
|
fire_event(ServingDocsPort(address=address, port=port))
|
||||||
fire_event(ServingDocsAccessInfo(port=port))
|
fire_event(ServingDocsAccessInfo(port=port))
|
||||||
|
fire_event(EmptyLine())
|
||||||
|
fire_event(EmptyLine())
|
||||||
fire_event(ServingDocsExitInfo())
|
fire_event(ServingDocsExitInfo())
|
||||||
|
|
||||||
# mypy doesn't think SimpleHTTPRequestHandler is ok here, but it is
|
# mypy doesn't think SimpleHTTPRequestHandler is ok here, but it is
|
||||||
|
|||||||
@@ -66,6 +66,4 @@ def line_wrap_message(
|
|||||||
|
|
||||||
|
|
||||||
def warning_tag(msg: str) -> str:
|
def warning_tag(msg: str) -> str:
|
||||||
# no longer needed, since new logging includes colorized log level
|
return f'[{yellow("WARNING")}]: {msg}'
|
||||||
# return f'[{yellow("WARNING")}]: {msg}'
|
|
||||||
return msg
|
|
||||||
|
|||||||
@@ -96,5 +96,5 @@ def _get_dbt_plugins_info():
|
|||||||
yield plugin_name, mod.version
|
yield plugin_name, mod.version
|
||||||
|
|
||||||
|
|
||||||
__version__ = '1.0.0rc3'
|
__version__ = '1.0.0'
|
||||||
installed = get_installed_version()
|
installed = get_installed_version()
|
||||||
|
|||||||
@@ -284,12 +284,12 @@ def parse_args(argv=None):
|
|||||||
parser.add_argument('adapter')
|
parser.add_argument('adapter')
|
||||||
parser.add_argument('--title-case', '-t', default=None)
|
parser.add_argument('--title-case', '-t', default=None)
|
||||||
parser.add_argument('--dependency', action='append')
|
parser.add_argument('--dependency', action='append')
|
||||||
parser.add_argument('--dbt-core-version', default='1.0.0rc3')
|
parser.add_argument('--dbt-core-version', default='1.0.0')
|
||||||
parser.add_argument('--email')
|
parser.add_argument('--email')
|
||||||
parser.add_argument('--author')
|
parser.add_argument('--author')
|
||||||
parser.add_argument('--url')
|
parser.add_argument('--url')
|
||||||
parser.add_argument('--sql', action='store_true')
|
parser.add_argument('--sql', action='store_true')
|
||||||
parser.add_argument('--package-version', default='1.0.0rc3')
|
parser.add_argument('--package-version', default='1.0.0')
|
||||||
parser.add_argument('--project-version', default='1.0')
|
parser.add_argument('--project-version', default='1.0')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--no-dependency', action='store_false', dest='set_dependency'
|
'--no-dependency', action='store_false', dest='set_dependency'
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ with open(os.path.join(this_directory, 'README.md')) as f:
|
|||||||
|
|
||||||
|
|
||||||
package_name = "dbt-core"
|
package_name = "dbt-core"
|
||||||
package_version = "1.0.0rc3"
|
package_version = "1.0.0"
|
||||||
description = """With dbt, data analysts and engineers can build analytics \
|
description = """With dbt, data analysts and engineers can build analytics \
|
||||||
the way engineers build applications."""
|
the way engineers build applications."""
|
||||||
|
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ cffi==1.15.0
|
|||||||
charset-normalizer==2.0.8
|
charset-normalizer==2.0.8
|
||||||
click==8.0.3
|
click==8.0.3
|
||||||
colorama==0.4.4
|
colorama==0.4.4
|
||||||
dbt-core==1.0.0rc3
|
dbt-core==1.0.0
|
||||||
dbt-extractor==0.4.0
|
dbt-extractor==0.4.0
|
||||||
dbt-postgres==1.0.0rc3
|
dbt-postgres==1.0.0
|
||||||
future==0.18.2
|
future==0.18.2
|
||||||
hologram==0.0.14
|
hologram==0.0.14
|
||||||
idna==3.3
|
idna==3.3
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
version = '1.0.0rc3'
|
version = '1.0.0'
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ def _dbt_psycopg2_name():
|
|||||||
|
|
||||||
|
|
||||||
package_name = "dbt-postgres"
|
package_name = "dbt-postgres"
|
||||||
package_version = "1.0.0rc3"
|
package_version = "1.0.0"
|
||||||
description = """The postgres adpter plugin for dbt (data build tool)"""
|
description = """The postgres adpter plugin for dbt (data build tool)"""
|
||||||
|
|
||||||
this_directory = os.path.abspath(os.path.dirname(__file__))
|
this_directory = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
|||||||
2
setup.py
2
setup.py
@@ -50,7 +50,7 @@ with open(os.path.join(this_directory, 'README.md')) as f:
|
|||||||
|
|
||||||
|
|
||||||
package_name = "dbt"
|
package_name = "dbt"
|
||||||
package_version = "1.0.0rc3"
|
package_version = "1.0.0"
|
||||||
description = """With dbt, data analysts and engineers can build analytics \
|
description = """With dbt, data analysts and engineers can build analytics \
|
||||||
the way engineers build applications."""
|
the way engineers build applications."""
|
||||||
|
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ class TestConfigPathDeprecation(BaseTestDeprecations):
|
|||||||
with self.assertRaises(dbt.exceptions.CompilationException) as exc:
|
with self.assertRaises(dbt.exceptions.CompilationException) as exc:
|
||||||
self.run_dbt(['--warn-error', 'debug'])
|
self.run_dbt(['--warn-error', 'debug'])
|
||||||
exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace
|
exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace
|
||||||
expected = "The `data-paths` config has been deprecated"
|
expected = "The `data-paths` config has been renamed"
|
||||||
assert expected in exc_str
|
assert expected in exc_str
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -41,9 +41,17 @@ def temporary_working_directory() -> str:
|
|||||||
out : str
|
out : str
|
||||||
The temporary working directory.
|
The temporary working directory.
|
||||||
"""
|
"""
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
# N.B: supressing the OSError is necessary for older (pre 3.10) versions of python
|
||||||
with change_working_directory(tmpdir):
|
# which do not support the `ignore_cleanup_errors` in tempfile::TemporaryDirectory.
|
||||||
yield tmpdir
|
# See: https://github.com/python/cpython/pull/24793
|
||||||
|
#
|
||||||
|
# In our case the cleanup is redundent since windows handles clearing
|
||||||
|
# Appdata/Local/Temp at the os level anyway.
|
||||||
|
|
||||||
|
with contextlib.suppress(OSError):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
with change_working_directory(tmpdir):
|
||||||
|
yield tmpdir
|
||||||
|
|
||||||
|
|
||||||
def get_custom_profiles_config(database_host, custom_schema):
|
def get_custom_profiles_config(database_host, custom_schema):
|
||||||
|
|||||||
@@ -34,15 +34,3 @@ class TestStatements(DBTIntegrationTest):
|
|||||||
self.assertEqual(len(results), 1)
|
self.assertEqual(len(results), 1)
|
||||||
|
|
||||||
self.assertTablesEqual("statement_actual", "statement_expected")
|
self.assertTablesEqual("statement_actual", "statement_expected")
|
||||||
|
|
||||||
@use_profile("presto")
|
|
||||||
def test_presto_statements(self):
|
|
||||||
self.use_default_project({"seed-paths": [self.dir("seed")]})
|
|
||||||
|
|
||||||
results = self.run_dbt(["seed"])
|
|
||||||
self.assertEqual(len(results), 2)
|
|
||||||
results = self.run_dbt()
|
|
||||||
self.assertEqual(len(results), 1)
|
|
||||||
|
|
||||||
self.assertTablesEqual("statement_actual", "statement_expected")
|
|
||||||
|
|
||||||
|
|||||||
204
test/interop/log_parsing/Cargo.lock
generated
Normal file
204
test/interop/log_parsing/Cargo.lock
generated
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "autocfg"
|
||||||
|
version = "1.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "chrono"
|
||||||
|
version = "0.4.19"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"num-integer",
|
||||||
|
"num-traits",
|
||||||
|
"serde",
|
||||||
|
"time",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "0.4.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libc"
|
||||||
|
version = "0.2.108"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "log_parsing"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"walkdir",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-integer"
|
||||||
|
version = "0.1.44"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-traits"
|
||||||
|
version = "0.2.14"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro2"
|
||||||
|
version = "1.0.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-xid",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quote"
|
||||||
|
version = "1.0.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ryu"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "same-file"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||||
|
dependencies = [
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde"
|
||||||
|
version = "1.0.130"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
|
||||||
|
dependencies = [
|
||||||
|
"serde_derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_derive"
|
||||||
|
version = "1.0.130"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.72"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "syn"
|
||||||
|
version = "1.0.82"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"unicode-xid",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "time"
|
||||||
|
version = "0.1.44"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"wasi",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-xid"
|
||||||
|
version = "0.2.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "walkdir"
|
||||||
|
version = "2.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
|
||||||
|
dependencies = [
|
||||||
|
"same-file",
|
||||||
|
"winapi",
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.10.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi"
|
||||||
|
version = "0.3.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
|
||||||
|
dependencies = [
|
||||||
|
"winapi-i686-pc-windows-gnu",
|
||||||
|
"winapi-x86_64-pc-windows-gnu",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-i686-pc-windows-gnu"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-util"
|
||||||
|
version = "0.1.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
|
||||||
|
dependencies = [
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-x86_64-pc-windows-gnu"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||||
10
test/interop/log_parsing/Cargo.toml
Normal file
10
test/interop/log_parsing/Cargo.toml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
[package]
|
||||||
|
name = "log_parsing"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = { version = "1.0" }
|
||||||
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
walkdir = "2"
|
||||||
260
test/interop/log_parsing/src/main.rs
Normal file
260
test/interop/log_parsing/src/main.rs
Normal file
@@ -0,0 +1,260 @@
|
|||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::env;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{self, BufRead};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
|
||||||
|
// Applies schema tests to file input
|
||||||
|
// if these fail, we either have a problem in dbt that needs to be resolved
|
||||||
|
// or we have changed our interface and the log_version should be bumped in dbt,
|
||||||
|
// modeled appropriately here, and publish new docs for the new log_version.
|
||||||
|
fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
let log_name = "dbt.log";
|
||||||
|
let path = env::var("LOG_DIR").expect("must pass absolute log path to tests with env var `LOG_DIR=/logs/live/here/`");
|
||||||
|
|
||||||
|
println!("Looking for files named `{}` in {}", log_name, path);
|
||||||
|
let lines: Vec<String> = get_input(&path, log_name)?;
|
||||||
|
println!("collected {} log lines.", lines.len());
|
||||||
|
|
||||||
|
println!("");
|
||||||
|
|
||||||
|
println!("testing type-level schema compliance by deserializing each line...");
|
||||||
|
let log_lines: Vec<LogLine> = deserialized_input(&lines)
|
||||||
|
.map_err(|e| format!("schema test failure: json doesn't match type definition\n{}", e))?;
|
||||||
|
println!("Done.");
|
||||||
|
|
||||||
|
println!("");
|
||||||
|
println!("because we skip non-json log lines, there are {} collected values to test.", log_lines.len());
|
||||||
|
println!("");
|
||||||
|
|
||||||
|
// make sure when we read a string in then output it back to a string the two strings
|
||||||
|
// contain all the same key-value pairs.
|
||||||
|
println!("testing serialization loop to make sure all key-value pairs are accounted for");
|
||||||
|
test_deserialize_serialize_is_unchanged(&lines);
|
||||||
|
println!("Done.");
|
||||||
|
|
||||||
|
println!("");
|
||||||
|
|
||||||
|
// make sure each log_line contains the values we expect
|
||||||
|
println!("testing that the field values in each log line are expected");
|
||||||
|
for log_line in log_lines {
|
||||||
|
log_line.value_test()
|
||||||
|
}
|
||||||
|
println!("Done.");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// each nested type of LogLine should define its own value_test function
|
||||||
|
// that asserts values are within an expected set of values when possible.
|
||||||
|
trait ValueTest {
|
||||||
|
fn value_test(&self) -> ();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
|
struct LogLine {
|
||||||
|
log_version: isize,
|
||||||
|
r#type: String,
|
||||||
|
code: String,
|
||||||
|
#[serde(with = "custom_date_format")]
|
||||||
|
ts: DateTime<Utc>,
|
||||||
|
pid: isize,
|
||||||
|
msg: String,
|
||||||
|
level: String,
|
||||||
|
invocation_id: String,
|
||||||
|
thread_name: String,
|
||||||
|
data: serde_json::Value, // TODO be more specific
|
||||||
|
node_info: serde_json::Value, // TODO be more specific
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValueTest for LogLine {
|
||||||
|
fn value_test(&self){
|
||||||
|
assert_eq!(
|
||||||
|
self.log_version, 1,
|
||||||
|
"The log version changed. Be sure this was intentional."
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
self.r#type,
|
||||||
|
"log_line".to_owned(),
|
||||||
|
"The type value has changed. If this is intentional, bump the log version"
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
["debug", "info", "warn", "error"]
|
||||||
|
.iter()
|
||||||
|
.any(|level| **level == self.level),
|
||||||
|
"log level had unexpected value {}",
|
||||||
|
self.level
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// logs output timestamps like this: "2021-11-30T12:31:04.312814Z"
|
||||||
|
// which is so close to the default except for the decimal.
|
||||||
|
// this requires handling the date with "%Y-%m-%dT%H:%M:%S%.6f" which requires this
|
||||||
|
// boilerplate-looking module.
|
||||||
|
mod custom_date_format {
|
||||||
|
use chrono::{NaiveDateTime, DateTime, Utc};
|
||||||
|
use serde::{self, Deserialize, Deserializer, Serializer};
|
||||||
|
|
||||||
|
const FORMAT: &'static str = "%Y-%m-%dT%H:%M:%S%.6fZ";
|
||||||
|
|
||||||
|
pub fn serialize<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let s = format!("{}", date.format(FORMAT));
|
||||||
|
serializer.serialize_str(&s)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
Ok(DateTime::<Utc>::from_utc(NaiveDateTime::parse_from_str(&s, FORMAT).map_err(serde::de::Error::custom)?, Utc))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finds all files in any subdirectory of this path with this name. returns the contents
|
||||||
|
// of each file line by line as one continuous structure. No distinction between files.
|
||||||
|
fn get_input(path: &str, file_name: &str) -> Result<Vec<String>, String> {
|
||||||
|
WalkDir::new(path)
|
||||||
|
.follow_links(true)
|
||||||
|
.into_iter()
|
||||||
|
// filters out all the exceptions encountered on this walk silently
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
// walks through each file and returns the contents if the filename matches
|
||||||
|
.filter_map(|e| {
|
||||||
|
let f_name = e.file_name().to_string_lossy();
|
||||||
|
if f_name.ends_with(file_name) {
|
||||||
|
let contents = File::open(e.path())
|
||||||
|
.map_err(|e| format!("Something went wrong opening the log file {}\n{}", f_name, e))
|
||||||
|
.and_then(|file| {
|
||||||
|
io::BufReader::new(file)
|
||||||
|
.lines()
|
||||||
|
.map(|l| {
|
||||||
|
l.map_err(|e| format!("Something went wrong reading lines of the log file {}\n{}", f_name, e))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<String>, String>>()
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(contents)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<Vec<String>>, String>>()
|
||||||
|
.map(|vv| vv.concat())
|
||||||
|
}
|
||||||
|
|
||||||
|
// attemps to deserialize the strings into LogLines. If the string isn't valid
|
||||||
|
// json it skips it instead of failing. This is so that any tests that generate
|
||||||
|
// non-json logs won't break the schema test.
|
||||||
|
fn deserialized_input(log_lines: &[String]) -> serde_json::Result<Vec<LogLine>> {
|
||||||
|
log_lines
|
||||||
|
.into_iter()
|
||||||
|
// if the log line isn't valid json format, toss it
|
||||||
|
.filter(|log_line| serde_json::from_str::<serde_json::Value>(log_line).is_ok())
|
||||||
|
// attempt to deserialize into our LogLine type
|
||||||
|
.map(|log_line| serde_json::from_str::<LogLine>(log_line))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// turn a String into a LogLine and back into a String returning both Strings so
|
||||||
|
// they can be compared
|
||||||
|
fn deserialize_serialize_loop(
|
||||||
|
log_lines: &[String],
|
||||||
|
) -> serde_json::Result<Vec<(String, String)>> {
|
||||||
|
log_lines
|
||||||
|
.into_iter()
|
||||||
|
.map(|log_line| {
|
||||||
|
serde_json::from_str::<LogLine>(log_line).and_then(|parsed| {
|
||||||
|
serde_json::to_string(&parsed).map(|json| (log_line.clone(), json))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure when we read a string in then output it back to a string the two strings
|
||||||
|
// contain all the same key-value pairs.
|
||||||
|
fn test_deserialize_serialize_is_unchanged(lines: &[String]) {
|
||||||
|
let objects: Result<Vec<(serde_json::Value, serde_json::Value)>, serde_json::Error> =
|
||||||
|
deserialize_serialize_loop(lines).and_then(|v| {
|
||||||
|
v.into_iter()
|
||||||
|
.map(|(s0, s1)| {
|
||||||
|
serde_json::from_str::<serde_json::Value>(&s0).and_then(|s0v| {
|
||||||
|
serde_json::from_str::<serde_json::Value>(&s1).map(|s1v| (s0v, s1v))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
});
|
||||||
|
|
||||||
|
match objects {
|
||||||
|
Err(e) => assert!(false, "{}", e),
|
||||||
|
Ok(v) => {
|
||||||
|
for pair in v {
|
||||||
|
match pair {
|
||||||
|
(
|
||||||
|
serde_json::Value::Object(original),
|
||||||
|
serde_json::Value::Object(looped),
|
||||||
|
) => {
|
||||||
|
// looping through each key of each json value gives us meaningful failure messages
|
||||||
|
// instead of "this big string" != "this other big string"
|
||||||
|
for (key, value) in original.clone() {
|
||||||
|
let looped_val = looped.get(&key);
|
||||||
|
assert_eq!(
|
||||||
|
looped_val,
|
||||||
|
Some(&value),
|
||||||
|
"original key value ({}, {}) expected in re-serialized result",
|
||||||
|
key,
|
||||||
|
value
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for (key, value) in looped.clone() {
|
||||||
|
let original_val = original.get(&key);
|
||||||
|
assert_eq!(
|
||||||
|
original_val,
|
||||||
|
Some(&value),
|
||||||
|
"looped key value ({}, {}) not found in original result",
|
||||||
|
key,
|
||||||
|
value
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => assert!(false, "not comparing json objects"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::*;
|
||||||
|
|
||||||
|
const LOG_LINE: &str = r#"{"code": "Z023", "data": {"stats": {"error": 0, "pass": 3, "skip": 0, "total": 3, "warn": 0}}, "invocation_id": "f1e1557c-4f9d-4053-bb50-572cbbf2ca64", "level": "info", "log_version": 1, "msg": "Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3", "node_info": {}, "pid": 75854, "thread_name": "MainThread", "ts": "2021-12-03T01:32:38.334601Z", "type": "log_line"}"#;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic_loop() {
|
||||||
|
assert!(deserialize_serialize_loop(&[LOG_LINE.to_owned()]).is_ok())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_values() {
|
||||||
|
assert!(deserialized_input(&[LOG_LINE.to_owned()]).map(|v| {
|
||||||
|
v.into_iter().map(|ll| ll.value_test())
|
||||||
|
}).is_ok())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_values_loop() {
|
||||||
|
test_deserialize_serialize_is_unchanged(&[LOG_LINE.to_owned()]);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,19 +1,20 @@
|
|||||||
from dbt import events
|
|
||||||
from dbt.events.functions import EVENT_HISTORY, fire_event
|
|
||||||
from dbt.events.test_types import UnitTestInfo
|
from dbt.events.test_types import UnitTestInfo
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
from dbt.events import AdapterLogger
|
from dbt.events import AdapterLogger
|
||||||
from dbt.events.functions import event_to_serializable_dict
|
from dbt.events.functions import event_to_serializable_dict
|
||||||
from dbt.events.types import *
|
from dbt.events.types import *
|
||||||
from dbt.events.test_types import *
|
from dbt.events.test_types import *
|
||||||
from dbt.events.base_types import Event, Node
|
from dbt.events.base_types import Event
|
||||||
from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
|
from dbt.events.stubs import _CachedRelation, BaseRelation, _ReferenceKey, ParsedModelNode
|
||||||
|
from importlib import reload
|
||||||
|
import dbt.events.functions as event_funcs
|
||||||
|
import dbt.flags as flags
|
||||||
import inspect
|
import inspect
|
||||||
import json
|
import json
|
||||||
import datetime
|
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from dbt.contracts.graph.parsed import (
|
from dbt.contracts.graph.parsed import (
|
||||||
ParsedModelNode, NodeConfig, DependsOn, ParsedMacro
|
ParsedModelNode, NodeConfig, DependsOn
|
||||||
)
|
)
|
||||||
from dbt.contracts.files import FileHash
|
from dbt.contracts.files import FileHash
|
||||||
|
|
||||||
@@ -88,29 +89,29 @@ class TestEventCodes(TestCase):
|
|||||||
|
|
||||||
class TestEventBuffer(TestCase):
|
class TestEventBuffer(TestCase):
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
flags.EVENT_BUFFER_SIZE = 10
|
||||||
|
reload(event_funcs)
|
||||||
|
|
||||||
# ensure events are populated to the buffer exactly once
|
# ensure events are populated to the buffer exactly once
|
||||||
def test_buffer_populates(self):
|
def test_buffer_populates(self):
|
||||||
fire_event(UnitTestInfo(msg="Test Event 1"))
|
event_funcs.fire_event(UnitTestInfo(msg="Test Event 1"))
|
||||||
fire_event(UnitTestInfo(msg="Test Event 2"))
|
event_funcs.fire_event(UnitTestInfo(msg="Test Event 2"))
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
|
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 1
|
||||||
)
|
)
|
||||||
|
|
||||||
# ensure events drop from the front of the buffer when buffer maxsize is reached
|
# ensure events drop from the front of the buffer when buffer maxsize is reached
|
||||||
# TODO commenting out till we can make this not spit out 100k log lines.
|
def test_buffer_FIFOs(self):
|
||||||
# def test_buffer_FIFOs(self):
|
for n in range(0,(flags.EVENT_BUFFER_SIZE + 1)):
|
||||||
# for n in range(0,100001):
|
event_funcs.fire_event(UnitTestInfo(msg=f"Test Event {n}"))
|
||||||
# fire_event(UnitTestInfo(msg=f"Test Event {n}"))
|
|
||||||
# self.assertTrue(
|
|
||||||
# EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
|
|
||||||
# )
|
|
||||||
# self.assertTrue(
|
|
||||||
# EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
|
|
||||||
# )
|
|
||||||
|
|
||||||
def dump_callable():
|
|
||||||
return dict()
|
|
||||||
|
|
||||||
|
self.assertTrue(
|
||||||
|
event_funcs.EVENT_HISTORY.count(EventBufferFull(code='Z048')) == 1
|
||||||
|
)
|
||||||
|
self.assertTrue(
|
||||||
|
event_funcs.EVENT_HISTORY.count(UnitTestInfo(msg='Test Event 1', code='T006')) == 0
|
||||||
|
)
|
||||||
|
|
||||||
def MockNode():
|
def MockNode():
|
||||||
return ParsedModelNode(
|
return ParsedModelNode(
|
||||||
@@ -195,8 +196,8 @@ sample_values = [
|
|||||||
SQLQueryStatus(status="", elapsed=0.1),
|
SQLQueryStatus(status="", elapsed=0.1),
|
||||||
SQLCommit(conn_name=""),
|
SQLCommit(conn_name=""),
|
||||||
ColTypeChange(orig_type="", new_type="", table=""),
|
ColTypeChange(orig_type="", new_type="", table=""),
|
||||||
SchemaCreation(relation=BaseRelation()),
|
SchemaCreation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||||
SchemaDrop(relation=BaseRelation()),
|
SchemaDrop(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||||
UncachedRelation(
|
UncachedRelation(
|
||||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
@@ -205,7 +206,7 @@ sample_values = [
|
|||||||
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
dep_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
ref_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
),
|
),
|
||||||
AddRelation(relation=_CachedRelation()),
|
AddRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||||
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
DropMissingRelation(relation=_ReferenceKey(database="", schema="", identifier="")),
|
||||||
DropCascade(
|
DropCascade(
|
||||||
dropped=_ReferenceKey(database="", schema="", identifier=""),
|
dropped=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
@@ -221,15 +222,13 @@ sample_values = [
|
|||||||
old_key=_ReferenceKey(database="", schema="", identifier=""),
|
old_key=_ReferenceKey(database="", schema="", identifier=""),
|
||||||
new_key=_ReferenceKey(database="", schema="", identifier="")
|
new_key=_ReferenceKey(database="", schema="", identifier="")
|
||||||
),
|
),
|
||||||
DumpBeforeAddGraph(dump_callable),
|
DumpBeforeAddGraph(dict()),
|
||||||
DumpAfterAddGraph(dump_callable),
|
DumpAfterAddGraph(dict()),
|
||||||
DumpBeforeRenameSchema(dump_callable),
|
DumpBeforeRenameSchema(dict()),
|
||||||
DumpAfterRenameSchema(dump_callable),
|
DumpAfterRenameSchema(dict()),
|
||||||
AdapterImportError(ModuleNotFoundError()),
|
AdapterImportError(ModuleNotFoundError()),
|
||||||
PluginLoadError(),
|
PluginLoadError(),
|
||||||
SystemReportReturnCode(returncode=0),
|
SystemReportReturnCode(returncode=0),
|
||||||
SelectorAlertUpto3UnusedNodes(node_names=[]),
|
|
||||||
SelectorAlertAllUnusedNodes(node_names=[]),
|
|
||||||
NewConnectionOpening(connection_state=''),
|
NewConnectionOpening(connection_state=''),
|
||||||
TimingInfoCollected(),
|
TimingInfoCollected(),
|
||||||
MergedFromState(nbr_merged=0, sample=[]),
|
MergedFromState(nbr_merged=0, sample=[]),
|
||||||
@@ -275,8 +274,6 @@ sample_values = [
|
|||||||
PartialParsingDeletedExposure(unique_id=''),
|
PartialParsingDeletedExposure(unique_id=''),
|
||||||
InvalidDisabledSourceInTestNode(msg=''),
|
InvalidDisabledSourceInTestNode(msg=''),
|
||||||
InvalidRefInTestNode(msg=''),
|
InvalidRefInTestNode(msg=''),
|
||||||
MessageHandleGenericException(build_path='', unique_id='', exc=Exception('')),
|
|
||||||
DetailsHandleGenericException(),
|
|
||||||
RunningOperationCaughtError(exc=Exception('')),
|
RunningOperationCaughtError(exc=Exception('')),
|
||||||
RunningOperationUncaughtError(exc=Exception('')),
|
RunningOperationUncaughtError(exc=Exception('')),
|
||||||
DbtProjectError(),
|
DbtProjectError(),
|
||||||
@@ -289,7 +286,7 @@ sample_values = [
|
|||||||
ProfileHelpMessage(),
|
ProfileHelpMessage(),
|
||||||
CatchableExceptionOnRun(exc=Exception('')),
|
CatchableExceptionOnRun(exc=Exception('')),
|
||||||
InternalExceptionOnRun(build_path='', exc=Exception('')),
|
InternalExceptionOnRun(build_path='', exc=Exception('')),
|
||||||
GenericExceptionOnRun(build_path='', unique_id='', exc=Exception('')),
|
GenericExceptionOnRun(build_path='', unique_id='', exc=''),
|
||||||
NodeConnectionReleaseError(node_name='', exc=Exception('')),
|
NodeConnectionReleaseError(node_name='', exc=Exception('')),
|
||||||
CheckCleanPath(path=''),
|
CheckCleanPath(path=''),
|
||||||
ConfirmCleanPath(path=''),
|
ConfirmCleanPath(path=''),
|
||||||
@@ -359,7 +356,7 @@ sample_values = [
|
|||||||
NodeExecuting(unique_id='', report_node_data=MockNode()),
|
NodeExecuting(unique_id='', report_node_data=MockNode()),
|
||||||
NodeFinished(unique_id='', report_node_data=MockNode(), run_result=''),
|
NodeFinished(unique_id='', report_node_data=MockNode(), run_result=''),
|
||||||
QueryCancelationUnsupported(type=''),
|
QueryCancelationUnsupported(type=''),
|
||||||
ConcurrencyLine(concurrency_line=''),
|
ConcurrencyLine(num_threads=0, target_name=''),
|
||||||
StarterProjectPath(dir=''),
|
StarterProjectPath(dir=''),
|
||||||
ConfigFolderDirectory(dir=''),
|
ConfigFolderDirectory(dir=''),
|
||||||
NoSampleProfileFound(adapter=''),
|
NoSampleProfileFound(adapter=''),
|
||||||
@@ -395,8 +392,6 @@ sample_values = [
|
|||||||
MainReportArgs(Namespace()),
|
MainReportArgs(Namespace()),
|
||||||
RegistryProgressMakingGETRequest(''),
|
RegistryProgressMakingGETRequest(''),
|
||||||
DepsUTD(),
|
DepsUTD(),
|
||||||
CatchRunException('', Exception('')),
|
|
||||||
HandleInternalException(Exception('')),
|
|
||||||
PartialParsingNotEnabled(),
|
PartialParsingNotEnabled(),
|
||||||
SQlRunnerException(Exception('')),
|
SQlRunnerException(Exception('')),
|
||||||
DropRelation(''),
|
DropRelation(''),
|
||||||
@@ -429,7 +424,7 @@ class TestEventJSONSerialization(TestCase):
|
|||||||
|
|
||||||
# if we have everything we need to test, try to serialize everything
|
# if we have everything we need to test, try to serialize everything
|
||||||
for event in sample_values:
|
for event in sample_values:
|
||||||
d = event_to_serializable_dict(event, lambda dt: dt.isoformat(), lambda x: x.message())
|
d = event_to_serializable_dict(event, lambda _: event.get_ts_rfc3339(), lambda x: x.message())
|
||||||
try:
|
try:
|
||||||
json.dumps(d)
|
json.dumps(d)
|
||||||
except TypeError as e:
|
except TypeError as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user