mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
* remove dbt.contracts.connection imports from adapter module * Move events to common (#8676) * Move events to common * More Type Annotations (#8536) * Extend use of type annotations in the events module. * Add return type of None to more __init__ definitions. * Still more type annotations adding -> None to __init__ * Tweak per review * Allow adapters to include python package logging in dbt logs (#8643) * add set_package_log_level functionality * set package handler * set package handler * add logging about stting up logging * test event log handler * add event log handler * add event log level * rename package and add unit tests * revert logfile config change * cleanup and add code comments * add changie * swap function for dict * add additional unit tests * fix unit test * update README and protos * fix formatting * update precommit --------- Co-authored-by: Peter Webb <peter.webb@dbtlabs.com> * fix import * move types_pb2.py from events to common/events * move agate_helper into common * Add utils module (#8910) * moving types_pb2.py to common/events * split out utils into core/common/adapters * add changie * remove usage of dbt.config.PartialProject from dbt/adapters (#8909) * remove usage of dbt.config.PartialProject from dbt/adapters * add changie --------- Co-authored-by: Colin <colin.rogers@dbtlabs.com> * move agate_helper unit tests under tests/unit/common * move agate_helper into common (#8911) * move agate_helper into common * add changie --------- Co-authored-by: Colin <colin.rogers@dbtlabs.com> * remove dbt.flags.MP_CONTEXT usage in dbt/adapters (#8931) * remove dbt.flags.LOG_CACHE_EVENTS usage in dbt/adapters (#8933) * Refactor Base Exceptions (#8989) * moving types_pb2.py to common/events * Refactor Base Exceptions * update make_log_dir_if_missing to handle str * move remaining adapters exception imports to common/adapters --------- Co-authored-by: Michelle Ark <michelle.ark@dbtlabs.com> * Remove usage of dbt.deprecations in dbt/adapters, enable core & adapter-specific (#9051) * Decouple adapter constraints from core (#9054) * Move constraints to dbt.common * Move constraints to contracts folder, per review * Add a changelog entry. * move include/global_project to adapters (#8930) * remove adapter.get_compiler (#9134) * Move adapter logger to adapters (#9165) * moving types_pb2.py to common/events * Move AdapterLogger to adapter folder * add changie * delete accidentally merged types_pb2.py * Move the semver package to common and alter references. (#9166) * Move the semver package to common and alter references. * Alter leftover references to dbt.semver, this time using from syntax. --------- Co-authored-by: Mila Page <versusfacit@users.noreply.github.com> * Refactor EventManager setup and interaction (#9180) * moving types_pb2.py to common/events * move event manager setup back to core, remove ref to global EVENT_MANAGER and clean up event manager functions * move invocation_id from events to first class common concept * move lowercase utils to common * move lowercase utils to common * ref CAPTURE_STREAM through method * add changie * first pass: adapter migration script (#9160) * Decouple macro generator from adapters (#9149) * Remove usage of dbt.contracts.relation in dbt/adapters (#9207) * Remove ResultNode usage from connections (#9211) * Add RelationConfig Protocol for use in Relation.create_from (#9210) * move relation contract to dbt.adapters * changelog entry * first pass: clean up relation.create_from * type ignores * type ignore * changelog entry * update RelationConfig variable names * Merge main into feature/decouple-adapters-from-core (#9240) * moving types_pb2.py to common/events * Restore warning on unpinned git packages (#9157) * Support --empty flag for schema-only dry runs (#8971) * Fix ensuring we produce valid jsonschema artifacts for manifest, catalog, sources, and run-results (#9155) * Drop `all_refs=True` from jsonschema-ization build process Passing `all_refs=True` makes it so that Everything is a ref, even the top level schema. In jsonschema land, this essentially makes the produced artifact not a full schema, but a fractal object to be included in a schema. Thus when `$id` is passed in, jsonschema tools blow up because `$id` is for identifying a schema, which we explicitly weren't creating. The alternative was to drop the inclusion of `$id`. Howver, we're intending to create a schema, and having an `$id` is recommended best practice. Additionally since we were intending to create a schema, not a fractal, it seemed best to create to full schema. * Explicity produce jsonschemas using DRAFT_2020_12 dialect Previously were were implicitly using the `DRAFT_2020_12` dialect through mashumaro. It felt wise to begin explicitly specifying this. First, it is closest in available mashumaro provided dialects to what we produced pre 1.7. Secondly, if mashumaro changes its default for whatever reason (say a new dialect is added, and mashumaro moves to that), we don't want to automatically inherit that. * Bump manifest version to v12 Core 1.7 released with manifest v11, and we don't want to be overriding that with 1.8. It'd be weird for 1.7 and 1.8 to both have v11 manifests, but for them to be different, right? * Begin including schema dialect specification in produced jsonschema In jsonschema's documentation they state > It's not always easy to tell which draft a JSON Schema is using. > You can use the $schema keyword to declare which version of the JSON Schema specification the schema is written to. > It's generally good practice to include it, though it is not required. and > For brevity, the $schema keyword isn't included in most of the examples in this book, but it should always be used in the real world. Basically, to know how to parse a schema, it's important to include what schema dialect is being used for the schema specification. The change in this commit ensures we include that information. * Create manifest v12 jsonschema specification * Add change documentation for jsonschema schema production fix * Bump run-results version to v6 * Generate new v6 run-results jsonschema * Regenerate catalog v1 and sources v3 with fixed jsonschema production * Update tests to handle bumped versions of manifest and run-results --------- Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com> Co-authored-by: Michelle Ark <MichelleArk@users.noreply.github.com> Co-authored-by: Quigley Malcolm <QMalcolm@users.noreply.github.com> * Move BaseConfig to Common (#9224) * moving types_pb2.py to common/events * move BaseConfig and assorted dependencies to common * move ShowBehavior and OnConfigurationChange to common * add changie * Remove manifest from catalog and connection method signatures (#9242) * Add MacroResolverProtocol, remove lazy loading of manifest in adapter.execute_macro (#9243) * remove manifest from adapter.execute_macro, replace with MacroResolver + remove lazy loading * rename to MacroResolverProtocol * pass MacroResolverProtcol in adapter.calculate_freshness_from_metadata * changelog entry * fix adapter.calculate_freshness call * pass context to MacroQueryStringSetter (#9248) * moving types_pb2.py to common/events * remove manifest from adapter.execute_macro, replace with MacroResolver + remove lazy loading * rename to MacroResolverProtocol * pass MacroResolverProtcol in adapter.calculate_freshness_from_metadata * changelog entry * fix adapter.calculate_freshness call * pass context to MacroQueryStringSetter * changelog entry --------- Co-authored-by: Colin <colin.rogers@dbtlabs.com> * add macro_context_generator on adapter (#9251) * moving types_pb2.py to common/events * remove manifest from adapter.execute_macro, replace with MacroResolver + remove lazy loading * rename to MacroResolverProtocol * pass MacroResolverProtcol in adapter.calculate_freshness_from_metadata * changelog entry * fix adapter.calculate_freshness call * add macro_context_generator on adapter * fix adapter test setup * changelog entry * Update parser to support conversion metrics (#9173) * added ConversionTypeParams classes * updated parser for ConversionTypeParams * added step to populate input_measure for conversion metrics * version bump on DSI * comment back manifest generating line * updated v12 schemas * added tests * added changelog * Add typing for macro_context_generator, fix query_header_context --------- Co-authored-by: Colin <colin.rogers@dbtlabs.com> Co-authored-by: William Deng <33618746+WilliamDee@users.noreply.github.com> * Pass mp_context to adapter factory (#9275) * moving types_pb2.py to common/events * require core to pass mp_context to adapter factory * add changie * fix SpawnContext annotation * Fix include for decoupling (#9286) * moving types_pb2.py to common/events * fix include path in MANIFEST.in * Fix include for decoupling (#9288) * moving types_pb2.py to common/events * fix include path in MANIFEST.in * add index.html to in MANIFEST.in * move system client to common (#9294) * moving types_pb2.py to common/events * move system.py to common * add changie update README * remove dbt.utils from semver.py * remove aliasing connection_exception_retry * Update materialized views to use RelationConfigs and remove refs to dbt.utils (#9291) * moving types_pb2.py to common/events * add AdapterRuntimeConfig protocol and clean up dbt-postgress core imports * add changie * remove AdapterRuntimeConfig * update changelog * Add config field to RelationConfig (#9300) * moving types_pb2.py to common/events * add config field to RelationConfig * merge main into feature/decouple-adapters-from-core (#9305) * moving types_pb2.py to common/events * Update parser to support conversion metrics (#9173) * added ConversionTypeParams classes * updated parser for ConversionTypeParams * added step to populate input_measure for conversion metrics * version bump on DSI * comment back manifest generating line * updated v12 schemas * added tests * added changelog * Remove `--dry-run` flag from `dbt deps` (#9169) * Rm --dry-run flag for dbt deps * Add changelog entry * Update test * PR feedback * adding clean_up methods to basic and unique_id tests (#9195) * init attempt of adding clean_up methods to basic and unique_id tests * swapping cleanup method drop of test_schema to unique_schema to test breakage on docs_generate test * moving the clean_up method down into class BaseDocsGenerate * remove drop relation for unique_schema * manually define alternate_schema for clean_up as not being seen as part of project_config * add changelog * remove unneeded changelog * uncomment line that generates new manifest and delete manifest our changes created * make sure the manifest test is deleted and readd older version of manifest.json to appease test * manually revert file to previous commit * Revert "manually revert file to previous commit" This reverts commita755419e8b. --------- Co-authored-by: William Deng <33618746+WilliamDee@users.noreply.github.com> Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com> Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> * resolve merge conflict on unparsed.py (#9309) * moving types_pb2.py to common/events * Update parser to support conversion metrics (#9173) * added ConversionTypeParams classes * updated parser for ConversionTypeParams * added step to populate input_measure for conversion metrics * version bump on DSI * comment back manifest generating line * updated v12 schemas * added tests * added changelog * Remove `--dry-run` flag from `dbt deps` (#9169) * Rm --dry-run flag for dbt deps * Add changelog entry * Update test * PR feedback * adding clean_up methods to basic and unique_id tests (#9195) * init attempt of adding clean_up methods to basic and unique_id tests * swapping cleanup method drop of test_schema to unique_schema to test breakage on docs_generate test * moving the clean_up method down into class BaseDocsGenerate * remove drop relation for unique_schema * manually define alternate_schema for clean_up as not being seen as part of project_config * add changelog * remove unneeded changelog * uncomment line that generates new manifest and delete manifest our changes created * make sure the manifest test is deleted and readd older version of manifest.json to appease test * manually revert file to previous commit * Revert "manually revert file to previous commit" This reverts commita755419e8b. --------- Co-authored-by: William Deng <33618746+WilliamDee@users.noreply.github.com> Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com> Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> * Resolve unparsed.py conflict (#9311) * Update parser to support conversion metrics (#9173) * added ConversionTypeParams classes * updated parser for ConversionTypeParams * added step to populate input_measure for conversion metrics * version bump on DSI * comment back manifest generating line * updated v12 schemas * added tests * added changelog * Remove `--dry-run` flag from `dbt deps` (#9169) * Rm --dry-run flag for dbt deps * Add changelog entry * Update test * PR feedback * adding clean_up methods to basic and unique_id tests (#9195) * init attempt of adding clean_up methods to basic and unique_id tests * swapping cleanup method drop of test_schema to unique_schema to test breakage on docs_generate test * moving the clean_up method down into class BaseDocsGenerate * remove drop relation for unique_schema * manually define alternate_schema for clean_up as not being seen as part of project_config * add changelog * remove unneeded changelog * uncomment line that generates new manifest and delete manifest our changes created * make sure the manifest test is deleted and readd older version of manifest.json to appease test * manually revert file to previous commit * Revert "manually revert file to previous commit" This reverts commita755419e8b. --------- Co-authored-by: William Deng <33618746+WilliamDee@users.noreply.github.com> Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com> Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> --------- Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Co-authored-by: Peter Webb <peter.webb@dbtlabs.com> Co-authored-by: Colin <colin.rogers@dbtlabs.com> Co-authored-by: Mila Page <67295367+VersusFacit@users.noreply.github.com> Co-authored-by: Mila Page <versusfacit@users.noreply.github.com> Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com> Co-authored-by: Quigley Malcolm <QMalcolm@users.noreply.github.com> Co-authored-by: William Deng <33618746+WilliamDee@users.noreply.github.com> Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Co-authored-by: Chenyu Li <chenyu.li@dbtlabs.com>
519 lines
20 KiB
Python
519 lines
20 KiB
Python
import threading
|
|
from copy import deepcopy
|
|
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
|
|
|
from dbt.adapters.reference_keys import (
|
|
_make_ref_key,
|
|
_make_ref_key_dict,
|
|
_ReferenceKey,
|
|
)
|
|
from dbt.common.exceptions.cache import (
|
|
NewNameAlreadyInCacheError,
|
|
ReferencedLinkNotCachedError,
|
|
DependentLinkNotCachedError,
|
|
TruncatedModelNameCausedCollisionError,
|
|
NoneRelationFoundError,
|
|
)
|
|
from dbt.common.events.functions import fire_event, fire_event_if
|
|
from dbt.adapters.events.types import CacheAction, CacheDumpGraph
|
|
from dbt.common.utils.formatting import lowercase
|
|
|
|
|
|
def dot_separated(key: _ReferenceKey) -> str:
|
|
"""Return the key in dot-separated string form.
|
|
|
|
:param _ReferenceKey key: The key to stringify.
|
|
"""
|
|
return ".".join(map(str, key))
|
|
|
|
|
|
class _CachedRelation:
|
|
"""Nothing about _CachedRelation is guaranteed to be thread-safe!
|
|
|
|
:attr str schema: The schema of this relation.
|
|
:attr str identifier: The identifier of this relation.
|
|
:attr Dict[_ReferenceKey, _CachedRelation] referenced_by: The relations
|
|
that refer to this relation.
|
|
:attr BaseRelation inner: The underlying dbt relation.
|
|
"""
|
|
|
|
def __init__(self, inner) -> None:
|
|
self.referenced_by: Dict[_ReferenceKey, _CachedRelation] = {}
|
|
self.inner = inner
|
|
|
|
def __str__(self) -> str:
|
|
return ("_CachedRelation(database={}, schema={}, identifier={}, inner={})").format(
|
|
self.database, self.schema, self.identifier, self.inner
|
|
)
|
|
|
|
@property
|
|
def database(self) -> Optional[str]:
|
|
return lowercase(self.inner.database)
|
|
|
|
@property
|
|
def schema(self) -> Optional[str]:
|
|
return lowercase(self.inner.schema)
|
|
|
|
@property
|
|
def identifier(self) -> Optional[str]:
|
|
return lowercase(self.inner.identifier)
|
|
|
|
def __copy__(self):
|
|
new = self.__class__(self.inner)
|
|
new.__dict__.update(self.__dict__)
|
|
return new
|
|
|
|
def __deepcopy__(self, memo):
|
|
new = self.__class__(self.inner.incorporate())
|
|
new.__dict__.update(self.__dict__)
|
|
new.referenced_by = deepcopy(self.referenced_by, memo)
|
|
|
|
def is_referenced_by(self, key):
|
|
return key in self.referenced_by
|
|
|
|
def key(self):
|
|
"""Get the _ReferenceKey that represents this relation
|
|
|
|
:return _ReferenceKey: A key for this relation.
|
|
"""
|
|
return _make_ref_key(self)
|
|
|
|
def add_reference(self, referrer: "_CachedRelation"):
|
|
"""Add a reference from referrer to self, indicating that if this node
|
|
were drop...cascaded, the referrer would be dropped as well.
|
|
|
|
:param _CachedRelation referrer: The node that refers to this node.
|
|
"""
|
|
self.referenced_by[referrer.key()] = referrer
|
|
|
|
def collect_consequences(self):
|
|
"""Recursively collect a set of _ReferenceKeys that would
|
|
consequentially get dropped if this were dropped via
|
|
"drop ... cascade".
|
|
|
|
:return Set[_ReferenceKey]: All the relations that would be dropped
|
|
"""
|
|
consequences = {self.key()}
|
|
for relation in self.referenced_by.values():
|
|
consequences.update(relation.collect_consequences())
|
|
return consequences
|
|
|
|
def release_references(self, keys):
|
|
"""Non-recursively indicate that an iterable of _ReferenceKey no longer
|
|
exist. Unknown keys are ignored.
|
|
|
|
:param Iterable[_ReferenceKey] keys: The keys to drop.
|
|
"""
|
|
keys = set(self.referenced_by) & set(keys)
|
|
for key in keys:
|
|
self.referenced_by.pop(key)
|
|
|
|
def rename(self, new_relation):
|
|
"""Rename this cached relation to new_relation.
|
|
Note that this will change the output of key(), all refs must be
|
|
updated!
|
|
|
|
:param _CachedRelation new_relation: The new name to apply to the
|
|
relation
|
|
"""
|
|
# Relations store this stuff inside their `path` dict. But they
|
|
# also store a table_name, and usually use it in their .render(),
|
|
# so we need to update that as well. It doesn't appear that
|
|
# table_name is ever anything but the identifier (via .create())
|
|
self.inner = self.inner.incorporate(
|
|
path={
|
|
"database": new_relation.inner.database,
|
|
"schema": new_relation.inner.schema,
|
|
"identifier": new_relation.inner.identifier,
|
|
},
|
|
)
|
|
|
|
def rename_key(self, old_key, new_key):
|
|
"""Rename a reference that may or may not exist. Only handles the
|
|
reference itself, so this is the other half of what `rename` does.
|
|
|
|
If old_key is not in referenced_by, this is a no-op.
|
|
|
|
:param _ReferenceKey old_key: The old key to be renamed.
|
|
:param _ReferenceKey new_key: The new key to rename to.
|
|
:raises InternalError: If the new key already exists.
|
|
"""
|
|
if new_key in self.referenced_by:
|
|
raise NewNameAlreadyInCacheError(old_key, new_key)
|
|
|
|
if old_key not in self.referenced_by:
|
|
return
|
|
value = self.referenced_by.pop(old_key)
|
|
self.referenced_by[new_key] = value
|
|
|
|
def dump_graph_entry(self):
|
|
"""Return a key/value pair representing this key and its referents.
|
|
|
|
return List[str]: The dot-separated form of all referent keys.
|
|
"""
|
|
return [dot_separated(r) for r in self.referenced_by]
|
|
|
|
|
|
class RelationsCache:
|
|
"""A cache of the relations known to dbt. Keeps track of relationships
|
|
declared between tables and handles renames/drops as a real database would.
|
|
|
|
:attr Dict[_ReferenceKey, _CachedRelation] relations: The known relations.
|
|
:attr threading.RLock lock: The lock around relations, held during updates.
|
|
The adapters also hold this lock while filling the cache.
|
|
:attr Set[str] schemas: The set of known/cached schemas, all lowercased.
|
|
"""
|
|
|
|
def __init__(self, log_cache_events: bool = False) -> None:
|
|
self.relations: Dict[_ReferenceKey, _CachedRelation] = {}
|
|
self.lock = threading.RLock()
|
|
self.schemas: Set[Tuple[Optional[str], Optional[str]]] = set()
|
|
self.log_cache_events = log_cache_events
|
|
|
|
def add_schema(
|
|
self,
|
|
database: Optional[str],
|
|
schema: Optional[str],
|
|
) -> None:
|
|
"""Add a schema to the set of known schemas (case-insensitive)
|
|
|
|
:param database: The database name to add.
|
|
:param schema: The schema name to add.
|
|
"""
|
|
self.schemas.add((lowercase(database), lowercase(schema)))
|
|
|
|
def drop_schema(
|
|
self,
|
|
database: Optional[str],
|
|
schema: Optional[str],
|
|
) -> None:
|
|
"""Drop the given schema and remove it from the set of known schemas.
|
|
|
|
Then remove all its contents (and their dependents, etc) as well.
|
|
"""
|
|
key = (lowercase(database), lowercase(schema))
|
|
if key not in self.schemas:
|
|
return
|
|
|
|
# avoid iterating over self.relations while removing things by
|
|
# collecting the list first.
|
|
|
|
with self.lock:
|
|
to_remove = self._list_relations_in_schema(database, schema)
|
|
self._remove_all(to_remove)
|
|
# handle a drop_schema race by using discard() over remove()
|
|
self.schemas.discard(key)
|
|
|
|
def update_schemas(self, schemas: Iterable[Tuple[Optional[str], str]]):
|
|
"""Add multiple schemas to the set of known schemas (case-insensitive)
|
|
|
|
:param schemas: An iterable of the schema names to add.
|
|
"""
|
|
self.schemas.update((lowercase(d), s.lower()) for (d, s) in schemas)
|
|
|
|
def __contains__(self, schema_id: Tuple[Optional[str], str]):
|
|
"""A schema is 'in' the relations cache if it is in the set of cached
|
|
schemas.
|
|
|
|
:param schema_id: The db name and schema name to look up.
|
|
"""
|
|
db, schema = schema_id
|
|
return (lowercase(db), schema.lower()) in self.schemas
|
|
|
|
def dump_graph(self):
|
|
"""Dump a key-only representation of the schema to a dictionary. Every
|
|
known relation is a key with a value of a list of keys it is referenced
|
|
by.
|
|
"""
|
|
# we have to hold the lock for the entire dump, if other threads modify
|
|
# self.relations or any cache entry's referenced_by during iteration
|
|
# it's a runtime error!
|
|
with self.lock:
|
|
return {dot_separated(k): str(v.dump_graph_entry()) for k, v in self.relations.items()}
|
|
|
|
def _setdefault(self, relation: _CachedRelation):
|
|
"""Add a relation to the cache, or return it if it already exists.
|
|
|
|
:param _CachedRelation relation: The relation to set or get.
|
|
:return _CachedRelation: The relation stored under the given relation's
|
|
key
|
|
"""
|
|
self.add_schema(relation.database, relation.schema)
|
|
key = relation.key()
|
|
return self.relations.setdefault(key, relation)
|
|
|
|
def _add_link(self, referenced_key, dependent_key):
|
|
"""Add a link between two relations to the database. Both the old and
|
|
new entries must alraedy exist in the database.
|
|
|
|
:param _ReferenceKey referenced_key: The key identifying the referenced
|
|
model (the one that if dropped will drop the dependent model).
|
|
:param _ReferenceKey dependent_key: The key identifying the dependent
|
|
model.
|
|
:raises InternalError: If either entry does not exist.
|
|
"""
|
|
referenced = self.relations.get(referenced_key)
|
|
if referenced is None:
|
|
return
|
|
if referenced is None:
|
|
raise ReferencedLinkNotCachedError(referenced_key)
|
|
|
|
dependent = self.relations.get(dependent_key)
|
|
if dependent is None:
|
|
raise DependentLinkNotCachedError(dependent_key)
|
|
|
|
assert dependent is not None # we just raised!
|
|
|
|
referenced.add_reference(dependent)
|
|
|
|
# This is called in plugins/postgres/dbt/adapters/postgres/impl.py
|
|
def add_link(self, referenced, dependent):
|
|
"""Add a link between two relations to the database. If either relation
|
|
does not exist, it will be added as an "external" relation.
|
|
|
|
The dependent model refers _to_ the referenced model. So, given
|
|
arguments of (jake_test, bar, jake_test, foo):
|
|
both values are in the schema jake_test and foo is a view that refers
|
|
to bar, so "drop bar cascade" will drop foo and all of foo's
|
|
dependents.
|
|
|
|
:param BaseRelation referenced: The referenced model.
|
|
:param BaseRelation dependent: The dependent model.
|
|
:raises InternalError: If either entry does not exist.
|
|
"""
|
|
ref_key = _make_ref_key(referenced)
|
|
dep_key = _make_ref_key(dependent)
|
|
if (ref_key.database, ref_key.schema) not in self:
|
|
# if we have not cached the referenced schema at all, we must be
|
|
# referring to a table outside our control. There's no need to make
|
|
# a link - we will never drop the referenced relation during a run.
|
|
fire_event(
|
|
CacheAction(
|
|
ref_key=ref_key._asdict(),
|
|
ref_key_2=dep_key._asdict(),
|
|
)
|
|
)
|
|
return
|
|
if ref_key not in self.relations:
|
|
# Insert a dummy "external" relation.
|
|
referenced = referenced.replace(type=referenced.External)
|
|
self.add(referenced)
|
|
if dep_key not in self.relations:
|
|
# Insert a dummy "external" relation.
|
|
dependent = dependent.replace(type=referenced.External)
|
|
self.add(dependent)
|
|
fire_event(
|
|
CacheAction(
|
|
action="add_link",
|
|
ref_key=dep_key._asdict(),
|
|
ref_key_2=ref_key._asdict(),
|
|
)
|
|
)
|
|
with self.lock:
|
|
self._add_link(ref_key, dep_key)
|
|
|
|
def add(self, relation):
|
|
"""Add the relation inner to the cache, under the schema schema and
|
|
identifier identifier
|
|
|
|
:param BaseRelation relation: The underlying relation.
|
|
"""
|
|
cached = _CachedRelation(relation)
|
|
fire_event_if(
|
|
self.log_cache_events,
|
|
lambda: CacheDumpGraph(before_after="before", action="adding", dump=self.dump_graph()),
|
|
)
|
|
fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_dict(cached)))
|
|
|
|
with self.lock:
|
|
self._setdefault(cached)
|
|
fire_event_if(
|
|
self.log_cache_events,
|
|
lambda: CacheDumpGraph(before_after="after", action="adding", dump=self.dump_graph()),
|
|
)
|
|
|
|
def _remove_refs(self, keys):
|
|
"""Removes all references to all entries in keys. This does not
|
|
cascade!
|
|
|
|
:param Iterable[_ReferenceKey] keys: The keys to remove.
|
|
"""
|
|
# remove direct refs
|
|
for key in keys:
|
|
del self.relations[key]
|
|
# then remove all entries from each child
|
|
for cached in self.relations.values():
|
|
cached.release_references(keys)
|
|
|
|
def drop(self, relation):
|
|
"""Drop the named relation and cascade it appropriately to all
|
|
dependent relations.
|
|
|
|
Because dbt proactively does many `drop relation if exist ... cascade`
|
|
that are noops, nonexistent relation drops cause a debug log and no
|
|
other actions.
|
|
|
|
:param str schema: The schema of the relation to drop.
|
|
:param str identifier: The identifier of the relation to drop.
|
|
"""
|
|
dropped_key = _make_ref_key(relation)
|
|
dropped_key_msg = _make_ref_key_dict(relation)
|
|
fire_event(CacheAction(action="drop_relation", ref_key=dropped_key_msg))
|
|
with self.lock:
|
|
if dropped_key not in self.relations:
|
|
fire_event(CacheAction(action="drop_missing_relation", ref_key=dropped_key_msg))
|
|
return
|
|
consequences = self.relations[dropped_key].collect_consequences()
|
|
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
|
|
consequence_msgs = [key._asdict() for key in consequences]
|
|
fire_event(
|
|
CacheAction(
|
|
action="drop_cascade", ref_key=dropped_key_msg, ref_list=consequence_msgs
|
|
)
|
|
)
|
|
self._remove_refs(consequences)
|
|
|
|
def _rename_relation(self, old_key, new_relation):
|
|
"""Rename a relation named old_key to new_key, updating references.
|
|
Return whether or not there was a key to rename.
|
|
|
|
:param _ReferenceKey old_key: The existing key, to rename from.
|
|
:param _CachedRelation new_key: The new relation, to rename to.
|
|
"""
|
|
# On the database level, a rename updates all values that were
|
|
# previously referenced by old_name to be referenced by new_name.
|
|
# basically, the name changes but some underlying ID moves. Kind of
|
|
# like an object reference!
|
|
relation = self.relations.pop(old_key)
|
|
new_key = new_relation.key()
|
|
|
|
# relation has to rename its innards, so it needs the _CachedRelation.
|
|
relation.rename(new_relation)
|
|
# update all the relations that refer to it
|
|
for cached in self.relations.values():
|
|
if cached.is_referenced_by(old_key):
|
|
fire_event(
|
|
CacheAction(
|
|
action="update_reference",
|
|
ref_key=_make_ref_key_dict(old_key),
|
|
ref_key_2=_make_ref_key_dict(new_key),
|
|
ref_key_3=_make_ref_key_dict(cached.key()),
|
|
)
|
|
)
|
|
|
|
cached.rename_key(old_key, new_key)
|
|
|
|
self.relations[new_key] = relation
|
|
# also fixup the schemas!
|
|
self.add_schema(new_key.database, new_key.schema)
|
|
|
|
return True
|
|
|
|
def _check_rename_constraints(self, old_key, new_key):
|
|
"""Check the rename constraints, and return whether or not the rename
|
|
can proceed.
|
|
|
|
If the new key is already present, that is an error.
|
|
If the old key is absent, we debug log and return False, assuming it's
|
|
a temp table being renamed.
|
|
|
|
:param _ReferenceKey old_key: The existing key, to rename from.
|
|
:param _ReferenceKey new_key: The new key, to rename to.
|
|
:return bool: If the old relation exists for renaming.
|
|
:raises InternalError: If the new key is already present.
|
|
"""
|
|
if new_key in self.relations:
|
|
# Tell user when collision caused by model names truncated during
|
|
# materialization.
|
|
raise TruncatedModelNameCausedCollisionError(new_key, self.relations)
|
|
|
|
if old_key not in self.relations:
|
|
fire_event(CacheAction(action="temporary_relation", ref_key=old_key._asdict()))
|
|
return False
|
|
return True
|
|
|
|
def rename(self, old, new):
|
|
"""Rename the old schema/identifier to the new schema/identifier and
|
|
update references.
|
|
|
|
If the new schema/identifier is already present, that is an error.
|
|
If the schema/identifier key is absent, we only debug log and return,
|
|
assuming it's a temp table being renamed.
|
|
|
|
:param BaseRelation old: The existing relation name information.
|
|
:param BaseRelation new: The new relation name information.
|
|
:raises InternalError: If the new key is already present.
|
|
"""
|
|
old_key = _make_ref_key(old)
|
|
new_key = _make_ref_key(new)
|
|
fire_event(
|
|
CacheAction(
|
|
action="rename_relation",
|
|
ref_key=old_key._asdict(),
|
|
ref_key_2=new_key._asdict(),
|
|
)
|
|
)
|
|
fire_event_if(
|
|
self.log_cache_events,
|
|
lambda: CacheDumpGraph(before_after="before", action="rename", dump=self.dump_graph()),
|
|
)
|
|
|
|
with self.lock:
|
|
if self._check_rename_constraints(old_key, new_key):
|
|
self._rename_relation(old_key, _CachedRelation(new))
|
|
else:
|
|
self._setdefault(_CachedRelation(new))
|
|
|
|
fire_event_if(
|
|
self.log_cache_events,
|
|
lambda: CacheDumpGraph(before_after="after", action="rename", dump=self.dump_graph()),
|
|
)
|
|
|
|
def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
|
|
"""Case-insensitively yield all relations matching the given schema.
|
|
|
|
:param str schema: The case-insensitive schema name to list from.
|
|
:return List[BaseRelation]: The list of relations with the given
|
|
schema
|
|
"""
|
|
database = lowercase(database)
|
|
schema = lowercase(schema)
|
|
with self.lock:
|
|
results = [
|
|
r.inner
|
|
for r in self.relations.values()
|
|
if (lowercase(r.schema) == schema and lowercase(r.database) == database)
|
|
]
|
|
|
|
if None in results:
|
|
raise NoneRelationFoundError()
|
|
return results
|
|
|
|
def clear(self):
|
|
"""Clear the cache"""
|
|
with self.lock:
|
|
self.relations.clear()
|
|
self.schemas.clear()
|
|
|
|
def _list_relations_in_schema(
|
|
self, database: Optional[str], schema: Optional[str]
|
|
) -> List[_CachedRelation]:
|
|
"""Get the relations in a schema. Callers should hold the lock."""
|
|
key = (lowercase(database), lowercase(schema))
|
|
|
|
to_remove: List[_CachedRelation] = []
|
|
for cachekey, relation in self.relations.items():
|
|
if (cachekey.database, cachekey.schema) == key:
|
|
to_remove.append(relation)
|
|
return to_remove
|
|
|
|
def _remove_all(self, to_remove: List[_CachedRelation]):
|
|
"""Remove all the listed relations. Ignore relations that have been
|
|
cascaded out.
|
|
"""
|
|
for relation in to_remove:
|
|
# it may have been cascaded out already
|
|
drop_key = _make_ref_key(relation)
|
|
if drop_key in self.relations:
|
|
self.drop(drop_key)
|