Compare commits

...

1 Commits

Author SHA1 Message Date
Gerda Shank
6f18e0d0cd Use msgpack to save Manifest instead of pickle 2021-04-26 12:38:18 -04:00
3 changed files with 122 additions and 151 deletions

View File

@@ -1,11 +1,11 @@
import abc
import enum import enum
from dataclasses import dataclass, field from dataclasses import dataclass, field
from itertools import chain, islice from itertools import chain, islice
from mashumaro import DataClassMessagePackMixin
from multiprocessing.synchronize import Lock from multiprocessing.synchronize import Lock
from typing import ( from typing import (
Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple, Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple,
TypeVar, Callable, Iterable, Generic, cast, AbstractSet TypeVar, Callable, Iterable, Generic, cast, AbstractSet, ClassVar
) )
from typing_extensions import Protocol from typing_extensions import Protocol
from uuid import UUID from uuid import UUID
@@ -46,72 +46,64 @@ RefName = str
UniqueID = str UniqueID = str
K_T = TypeVar('K_T') def find_unique_id_for_package(storage, key, package: Optional[PackageName]):
V_T = TypeVar('V_T') if key not in storage:
class PackageAwareCache(Generic[K_T, V_T]):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[K_T, Dict[PackageName, UniqueID]] = {}
self._manifest = manifest
self.populate()
@abc.abstractmethod
def populate(self):
pass
@abc.abstractmethod
def perform_lookup(self, unique_id: UniqueID) -> V_T:
pass
def find_cached_value(
self, key: K_T, package: Optional[PackageName]
) -> Optional[V_T]:
unique_id = self.find_unique_id_for_package(key, package)
if unique_id is not None:
return self.perform_lookup(unique_id)
return None return None
def find_unique_id_for_package( pkg_dct: Mapping[PackageName, UniqueID] = storage[key]
self, key: K_T, package: Optional[PackageName]
) -> Optional[UniqueID]: if package is None:
if key not in self.storage: if not pkg_dct:
return None return None
pkg_dct: Mapping[PackageName, UniqueID] = self.storage[key]
if package is None:
if not pkg_dct:
return None
else:
return next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
else: else:
return None return next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
else:
return None
class DocCache(PackageAwareCache[DocName, ParsedDocumentation]): class DocLookup(dbtClassMixin):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_doc(self, doc: ParsedDocumentation): def add_doc(self, doc: ParsedDocumentation):
if doc.name not in self.storage: if doc.name not in self.storage:
self.storage[doc.name] = {} self.storage[doc.name] = {}
self.storage[doc.name][doc.package_name] = doc.unique_id self.storage[doc.name][doc.package_name] = doc.unique_id
def populate(self): def populate(self, manifest):
for doc in self._manifest.docs.values(): for doc in manifest.docs.values():
self.add_doc(doc) self.add_doc(doc)
def perform_lookup( def perform_lookup(
self, unique_id: UniqueID self, unique_id: UniqueID, manifest
) -> ParsedDocumentation: ) -> ParsedDocumentation:
if unique_id not in self._manifest.docs: if unique_id not in manifest.docs:
raise dbt.exceptions.InternalException( raise dbt.exceptions.InternalException(
f'Doc {unique_id} found in cache but not found in manifest' f'Doc {unique_id} found in cache but not found in manifest'
) )
return self._manifest.docs[unique_id] return manifest.docs[unique_id]
class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]): class SourceLookup(dbtClassMixin):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[Tuple[str, str], Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_source(self, source: ParsedSourceDefinition): def add_source(self, source: ParsedSourceDefinition):
key = (source.source_name, source.name) key = (source.source_name, source.name)
if key not in self.storage: if key not in self.storage:
@@ -119,47 +111,55 @@ class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]):
self.storage[key][source.package_name] = source.unique_id self.storage[key][source.package_name] = source.unique_id
def populate(self): def populate(self, manifest):
for source in self._manifest.sources.values(): for source in manifest.sources.values():
if hasattr(source, 'source_name'): if hasattr(source, 'source_name'):
self.add_source(source) self.add_source(source)
def perform_lookup( def perform_lookup(
self, unique_id: UniqueID self, unique_id: UniqueID, manifest: 'Manifest'
) -> ParsedSourceDefinition: ) -> ParsedSourceDefinition:
if unique_id not in self._manifest.sources: if unique_id not in manifest.sources:
raise dbt.exceptions.InternalException( raise dbt.exceptions.InternalException(
f'Source {unique_id} found in cache but not found in manifest' f'Source {unique_id} found in cache but not found in manifest'
) )
return self._manifest.sources[unique_id] return manifest.sources[unique_id]
class RefableCache(PackageAwareCache[RefName, ManifestNode]): class RefableLookup(dbtClassMixin):
_lookup_types: ClassVar[set] = set(NodeType.refable())
# refables are actually unique, so the Dict[PackageName, UniqueID] will # refables are actually unique, so the Dict[PackageName, UniqueID] will
# only ever have exactly one value, but doing 3 dict lookups instead of 1 # only ever have exactly one value, but doing 3 dict lookups instead of 1
# is not a big deal at all and retains consistency # is not a big deal at all and retains consistency
def __init__(self, manifest: 'Manifest'): def __init__(self, manifest: 'Manifest'):
self._cached_types = set(NodeType.refable()) self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
super().__init__(manifest) self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_node(self, node: ManifestNode): def add_node(self, node: ManifestNode):
if node.resource_type in self._cached_types: if node.resource_type in self._lookup_types:
if node.name not in self.storage: if node.name not in self.storage:
self.storage[node.name] = {} self.storage[node.name] = {}
self.storage[node.name][node.package_name] = node.unique_id self.storage[node.name][node.package_name] = node.unique_id
def populate(self): def populate(self, manifest):
for node in self._manifest.nodes.values(): for node in manifest.nodes.values():
self.add_node(node) self.add_node(node)
def perform_lookup( def perform_lookup(
self, unique_id: UniqueID self, unique_id: UniqueID, manifest
) -> ManifestNode: ) -> ManifestNode:
if unique_id not in self._manifest.nodes: if unique_id not in manifest.nodes:
raise dbt.exceptions.InternalException( raise dbt.exceptions.InternalException(
f'Node {unique_id} found in cache but not found in manifest' f'Node {unique_id} found in cache but not found in manifest'
) )
return self._manifest.nodes[unique_id] return manifest.nodes[unique_id]
def _search_packages( def _search_packages(
@@ -520,7 +520,7 @@ class ManifestStateCheck(dbtClassMixin):
@dataclass @dataclass
class Manifest(MacroMethods): class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
"""The manifest for the full graph, after parsing and during compilation. """The manifest for the full graph, after parsing and during compilation.
""" """
# These attributes are both positional and by keyword. If an attribute # These attributes are both positional and by keyword. If an attribute
@@ -543,10 +543,19 @@ class Manifest(MacroMethods):
source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict) source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict)
# following is from ParseResult # following is from ParseResult
_disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict) _disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict)
_docs_cache: Optional[DocCache] = None _doc_lookup: Optional[DocLookup] = field(
_sources_cache: Optional[SourceCache] = None default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
_refs_cache: Optional[RefableCache] = None )
_lock: Lock = field(default_factory=flags.MP_CONTEXT.Lock) _source_lookup: Optional[SourceLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_ref_lookup: Optional[RefableLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_lock: Lock = field(
default_factory=flags.MP_CONTEXT.Lock,
metadata={'serialize': lambda x: None, 'deserialize': lambda x: flags.MP_CONTEXT.Lock}
)
def sync_update_node( def sync_update_node(
self, new_node: NonSourceCompiledNode self, new_node: NonSourceCompiledNode
@@ -662,17 +671,6 @@ class Manifest(MacroMethods):
resource_fqns[resource_type_plural].add(tuple(resource.fqn)) resource_fqns[resource_type_plural].add(tuple(resource.fqn))
return resource_fqns return resource_fqns
def add_nodes(self, new_nodes: Mapping[str, ManifestNode]):
"""Add the given dict of new nodes to the manifest."""
for unique_id, node in new_nodes.items():
if unique_id in self.nodes:
raise_duplicate_resource_name(node, self.nodes[unique_id])
self.nodes[unique_id] = node
# fixup the cache if it exists.
if self._refs_cache is not None:
if node.resource_type in NodeType.refable():
self._refs_cache.add_node(node)
def add_patch( def add_patch(
self, source_file: SourceFile, patch: ParsedNodePatch, self, source_file: SourceFile, patch: ParsedNodePatch,
) -> None: ) -> None:
@@ -808,11 +806,6 @@ class Manifest(MacroMethods):
parent_map=backward_edges, parent_map=backward_edges,
) )
# When 'to_dict' is called on the Manifest, it substitues a
# WritableManifest
def __pre_serialize__(self):
return self.writable_manifest()
def write(self, path): def write(self, path):
self.writable_manifest().write(path) self.writable_manifest().write(path)
@@ -830,28 +823,22 @@ class Manifest(MacroMethods):
) )
@property @property
def docs_cache(self) -> DocCache: def doc_lookup(self) -> DocLookup:
if self._docs_cache is not None: if self._doc_lookup is None:
return self._docs_cache self._doc_lookup = DocLookup(self)
cache = DocCache(self) return self._doc_lookup
self._docs_cache = cache
return cache
@property @property
def source_cache(self) -> SourceCache: def source_lookup(self) -> SourceLookup:
if self._sources_cache is not None: if self._source_lookup is None:
return self._sources_cache self._source_lookup = SourceLookup(self)
cache = SourceCache(self) return self._source_lookup
self._sources_cache = cache
return cache
@property @property
def refs_cache(self) -> RefableCache: def ref_lookup(self) -> RefableLookup:
if self._refs_cache is not None: if self._ref_lookup is None:
return self._refs_cache self._ref_lookup = RefableLookup(self)
cache = RefableCache(self) return self._ref_lookup
self._refs_cache = cache
return cache
def resolve_ref( def resolve_ref(
self, self,
@@ -868,7 +855,7 @@ class Manifest(MacroMethods):
current_project, node_package, target_model_package current_project, node_package, target_model_package
) )
for pkg in candidates: for pkg in candidates:
node = self.refs_cache.find_cached_value(target_model_name, pkg) node = self.ref_lookup.find(target_model_name, pkg, self)
if node is not None and node.config.enabled: if node is not None and node.config.enabled:
return node return node
@@ -897,7 +884,7 @@ class Manifest(MacroMethods):
disabled: Optional[ParsedSourceDefinition] = None disabled: Optional[ParsedSourceDefinition] = None
for pkg in candidates: for pkg in candidates:
source = self.source_cache.find_cached_value(key, pkg) source = self.source_lookup.find(key, pkg, self)
if source is not None and source.config.enabled: if source is not None and source.config.enabled:
return source return source
@@ -926,7 +913,7 @@ class Manifest(MacroMethods):
) )
for pkg in candidates: for pkg in candidates:
result = self.docs_cache.find_cached_value(name, pkg) result = self.doc_lookup.find(name, pkg, self)
if result is not None: if result is not None:
return result return result
return None return None
@@ -1193,9 +1180,9 @@ class Manifest(MacroMethods):
self.patches, self.patches,
self.source_patches, self.source_patches,
self._disabled, self._disabled,
self._docs_cache, self._doc_lookup,
self._sources_cache, self._source_lookup,
self._refs_cache, self._ref_lookup,
) )
return self.__class__, args return self.__class__, args
@@ -1266,6 +1253,10 @@ def _check_duplicates(
raise_duplicate_resource_name(value, src[value.unique_id]) raise_duplicate_resource_name(value, src[value.unique_id])
K_T = TypeVar('K_T')
V_T = TypeVar('V_T')
def _expect_value( def _expect_value(
key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str
) -> V_T: ) -> V_T:

View File

@@ -2,14 +2,13 @@ from dataclasses import field, Field, dataclass
from enum import Enum from enum import Enum
from itertools import chain from itertools import chain
from typing import ( from typing import (
Any, List, Optional, Dict, MutableMapping, Union, Type, Any, List, Optional, Dict, Union, Type, TypeVar
TypeVar, Callable,
) )
from dbt.dataclass_schema import ( from dbt.dataclass_schema import (
dbtClassMixin, ValidationError, register_pattern, dbtClassMixin, ValidationError, register_pattern,
) )
from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed
from dbt.exceptions import CompilationException, InternalException from dbt.exceptions import InternalException
from dbt.contracts.util import Replaceable, list_str from dbt.contracts.util import Replaceable, list_str
from dbt import hooks from dbt import hooks
from dbt.node_types import NodeType from dbt.node_types import NodeType
@@ -182,53 +181,29 @@ T = TypeVar('T', bound='BaseConfig')
@dataclass @dataclass
class BaseConfig( class BaseConfig(
AdditionalPropertiesAllowed, Replaceable, MutableMapping[str, Any] AdditionalPropertiesAllowed, Replaceable
): ):
# Implement MutableMapping so this config will behave as some macros expect
# during parsing (notably, syntax like `{{ node.config['schema'] }}`) # enable syntax like: config['key']
def __getitem__(self, key): def __getitem__(self, key):
"""Handle parse-time use of `config` as a dictionary, making the extra return self.get(key)
values available during parsing.
""" # like doing 'get' on a dictionary
def get(self, key, default=None):
if hasattr(self, key): if hasattr(self, key):
return getattr(self, key) return getattr(self, key)
else: elif key in self._extra:
return self._extra[key] return self._extra[key]
else:
return default
# enable syntax like: config['key'] = value
def __setitem__(self, key, value): def __setitem__(self, key, value):
if hasattr(self, key): if hasattr(self, key):
setattr(self, key, value) setattr(self, key, value)
else: else:
self._extra[key] = value self._extra[key] = value
def __delitem__(self, key):
if hasattr(self, key):
msg = (
'Error, tried to delete config key "{}": Cannot delete '
'built-in keys'
).format(key)
raise CompilationException(msg)
else:
del self._extra[key]
def _content_iterator(self, include_condition: Callable[[Field], bool]):
seen = set()
for fld, _ in self._get_fields():
seen.add(fld.name)
if include_condition(fld):
yield fld.name
for key in self._extra:
if key not in seen:
seen.add(key)
yield key
def __iter__(self):
yield from self._content_iterator(include_condition=lambda f: True)
def __len__(self):
return len(self._get_fields()) + len(self._extra)
@staticmethod @staticmethod
def compare_key( def compare_key(
unrendered: Dict[str, Any], unrendered: Dict[str, Any],

View File

@@ -1,7 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from dataclasses import field from dataclasses import field
import os import os
import pickle
from typing import ( from typing import (
Dict, Optional, Mapping, Callable, Any, List, Type, Union Dict, Optional, Mapping, Callable, Any, List, Type, Union
) )
@@ -59,7 +58,7 @@ from dbt.version import __version__
from dbt.dataclass_schema import dbtClassMixin from dbt.dataclass_schema import dbtClassMixin
PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle' PARTIAL_PARSE_FILE_NAME = 'partial_parse.msgpack'
PARSING_STATE = DbtProcessState('parsing') PARSING_STATE = DbtProcessState('parsing')
DEFAULT_PARTIAL_PARSE = False DEFAULT_PARTIAL_PARSE = False
@@ -334,11 +333,16 @@ class ManifestLoader:
return False return False
def write_manifest_for_partial_parse(self): def write_manifest_for_partial_parse(self):
path = os.path.join(self.root_project.target_path, path = os.path.join(self.root_project.target_path,
PARTIAL_PARSE_FILE_NAME) PARTIAL_PARSE_FILE_NAME)
make_directory(self.root_project.target_path) try:
with open(path, 'wb') as fp: manifest_msgpack = self.manifest.to_msgpack()
pickle.dump(self.manifest, fp) make_directory(os.path.dirname(path))
with open(path, 'wb') as fp:
fp.write(manifest_msgpack)
except Exception:
raise
def matching_parse_results(self, manifest: Manifest) -> bool: def matching_parse_results(self, manifest: Manifest) -> bool:
"""Compare the global hashes of the read-in parse results' values to """Compare the global hashes of the read-in parse results' values to
@@ -410,7 +414,8 @@ class ManifestLoader:
if os.path.exists(path): if os.path.exists(path):
try: try:
with open(path, 'rb') as fp: with open(path, 'rb') as fp:
manifest: Manifest = pickle.load(fp) manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore
# keep this check inside the try/except in case something about # keep this check inside the try/except in case something about
# the file has changed in weird ways, perhaps due to being a # the file has changed in weird ways, perhaps due to being a
# different version of dbt # different version of dbt