Compare commits

...

41 Commits

Author SHA1 Message Date
Michelle Ark
60f87411d5 Merge branch 'feature/decouple-adapters-from-core' into macro-resolver-remove-lazy-loading 2023-12-07 08:27:45 +09:00
Michelle Ark
eb96e3deec Add RelationConfig Protocol for use in Relation.create_from (#9210)
* move relation contract to dbt.adapters

* changelog entry

* first pass: clean up relation.create_from

* type ignores

* type ignore

* changelog entry

* update RelationConfig variable names
2023-12-06 10:46:45 -08:00
Michelle Ark
f68af070f3 remove manifest from adapter.execute_macro, replace with MacroResolver + remove lazy loading 2023-12-06 23:27:42 +09:00
Michelle Ark
7ad1accf2b Merge branch 'feature/decouple-adapters-from-core' into relation-create-from-refactor 2023-12-06 11:19:16 +09:00
Michelle Ark
ed8f5d38e4 Remove ResultNode usage from connections (#9211) 2023-12-06 11:04:26 +09:00
Michelle Ark
7ad6aa18da update RelationConfig variable names 2023-12-06 11:02:39 +09:00
Michelle Ark
6796edd66e Merge branch 'feature/decouple-adapters-from-core' into relation-create-from-refactor 2023-12-06 10:58:41 +09:00
Michelle Ark
e01eb30884 Remove usage of dbt.contracts.relation in dbt/adapters (#9207) 2023-12-06 10:34:56 +09:00
Michelle Ark
ba53f053fd changelog entry 2023-12-05 17:07:32 +09:00
Michelle Ark
b8de881ed3 type ignore 2023-12-05 17:06:49 +09:00
Michelle Ark
d7d5e2335c type ignores 2023-12-05 15:41:13 +09:00
Michelle Ark
160d0db238 first pass: clean up relation.create_from 2023-12-05 15:03:16 +09:00
Michelle Ark
2cee8652a6 changelog entry 2023-12-05 12:06:06 +09:00
Michelle Ark
7f777f8a42 move relation contract to dbt.adapters 2023-12-05 12:03:16 +09:00
Michelle Ark
00f49206e9 Decouple macro generator from adapters (#9149) 2023-12-05 10:51:51 +09:00
Michelle Ark
1bca662883 first pass: adapter migration script (#9160) 2023-12-05 08:41:11 +09:00
colin-rogers-dbt
41ac915949 Refactor EventManager setup and interaction (#9180)
* moving types_pb2.py to common/events

* move event manager setup back to core, remove ref to global EVENT_MANAGER and clean up event manager functions

* move invocation_id from events to first class common concept

* move lowercase utils to common

* move lowercase utils to common

* ref CAPTURE_STREAM through method

* add changie
2023-12-01 08:48:19 -08:00
Mila Page
373125ecb8 Move the semver package to common and alter references. (#9166)
* Move the semver package to common and alter references.

* Alter leftover references to dbt.semver, this time using from syntax.

---------

Co-authored-by: Mila Page <versusfacit@users.noreply.github.com>
2023-11-29 11:03:13 -08:00
Michelle Ark
294ad82e50 delete accidentally merged types_pb2.py 2023-11-28 20:20:59 -05:00
Michelle Ark
12bd1e87fb Merge branch 'main' into feature/decouple-adapters-from-core 2023-11-28 20:08:33 -05:00
colin-rogers-dbt
8bad75c65b Move adapter logger to adapters (#9165)
* moving types_pb2.py to common/events

* Move AdapterLogger to adapter folder

* add changie
2023-11-28 15:05:30 -08:00
Michelle Ark
220f56d8d2 remove adapter.get_compiler (#9134) 2023-11-28 09:27:00 -05:00
Michelle Ark
615ad1fe2d move include/global_project to adapters (#8930) 2023-11-27 18:14:02 -05:00
Peter Webb
2ab0f7b26b Decouple adapter constraints from core (#9054)
* Move constraints to dbt.common

* Move constraints to contracts folder, per review

* Add a changelog entry.
2023-11-21 12:16:39 -05:00
Michelle Ark
e56a5dae8b Remove usage of dbt.deprecations in dbt/adapters, enable core & adapter-specific (#9051) 2023-11-20 17:26:11 -05:00
Michelle Ark
1d0a3e92c8 Merge branch 'main' into feature/decouple-adapters-from-core 2023-11-16 11:28:51 -05:00
colin-rogers-dbt
51b94b26cc Refactor Base Exceptions (#8989)
* moving types_pb2.py to common/events

* Refactor Base Exceptions

* update make_log_dir_if_missing to handle str

* move remaining adapters exception imports to common/adapters
---------

Co-authored-by: Michelle Ark <michelle.ark@dbtlabs.com>
2023-11-10 09:54:36 -05:00
Michelle Ark
4ee950427a Merge branch 'main' into feature/decouple-adapters-from-core 2023-11-07 11:38:24 -05:00
Michelle Ark
c4ff280436 remove dbt.flags.LOG_CACHE_EVENTS usage in dbt/adapters (#8933) 2023-11-01 18:44:59 -04:00
Michelle Ark
1260782bd2 remove dbt.flags.MP_CONTEXT usage in dbt/adapters (#8931) 2023-11-01 16:13:39 -04:00
Michelle Ark
333120b111 Merge branch 'main' into feature/decouple-adapters-from-core 2023-11-01 12:00:17 -04:00
Michelle Ark
af916666a2 move agate_helper into common (#8911)
* move agate_helper into common

* add changie

---------

Co-authored-by: Colin <colin.rogers@dbtlabs.com>
2023-10-26 16:22:04 -04:00
Michelle Ark
7de8930d1d move agate_helper unit tests under tests/unit/common 2023-10-26 16:19:13 -04:00
Michelle Ark
200bcdcd9f Merge branch 'move-agate-client-to-common' into feature/decouple-adapters-from-core 2023-10-26 16:18:30 -04:00
Michelle Ark
b9a603e3aa remove usage of dbt.config.PartialProject from dbt/adapters (#8909)
* remove usage of dbt.config.PartialProject from dbt/adapters

* add changie

---------

Co-authored-by: Colin <colin.rogers@dbtlabs.com>
2023-10-26 16:04:39 -04:00
colin-rogers-dbt
1a825484fb Add utils module (#8910)
* moving types_pb2.py to common/events

* split out utils into core/common/adapters

* add changie
2023-10-26 15:59:07 -04:00
Michelle Ark
f44d704801 move agate_helper into common 2023-10-25 17:05:08 -04:00
Michelle Ark
dbd02e54c2 move types_pb2.py from events to common/events 2023-10-25 15:46:41 -04:00
Michelle Ark
a89642a6f9 fix import 2023-10-25 15:04:17 -04:00
colin-rogers-dbt
c141148616 Move events to common (#8676)
* Move events to common

* More Type Annotations (#8536)

* Extend use of type annotations in the events module.

* Add return type of None to more __init__ definitions.

* Still more type annotations adding -> None to __init__

* Tweak per review

* Allow adapters to include python package logging in dbt logs (#8643)

* add set_package_log_level functionality

* set package handler

* set package handler

* add logging about stting up logging

* test event log handler

* add event log handler

* add event log level

* rename package and add unit tests

* revert logfile config change

* cleanup and add code comments

* add changie

* swap function for dict

* add additional unit tests

* fix unit test

* update README and protos

* fix formatting

* update precommit

---------

Co-authored-by: Peter Webb <peter.webb@dbtlabs.com>
2023-10-25 14:57:02 -04:00
Michelle Ark
469a9aca06 remove dbt.contracts.connection imports from adapter module 2023-10-25 14:56:55 -04:00
386 changed files with 8558 additions and 7621 deletions

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Remove adapter.get_compiler interface
time: 2023-11-27T11:47:57.443202-05:00
custom:
Author: michelleark
Issue: "9148"

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Move AdapterLogger to adapters folder
time: 2023-11-28T13:43:56.853925-08:00
custom:
Author: colin-rogers-dbt
Issue: "9151"

View File

@@ -0,0 +1,7 @@
kind: Breaking Changes
body: move event manager setup back to core, remove ref to global EVENT_MANAGER and
clean up event manager functions
time: 2023-11-30T13:53:48.645192-08:00
custom:
Author: colin-rogers-dbt
Issue: "9150"

View File

@@ -0,0 +1,6 @@
kind: Features
body: 'Allow adapters to include package logs in dbt standard logging '
time: 2023-09-15T12:37:33.862862-07:00
custom:
Author: colin-rogers-dbt
Issue: "7859"

View File

@@ -0,0 +1,6 @@
kind: Features
body: migrate utils to common and adapters folders
time: 2023-10-26T11:08:21.458709-07:00
custom:
Author: colin-rogers-dbt
Issue: "8924"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Move Agate helper client into common
time: 2023-10-26T12:35:56.538587-07:00
custom:
Author: MichelleArk
Issue: "8926"

View File

@@ -0,0 +1,6 @@
kind: Features
body: remove usage of dbt.config.PartialProject from dbt/adapters
time: 2023-10-26T12:39:13.904116-07:00
custom:
Author: MichelleArk
Issue: "8928"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Remove legacy logger
time: 2023-11-07T13:56:35.186648-08:00
custom:
Author: colin-rogers-dbt
Issue: "8027"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Added more type annotations.
time: 2023-08-31T16:44:35.737954-04:00
custom:
Author: peterallenwebb
Issue: "8537"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usage of dbt.include.global_project in dbt/adapters
time: 2023-10-26T18:49:53.36449-04:00
custom:
Author: michelleark
Issue: "8925"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: remove dbt.flags.MP_CONTEXT usage in dbt/adapters
time: 2023-11-01T10:27:58.790153-04:00
custom:
Author: michelleark
Issue: "8967"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: 'Remove usage of dbt.flags.LOG_CACHE_EVENTS in dbt/adapters'
time: 2023-11-01T17:31:24.974093-04:00
custom:
Author: michelleark
Issue: "8969"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove use of dbt/core exceptions in dbt/adapter
time: 2023-11-07T13:57:28.683727-08:00
custom:
Author: colin-rogers-dbt MichelleArk
Issue: "8920"

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Remove usage of dbt.deprecations in dbt/adapters, enable core & adapter-specific
event types and protos
time: 2023-11-16T17:42:51.005023-05:00
custom:
Author: michelleark
Issue: 8927 8918

View File

@@ -0,0 +1,7 @@
kind: Under the Hood
body: Move column constraints into common/contracts, removing another dependency of
adapters on core.
time: 2023-11-20T18:32:14.859503-05:00
custom:
Author: peterallenwebb
Issue: "9024"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Move dbt.semver to dbt.common.semver and update references.
time: 2023-11-28T17:07:32.172421-08:00
custom:
Author: versusfacit
Issue: "9039"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Move lowercase utils method to common
time: 2023-11-30T13:54:32.561673-08:00
custom:
Author: colin-rogers-dbt
Issue: "9180"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usages of dbt.clients.jinja in dbt/adapters
time: 2023-12-05T09:35:44.845352+09:00
custom:
Author: michelleark
Issue: "9205"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usage of dbt.contracts in dbt/adapters
time: 2023-12-05T12:05:59.936775+09:00
custom:
Author: michelleark
Issue: "9208"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usage of dbt.contracts.graph.nodes.ResultNode in dbt/adapters
time: 2023-12-05T16:58:12.932172+09:00
custom:
Author: michelleark
Issue: "9214"

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Introduce RelationConfig Protocol, consolidate Relation.create_from
time: 2023-12-05T17:07:25.33861+09:00
custom:
Author: michelleark
Issue: "9215"

View File

@@ -10,3 +10,5 @@ ignore =
E741
E501 # long line checking is done in black
exclude = test/
per-file-ignores =
*/__init__.py: F401

2
.gitattributes vendored
View File

@@ -1,4 +1,4 @@
core/dbt/include/index.html binary
core/dbt/task/docs/index.html binary
tests/functional/artifacts/data/state/*/manifest.json binary
core/dbt/docs/build/html/searchindex.js binary
core/dbt/docs/build/html/index.html binary

View File

@@ -1,7 +1,7 @@
# Configuration for pre-commit hooks (see https://pre-commit.com/).
# Eventually the hooks described here will be run as tests before merging each PR.
exclude: ^(core/dbt/docs/build/|core/dbt/events/types_pb2.py)
exclude: ^(core/dbt/docs/build/|core/dbt/common/events/types_pb2.py|core/dbt/events/core_types_pb2.py|core/dbt/adapters/events/adapter_types_pb2.py)
# Force all unspecified python hooks to run python 3.8
default_language_version:

View File

@@ -26,7 +26,7 @@ Legacy tests are found in the 'test' directory:
The "tasks" map to top-level dbt commands. So `dbt run` => task.run.RunTask, etc. Some are more like abstract base classes (GraphRunnableTask, for example) but all the concrete types outside of task should map to tasks. Currently one executes at a time. The tasks kick off their “Runners” and those do execute in parallel. The parallelism is managed via a thread pool, in GraphRunnableTask.
core/dbt/include/index.html
core/dbt/task/docs/index.html
This is the docs website code. It comes from the dbt-docs repository, and is generated when a release is packaged.
## Adapters

View File

@@ -40,7 +40,16 @@ dev: dev_req ## Installs dbt-* packages in develop mode along with development d
.PHONY: proto_types
proto_types: ## generates google protobuf python file from types.proto
protoc -I=./core/dbt/events --python_out=./core/dbt/events ./core/dbt/events/types.proto
protoc -I=./core/dbt/common/events --python_out=./core/dbt/common/events ./core/dbt/common/events/types.proto
.PHONY: core_proto_types
core_proto_types: ## generates google protobuf python file from core_types.proto
protoc -I=./core/dbt/events --python_out=./core/dbt/events ./core/dbt/events/core_types.proto
.PHONY: adapter_proto_types
adapter_proto_types: ## generates google protobuf python file from core_types.proto
protoc -I=./core/dbt/adapters/events --python_out=./core/dbt/adapters/events ./core/dbt/adapters/events/adapter_types.proto
.PHONY: mypy
mypy: .env ## Runs mypy against staged changes for static type checking.

View File

@@ -1,7 +1,7 @@
# these are all just exports, #noqa them so flake8 will be happy
# TODO: Should we still include this in the `adapters` namespace?
from dbt.contracts.connection import Credentials # noqa: F401
from dbt.adapters.contracts.connection import Credentials # noqa: F401
from dbt.adapters.base.meta import available # noqa: F401
from dbt.adapters.base.connections import BaseConnectionManager # noqa: F401
from dbt.adapters.base.relation import ( # noqa: F401

View File

@@ -2,7 +2,7 @@ from dataclasses import dataclass
import re
from typing import Dict, ClassVar, Any, Optional
from dbt.exceptions import DbtRuntimeError
from dbt.common.exceptions import DbtRuntimeError
@dataclass

View File

@@ -6,6 +6,7 @@ import traceback
# multiprocessing.RLock is a function returning this type
from multiprocessing.synchronize import RLock
from multiprocessing.context import SpawnContext
from threading import get_ident
from typing import (
Any,
@@ -23,8 +24,9 @@ from typing import (
import agate
import dbt.exceptions
from dbt.contracts.connection import (
import dbt.adapters.exceptions
import dbt.common.exceptions.base
from dbt.adapters.contracts.connection import (
Connection,
Identifier,
ConnectionState,
@@ -36,9 +38,9 @@ from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.base.query_headers import (
MacroQueryStringSetter,
)
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import (
from dbt.adapters.events.logging import AdapterLogger
from dbt.common.events.functions import fire_event
from dbt.adapters.events.types import (
NewConnection,
ConnectionReused,
ConnectionLeftOpenInCleanup,
@@ -48,9 +50,8 @@ from dbt.events.types import (
Rollback,
RollbackFailed,
)
from dbt.events.contextvars import get_node_info
from dbt import flags
from dbt.utils import cast_to_str
from dbt.common.events.contextvars import get_node_info
from dbt.common.utils import cast_to_str
SleepTime = Union[int, float] # As taken by time.sleep.
AdapterHandle = Any # Adapter connection handle objects can be any class.
@@ -72,10 +73,10 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
TYPE: str = NotImplemented
def __init__(self, profile: AdapterRequiredConfig) -> None:
def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext) -> None:
self.profile = profile
self.thread_connections: Dict[Hashable, Connection] = {}
self.lock: RLock = flags.MP_CONTEXT.RLock()
self.lock: RLock = mp_context.RLock()
self.query_header: Optional[MacroQueryStringSetter] = None
def set_query_header(self, manifest: Manifest) -> None:
@@ -91,13 +92,15 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
key = self.get_thread_identifier()
with self.lock:
if key not in self.thread_connections:
raise dbt.exceptions.InvalidConnectionError(key, list(self.thread_connections))
raise dbt.adapters.exceptions.InvalidConnectionError(
key, list(self.thread_connections)
)
return self.thread_connections[key]
def set_thread_connection(self, conn: Connection) -> None:
key = self.get_thread_identifier()
if key in self.thread_connections:
raise dbt.exceptions.DbtInternalError(
raise dbt.common.exceptions.DbtInternalError(
"In set_thread_connection, existing connection exists for {}"
)
self.thread_connections[key] = conn
@@ -137,13 +140,13 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
:return: A context manager that handles exceptions raised by the
underlying database.
"""
raise dbt.exceptions.NotImplementedError(
raise dbt.common.exceptions.base.NotImplementedError(
"`exception_handler` is not implemented for this adapter!"
)
def set_connection_name(self, name: Optional[str] = None) -> Connection:
"""Called by 'acquire_connection' in BaseAdapter, which is called by
'connection_named', called by 'connection_for(node)'.
'connection_named'.
Creates a connection for this thread if one doesn't already
exist, and will rename an existing connection."""
@@ -220,14 +223,14 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
:param int _attempts: Parameter used to keep track of the number of attempts in calling the
connect function across recursive calls. Passed as an argument to retry_timeout if it
is a Callable. This parameter should not be set by the initial caller.
:raises dbt.exceptions.FailedToConnectError: Upon exhausting all retry attempts without
:raises dbt.adapters.exceptions.FailedToConnectError: Upon exhausting all retry attempts without
successfully acquiring a handle.
:return: The given connection with its appropriate state and handle attributes set
depending on whether we successfully acquired a handle or not.
"""
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
if timeout < 0:
raise dbt.exceptions.FailedToConnectError(
raise dbt.adapters.exceptions.FailedToConnectError(
"retry_timeout cannot be negative or return a negative time."
)
@@ -235,7 +238,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
# This guard is not perfect others may add to the recursion limit (e.g. built-ins).
connection.handle = None
connection.state = ConnectionState.FAIL
raise dbt.exceptions.FailedToConnectError("retry_limit cannot be negative")
raise dbt.adapters.exceptions.FailedToConnectError("retry_limit cannot be negative")
try:
connection.handle = connect()
@@ -246,7 +249,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
if retry_limit <= 0:
connection.handle = None
connection.state = ConnectionState.FAIL
raise dbt.exceptions.FailedToConnectError(str(e))
raise dbt.adapters.exceptions.FailedToConnectError(str(e))
logger.debug(
f"Got a retryable error when attempting to open a {cls.TYPE} connection.\n"
@@ -268,12 +271,12 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
except Exception as e:
connection.handle = None
connection.state = ConnectionState.FAIL
raise dbt.exceptions.FailedToConnectError(str(e))
raise dbt.adapters.exceptions.FailedToConnectError(str(e))
@abc.abstractmethod
def cancel_open(self) -> Optional[List[str]]:
"""Cancel all open connections on the adapter. (passable)"""
raise dbt.exceptions.NotImplementedError(
raise dbt.common.exceptions.base.NotImplementedError(
"`cancel_open` is not implemented for this adapter!"
)
@@ -288,7 +291,9 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
This should be thread-safe, or hold the lock if necessary. The given
connection should not be in either in_use or available.
"""
raise dbt.exceptions.NotImplementedError("`open` is not implemented for this adapter!")
raise dbt.common.exceptions.base.NotImplementedError(
"`open` is not implemented for this adapter!"
)
def release(self) -> None:
with self.lock:
@@ -320,12 +325,16 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
@abc.abstractmethod
def begin(self) -> None:
"""Begin a transaction. (passable)"""
raise dbt.exceptions.NotImplementedError("`begin` is not implemented for this adapter!")
raise dbt.common.exceptions.base.NotImplementedError(
"`begin` is not implemented for this adapter!"
)
@abc.abstractmethod
def commit(self) -> None:
"""Commit a transaction. (passable)"""
raise dbt.exceptions.NotImplementedError("`commit` is not implemented for this adapter!")
raise dbt.common.exceptions.base.NotImplementedError(
"`commit` is not implemented for this adapter!"
)
@classmethod
def _rollback_handle(cls, connection: Connection) -> None:
@@ -361,7 +370,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
def _rollback(cls, connection: Connection) -> None:
"""Roll back the given connection."""
if connection.transaction_open is False:
raise dbt.exceptions.DbtInternalError(
raise dbt.common.exceptions.DbtInternalError(
f"Tried to rollback transaction on connection "
f'"{connection.name}", but it does not have one open!'
)
@@ -412,7 +421,9 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
:return: A tuple of the query status and results (empty if fetch=False).
:rtype: Tuple[AdapterResponse, agate.Table]
"""
raise dbt.exceptions.NotImplementedError("`execute` is not implemented for this adapter!")
raise dbt.common.exceptions.base.NotImplementedError(
"`execute` is not implemented for this adapter!"
)
def add_select_query(self, sql: str) -> Tuple[Connection, Any]:
"""
@@ -422,7 +433,7 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
See https://github.com/dbt-labs/dbt-core/issues/8396 for more information.
"""
raise dbt.exceptions.NotImplementedError(
raise dbt.common.exceptions.base.NotImplementedError(
"`add_select_query` is not implemented for this adapter!"
)
@@ -430,6 +441,6 @@ class BaseConnectionManager(metaclass=abc.ABCMeta):
def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
"""Get the string representation of the data type from the type_code."""
# https://peps.python.org/pep-0249/#type-objects
raise dbt.exceptions.NotImplementedError(
raise dbt.common.exceptions.base.NotImplementedError(
"`data_type_code_to_name` is not implemented for this adapter!"
)

View File

@@ -9,7 +9,6 @@ from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
@@ -20,44 +19,52 @@ from typing import (
TypedDict,
Union,
)
from multiprocessing.context import SpawnContext
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint
from dbt.common.contracts.constraints import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
)
from dbt.adapters.contracts.macros import MacroResolver
import agate
import pytz
from dbt.exceptions import (
from dbt.adapters.exceptions import (
SnapshotTargetIncompleteError,
SnapshotTargetNotSnapshotTableError,
NullRelationDropAttemptedError,
NullRelationCacheAttemptedError,
RelationReturnedMultipleResultsError,
UnexpectedNonTimestampError,
RenameToNoneAttemptedError,
QuoteConfigTypeError,
)
from dbt.common.exceptions import (
NotImplementedError,
DbtInternalError,
DbtRuntimeError,
DbtValidationError,
UnexpectedNullError,
MacroArgTypeError,
MacroResultError,
NotImplementedError,
NullRelationCacheAttemptedError,
NullRelationDropAttemptedError,
QuoteConfigTypeError,
RelationReturnedMultipleResultsError,
RenameToNoneAttemptedError,
SnapshotTargetIncompleteError,
SnapshotTargetNotSnapshotTableError,
UnexpectedNonTimestampError,
UnexpectedNullError,
)
from dbt.adapters.protocol import AdapterConfig
from dbt.clients.agate_helper import (
from dbt.common.clients.agate_helper import (
empty_table,
get_column_value_uncased,
merge_tables,
table_from_rows,
Integer,
)
from dbt.clients.jinja import MacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.functions import fire_event, warn_or_error
from dbt.events.types import (
from dbt.common.clients.jinja import CallableMacroGenerator
from dbt.contracts.graph.manifest import Manifest
from dbt.common.events.functions import fire_event, warn_or_error
from dbt.adapters.events.types import (
CacheMiss,
ListRelations,
CodeExecution,
@@ -66,7 +73,7 @@ from dbt.events.types import (
ConstraintNotSupported,
ConstraintNotEnforced,
)
from dbt.utils import filter_null_values, executor, cast_to_str, AttrDict
from dbt.common.utils import filter_null_values, executor, cast_to_str, AttrDict
from dbt.adapters.base.connections import Connection, AdapterResponse, BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
@@ -79,7 +86,8 @@ from dbt.adapters.base.relation import (
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict
from dbt import deprecations
from dbt.adapters.events.types import CollectFreshnessReturnSignature
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
@@ -242,11 +250,24 @@ class BaseAdapter(metaclass=AdapterMeta):
# implementations to indicate adapter support for optional capabilities.
_capabilities = CapabilityDict({})
def __init__(self, config) -> None:
def __init__(self, config, mp_context: SpawnContext) -> None:
self.config = config
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._macro_manifest_lazy: Optional[MacroManifest] = None
self.cache = RelationsCache(log_cache_events=config.log_cache_events)
self.connections = self.ConnectionManager(config, mp_context)
self._macro_resolver: Optional[MacroResolver] = None
###
# Methods to set / access a macro resolver
###
def set_macro_resolver(self, macro_resolver: MacroResolver) -> None:
self._macro_resolver = macro_resolver
def get_macro_resolver(self) -> Optional[MacroResolver]:
return self._macro_resolver
def clear_macro_resolver(self) -> None:
if self._macro_resolver is not None:
self._macro_resolver = None
###
# Methods that pass through to the connection manager
@@ -276,10 +297,10 @@ class BaseAdapter(metaclass=AdapterMeta):
return conn.name
@contextmanager
def connection_named(self, name: str, node: Optional[ResultNode] = None) -> Iterator[None]:
def connection_named(self, name: str, query_header_context: Any = None) -> Iterator[None]:
try:
if self.connections.query_header is not None:
self.connections.query_header.set(name, node)
self.connections.query_header.set(name, query_header_context)
self.acquire_connection(name)
yield
finally:
@@ -287,11 +308,6 @@ class BaseAdapter(metaclass=AdapterMeta):
if self.connections.query_header is not None:
self.connections.query_header.reset()
@contextmanager
def connection_for(self, node: ResultNode) -> Iterator[None]:
with self.connection_named(node.unique_id, node):
yield
@available.parse(lambda *a, **k: ("", empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
@@ -364,39 +380,6 @@ class BaseAdapter(metaclass=AdapterMeta):
"""
return cls.ConnectionManager.TYPE
@property
def _macro_manifest(self) -> MacroManifest:
if self._macro_manifest_lazy is None:
return self.load_macro_manifest()
return self._macro_manifest_lazy
def check_macro_manifest(self) -> Optional[MacroManifest]:
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
return self._macro_manifest_lazy
def load_macro_manifest(self, base_macros_only=False) -> MacroManifest:
# base_macros_only is for the test framework
if self._macro_manifest_lazy is None:
# avoid a circular import
from dbt.parser.manifest import ManifestLoader
manifest = ManifestLoader.load_macros(
self.config,
self.connections.set_query_header,
base_macros_only=base_macros_only,
)
# TODO CT-211
self._macro_manifest_lazy = manifest # type: ignore[assignment]
# TODO CT-211
return self._macro_manifest_lazy # type: ignore[return-value]
def clear_macro_manifest(self):
if self._macro_manifest_lazy is not None:
self._macro_manifest_lazy = None
###
# Caching methods
###
def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
@@ -420,7 +403,7 @@ class BaseAdapter(metaclass=AdapterMeta):
"""
# the cache only cares about executable nodes
return {
self.Relation.create_from(self.config, node).without_identifier()
self.Relation.create_from(self.config, node).without_identifier() # type: ignore[arg-type]
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model and not node.is_external_node)
}
@@ -467,7 +450,7 @@ class BaseAdapter(metaclass=AdapterMeta):
manifest.sources.values(),
)
relations = [self.Relation.create_from(self.config, n) for n in nodes]
relations = [self.Relation.create_from(self.config, n) for n in nodes] # type: ignore[arg-type]
return relations
def _relations_cache_for_schemas(
@@ -1051,11 +1034,10 @@ class BaseAdapter(metaclass=AdapterMeta):
def execute_macro(
self,
macro_name: str,
manifest: Optional[Manifest] = None,
macro_resolver: Optional[MacroResolver] = None,
project: Optional[str] = None,
context_override: Optional[Dict[str, Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
text_only_columns: Optional[Iterable[str]] = None,
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.
@@ -1075,13 +1057,11 @@ class BaseAdapter(metaclass=AdapterMeta):
if context_override is None:
context_override = {}
if manifest is None:
# TODO CT-211
manifest = self._macro_manifest # type: ignore[assignment]
# TODO CT-211
macro = manifest.find_macro_by_name( # type: ignore[union-attr]
macro_name, self.config.project_name, project
)
resolver = macro_resolver or self._macro_resolver
if resolver is None:
raise DbtInternalError("macro resolver was None when calling execute_macro!")
macro = resolver.find_macro_by_name(macro_name, self.config.project_name, project)
if macro is None:
if project is None:
package_name = "any package"
@@ -1101,12 +1081,12 @@ class BaseAdapter(metaclass=AdapterMeta):
# TODO CT-211
macro=macro,
config=self.config,
manifest=manifest, # type: ignore[arg-type]
manifest=resolver, # type: ignore[arg-type]
package_name=project,
)
macro_context.update(context_override)
macro_function = MacroGenerator(macro, macro_context)
macro_function = CallableMacroGenerator(macro, macro_context)
with self.connections.exception_handler(f"macro {macro_name}"):
result = macro_function(**kwargs)
@@ -1137,7 +1117,7 @@ class BaseAdapter(metaclass=AdapterMeta):
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
macro_resolver=manifest,
)
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
@@ -1159,7 +1139,7 @@ class BaseAdapter(metaclass=AdapterMeta):
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
macro_resolver=manifest,
)
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
@@ -1270,9 +1250,9 @@ class BaseAdapter(metaclass=AdapterMeta):
AttrDict, # current: contains AdapterResponse + agate.Table
agate.Table, # previous: just table
]
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=manifest)
if isinstance(result, agate.Table):
deprecations.warn("collect-freshness-return-signature")
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
table = result
else:
@@ -1307,7 +1287,7 @@ class BaseAdapter(metaclass=AdapterMeta):
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, manifest=manifest
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, macro_resolver=manifest
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
@@ -1361,11 +1341,6 @@ class BaseAdapter(metaclass=AdapterMeta):
"""
pass
def get_compiler(self):
from dbt.compilation import Compiler
return Compiler(self.config)
# Methods used in adapter tests
def update_column_sql(
self,
@@ -1485,7 +1460,7 @@ class BaseAdapter(metaclass=AdapterMeta):
strategy = strategy.replace("+", "_")
macro_name = f"get_incremental_{strategy}_sql"
# The model_context should have MacroGenerator callable objects for all macros
# The model_context should have callable objects for all macros
if macro_name not in model_context:
raise DbtRuntimeError(
'dbt could not find an incremental strategy macro with the name "{}" in {}'.format(

View File

@@ -1,9 +1,8 @@
import abc
from functools import wraps
from typing import Callable, Optional, Any, FrozenSet, Dict, Set
from dbt.deprecations import warn, renamed_method
from dbt.common.events.functions import warn_or_error
from dbt.adapters.events.types import AdapterDeprecationWarning
Decorator = Callable[[Any], Callable]
@@ -62,11 +61,12 @@ class _Available:
def wrapper(func):
func_name = func.__name__
renamed_method(func_name, supported_name)
@wraps(func)
def inner(*args, **kwargs):
warn("adapter:{}".format(func_name))
warn_or_error(
AdapterDeprecationWarning(old_name=func_name, new_name=supported_name)
)
return func(*args, **kwargs)
if parse_replacement:

View File

@@ -1,20 +1,10 @@
from typing import List, Optional, Type
from pathlib import Path
from dbt.adapters.base import Credentials
from dbt.exceptions import CompilationError
from dbt.adapters.protocol import AdapterProtocol
def project_name_from_path(include_path: str) -> str:
# avoid an import cycle
from dbt.config.project import PartialProject
partial = PartialProject.from_project_root(include_path)
if partial.project_name is None:
raise CompilationError(f"Invalid project at {include_path}: name not set!")
return partial.project_name
class AdapterPlugin:
"""Defines the basic requirements for a dbt adapter plugin.
@@ -29,12 +19,13 @@ class AdapterPlugin:
credentials: Type[Credentials],
include_path: str,
dependencies: Optional[List[str]] = None,
project_name: Optional[str] = None,
) -> None:
self.adapter: Type[AdapterProtocol] = adapter
self.credentials: Type[Credentials] = credentials
self.include_path: str = include_path
self.project_name: str = project_name_from_path(include_path)
self.project_name: str = project_name or f"dbt_{Path(include_path).name}"
self.dependencies: List[str]
if dependencies is None:
self.dependencies = []

View File

@@ -1,21 +1,20 @@
from threading import local
from typing import Optional, Callable, Dict, Any
from dbt.clients.jinja import QueryStringGenerator
from dbt.adapters.clients.jinja import QueryStringGenerator
from dbt.context.manifest import generate_query_header_context
from dbt.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.contracts.graph.nodes import ResultNode
from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import DbtRuntimeError
from dbt.common.exceptions import DbtRuntimeError
class NodeWrapper:
def __init__(self, node) -> None:
self._inner_node = node
class QueryHeaderContextWrapper:
def __init__(self, context) -> None:
self._inner_context = context
def __getattr__(self, name):
return getattr(self._inner_node, name, "")
return getattr(self._inner_context, name, "")
class _QueryComment(local):
@@ -53,7 +52,7 @@ class _QueryComment(local):
self.append = append
QueryStringFunc = Callable[[str, Optional[NodeWrapper]], str]
QueryStringFunc = Callable[[str, Optional[QueryHeaderContextWrapper]], str]
class MacroQueryStringSetter:
@@ -90,10 +89,10 @@ class MacroQueryStringSetter:
def reset(self):
self.set("master", None)
def set(self, name: str, node: Optional[ResultNode]):
wrapped: Optional[NodeWrapper] = None
if node is not None:
wrapped = NodeWrapper(node)
def set(self, name: str, query_header_context: Any):
wrapped: Optional[QueryHeaderContextWrapper] = None
if query_header_context is not None:
wrapped = QueryHeaderContextWrapper(query_header_context)
comment_str = self.generator(name, wrapped)
append = False

View File

@@ -2,8 +2,8 @@ from collections.abc import Hashable
from dataclasses import dataclass, field
from typing import Optional, TypeVar, Any, Type, Dict, Iterator, Tuple, Set, Union, FrozenSet
from dbt.contracts.graph.nodes import SourceDefinition, ManifestNode, ResultNode, ParsedNode
from dbt.contracts.relation import (
from dbt.adapters.contracts.relation import (
RelationConfig,
RelationType,
ComponentName,
HasQuoting,
@@ -11,15 +11,11 @@ from dbt.contracts.relation import (
Policy,
Path,
)
from dbt.exceptions import (
ApproximateMatchError,
DbtInternalError,
MultipleDatabasesNotAllowedError,
)
from dbt.node_types import NodeType
from dbt.utils import filter_null_values, deep_merge, classproperty
from dbt.adapters.exceptions import MultipleDatabasesNotAllowedError, ApproximateMatchError
from dbt.common.utils import filter_null_values, deep_merge
from dbt.adapters.utils import classproperty
import dbt.exceptions
import dbt.common.exceptions
Self = TypeVar("Self", bound="BaseRelation")
@@ -100,7 +96,7 @@ class BaseRelation(FakeAPIObject, Hashable):
if not search:
# nothing was passed in
raise dbt.exceptions.DbtRuntimeError(
raise dbt.common.exceptions.DbtRuntimeError(
"Tried to match relation, but no search path was passed!"
)
@@ -200,83 +196,50 @@ class BaseRelation(FakeAPIObject, Hashable):
identifier=identifier,
)
@classmethod
def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
source_quoting.pop("column", None)
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
source_quoting,
kwargs.get("quote_policy", {}),
)
return cls.create(
database=source.database,
schema=source.schema,
identifier=source.identifier,
quote_policy=quote_policy,
**kwargs,
)
@staticmethod
def add_ephemeral_prefix(name: str):
return f"__dbt__cte__{name}"
@classmethod
def create_ephemeral_from_node(
def create_ephemeral_from(
cls: Type[Self],
config: HasQuoting,
node: ManifestNode,
relation_config: RelationConfig,
) -> Self:
# Note that ephemeral models are based on the name.
identifier = cls.add_ephemeral_prefix(node.name)
identifier = cls.add_ephemeral_prefix(relation_config.name)
return cls.create(
type=cls.CTE,
identifier=identifier,
).quote(identifier=False)
@classmethod
def create_from_node(
def create_from(
cls: Type[Self],
config: HasQuoting,
node,
quote_policy: Optional[Dict[str, bool]] = None,
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
) -> Self:
if quote_policy is None:
quote_policy = {}
quote_policy = kwargs.pop("quote_policy", {})
quote_policy = dbt.utils.merge(config.quoting, quote_policy)
config_quoting = relation_config.quoting_dict
config_quoting.pop("column", None)
# precedence: kwargs quoting > relation config quoting > base quoting > default quoting
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
quoting.quoting,
config_quoting,
quote_policy,
)
return cls.create(
database=node.database,
schema=node.schema,
identifier=node.alias,
database=relation_config.database,
schema=relation_config.schema,
identifier=relation_config.identifier,
quote_policy=quote_policy,
**kwargs,
)
@classmethod
def create_from(
cls: Type[Self],
config: HasQuoting,
node: ResultNode,
**kwargs: Any,
) -> Self:
if node.resource_type == NodeType.Source:
if not isinstance(node, SourceDefinition):
raise DbtInternalError(
"type mismatch, expected SourceDefinition but got {}".format(type(node))
)
return cls.create_from_source(node, **kwargs)
else:
# Can't use ManifestNode here because of parameterized generics
if not isinstance(node, (ParsedNode)):
raise DbtInternalError(
f"type mismatch, expected ManifestNode but got {type(node)}"
)
return cls.create_from_node(config, node, **kwargs)
@classmethod
def create(
cls: Type[Self],
@@ -386,7 +349,7 @@ class InformationSchema(BaseRelation):
def __post_init__(self):
if not isinstance(self.information_schema_view, (type(None), str)):
raise dbt.exceptions.CompilationError(
raise dbt.common.exceptions.CompilationError(
"Got an invalid name: {}".format(self.information_schema_view)
)

View File

@@ -7,17 +7,16 @@ from dbt.adapters.reference_keys import (
_make_ref_key_dict,
_ReferenceKey,
)
from dbt.exceptions import (
DependentLinkNotCachedError,
from dbt.common.exceptions.cache import (
NewNameAlreadyInCacheError,
NoneRelationFoundError,
ReferencedLinkNotCachedError,
DependentLinkNotCachedError,
TruncatedModelNameCausedCollisionError,
NoneRelationFoundError,
)
from dbt.events.functions import fire_event, fire_event_if
from dbt.events.types import CacheAction, CacheDumpGraph
from dbt.flags import get_flags
from dbt.utils import lowercase
from dbt.common.events.functions import fire_event, fire_event_if
from dbt.adapters.events.types import CacheAction, CacheDumpGraph
from dbt.common.utils.formatting import lowercase
def dot_separated(key: _ReferenceKey) -> str:
@@ -165,10 +164,11 @@ class RelationsCache:
:attr Set[str] schemas: The set of known/cached schemas, all lowercased.
"""
def __init__(self) -> None:
def __init__(self, log_cache_events: bool = False) -> None:
self.relations: Dict[_ReferenceKey, _CachedRelation] = {}
self.lock = threading.RLock()
self.schemas: Set[Tuple[Optional[str], Optional[str]]] = set()
self.log_cache_events = log_cache_events
def add_schema(
self,
@@ -318,10 +318,9 @@ class RelationsCache:
:param BaseRelation relation: The underlying relation.
"""
flags = get_flags()
cached = _CachedRelation(relation)
fire_event_if(
flags.LOG_CACHE_EVENTS,
self.log_cache_events,
lambda: CacheDumpGraph(before_after="before", action="adding", dump=self.dump_graph()),
)
fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_dict(cached)))
@@ -329,7 +328,7 @@ class RelationsCache:
with self.lock:
self._setdefault(cached)
fire_event_if(
flags.LOG_CACHE_EVENTS,
self.log_cache_events,
lambda: CacheDumpGraph(before_after="after", action="adding", dump=self.dump_graph()),
)
@@ -454,9 +453,8 @@ class RelationsCache:
ref_key_2=new_key._asdict(),
)
)
flags = get_flags()
fire_event_if(
flags.LOG_CACHE_EVENTS,
self.log_cache_events,
lambda: CacheDumpGraph(before_after="before", action="rename", dump=self.dump_graph()),
)
@@ -467,7 +465,7 @@ class RelationsCache:
self._setdefault(_CachedRelation(new))
fire_event_if(
flags.LOG_CACHE_EVENTS,
self.log_cache_events,
lambda: CacheDumpGraph(before_after="after", action="rename", dump=self.dump_graph()),
)

View File

View File

@@ -0,0 +1,23 @@
from typing import Dict, Any
from dbt.common.clients.jinja import BaseMacroGenerator, get_environment
class QueryStringGenerator(BaseMacroGenerator):
def __init__(self, template_str: str, context: Dict[str, Any]) -> None:
super().__init__(context)
self.template_str: str = template_str
env = get_environment()
self.template = env.from_string(
self.template_str,
globals=self.context,
)
def get_name(self) -> str:
return "query_comment_macro"
def get_template(self):
"""Don't use the template cache, we don't have a node"""
return self.template
def __call__(self, connection_name: str, node) -> str:
return str(self.call_macro(connection_name, node))

View File

View File

@@ -11,20 +11,26 @@ from typing import (
List,
Callable,
)
from dbt.exceptions import DbtInternalError
from dbt.utils import translate_aliases, md5
from dbt.events.functions import fire_event
from dbt.events.types import NewConnectionOpening
from dbt.events.contextvars import get_node_info
from typing_extensions import Protocol, Annotated
from dbt.dataclass_schema import (
from mashumaro.jsonschema.annotations import Pattern
from dbt.adapters.utils import translate_aliases
from dbt.common.exceptions import DbtInternalError
from dbt.common.dataclass_schema import (
dbtClassMixin,
StrEnum,
ExtensibleDbtClassMixin,
ValidatedStringMixin,
)
from dbt.contracts.util import Replaceable
from mashumaro.jsonschema.annotations import Pattern
from dbt.common.contracts.util import Replaceable
from dbt.common.utils import md5
from dbt.common.events.functions import fire_event
from dbt.adapters.events.types import NewConnectionOpening
# TODO: this is a very bad dependency - shared global state
from dbt.common.events.contextvars import get_node_info
class Identifier(ValidatedStringMixin):
@@ -228,3 +234,4 @@ class AdapterRequiredConfig(HasCredentials, Protocol):
query_comment: QueryComment
cli_vars: Dict[str, Any]
target_path: str
log_cache_events: bool

View File

@@ -0,0 +1,11 @@
from typing import Optional
from typing_extensions import Protocol
from dbt.common.clients.jinja import MacroProtocol
class MacroResolver(Protocol):
def find_macro_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[MacroProtocol]:
raise NotImplementedError("find_macro_by_name not implemented")

View File

@@ -6,11 +6,11 @@ from typing import (
)
from typing_extensions import Protocol
from dbt.dataclass_schema import dbtClassMixin, StrEnum
from dbt.common.dataclass_schema import dbtClassMixin, StrEnum
from dbt.contracts.util import Replaceable
from dbt.exceptions import CompilationError, DataclassNotDictError
from dbt.utils import deep_merge
from dbt.common.contracts.util import Replaceable
from dbt.common.exceptions import CompilationError, DataclassNotDictError
from dbt.common.utils import deep_merge
class RelationType(StrEnum):
@@ -22,6 +22,14 @@ class RelationType(StrEnum):
Ephemeral = "ephemeral"
class RelationConfig(Protocol):
name: str
database: str
schema: str
identifier: str
quoting_dict: Dict[str, bool]
class ComponentName(StrEnum):
Database = "database"
Schema = "schema"

View File

@@ -0,0 +1,57 @@
# Events Module
The Events module is responsible for communicating internal dbt structures into a consumable interface. Because the "event" classes are based entirely on protobuf definitions, the interface is really clearly defined, whether or not protobufs are used to consume it. We use Betterproto for compiling the protobuf message definitions into Python classes.
# Using the Events Module
The event module provides types that represent what is happening in dbt in `events.types`. These types are intended to represent an exhaustive list of all things happening within dbt that will need to be logged, streamed, or printed. To fire an event, `common.events.functions::fire_event` is the entry point to the module from everywhere in dbt.
# Logging
When events are processed via `fire_event`, nearly everything is logged. Whether or not the user has enabled the debug flag, all debug messages are still logged to the file. However, some events are particularly time consuming to construct because they return a huge amount of data. Today, the only messages in this category are cache events and are only logged if the `--log-cache-events` flag is on. This is important because these messages should not be created unless they are going to be logged, because they cause a noticable performance degredation. These events use a "fire_event_if" functions.
# Adding a New Event
* Add a new message in types.proto, and a second message with the same name + "Msg". The "Msg" message should have two fields, an "info" field of EventInfo, and a "data" field referring to the message name without "Msg"
* run the protoc compiler to update adapter_types_pb2.py: make adapter_proto_types
* Add a wrapping class in core/dbt/adapters/event/types.py with a Level superclass plus code and message methods
We have switched from using betterproto to using google protobuf, because of a lack of support for Struct fields in betterproto.
The google protobuf interface is janky and very much non-Pythonic. The "generated" classes in types_pb2.py do not resemble regular Python classes. They do not have normal constructors; they can only be constructed empty. They can be "filled" by setting fields individually or using a json_format method like ParseDict. We have wrapped the logging events with a class (in types.py) which allows using a constructor -- keywords only, no positional parameters.
## Required for Every Event
- a method `code`, that's unique across events
- assign a log level by using the Level mixin: `DebugLevel`, `InfoLevel`, `WarnLevel`, or `ErrorLevel`
- a message()
Example
```
class PartialParsingDeletedExposure(DebugLevel):
def code(self):
return "I049"
def message(self) -> str:
return f"Partial parsing: deleted exposure {self.unique_id}"
```
# Adapter Maintainers
To integrate existing log messages from adapters, you likely have a line of code like this in your adapter already:
```python
from dbt.logger import GLOBAL_LOGGER as logger
```
Simply change it to these two lines with your adapter's database name, and all your existing call sites will now use the new system for v1.0:
```python
from dbt.adapter.events.logging import AdapterLogger
logger = AdapterLogger("<database name>")
# e.g. AdapterLogger("Snowflake")
```
## Compiling types.proto
After adding a new message in `adapter_types.proto`, either:
- In the repository root directory: `make adapter_proto_types`
- In the `core/dbt/adapters/events` directory: `protoc -I=. --python_out=. types.proto`

View File

View File

@@ -0,0 +1,517 @@
syntax = "proto3";
package proto_types;
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
// Common event info
message AdapterCommonEventInfo {
string name = 1;
string code = 2;
string msg = 3;
string level = 4;
string invocation_id = 5;
int32 pid = 6;
string thread = 7;
google.protobuf.Timestamp ts = 8;
map<string, string> extra = 9;
string category = 10;
}
// AdapterNodeRelation
message AdapterNodeRelation {
string database = 10;
string schema = 11;
string alias = 12;
string relation_name = 13;
}
// NodeInfo
message AdapterNodeInfo {
string node_path = 1;
string node_name = 2;
string unique_id = 3;
string resource_type = 4;
string materialized = 5;
string node_status = 6;
string node_started_at = 7;
string node_finished_at = 8;
google.protobuf.Struct meta = 9;
AdapterNodeRelation node_relation = 10;
}
// ReferenceKey
message ReferenceKeyMsg {
string database = 1;
string schema = 2;
string identifier = 3;
}
// D - Deprecations
// D005
message AdapterDeprecationWarning {
string old_name = 1;
string new_name = 2;
}
message AdapterDeprecationWarningMsg {
AdapterCommonEventInfo info = 1;
AdapterDeprecationWarning data = 2;
}
// D012
message CollectFreshnessReturnSignature {
}
message CollectFreshnessReturnSignatureMsg {
AdapterCommonEventInfo info = 1;
CollectFreshnessReturnSignature data = 2;
}
// E - DB Adapter
// E001
message AdapterEventDebug {
AdapterNodeInfo node_info = 1;
string name = 2;
string base_msg = 3;
google.protobuf.ListValue args = 4;
}
message AdapterEventDebugMsg {
AdapterCommonEventInfo info = 1;
AdapterEventDebug data = 2;
}
// E002
message AdapterEventInfo {
AdapterNodeInfo node_info = 1;
string name = 2;
string base_msg = 3;
google.protobuf.ListValue args = 4;
}
message AdapterEventInfoMsg {
AdapterCommonEventInfo info = 1;
AdapterEventInfo data = 2;
}
// E003
message AdapterEventWarning {
AdapterNodeInfo node_info = 1;
string name = 2;
string base_msg = 3;
google.protobuf.ListValue args = 4;
}
message AdapterEventWarningMsg {
AdapterCommonEventInfo info = 1;
AdapterEventWarning data = 2;
}
// E004
message AdapterEventError {
AdapterNodeInfo node_info = 1;
string name = 2;
string base_msg = 3;
google.protobuf.ListValue args = 4;
string exc_info = 5;
}
message AdapterEventErrorMsg {
AdapterCommonEventInfo info = 1;
AdapterEventError data = 2;
}
// E005
message NewConnection {
AdapterNodeInfo node_info = 1;
string conn_type = 2;
string conn_name = 3;
}
message NewConnectionMsg {
AdapterCommonEventInfo info = 1;
NewConnection data = 2;
}
// E006
message ConnectionReused {
string conn_name = 1;
string orig_conn_name = 2;
}
message ConnectionReusedMsg {
AdapterCommonEventInfo info = 1;
ConnectionReused data = 2;
}
// E007
message ConnectionLeftOpenInCleanup {
string conn_name = 1;
}
message ConnectionLeftOpenInCleanupMsg {
AdapterCommonEventInfo info = 1;
ConnectionLeftOpenInCleanup data = 2;
}
// E008
message ConnectionClosedInCleanup {
string conn_name = 1;
}
message ConnectionClosedInCleanupMsg {
AdapterCommonEventInfo info = 1;
ConnectionClosedInCleanup data = 2;
}
// E009
message RollbackFailed {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
string exc_info = 3;
}
message RollbackFailedMsg {
AdapterCommonEventInfo info = 1;
RollbackFailed data = 2;
}
// E010
message ConnectionClosed {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
}
message ConnectionClosedMsg {
AdapterCommonEventInfo info = 1;
ConnectionClosed data = 2;
}
// E011
message ConnectionLeftOpen {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
}
message ConnectionLeftOpenMsg {
AdapterCommonEventInfo info = 1;
ConnectionLeftOpen data = 2;
}
// E012
message Rollback {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
}
message RollbackMsg {
AdapterCommonEventInfo info = 1;
Rollback data = 2;
}
// E013
message CacheMiss {
string conn_name = 1;
string database = 2;
string schema = 3;
}
message CacheMissMsg {
AdapterCommonEventInfo info = 1;
CacheMiss data = 2;
}
// E014
message ListRelations {
string database = 1;
string schema = 2;
repeated ReferenceKeyMsg relations = 3;
}
message ListRelationsMsg {
AdapterCommonEventInfo info = 1;
ListRelations data = 2;
}
// E015
message ConnectionUsed {
AdapterNodeInfo node_info = 1;
string conn_type = 2;
string conn_name = 3;
}
message ConnectionUsedMsg {
AdapterCommonEventInfo info = 1;
ConnectionUsed data = 2;
}
// E016
message SQLQuery {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
string sql = 3;
}
message SQLQueryMsg {
AdapterCommonEventInfo info = 1;
SQLQuery data = 2;
}
// E017
message SQLQueryStatus {
AdapterNodeInfo node_info = 1;
string status = 2;
float elapsed = 3;
}
message SQLQueryStatusMsg {
AdapterCommonEventInfo info = 1;
SQLQueryStatus data = 2;
}
// E018
message SQLCommit {
AdapterNodeInfo node_info = 1;
string conn_name = 2;
}
message SQLCommitMsg {
AdapterCommonEventInfo info = 1;
SQLCommit data = 2;
}
// E019
message ColTypeChange {
string orig_type = 1;
string new_type = 2;
ReferenceKeyMsg table = 3;
}
message ColTypeChangeMsg {
AdapterCommonEventInfo info = 1;
ColTypeChange data = 2;
}
// E020
message SchemaCreation {
ReferenceKeyMsg relation = 1;
}
message SchemaCreationMsg {
AdapterCommonEventInfo info = 1;
SchemaCreation data = 2;
}
// E021
message SchemaDrop {
ReferenceKeyMsg relation = 1;
}
message SchemaDropMsg {
AdapterCommonEventInfo info = 1;
SchemaDrop data = 2;
}
// E022
message CacheAction {
string action = 1;
ReferenceKeyMsg ref_key = 2;
ReferenceKeyMsg ref_key_2 = 3;
ReferenceKeyMsg ref_key_3 = 4;
repeated ReferenceKeyMsg ref_list = 5;
}
message CacheActionMsg {
AdapterCommonEventInfo info = 1;
CacheAction data = 2;
}
// Skipping E023, E024, E025, E026, E027, E028, E029, E0230
// E031
message CacheDumpGraph {
map<string, string> dump = 1;
string before_after = 2;
string action = 3;
}
message CacheDumpGraphMsg {
AdapterCommonEventInfo info = 1;
CacheDumpGraph data = 2;
}
// Skipping E032, E033, E034
// E034
message AdapterRegistered {
string adapter_name = 1;
string adapter_version = 2;
}
message AdapterRegisteredMsg {
AdapterCommonEventInfo info = 1;
AdapterRegistered data = 2;
}
// E035
message AdapterImportError {
string exc = 1;
}
message AdapterImportErrorMsg {
AdapterCommonEventInfo info = 1;
AdapterImportError data = 2;
}
// E036
message PluginLoadError {
string exc_info = 1;
}
message PluginLoadErrorMsg {
AdapterCommonEventInfo info = 1;
PluginLoadError data = 2;
}
// E037
message NewConnectionOpening {
AdapterNodeInfo node_info = 1;
string connection_state = 2;
}
message NewConnectionOpeningMsg {
AdapterCommonEventInfo info = 1;
NewConnectionOpening data = 2;
}
// E038
message CodeExecution {
string conn_name = 1;
string code_content = 2;
}
message CodeExecutionMsg {
AdapterCommonEventInfo info = 1;
CodeExecution data = 2;
}
// E039
message CodeExecutionStatus {
string status = 1;
float elapsed = 2;
}
message CodeExecutionStatusMsg {
AdapterCommonEventInfo info = 1;
CodeExecutionStatus data = 2;
}
// E040
message CatalogGenerationError {
string exc = 1;
}
message CatalogGenerationErrorMsg {
AdapterCommonEventInfo info = 1;
CatalogGenerationError data = 2;
}
// E041
message WriteCatalogFailure {
int32 num_exceptions = 1;
}
message WriteCatalogFailureMsg {
AdapterCommonEventInfo info = 1;
WriteCatalogFailure data = 2;
}
// E042
message CatalogWritten {
string path = 1;
}
message CatalogWrittenMsg {
AdapterCommonEventInfo info = 1;
CatalogWritten data = 2;
}
// E043
message CannotGenerateDocs {
}
message CannotGenerateDocsMsg {
AdapterCommonEventInfo info = 1;
CannotGenerateDocs data = 2;
}
// E044
message BuildingCatalog {
}
message BuildingCatalogMsg {
AdapterCommonEventInfo info = 1;
BuildingCatalog data = 2;
}
// E045
message DatabaseErrorRunningHook {
string hook_type = 1;
}
message DatabaseErrorRunningHookMsg {
AdapterCommonEventInfo info = 1;
DatabaseErrorRunningHook data = 2;
}
// E046
message HooksRunning {
int32 num_hooks = 1;
string hook_type = 2;
}
message HooksRunningMsg {
AdapterCommonEventInfo info = 1;
HooksRunning data = 2;
}
// E047
message FinishedRunningStats {
string stat_line = 1;
string execution = 2;
float execution_time = 3;
}
message FinishedRunningStatsMsg {
AdapterCommonEventInfo info = 1;
FinishedRunningStats data = 2;
}
// E048
message ConstraintNotEnforced {
string constraint = 1;
string adapter = 2;
}
message ConstraintNotEnforcedMsg {
AdapterCommonEventInfo info = 1;
ConstraintNotEnforced data = 2;
}
// E049
message ConstraintNotSupported {
string constraint = 1;
string adapter = 2;
}
message ConstraintNotSupportedMsg {
AdapterCommonEventInfo info = 1;
ConstraintNotSupported data = 2;
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,39 @@
# Aliasing common Level classes in order to make custom, but not overly-verbose versions that have PROTO_TYPES_MODULE set to the adapter-specific generated types_pb2 module
from dbt.common.events.base_types import (
BaseEvent,
DynamicLevel as CommonDyanicLevel,
TestLevel as CommonTestLevel,
DebugLevel as CommonDebugLevel,
InfoLevel as CommonInfoLevel,
WarnLevel as CommonWarnLevel,
ErrorLevel as CommonErrorLevel,
)
from dbt.adapters.events import adapter_types_pb2
class AdapterBaseEvent(BaseEvent):
PROTO_TYPES_MODULE = adapter_types_pb2
class DynamicLevel(CommonDyanicLevel, AdapterBaseEvent):
pass
class TestLevel(CommonTestLevel, AdapterBaseEvent):
pass
class DebugLevel(CommonDebugLevel, AdapterBaseEvent):
pass
class InfoLevel(CommonInfoLevel, AdapterBaseEvent):
pass
class WarnLevel(CommonWarnLevel, AdapterBaseEvent):
pass
class ErrorLevel(CommonErrorLevel, AdapterBaseEvent):
pass

View File

@@ -1,17 +1,18 @@
import traceback
from dataclasses import dataclass
from dbt.events.functions import fire_event, EVENT_MANAGER
from dbt.events.contextvars import get_node_info
from dbt.events.event_handler import set_package_logging
from dbt.events.types import (
from dbt.adapters.events.types import (
AdapterEventDebug,
AdapterEventInfo,
AdapterEventWarning,
AdapterEventError,
)
from dbt.common.events import get_event_manager
from dbt.common.events.contextvars import get_node_info
from dbt.common.events.event_handler import set_package_logging
from dbt.common.events.functions import fire_event
# N.B. No guarantees for what type param msg is.
@dataclass
class AdapterLogger:
name: str
@@ -63,4 +64,4 @@ class AdapterLogger:
"""By default, dbt suppresses non-dbt package logs. This method allows
you to set the log level for a specific package.
"""
set_package_logging(package_name, level, EVENT_MANAGER)
set_package_logging(package_name, level, get_event_manager())

View File

@@ -0,0 +1,417 @@
from dbt.adapters.events.base_types import WarnLevel, InfoLevel, ErrorLevel, DebugLevel
from dbt.common.ui import line_wrap_message, warning_tag
def format_adapter_message(name, base_msg, args) -> str:
# only apply formatting if there are arguments to format.
# avoids issues like "dict: {k: v}".format() which results in `KeyError 'k'`
msg = base_msg if len(args) == 0 else base_msg.format(*args)
return f"{name} adapter: {msg}"
# =======================================================
# D - Deprecations
# =======================================================
class CollectFreshnessReturnSignature(WarnLevel):
def code(self) -> str:
return "D012"
def message(self) -> str:
description = (
"The 'collect_freshness' macro signature has changed to return the full "
"query result, rather than just a table of values. See the v1.5 migration guide "
"for details on how to update your custom macro: https://docs.getdbt.com/guides/migration/versions/upgrading-to-v1.5"
)
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
class AdapterDeprecationWarning(WarnLevel):
def code(self) -> str:
return "D005"
def message(self) -> str:
description = (
f"The adapter function `adapter.{self.old_name}` is deprecated and will be removed in "
f"a future release of dbt. Please use `adapter.{self.new_name}` instead. "
f"\n\nDocumentation for {self.new_name} can be found here:"
f"\n\nhttps://docs.getdbt.com/docs/adapter"
)
return line_wrap_message(warning_tag(f"Deprecated functionality\n\n{description}"))
# =======================================================
# E - DB Adapter
# =======================================================
class AdapterEventDebug(DebugLevel):
def code(self) -> str:
return "E001"
def message(self) -> str:
return format_adapter_message(self.name, self.base_msg, self.args)
class AdapterEventInfo(InfoLevel):
def code(self) -> str:
return "E002"
def message(self) -> str:
return format_adapter_message(self.name, self.base_msg, self.args)
class AdapterEventWarning(WarnLevel):
def code(self) -> str:
return "E003"
def message(self) -> str:
return format_adapter_message(self.name, self.base_msg, self.args)
class AdapterEventError(ErrorLevel):
def code(self) -> str:
return "E004"
def message(self) -> str:
return format_adapter_message(self.name, self.base_msg, self.args)
class NewConnection(DebugLevel):
def code(self) -> str:
return "E005"
def message(self) -> str:
return f"Acquiring new {self.conn_type} connection '{self.conn_name}'"
class ConnectionReused(DebugLevel):
def code(self) -> str:
return "E006"
def message(self) -> str:
return f"Re-using an available connection from the pool (formerly {self.orig_conn_name}, now {self.conn_name})"
class ConnectionLeftOpenInCleanup(DebugLevel):
def code(self) -> str:
return "E007"
def message(self) -> str:
return f"Connection '{self.conn_name}' was left open."
class ConnectionClosedInCleanup(DebugLevel):
def code(self) -> str:
return "E008"
def message(self) -> str:
return f"Connection '{self.conn_name}' was properly closed."
class RollbackFailed(DebugLevel):
def code(self) -> str:
return "E009"
def message(self) -> str:
return f"Failed to rollback '{self.conn_name}'"
class ConnectionClosed(DebugLevel):
def code(self) -> str:
return "E010"
def message(self) -> str:
return f"On {self.conn_name}: Close"
class ConnectionLeftOpen(DebugLevel):
def code(self) -> str:
return "E011"
def message(self) -> str:
return f"On {self.conn_name}: No close available on handle"
class Rollback(DebugLevel):
def code(self) -> str:
return "E012"
def message(self) -> str:
return f"On {self.conn_name}: ROLLBACK"
class CacheMiss(DebugLevel):
def code(self) -> str:
return "E013"
def message(self) -> str:
return (
f'On "{self.conn_name}": cache miss for schema '
f'"{self.database}.{self.schema}", this is inefficient'
)
class ListRelations(DebugLevel):
def code(self) -> str:
return "E014"
def message(self) -> str:
identifiers_str = ", ".join(r.identifier for r in self.relations)
return f"While listing relations in database={self.database}, schema={self.schema}, found: {identifiers_str}"
class ConnectionUsed(DebugLevel):
def code(self) -> str:
return "E015"
def message(self) -> str:
return f'Using {self.conn_type} connection "{self.conn_name}"'
class SQLQuery(DebugLevel):
def code(self) -> str:
return "E016"
def message(self) -> str:
return f"On {self.conn_name}: {self.sql}"
class SQLQueryStatus(DebugLevel):
def code(self) -> str:
return "E017"
def message(self) -> str:
return f"SQL status: {self.status} in {self.elapsed} seconds"
class SQLCommit(DebugLevel):
def code(self) -> str:
return "E018"
def message(self) -> str:
return f"On {self.conn_name}: COMMIT"
class ColTypeChange(DebugLevel):
def code(self) -> str:
return "E019"
def message(self) -> str:
return f"Changing col type from {self.orig_type} to {self.new_type} in table {self.table}"
class SchemaCreation(DebugLevel):
def code(self) -> str:
return "E020"
def message(self) -> str:
return f'Creating schema "{self.relation}"'
class SchemaDrop(DebugLevel):
def code(self) -> str:
return "E021"
def message(self) -> str:
return f'Dropping schema "{self.relation}".'
class CacheAction(DebugLevel):
def code(self) -> str:
return "E022"
def format_ref_key(self, ref_key) -> str:
return f"(database={ref_key.database}, schema={ref_key.schema}, identifier={ref_key.identifier})"
def message(self) -> str:
ref_key = self.format_ref_key(self.ref_key)
ref_key_2 = self.format_ref_key(self.ref_key_2)
ref_key_3 = self.format_ref_key(self.ref_key_3)
ref_list = []
for rfk in self.ref_list:
ref_list.append(self.format_ref_key(rfk))
if self.action == "add_link":
return f"adding link, {ref_key} references {ref_key_2}"
elif self.action == "add_relation":
return f"adding relation: {ref_key}"
elif self.action == "drop_missing_relation":
return f"dropped a nonexistent relationship: {ref_key}"
elif self.action == "drop_cascade":
return f"drop {ref_key} is cascading to {ref_list}"
elif self.action == "drop_relation":
return f"Dropping relation: {ref_key}"
elif self.action == "update_reference":
return (
f"updated reference from {ref_key} -> {ref_key_3} to "
f"{ref_key_2} -> {ref_key_3}"
)
elif self.action == "temporary_relation":
return f"old key {ref_key} not found in self.relations, assuming temporary"
elif self.action == "rename_relation":
return f"Renaming relation {ref_key} to {ref_key_2}"
elif self.action == "uncached_relation":
return (
f"{ref_key_2} references {ref_key} "
f"but {self.ref_key.database}.{self.ref_key.schema}"
"is not in the cache, skipping assumed external relation"
)
else:
return ref_key
# Skipping E023, E024, E025, E026, E027, E028, E029, E030
class CacheDumpGraph(DebugLevel):
def code(self) -> str:
return "E031"
def message(self) -> str:
return f"dump {self.before_after} {self.action} : {self.dump}"
# Skipping E032, E033, E034
class AdapterRegistered(InfoLevel):
def code(self) -> str:
return "E034"
def message(self) -> str:
return f"Registered adapter: {self.adapter_name}{self.adapter_version}"
class AdapterImportError(InfoLevel):
def code(self) -> str:
return "E035"
def message(self) -> str:
return f"Error importing adapter: {self.exc}"
class PluginLoadError(DebugLevel):
def code(self) -> str:
return "E036"
def message(self) -> str:
return f"{self.exc_info}"
class NewConnectionOpening(DebugLevel):
def code(self) -> str:
return "E037"
def message(self) -> str:
return f"Opening a new connection, currently in state {self.connection_state}"
class CodeExecution(DebugLevel):
def code(self) -> str:
return "E038"
def message(self) -> str:
return f"On {self.conn_name}: {self.code_content}"
class CodeExecutionStatus(DebugLevel):
def code(self) -> str:
return "E039"
def message(self) -> str:
return f"Execution status: {self.status} in {self.elapsed} seconds"
class CatalogGenerationError(WarnLevel):
def code(self) -> str:
return "E040"
def message(self) -> str:
return f"Encountered an error while generating catalog: {self.exc}"
class WriteCatalogFailure(ErrorLevel):
def code(self) -> str:
return "E041"
def message(self) -> str:
return (
f"dbt encountered {self.num_exceptions} failure{(self.num_exceptions != 1) * 's'} "
"while writing the catalog"
)
class CatalogWritten(InfoLevel):
def code(self) -> str:
return "E042"
def message(self) -> str:
return f"Catalog written to {self.path}"
class CannotGenerateDocs(InfoLevel):
def code(self) -> str:
return "E043"
def message(self) -> str:
return "compile failed, cannot generate docs"
class BuildingCatalog(InfoLevel):
def code(self) -> str:
return "E044"
def message(self) -> str:
return "Building catalog"
class DatabaseErrorRunningHook(InfoLevel):
def code(self) -> str:
return "E045"
def message(self) -> str:
return f"Database error while running {self.hook_type}"
class HooksRunning(InfoLevel):
def code(self) -> str:
return "E046"
def message(self) -> str:
plural = "hook" if self.num_hooks == 1 else "hooks"
return f"Running {self.num_hooks} {self.hook_type} {plural}"
class FinishedRunningStats(InfoLevel):
def code(self) -> str:
return "E047"
def message(self) -> str:
return f"Finished running {self.stat_line}{self.execution} ({self.execution_time:0.2f}s)."
class ConstraintNotEnforced(WarnLevel):
def code(self) -> str:
return "E048"
def message(self) -> str:
msg = (
f"The constraint type {self.constraint} is not enforced by {self.adapter}. "
"The constraint will be included in this model's DDL statement, but it will not "
"guarantee anything about the underlying data. Set 'warn_unenforced: false' on "
"this constraint to ignore this warning."
)
return line_wrap_message(warning_tag(msg))
class ConstraintNotSupported(WarnLevel):
def code(self) -> str:
return "E049"
def message(self) -> str:
msg = (
f"The constraint type {self.constraint} is not supported by {self.adapter}, and will "
"be ignored. Set 'warn_unsupported: false' on this constraint to ignore this warning."
)
return line_wrap_message(warning_tag(msg))

View File

@@ -0,0 +1,4 @@
from dbt.adapters.exceptions.compilation import * # noqa
from dbt.adapters.exceptions.alias import * # noqa
from dbt.adapters.exceptions.database import * # noqa
from dbt.adapters.exceptions.connection import * # noqa

View File

@@ -0,0 +1,24 @@
from typing import Mapping, Any
from dbt.common.exceptions import DbtValidationError
class AliasError(DbtValidationError):
pass
# core level exceptions
class DuplicateAliasError(AliasError):
def __init__(self, kwargs: Mapping[str, Any], aliases: Mapping[str, str], canonical_key: str):
self.kwargs = kwargs
self.aliases = aliases
self.canonical_key = canonical_key
super().__init__(msg=self.get_message())
def get_message(self) -> str:
# dupe found: go through the dict so we can have a nice-ish error
key_names = ", ".join(
"{}".format(k) for k in self.kwargs if self.aliases.get(k) == self.canonical_key
)
msg = f'Got duplicate keys: ({key_names}) all map to "{self.canonical_key}"'
return msg

View File

@@ -0,0 +1,255 @@
from typing import List, Mapping, Any
from dbt.common.exceptions import CompilationError, DbtDatabaseError
from dbt.common.ui import line_wrap_message
class MissingConfigError(CompilationError):
def __init__(self, unique_id: str, name: str):
self.unique_id = unique_id
self.name = name
msg = (
f"Model '{self.unique_id}' does not define a required config parameter '{self.name}'."
)
super().__init__(msg=msg)
class MultipleDatabasesNotAllowedError(CompilationError):
def __init__(self, databases):
self.databases = databases
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = str(self.databases)
return msg
class ApproximateMatchError(CompilationError):
def __init__(self, target, relation):
self.target = target
self.relation = relation
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
"When searching for a relation, dbt found an approximate match. "
"Instead of guessing \nwhich relation to use, dbt will move on. "
f"Please delete {self.relation}, or rename it to be less ambiguous."
f"\nSearched for: {self.target}\nFound: {self.relation}"
)
return msg
class SnapshotTargetIncompleteError(CompilationError):
def __init__(self, extra: List, missing: List):
self.extra = extra
self.missing = missing
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
'Snapshot target has ("{}") but not ("{}") - is it an '
"unmigrated previous version archive?".format(
'", "'.join(self.extra), '", "'.join(self.missing)
)
)
return msg
class DuplicateMacroInPackageError(CompilationError):
def __init__(self, macro, macro_mapping: Mapping):
self.macro = macro
self.macro_mapping = macro_mapping
super().__init__(msg=self.get_message())
def get_message(self) -> str:
other_path = self.macro_mapping[self.macro.unique_id].original_file_path
# subtract 2 for the "Compilation Error" indent
# note that the line wrap eats newlines, so if you want newlines,
# this is the result :(
msg = line_wrap_message(
f"""\
dbt found two macros named "{self.macro.name}" in the project
"{self.macro.package_name}".
To fix this error, rename or remove one of the following
macros:
- {self.macro.original_file_path}
- {other_path}
""",
subtract=2,
)
return msg
class DuplicateMaterializationNameError(CompilationError):
def __init__(self, macro, other_macro):
self.macro = macro
self.other_macro = other_macro
super().__init__(msg=self.get_message())
def get_message(self) -> str:
macro_name = self.macro.name
macro_package_name = self.macro.package_name
other_package_name = self.other_macro.macro.package_name
msg = (
f"Found two materializations with the name {macro_name} (packages "
f"{macro_package_name} and {other_package_name}). dbt cannot resolve "
"this ambiguity"
)
return msg
class ColumnTypeMissingError(CompilationError):
def __init__(self, column_names: List):
self.column_names = column_names
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
"Contracted models require data_type to be defined for each column. "
"Please ensure that the column name and data_type are defined within "
f"the YAML configuration for the {self.column_names} column(s)."
)
return msg
class MacroNotFoundError(CompilationError):
def __init__(self, node, target_macro_id: str):
self.node = node
self.target_macro_id = target_macro_id
msg = f"'{self.node.unique_id}' references macro '{self.target_macro_id}' which is not defined!"
super().__init__(msg=msg)
class MissingMaterializationError(CompilationError):
def __init__(self, materialization, adapter_type):
self.materialization = materialization
self.adapter_type = adapter_type
super().__init__(msg=self.get_message())
def get_message(self) -> str:
valid_types = "'default'"
if self.adapter_type != "default":
valid_types = f"'default' and '{self.adapter_type}'"
msg = f"No materialization '{self.materialization}' was found for adapter {self.adapter_type}! (searched types {valid_types})"
return msg
class SnapshotTargetNotSnapshotTableError(CompilationError):
def __init__(self, missing: List):
self.missing = missing
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = 'Snapshot target is not a snapshot table (missing "{}")'.format(
'", "'.join(self.missing)
)
return msg
class NullRelationDropAttemptedError(CompilationError):
def __init__(self, name: str):
self.name = name
self.msg = f"Attempted to drop a null relation for {self.name}"
super().__init__(msg=self.msg)
class NullRelationCacheAttemptedError(CompilationError):
def __init__(self, name: str):
self.name = name
self.msg = f"Attempted to cache a null relation for {self.name}"
super().__init__(msg=self.msg)
class RelationTypeNullError(CompilationError):
def __init__(self, relation):
self.relation = relation
self.msg = f"Tried to drop relation {self.relation}, but its type is null."
super().__init__(msg=self.msg)
class MaterializationNotAvailableError(CompilationError):
def __init__(self, materialization, adapter_type: str):
self.materialization = materialization
self.adapter_type = adapter_type
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = f"Materialization '{self.materialization}' is not available for {self.adapter_type}!"
return msg
class RelationReturnedMultipleResultsError(CompilationError):
def __init__(self, kwargs: Mapping[str, Any], matches: List):
self.kwargs = kwargs
self.matches = matches
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
"get_relation returned more than one relation with the given args. "
"Please specify a database or schema to narrow down the result set."
f"\n{self.kwargs}\n\n{self.matches}"
)
return msg
class UnexpectedNonTimestampError(DbtDatabaseError):
def __init__(self, field_name: str, source, dt: Any):
self.field_name = field_name
self.source = source
self.type_name = type(dt).__name__
msg = (
f"Expected a timestamp value when querying field '{self.field_name}' of table "
f"{self.source} but received value of type '{self.type_name}' instead"
)
super().__init__(msg)
class RenameToNoneAttemptedError(CompilationError):
def __init__(self, src_name: str, dst_name: str, name: str):
self.src_name = src_name
self.dst_name = dst_name
self.name = name
self.msg = f"Attempted to rename {self.src_name} to {self.dst_name} for {self.name}"
super().__init__(msg=self.msg)
class QuoteConfigTypeError(CompilationError):
def __init__(self, quote_config: Any):
self.quote_config = quote_config
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
'The seed configuration value of "quote_columns" has an '
f"invalid type {type(self.quote_config)}"
)
return msg
class RelationWrongTypeError(CompilationError):
def __init__(self, relation, expected_type, model=None):
self.relation = relation
self.expected_type = expected_type
self.model = model
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
f"Trying to create {self.expected_type} {self.relation}, "
f"but it currently exists as a {self.relation.type}. Either "
f"drop {self.relation} manually, or run dbt with "
"`--full-refresh` and dbt will drop it for you."
)
return msg

View File

@@ -0,0 +1,16 @@
from typing import List
from dbt.common.exceptions import DbtRuntimeError, DbtDatabaseError
class InvalidConnectionError(DbtRuntimeError):
def __init__(self, thread_id, known: List) -> None:
self.thread_id = thread_id
self.known = known
super().__init__(
msg=f"connection never acquired for thread {self.thread_id}, have {self.known}"
)
class FailedToConnectError(DbtDatabaseError):
pass

View File

@@ -0,0 +1,51 @@
from typing import Any
from dbt.common.exceptions import NotImplementedError, CompilationError
class UnexpectedDbReferenceError(NotImplementedError):
def __init__(self, adapter, database, expected):
self.adapter = adapter
self.database = database
self.expected = expected
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = f"Cross-db references not allowed in {self.adapter} ({self.database} vs {self.expected})"
return msg
class CrossDbReferenceProhibitedError(CompilationError):
def __init__(self, adapter, exc_msg: str):
self.adapter = adapter
self.exc_msg = exc_msg
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = f"Cross-db references not allowed in adapter {self.adapter}: Got {self.exc_msg}"
return msg
class IndexConfigNotDictError(CompilationError):
def __init__(self, raw_index: Any):
self.raw_index = raw_index
super().__init__(msg=self.get_message())
def get_message(self) -> str:
msg = (
f"Invalid index config:\n"
f" Got: {self.raw_index}\n"
f' Expected a dictionary with at minimum a "columns" key'
)
return msg
class IndexConfigError(CompilationError):
def __init__(self, exc: TypeError):
self.exc = exc
super().__init__(msg=self.get_message())
def get_message(self) -> str:
validator_msg = self.validator_error_message(self.exc)
msg = f"Could not parse index config: {validator_msg}"
return msg

View File

@@ -7,13 +7,14 @@ from typing import Any, Dict, List, Optional, Set, Type
from dbt.adapters.base.plugin import AdapterPlugin
from dbt.adapters.protocol import AdapterConfig, AdapterProtocol, RelationProtocol
from dbt.contracts.connection import AdapterRequiredConfig, Credentials
from dbt.events.functions import fire_event
from dbt.events.types import AdapterImportError, PluginLoadError, AdapterRegistered
from dbt.exceptions import DbtInternalError, DbtRuntimeError
from dbt.include.global_project import PACKAGE_PATH as GLOBAL_PROJECT_PATH
from dbt.include.global_project import PROJECT_NAME as GLOBAL_PROJECT_NAME
from dbt.semver import VersionSpecifier
from dbt.adapters.contracts.connection import AdapterRequiredConfig, Credentials
from dbt.common.events.functions import fire_event
from dbt.adapters.events.types import AdapterImportError, PluginLoadError, AdapterRegistered
from dbt.common.exceptions import DbtInternalError, DbtRuntimeError
from dbt.adapters.include.global_project import PACKAGE_PATH as GLOBAL_PROJECT_PATH
from dbt.adapters.include.global_project import PROJECT_NAME as GLOBAL_PROJECT_NAME
from dbt.common.semver import VersionSpecifier
from dbt.mp_context import get_mp_context
Adapter = AdapterProtocol
@@ -102,7 +103,7 @@ class AdapterContainer:
# this shouldn't really happen...
return
adapter: Adapter = adapter_type(config) # type: ignore
adapter: Adapter = adapter_type(config, get_mp_context()) # type: ignore
self.adapters[adapter_name] = adapter
def lookup_adapter(self, adapter_name: str) -> Adapter:

View File

@@ -0,0 +1,4 @@
import os
PACKAGE_PATH = os.path.dirname(__file__)
PROJECT_NAME = "dbt"

Some files were not shown because too many files have changed in this diff Show More