Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
686252950b Experiment with caching columns, inspired by dbt-labs/dbt-spark#342 2022-05-15 13:23:40 +02:00
5 changed files with 61 additions and 7 deletions

View File

@@ -2,11 +2,13 @@ from dataclasses import dataclass
import re
from typing import Dict, ClassVar, Any, Optional
from dbt.dataclass_schema import dbtClassMixin
from dbt.exceptions import RuntimeException
@dataclass
class Column:
class Column(dbtClassMixin):
TYPE_LABELS: ClassVar[Dict[str, str]] = {
"STRING": "TEXT",
"TIMESTAMP": "TIMESTAMP",

View File

@@ -1,6 +1,6 @@
from collections.abc import Hashable
from dataclasses import dataclass
from typing import Optional, TypeVar, Any, Type, Dict, Union, Iterator, Tuple, Set
from dataclasses import dataclass, field
from typing import Optional, TypeVar, Any, Type, Dict, Union, Iterator, Tuple, Set, List
from dbt.contracts.graph.compiled import CompiledNode
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedNode
@@ -12,6 +12,7 @@ from dbt.contracts.relation import (
Policy,
Path,
)
from dbt.adapters.base.column import Column
from dbt.exceptions import InternalException
from dbt.node_types import NodeType
from dbt.utils import filter_null_values, deep_merge, classproperty
@@ -30,6 +31,10 @@ class BaseRelation(FakeAPIObject, Hashable):
include_policy: Policy = Policy()
quote_policy: Policy = Policy()
dbt_created: bool = False
# this should be List[Columns], but that raises a validation error:
# Field "columns" of type List[Column] in PostgresRelation has invalid value [<Column id (integer)>, <Column color (text)>]
columns: List[Any] = field(default_factory=lambda: [])
def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
if self.dbt_created and self.quote_policy.get_part(field) is False:

View File

@@ -19,6 +19,8 @@ from dbt.events.types import (
TemporaryRelation,
UncachedRelation,
UpdateReference,
UpdateRelation,
UpdateMissingRelation,
)
from dbt.utils import lowercase
from dbt.helper_types import Lazy
@@ -249,6 +251,25 @@ class RelationsCache:
self.add_schema(relation.database, relation.schema)
key = relation.key()
return self.relations.setdefault(key, relation)
def _update(self, relation: _CachedRelation):
key = relation.key()
if key not in self.relations:
fire_event(UpdateMissingRelation(relation=key))
return
self.relations[key].inner = relation.inner
def update_relation(self, relation):
"""Update the relation inner to the cache
: param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(UpdateRelation(relation=_make_key(cached)))
with self.lock:
self._update(cached)
def _add_link(self, referenced_key, dependent_key):
"""Add a link between two relations to the database. Both the old and

View File

@@ -12,6 +12,7 @@ from dbt.events.types import ColTypeChange, SchemaCreation, SchemaDrop
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.column import Column
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
GET_COLUMNS_IN_RELATION_MACRO_NAME = "get_columns_in_relation"
@@ -148,10 +149,17 @@ class SQLAdapter(BaseAdapter):
kwargs = {"from_relation": from_relation, "to_relation": to_relation}
self.execute_macro(RENAME_RELATION_MACRO_NAME, kwargs=kwargs)
def get_columns_in_relation(self, relation):
return self.execute_macro(
GET_COLUMNS_IN_RELATION_MACRO_NAME, kwargs={"relation": relation}
)
def get_columns_in_relation(self, relation: BaseRelation) -> List[Column]:
cached_relation = self.get_relation(relation.database, relation.schema, relation.identifier)
if cached_relation and cached_relation.columns:
print("Cache hit for columns")
return cached_relation.columns
else:
print("Cache miss for columns")
columns = self.execute_macro(GET_COLUMNS_IN_RELATION_MACRO_NAME, kwargs={"relation": relation})
updated_relation = cached_relation.incorporate(columns=columns)
self.cache.update_relation(updated_relation)
return columns
def create_schema(self, relation: BaseRelation) -> None:
relation = relation.without_identifier()

View File

@@ -644,6 +644,24 @@ class UncachedRelation(DebugLevel, Cache):
)
@dataclass
class UpdateRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E038"
def message(self) -> str:
return f"Updating relation: {str(self.relation)}"
@dataclass
class UpdateMissingRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E039"
def message(self) -> str:
return f"updated a nonexistent relationship: {str(self.relation)}"
@dataclass
class AddLink(DebugLevel, Cache):
dep_key: _ReferenceKey