Compare commits

...

1 Commits

Author SHA1 Message Date
Gerda Shank
6f18e0d0cd Use msgpack to save Manifest instead of pickle 2021-04-26 12:38:18 -04:00
3 changed files with 122 additions and 151 deletions

View File

@@ -1,11 +1,11 @@
import abc
import enum
from dataclasses import dataclass, field
from itertools import chain, islice
from mashumaro import DataClassMessagePackMixin
from multiprocessing.synchronize import Lock
from typing import (
Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple,
TypeVar, Callable, Iterable, Generic, cast, AbstractSet
TypeVar, Callable, Iterable, Generic, cast, AbstractSet, ClassVar
)
from typing_extensions import Protocol
from uuid import UUID
@@ -46,72 +46,64 @@ RefName = str
UniqueID = str
K_T = TypeVar('K_T')
V_T = TypeVar('V_T')
class PackageAwareCache(Generic[K_T, V_T]):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[K_T, Dict[PackageName, UniqueID]] = {}
self._manifest = manifest
self.populate()
@abc.abstractmethod
def populate(self):
pass
@abc.abstractmethod
def perform_lookup(self, unique_id: UniqueID) -> V_T:
pass
def find_cached_value(
self, key: K_T, package: Optional[PackageName]
) -> Optional[V_T]:
unique_id = self.find_unique_id_for_package(key, package)
if unique_id is not None:
return self.perform_lookup(unique_id)
def find_unique_id_for_package(storage, key, package: Optional[PackageName]):
if key not in storage:
return None
def find_unique_id_for_package(
self, key: K_T, package: Optional[PackageName]
) -> Optional[UniqueID]:
if key not in self.storage:
pkg_dct: Mapping[PackageName, UniqueID] = storage[key]
if package is None:
if not pkg_dct:
return None
pkg_dct: Mapping[PackageName, UniqueID] = self.storage[key]
if package is None:
if not pkg_dct:
return None
else:
return next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
else:
return None
return next(iter(pkg_dct.values()))
elif package in pkg_dct:
return pkg_dct[package]
else:
return None
class DocCache(PackageAwareCache[DocName, ParsedDocumentation]):
class DocLookup(dbtClassMixin):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_doc(self, doc: ParsedDocumentation):
if doc.name not in self.storage:
self.storage[doc.name] = {}
self.storage[doc.name][doc.package_name] = doc.unique_id
def populate(self):
for doc in self._manifest.docs.values():
def populate(self, manifest):
for doc in manifest.docs.values():
self.add_doc(doc)
def perform_lookup(
self, unique_id: UniqueID
self, unique_id: UniqueID, manifest
) -> ParsedDocumentation:
if unique_id not in self._manifest.docs:
if unique_id not in manifest.docs:
raise dbt.exceptions.InternalException(
f'Doc {unique_id} found in cache but not found in manifest'
)
return self._manifest.docs[unique_id]
return manifest.docs[unique_id]
class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]):
class SourceLookup(dbtClassMixin):
def __init__(self, manifest: 'Manifest'):
self.storage: Dict[Tuple[str, str], Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_source(self, source: ParsedSourceDefinition):
key = (source.source_name, source.name)
if key not in self.storage:
@@ -119,47 +111,55 @@ class SourceCache(PackageAwareCache[SourceKey, ParsedSourceDefinition]):
self.storage[key][source.package_name] = source.unique_id
def populate(self):
for source in self._manifest.sources.values():
def populate(self, manifest):
for source in manifest.sources.values():
if hasattr(source, 'source_name'):
self.add_source(source)
def perform_lookup(
self, unique_id: UniqueID
self, unique_id: UniqueID, manifest: 'Manifest'
) -> ParsedSourceDefinition:
if unique_id not in self._manifest.sources:
if unique_id not in manifest.sources:
raise dbt.exceptions.InternalException(
f'Source {unique_id} found in cache but not found in manifest'
)
return self._manifest.sources[unique_id]
return manifest.sources[unique_id]
class RefableCache(PackageAwareCache[RefName, ManifestNode]):
class RefableLookup(dbtClassMixin):
_lookup_types: ClassVar[set] = set(NodeType.refable())
# refables are actually unique, so the Dict[PackageName, UniqueID] will
# only ever have exactly one value, but doing 3 dict lookups instead of 1
# is not a big deal at all and retains consistency
def __init__(self, manifest: 'Manifest'):
self._cached_types = set(NodeType.refable())
super().__init__(manifest)
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def find(self, key, package: Optional[PackageName], manifest: 'Manifest'):
unique_id = find_unique_id_for_package(self.storage, key, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_node(self, node: ManifestNode):
if node.resource_type in self._cached_types:
if node.resource_type in self._lookup_types:
if node.name not in self.storage:
self.storage[node.name] = {}
self.storage[node.name][node.package_name] = node.unique_id
def populate(self):
for node in self._manifest.nodes.values():
def populate(self, manifest):
for node in manifest.nodes.values():
self.add_node(node)
def perform_lookup(
self, unique_id: UniqueID
self, unique_id: UniqueID, manifest
) -> ManifestNode:
if unique_id not in self._manifest.nodes:
if unique_id not in manifest.nodes:
raise dbt.exceptions.InternalException(
f'Node {unique_id} found in cache but not found in manifest'
)
return self._manifest.nodes[unique_id]
return manifest.nodes[unique_id]
def _search_packages(
@@ -520,7 +520,7 @@ class ManifestStateCheck(dbtClassMixin):
@dataclass
class Manifest(MacroMethods):
class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
"""The manifest for the full graph, after parsing and during compilation.
"""
# These attributes are both positional and by keyword. If an attribute
@@ -543,10 +543,19 @@ class Manifest(MacroMethods):
source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict)
# following is from ParseResult
_disabled: MutableMapping[str, List[CompileResultNode]] = field(default_factory=dict)
_docs_cache: Optional[DocCache] = None
_sources_cache: Optional[SourceCache] = None
_refs_cache: Optional[RefableCache] = None
_lock: Lock = field(default_factory=flags.MP_CONTEXT.Lock)
_doc_lookup: Optional[DocLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_source_lookup: Optional[SourceLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_ref_lookup: Optional[RefableLookup] = field(
default=None, metadata={'serialize': lambda x: None, 'deserialize': lambda x: None}
)
_lock: Lock = field(
default_factory=flags.MP_CONTEXT.Lock,
metadata={'serialize': lambda x: None, 'deserialize': lambda x: flags.MP_CONTEXT.Lock}
)
def sync_update_node(
self, new_node: NonSourceCompiledNode
@@ -662,17 +671,6 @@ class Manifest(MacroMethods):
resource_fqns[resource_type_plural].add(tuple(resource.fqn))
return resource_fqns
def add_nodes(self, new_nodes: Mapping[str, ManifestNode]):
"""Add the given dict of new nodes to the manifest."""
for unique_id, node in new_nodes.items():
if unique_id in self.nodes:
raise_duplicate_resource_name(node, self.nodes[unique_id])
self.nodes[unique_id] = node
# fixup the cache if it exists.
if self._refs_cache is not None:
if node.resource_type in NodeType.refable():
self._refs_cache.add_node(node)
def add_patch(
self, source_file: SourceFile, patch: ParsedNodePatch,
) -> None:
@@ -808,11 +806,6 @@ class Manifest(MacroMethods):
parent_map=backward_edges,
)
# When 'to_dict' is called on the Manifest, it substitues a
# WritableManifest
def __pre_serialize__(self):
return self.writable_manifest()
def write(self, path):
self.writable_manifest().write(path)
@@ -830,28 +823,22 @@ class Manifest(MacroMethods):
)
@property
def docs_cache(self) -> DocCache:
if self._docs_cache is not None:
return self._docs_cache
cache = DocCache(self)
self._docs_cache = cache
return cache
def doc_lookup(self) -> DocLookup:
if self._doc_lookup is None:
self._doc_lookup = DocLookup(self)
return self._doc_lookup
@property
def source_cache(self) -> SourceCache:
if self._sources_cache is not None:
return self._sources_cache
cache = SourceCache(self)
self._sources_cache = cache
return cache
def source_lookup(self) -> SourceLookup:
if self._source_lookup is None:
self._source_lookup = SourceLookup(self)
return self._source_lookup
@property
def refs_cache(self) -> RefableCache:
if self._refs_cache is not None:
return self._refs_cache
cache = RefableCache(self)
self._refs_cache = cache
return cache
def ref_lookup(self) -> RefableLookup:
if self._ref_lookup is None:
self._ref_lookup = RefableLookup(self)
return self._ref_lookup
def resolve_ref(
self,
@@ -868,7 +855,7 @@ class Manifest(MacroMethods):
current_project, node_package, target_model_package
)
for pkg in candidates:
node = self.refs_cache.find_cached_value(target_model_name, pkg)
node = self.ref_lookup.find(target_model_name, pkg, self)
if node is not None and node.config.enabled:
return node
@@ -897,7 +884,7 @@ class Manifest(MacroMethods):
disabled: Optional[ParsedSourceDefinition] = None
for pkg in candidates:
source = self.source_cache.find_cached_value(key, pkg)
source = self.source_lookup.find(key, pkg, self)
if source is not None and source.config.enabled:
return source
@@ -926,7 +913,7 @@ class Manifest(MacroMethods):
)
for pkg in candidates:
result = self.docs_cache.find_cached_value(name, pkg)
result = self.doc_lookup.find(name, pkg, self)
if result is not None:
return result
return None
@@ -1193,9 +1180,9 @@ class Manifest(MacroMethods):
self.patches,
self.source_patches,
self._disabled,
self._docs_cache,
self._sources_cache,
self._refs_cache,
self._doc_lookup,
self._source_lookup,
self._ref_lookup,
)
return self.__class__, args
@@ -1266,6 +1253,10 @@ def _check_duplicates(
raise_duplicate_resource_name(value, src[value.unique_id])
K_T = TypeVar('K_T')
V_T = TypeVar('V_T')
def _expect_value(
key: K_T, src: Mapping[K_T, V_T], old_file: SourceFile, name: str
) -> V_T:

View File

@@ -2,14 +2,13 @@ from dataclasses import field, Field, dataclass
from enum import Enum
from itertools import chain
from typing import (
Any, List, Optional, Dict, MutableMapping, Union, Type,
TypeVar, Callable,
Any, List, Optional, Dict, Union, Type, TypeVar
)
from dbt.dataclass_schema import (
dbtClassMixin, ValidationError, register_pattern,
)
from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed
from dbt.exceptions import CompilationException, InternalException
from dbt.exceptions import InternalException
from dbt.contracts.util import Replaceable, list_str
from dbt import hooks
from dbt.node_types import NodeType
@@ -182,53 +181,29 @@ T = TypeVar('T', bound='BaseConfig')
@dataclass
class BaseConfig(
AdditionalPropertiesAllowed, Replaceable, MutableMapping[str, Any]
AdditionalPropertiesAllowed, Replaceable
):
# Implement MutableMapping so this config will behave as some macros expect
# during parsing (notably, syntax like `{{ node.config['schema'] }}`)
# enable syntax like: config['key']
def __getitem__(self, key):
"""Handle parse-time use of `config` as a dictionary, making the extra
values available during parsing.
"""
return self.get(key)
# like doing 'get' on a dictionary
def get(self, key, default=None):
if hasattr(self, key):
return getattr(self, key)
else:
elif key in self._extra:
return self._extra[key]
else:
return default
# enable syntax like: config['key'] = value
def __setitem__(self, key, value):
if hasattr(self, key):
setattr(self, key, value)
else:
self._extra[key] = value
def __delitem__(self, key):
if hasattr(self, key):
msg = (
'Error, tried to delete config key "{}": Cannot delete '
'built-in keys'
).format(key)
raise CompilationException(msg)
else:
del self._extra[key]
def _content_iterator(self, include_condition: Callable[[Field], bool]):
seen = set()
for fld, _ in self._get_fields():
seen.add(fld.name)
if include_condition(fld):
yield fld.name
for key in self._extra:
if key not in seen:
seen.add(key)
yield key
def __iter__(self):
yield from self._content_iterator(include_condition=lambda f: True)
def __len__(self):
return len(self._get_fields()) + len(self._extra)
@staticmethod
def compare_key(
unrendered: Dict[str, Any],

View File

@@ -1,7 +1,6 @@
from dataclasses import dataclass
from dataclasses import field
import os
import pickle
from typing import (
Dict, Optional, Mapping, Callable, Any, List, Type, Union
)
@@ -59,7 +58,7 @@ from dbt.version import __version__
from dbt.dataclass_schema import dbtClassMixin
PARTIAL_PARSE_FILE_NAME = 'partial_parse.pickle'
PARTIAL_PARSE_FILE_NAME = 'partial_parse.msgpack'
PARSING_STATE = DbtProcessState('parsing')
DEFAULT_PARTIAL_PARSE = False
@@ -334,11 +333,16 @@ class ManifestLoader:
return False
def write_manifest_for_partial_parse(self):
path = os.path.join(self.root_project.target_path,
PARTIAL_PARSE_FILE_NAME)
make_directory(self.root_project.target_path)
with open(path, 'wb') as fp:
pickle.dump(self.manifest, fp)
try:
manifest_msgpack = self.manifest.to_msgpack()
make_directory(os.path.dirname(path))
with open(path, 'wb') as fp:
fp.write(manifest_msgpack)
except Exception:
raise
def matching_parse_results(self, manifest: Manifest) -> bool:
"""Compare the global hashes of the read-in parse results' values to
@@ -410,7 +414,8 @@ class ManifestLoader:
if os.path.exists(path):
try:
with open(path, 'rb') as fp:
manifest: Manifest = pickle.load(fp)
manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp) # type: ignore
# keep this check inside the try/except in case something about
# the file has changed in weird ways, perhaps due to being a
# different version of dbt