Compare commits

...

3 Commits

Author SHA1 Message Date
Michelle Ark
03229d65d8 changelog entry 2023-12-05 23:58:35 +09:00
Michelle Ark
48a0f8025d Merge branch 'relation-create-from-refactor' into relation-cache-no-manifest 2023-12-05 23:57:05 +09:00
Michelle Ark
8c96285650 remove manifest from adapter.set_relations_cache signature 2023-12-05 16:21:56 +09:00
3 changed files with 29 additions and 12 deletions

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: remove manifest from adapter.set_relations_cache signature
time: 2023-12-05T23:58:30.920144+09:00
custom:
Author: michelleark
Issue: "9217"

View File

@@ -76,6 +76,7 @@ from dbt.adapters.events.types import (
)
from dbt.common.utils import filter_null_values, executor, cast_to_str, AttrDict
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.base.connections import Connection, AdapterResponse, BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
@@ -423,15 +424,13 @@ class BaseAdapter(metaclass=AdapterMeta):
else:
return True
def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]:
def _get_cache_schemas(self, relation_configs: Iterable[RelationConfig]) -> Set[BaseRelation]:
"""Get the set of schema relations that the cache logic needs to
populate. This means only executable nodes are included.
populate.
"""
# the cache only cares about executable nodes
return {
self.Relation.create_from(self.config, node).without_identifier() # type: ignore[arg-type]
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model and not node.is_external_node)
self.Relation.create_from(quoting=self.config, config=relation_config)
for relation_config in relation_configs
}
def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
@@ -480,13 +479,15 @@ class BaseAdapter(metaclass=AdapterMeta):
return relations
def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None
self,
relation_configs: Iterable[RelationConfig],
cache_schemas: Optional[Set[BaseRelation]] = None,
) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
"""
if not cache_schemas:
cache_schemas = self._get_cache_schemas(manifest)
cache_schemas = self._get_cache_schemas(relation_configs)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
@@ -515,7 +516,7 @@ class BaseAdapter(metaclass=AdapterMeta):
def set_relations_cache(
self,
manifest: Manifest,
relation_configs: Iterable[RelationConfig],
clear: bool = False,
required_schemas: Optional[Set[BaseRelation]] = None,
) -> None:
@@ -525,7 +526,7 @@ class BaseAdapter(metaclass=AdapterMeta):
with self.cache.lock:
if clear:
self.cache.clear()
self._relations_cache_for_schemas(manifest, required_schemas)
self._relations_cache_for_schemas(relation_configs, required_schemas)
@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:

View File

@@ -406,11 +406,21 @@ class GraphRunnableTask(ConfiguredTask):
if not self.args.populate_cache:
return
if self.manifest is None:
raise DbtInternalError("manifest was None in populate_adapter_cache")
start_populate_cache = time.perf_counter()
# the cache only cares about executable nodes
cachable_nodes = [
node
for node in self.manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model and not node.is_external_node)
]
if get_flags().CACHE_SELECTED_ONLY is True:
adapter.set_relations_cache(self.manifest, required_schemas=required_schemas)
adapter.set_relations_cache(cachable_nodes, required_schemas=required_schemas)
else:
adapter.set_relations_cache(self.manifest)
adapter.set_relations_cache(cachable_nodes)
cache_populate_time = time.perf_counter() - start_populate_cache
if dbt.tracking.active_user is not None:
dbt.tracking.track_runnable_timing(