mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
1 Commits
jerco/pyth
...
mashumaro_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f18e0d0cd |
@@ -1,11 +1,11 @@
|
||||
import abc
|
||||
import enum
|
||||
from dataclasses import dataclass, field
|
||||
from itertools import chain, islice
|
||||
from mashumaro import DataClassMessagePackMixin
|
||||
from multiprocessing.synchronize import Lock
|
||||
from typing import (
|
||||
Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple,
|
||||
TypeVar, Callable, Iterable, Generic, cast, AbstractSet
|
||||
TypeVar, Callable, Iterable, Generic, cast, AbstractSet, ClassVar
|
||||
)
|
||||
from typing_extensions import Protocol
|
||||
from uuid import UUID
|
||||
@@ -46,72 +46,64 @@ RefName = str
|
||||
UniqueID = str
|
||||
|
||||
|
||||
K_T = TypeVar('K_T')
|
||||
V_T = TypeVar('V_T')
|
||||
|
||||
|
||||
class PackageAwareCache(Generic[K_T, V_T]):
|
||||
def __init__(self, manifest: 'Manifest'):
|
||||
self.storage: Dict[K_T, Dict[PackageName, UniqueID]] = {}
|
||||
self._manifest = manifest
|
||||
self.populate()
|
||||
|
||||
@abc.abstractmethod
|
||||
def populate(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def perform_lookup(self, unique_id: UniqueID) -> V_T:
|
||||
pass
|
||||
|
||||
def find_cached_value(
|
||||
self, key: K_T, package: Optional[PackageName]
|
||||
) -> Optional[V_T]:
|
||||
unique_id = self.find_unique_id_for_package(key, package)
|
||||
if unique_id is not None:
|
||||
return self.perform_lookup(unique_id)
|
||||
def find_unique_id_for_package(storage, key, package: Optional[PackageName]):
|
||||
if key not in storage:
|
||||
return None
|
||||
|
||||
def find_unique_id_for_package(
|
||||
self, key: K_T, package: Optional[PackageName]
|
||||
) -> Optional[UniqueID]:
|
||||
if key not in self.storage:
|
||||
pkg_dct: Mapping[PackageName, UniqueID] = storage[key]
|
||||
|
||||
if package is None:
|
||||
if not pkg_dct:
|
||||
return None
|
||||
|
||||
pkg_dct: Mapping[PackageName, UniqueID] = self.storage[key]
|
||||
|
||||
if package is None:
|
||||
if not pkg_dct:
|
||||
return None
|
||||
else:
|
||||
return next(iter(pkg_dct.values()))
|
||||
elif package in pkg_dct:
|
||||
return pkg_dct[package]
|
||||
else:
|
||||
return None
|
||||
return next(iter(pkg_dct.values()))
|
||||
elif package in pkg_dct:
|
||||
return pkg_dct[package]
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class DocCache(PackageAwareCache[DocName, ParsedDocumentation]):
|
||||
class DocLookup(dbtClassMixin):
|
||||
def __init__(self, manifest: 'Manifest'):
|
||||
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
|
||||
self.populate(manifest)
|
||||
|
||||
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
|
||||
unique_id = find_unique_id_for_package(self.storage, key, package)
|
||||
if unique_id is not None:
|
||||
return self.perform_lookup(unique_id, manifest)
|
||||
return None
|
||||
|
||||
def add_doc(self, doc: ParsedDocumentation):
|
||||
if doc.name not in self.storage:
|
||||
self.storage[doc.name] = {}
|
||||
self.storage[doc.name][doc.package_name] = doc.unique_id
|
||||
|
||||
def populate(self):
|
||||
for doc in self._manifest.docs.values():
|
||||
def populate(self, manifest):
|
||||
for doc in manifest.docs.values():
|
||||
self.add_doc(doc)
|
||||
|
||||
def perform_lookup(
|
||||
self, unique_id: UniqueID
|
||||
self, unique_id: UniqueID, manifest
|
||||
) -> ParsedDocumentation:
|
||||
if unique_id not in self._manifest.docs:
|
||||
if unique_id not in manifest.docs:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f'Doc {unique_id} found in cache but not found in manifest'
|
||||
)
|
||||
return self._manifest.docs[unique_id]
|
||||
return manifest.docs[unique_id]
|
||||
|
||||
|
||||
class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]):
|
||||
class SourceLookup(dbtClassMixin):
|
||||
def __init__(self, manifest: 'Manifest'):
|
||||
self.storage: Dict[Tuple[str, str], Dict[PackageName, UniqueID]] = {}
|
||||
self.populate(manifest)
|
||||
|
||||
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
|
||||
unique_id = find_unique_id_for_package(self.storage, key, package)
|
||||
if unique_id is not None:
|
||||
return self.perform_lookup(unique_id, manifest)
|
||||
return None
|
||||
|
||||
def add_source(self, source: ParsedSourceDefinition):
|
||||
key = (source.source_name, source.name)
|
||||
if key not in self.storage:
|
||||
@@ -119,47 +111,55 @@ class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]):
|
||||
|
||||
self.storage[key][source.package_name] = source.unique_id
|
||||
|
||||
def populate(self):
|
||||
for source in self._manifest.sources.values():
|
||||
def populate(self, manifest):
|
||||
for source in manifest.sources.values():
|
||||
if hasattr(source, 'source_name'):
|
||||
self.add_source(source)
|
||||
|
||||
def perform_lookup(
|
||||
self, unique_id: UniqueID
|
||||
self, unique_id: UniqueID, manifest: 'Manifest'
|
||||
) -> ParsedSourceDefinition:
|
||||
if unique_id not in self._manifest.sources:
|
||||
if unique_id not in manifest.sources:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f'Source {unique_id} found in cache but not found in manifest'
|
||||
)
|
||||
return self._manifest.sources[unique_id]
|
||||
return manifest.sources[unique_id]
|
||||
|
||||
|
||||
class RefableCache(PackageAwareCache[RefName, ManifestNode]):
|
||||
class RefableLookup(dbtClassMixin):
|
||||
_lookup_types: ClassVar[set] = set(NodeType.refable())
|
||||
|
||||
# refables are actually unique, so the Dict[PackageName, UniqueID] will
|
||||
# only ever have exactly one value, but doing 3 dict lookups instead of 1
|
||||
# is not a big deal at all and retains consistency
|
||||
def __init__(self, manifest: 'Manifest'):
|
||||
self._cached_types = set(NodeType.refable())
|
||||
super().__init__(manifest)
|
||||
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
|
||||
self.populate(manifest)
|
||||
|
||||
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
|
||||
unique_id = find_unique_id_for_package(self.storage, key, package)
|
||||
if unique_id is not None:
|
||||
return self.perform_lookup(unique_id, manifest)
|
||||
return None
|
||||
|
||||
def add_node(self, node: ManifestNode):
|
||||
if node.resource_type in self._cached_types:
|
||||
if node.resource_type in self._lookup_types:
|
||||
if node.name not in self.storage:
|
||||
self.storage[node.name] = {}
|
||||
self.storage[node.name][node.package_name] = node.unique_id
|
||||
|
||||
def populate(self):
|
||||
for node in self._manifest.nodes.values():
|
||||
def populate(self, manifest):
|
||||
for node in manifest.nodes.values():
|
||||
self.add_node(node)
|
||||
|
||||
def perform_lookup(
|
||||
self, unique_id: UniqueID
|
||||
self, unique_id: UniqueID, manifest
|
||||
) -> ManifestNode:
|
||||
if unique_id not in self._manifest.nodes:
|
||||
if unique_id not in manifest.nodes:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f'Node {unique_id} found in cache but not found in manifest'
|
||||
)
|
||||
return self._manifest.nodes[unique_id]
|
||||
return manifest.nodes[unique_id]
|
||||
|
||||
|
||||
def _search_packages(
|
||||
@@ -520,7 +520,7 @@ class ManifestStateCheck(dbtClassMixin):
|
||||
|
||||
|
||||
@dataclass
|
||||
class Manifest(MacroMethods):
|
||||
class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
"""The manifest for the full graph, after parsing and during compilation.
|
||||
"""
|
||||
# These attributes are both positional and by keyword. If an attribute
|
||||
@@ -543,10 +543,19 @@ class Manifest(MacroMethods):
|
||||
source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict)
|
||||
# following is from ParseResult
|
||||
_disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict)
|
||||
_docs_cache: Optional[DocCache] = None
|
||||
_sources_cache: Optional[SourceCache] = None
|
||||
_refs_cache: Optional[RefableCache] = None
|
||||
_lock: Lock = field(default_factory=flags.MP_CONTEXT.Lock)
|
||||
_doc_lookup: Optional[DocLookup] = field(
|
||||
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
|
||||
)
|
||||
_source_lookup: Optional[SourceLookup] = field(
|
||||
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
|
||||
)
|
||||
_ref_lookup: Optional[RefableLookup] = field(
|
||||
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
|
||||
)
|
||||
_lock: Lock = field(
|
||||
default_factory=flags.MP_CONTEXT.Lock,
|
||||
metadata={'serialize': lambda x: None, 'deserialize': lambda x: flags.MP_CONTEXT.Lock}
|
||||
)
|
||||
|
||||
def sync_update_node(
|
||||
self, new_node: NonSourceCompiledNode
|
||||
@@ -662,17 +671,6 @@ class Manifest(MacroMethods):
|
||||
resource_fqns[resource_type_plural].add(tuple(resource.fqn))
|
||||
return resource_fqns
|
||||
|
||||
def add_nodes(self, new_nodes: Mapping[str, ManifestNode]):
|
||||
"""Add the given dict of new nodes to the manifest."""
|
||||
for unique_id, node in new_nodes.items():
|
||||
if unique_id in self.nodes:
|
||||
raise_duplicate_resource_name(node, self.nodes[unique_id])
|
||||
self.nodes[unique_id] = node
|
||||
# fixup the cache if it exists.
|
||||
if self._refs_cache is not None:
|
||||
if node.resource_type in NodeType.refable():
|
||||
self._refs_cache.add_node(node)
|
||||
|
||||
def add_patch(
|
||||
self, source_file: SourceFile, patch: ParsedNodePatch,
|
||||
) -> None:
|
||||
@@ -808,11 +806,6 @@ class Manifest(MacroMethods):
|
||||
parent_map=backward_edges,
|
||||
)
|
||||
|
||||
# When 'to_dict' is called on the Manifest, it substitues a
|
||||
# WritableManifest
|
||||
def __pre_serialize__(self):
|
||||
return self.writable_manifest()
|
||||
|
||||
def write(self, path):
|
||||
self.writable_manifest().write(path)
|
||||
|
||||
@@ -830,28 +823,22 @@ class Manifest(MacroMethods):
|
||||
)
|
||||
|
||||
@property
|
||||
def docs_cache(self) -> DocCache:
|
||||
if self._docs_cache is not None:
|
||||
return self._docs_cache
|
||||
cache = DocCache(self)
|
||||
self._docs_cache = cache
|
||||
return cache
|
||||
def doc_lookup(self) -> DocLookup:
|
||||
if self._doc_lookup is None:
|
||||
self._doc_lookup = DocLookup(self)
|
||||
return self._doc_lookup
|
||||
|
||||
@property
|
||||
def source_cache(self) -> SourceCache:
|
||||
if self._sources_cache is not None:
|
||||
return self._sources_cache
|
||||
cache = SourceCache(self)
|
||||
self._sources_cache = cache
|
||||
return cache
|
||||
def source_lookup(self) -> SourceLookup:
|
||||
if self._source_lookup is None:
|
||||
self._source_lookup = SourceLookup(self)
|
||||
return self._source_lookup
|
||||
|
||||
@property
|
||||
def refs_cache(self) -> RefableCache:
|
||||
if self._refs_cache is not None:
|
||||
return self._refs_cache
|
||||
cache = RefableCache(self)
|
||||
self._refs_cache = cache
|
||||
return cache
|
||||
def ref_lookup(self) -> RefableLookup:
|
||||
if self._ref_lookup is None:
|
||||
self._ref_lookup = RefableLookup(self)
|
||||
return self._ref_lookup
|
||||
|
||||
def resolve_ref(
|
||||
self,
|
||||
@@ -868,7 +855,7 @@ class Manifest(MacroMethods):
|
||||
current_project, node_package, target_model_package
|
||||
)
|
||||
for pkg in candidates:
|
||||
node = self.refs_cache.find_cached_value(target_model_name, pkg)
|
||||
node = self.ref_lookup.find(target_model_name, pkg, self)
|
||||
|
||||
if node is not None and node.config.enabled:
|
||||
return node
|
||||
@@ -897,7 +884,7 @@ class Manifest(MacroMethods):
|
||||
disabled: Optional[ParsedSourceDefinition] = None
|
||||
|
||||
for pkg in candidates:
|
||||
source = self.source_cache.find_cached_value(key, pkg)
|
||||
source = self.source_lookup.find(key, pkg, self)
|
||||
if source is not None and source.config.enabled:
|
||||
return source
|
||||
|
||||
@@ -926,7 +913,7 @@ class Manifest(MacroMethods):
|
||||
)
|
||||
|
||||
for pkg in candidates:
|
||||
result = self.docs_cache.find_cached_value(name, pkg)
|
||||
result = self.doc_lookup.find(name, pkg, self)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
@@ -1193,9 +1180,9 @@ class Manifest(MacroMethods):
|
||||
self.patches,
|
||||
self.source_patches,
|
||||
self._disabled,
|
||||
self._docs_cache,
|
||||
self._sources_cache,
|
||||
self._refs_cache,
|
||||
self._doc_lookup,
|
||||
self._source_lookup,
|
||||
self._ref_lookup,
|
||||
)
|
||||
return self.__class__, args
|
||||
|
||||
@@ -1266,6 +1253,10 @@ def _check_duplicates(
|
||||
raise_duplicate_resource_name(value, src[value.unique_id])
|
||||
|
||||
|
||||
K_T = TypeVar('K_T')
|
||||
V_T = TypeVar('V_T')
|
||||
|
||||
|
||||
def _expect_value(
|
||||
key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str
|
||||
) -> V_T:
|
||||
|
||||
@@ -2,14 +2,13 @@ from dataclasses import field, Field, dataclass
|
||||
from enum import Enum
|
||||
from itertools import chain
|
||||
from typing import (
|
||||
Any, List, Optional, Dict, MutableMapping, Union, Type,
|
||||
TypeVar, Callable,
|
||||
Any, List, Optional, Dict, Union, Type, TypeVar
|
||||
)
|
||||
from dbt.dataclass_schema import (
|
||||
dbtClassMixin, ValidationError, register_pattern,
|
||||
)
|
||||
from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed
|
||||
from dbt.exceptions import CompilationException, InternalException
|
||||
from dbt.exceptions import InternalException
|
||||
from dbt.contracts.util import Replaceable, list_str
|
||||
from dbt import hooks
|
||||
from dbt.node_types import NodeType
|
||||
@@ -182,53 +181,29 @@ T = TypeVar('T', bound='BaseConfig')
|
||||
|
||||
@dataclass
|
||||
class BaseConfig(
|
||||
AdditionalPropertiesAllowed, Replaceable, MutableMapping[str, Any]
|
||||
AdditionalPropertiesAllowed, Replaceable
|
||||
):
|
||||
# Implement MutableMapping so this config will behave as some macros expect
|
||||
# during parsing (notably, syntax like `{{ node.config['schema'] }}`)
|
||||
|
||||
# enable syntax like: config['key']
|
||||
def __getitem__(self, key):
|
||||
"""Handle parse-time use of `config` as a dictionary, making the extra
|
||||
values available during parsing.
|
||||
"""
|
||||
return self.get(key)
|
||||
|
||||
# like doing 'get' on a dictionary
|
||||
def get(self, key, default=None):
|
||||
if hasattr(self, key):
|
||||
return getattr(self, key)
|
||||
else:
|
||||
elif key in self._extra:
|
||||
return self._extra[key]
|
||||
else:
|
||||
return default
|
||||
|
||||
# enable syntax like: config['key'] = value
|
||||
def __setitem__(self, key, value):
|
||||
if hasattr(self, key):
|
||||
setattr(self, key, value)
|
||||
else:
|
||||
self._extra[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
if hasattr(self, key):
|
||||
msg = (
|
||||
'Error, tried to delete config key "{}": Cannot delete '
|
||||
'built-in keys'
|
||||
).format(key)
|
||||
raise CompilationException(msg)
|
||||
else:
|
||||
del self._extra[key]
|
||||
|
||||
def _content_iterator(self, include_condition: Callable[[Field], bool]):
|
||||
seen = set()
|
||||
for fld, _ in self._get_fields():
|
||||
seen.add(fld.name)
|
||||
if include_condition(fld):
|
||||
yield fld.name
|
||||
|
||||
for key in self._extra:
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
yield key
|
||||
|
||||
def __iter__(self):
|
||||
yield from self._content_iterator(include_condition=lambda f: True)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._get_fields()) + len(self._extra)
|
||||
|
||||
@staticmethod
|
||||
def compare_key(
|
||||
unrendered: Dict[str, Any],
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
import os
|
||||
import pickle
|
||||
from typing import (
|
||||
Dict, Optional, Mapping, Callable, Any, List, Type, Union
|
||||
)
|
||||
@@ -59,7 +58,7 @@ from dbt.version import __version__
|
||||
|
||||
from dbt.dataclass_schema import dbtClassMixin
|
||||
|
||||
PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle'
|
||||
PARTIAL_PARSE_FILE_NAME = 'partial_parse.msgpack'
|
||||
PARSING_STATE = DbtProcessState('parsing')
|
||||
DEFAULT_PARTIAL_PARSE = False
|
||||
|
||||
@@ -334,11 +333,16 @@ class ManifestLoader:
|
||||
return False
|
||||
|
||||
def write_manifest_for_partial_parse(self):
|
||||
|
||||
path = os.path.join(self.root_project.target_path,
|
||||
PARTIAL_PARSE_FILE_NAME)
|
||||
make_directory(self.root_project.target_path)
|
||||
with open(path, 'wb') as fp:
|
||||
pickle.dump(self.manifest, fp)
|
||||
try:
|
||||
manifest_msgpack = self.manifest.to_msgpack()
|
||||
make_directory(os.path.dirname(path))
|
||||
with open(path, 'wb') as fp:
|
||||
fp.write(manifest_msgpack)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
def matching_parse_results(self, manifest: Manifest) -> bool:
|
||||
"""Compare the global hashes of the read-in parse results' values to
|
||||
@@ -410,7 +414,8 @@ class ManifestLoader:
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
with open(path, 'rb') as fp:
|
||||
manifest: Manifest = pickle.load(fp)
|
||||
manifest_mp = fp.read()
|
||||
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore
|
||||
# keep this check inside the try/except in case something about
|
||||
# the file has changed in weird ways, perhaps due to being a
|
||||
# different version of dbt
|
||||
|
||||
Reference in New Issue
Block a user