mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-20 05:41:28 +00:00
Compare commits
65 Commits
postgres-s
...
feature/ma
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
817a809dd3 | ||
|
|
9fe6cc613a | ||
|
|
feff055c2e | ||
|
|
856704b1cd | ||
|
|
fdb59f81c5 | ||
|
|
c2ba88763d | ||
|
|
c50e873535 | ||
|
|
d2142bfe1c | ||
|
|
5280b06af8 | ||
|
|
7cb83029b9 | ||
|
|
8eb95af9eb | ||
|
|
83041f4ff2 | ||
|
|
c51059e795 | ||
|
|
c9c1a2eb41 | ||
|
|
d57495fbbd | ||
|
|
d1c0a4ffcb | ||
|
|
df21475c8c | ||
|
|
c1ae265657 | ||
|
|
9b54d85e81 | ||
|
|
720da381f7 | ||
|
|
01d42ff08b | ||
|
|
1b4a231a71 | ||
|
|
e884abd5a1 | ||
|
|
f158a81fa4 | ||
|
|
249f5f7fe6 | ||
|
|
cca6246cca | ||
|
|
59de331e63 | ||
|
|
22a5f1229e | ||
|
|
96ba1b1803 | ||
|
|
0070877124 | ||
|
|
dac6088e11 | ||
|
|
39b8b0ad8b | ||
|
|
4f72934c89 | ||
|
|
76eba9cbc2 | ||
|
|
3c95e0ce9b | ||
|
|
5ae82386da | ||
|
|
76c0b007bd | ||
|
|
a20a3b514b | ||
|
|
f7aefd187b | ||
|
|
96d9439301 | ||
|
|
92627745a6 | ||
|
|
ed65ce6c9c | ||
|
|
0d341dc3c1 | ||
|
|
88c4a973f6 | ||
|
|
205b57bbb6 | ||
|
|
bba3a6c278 | ||
|
|
f4aab05b25 | ||
|
|
9b8e8cb819 | ||
|
|
3f46e7d2f9 | ||
|
|
269888c047 | ||
|
|
aabd97e5c5 | ||
|
|
7747f8b4ca | ||
|
|
e6d59e499f | ||
|
|
8e4b1a71e6 | ||
|
|
9fae72bb8b | ||
|
|
8f3cc58b78 | ||
|
|
2fe362e039 | ||
|
|
619c138190 | ||
|
|
30b73f5b74 | ||
|
|
1efadbd83f | ||
|
|
6a460ef5ad | ||
|
|
454d85f349 | ||
|
|
486a7bb836 | ||
|
|
2ee2f1ea91 | ||
|
|
6e2476e84e |
6
.changes/unreleased/Features-20230329-120313.yaml
Normal file
6
.changes/unreleased/Features-20230329-120313.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Add support for materialized views
|
||||
time: 2023-03-29T12:03:13.862041-04:00
|
||||
custom:
|
||||
Author: mikealfare McKnight-42
|
||||
Issue: "6911"
|
||||
@@ -2,6 +2,10 @@ from collections.abc import Hashable
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, TypeVar, Any, Type, Dict, Iterator, Tuple, Set
|
||||
|
||||
from dbt.adapters.relation_configs import (
|
||||
RelationConfigChangeCollection,
|
||||
RelationResults,
|
||||
)
|
||||
from dbt.contracts.graph.nodes import SourceDefinition, ManifestNode, ResultNode, ParsedNode
|
||||
from dbt.contracts.relation import (
|
||||
RelationType,
|
||||
@@ -14,6 +18,7 @@ from dbt.contracts.relation import (
|
||||
from dbt.exceptions import (
|
||||
ApproximateMatchError,
|
||||
DbtInternalError,
|
||||
DbtRuntimeError,
|
||||
MultipleDatabasesNotAllowedError,
|
||||
)
|
||||
from dbt.node_types import NodeType
|
||||
@@ -286,6 +291,43 @@ class BaseRelation(FakeAPIObject, Hashable):
|
||||
)
|
||||
return cls.from_dict(kwargs)
|
||||
|
||||
def get_relation_config_change_collection(
|
||||
self,
|
||||
relation_results: RelationResults,
|
||||
runtime_config, # RuntimeConfigObject
|
||||
) -> Optional[RelationConfigChangeCollection]:
|
||||
"""
|
||||
Determine the relation config changes between an existing deployment and the new deployment.
|
||||
|
||||
This could be overridden in the event that there is a adapter-specific materialization; however,
|
||||
it's recommended that the relation type-specific methods are overridden in all other scenarios.
|
||||
In the former scenario, the overridden method should still call to `super()` to catch all commonly
|
||||
supported materializations.
|
||||
|
||||
Args:
|
||||
relation_results: the description of the existing deployment according to the database
|
||||
runtime_config: the description of the new deployment according to the user's config
|
||||
|
||||
Returns: a set of changes to be made on the relation; this would be None if there are no changes
|
||||
"""
|
||||
relation_type = runtime_config.model.config.get("materialized", RelationType.default())
|
||||
if relation_type == RelationType.MaterializedView:
|
||||
return self.get_materialized_view_config_change_collection(
|
||||
relation_results, runtime_config
|
||||
)
|
||||
raise DbtRuntimeError(
|
||||
f"Config changes have not been configured for relation type {relation_type}."
|
||||
)
|
||||
|
||||
def get_materialized_view_config_change_collection(
|
||||
self,
|
||||
relation_results: RelationResults,
|
||||
runtime_config, # RuntimeConfigObject
|
||||
) -> Optional[RelationConfigChangeCollection]:
|
||||
raise NotImplementedError(
|
||||
"Materialized view config changes have not been configured for this adapter."
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "<{} {}>".format(self.__class__.__name__, self.render())
|
||||
|
||||
@@ -328,6 +370,10 @@ class BaseRelation(FakeAPIObject, Hashable):
|
||||
def is_view(self) -> bool:
|
||||
return self.type == RelationType.View
|
||||
|
||||
@property
|
||||
def is_materialized_view(self) -> bool:
|
||||
return self.type == RelationType.MaterializedView
|
||||
|
||||
@classproperty
|
||||
def Table(cls) -> str:
|
||||
return str(RelationType.Table)
|
||||
@@ -344,6 +390,10 @@ class BaseRelation(FakeAPIObject, Hashable):
|
||||
def External(cls) -> str:
|
||||
return str(RelationType.External)
|
||||
|
||||
@classproperty
|
||||
def MaterializedView(cls) -> str:
|
||||
return str(RelationType.MaterializedView)
|
||||
|
||||
@classproperty
|
||||
def get_relation_type(cls) -> Type[RelationType]:
|
||||
return RelationType
|
||||
|
||||
19
core/dbt/adapters/relation_configs/README.md
Normal file
19
core/dbt/adapters/relation_configs/README.md
Normal file
@@ -0,0 +1,19 @@
|
||||
This package contains classes to model the database objects as they
|
||||
are described by the database. They exist primarily as an in between
|
||||
to parse node configuration from the user and database configuration
|
||||
in database-specific terms. For example, we expose `method` for indexes
|
||||
in Postgres as `type`. We want to retain that term across adapters, but
|
||||
it's more useful to call it type in Postgres parlance. Similarly,
|
||||
we combine `distkey` and `diststyle` in Redshift into a single `dist`
|
||||
config. This makes sense to the end user because of the available
|
||||
combinations of these two terms (if `diststyle` is `key`, then `distkey`
|
||||
is used; if `diststyle` is not `key`, then `distkey` is not used,
|
||||
hence if `dist` is not one of the other `diststyle`, it's assumed it's
|
||||
a `distkey` and `diststyle` is `key`. This kind of nuance can be
|
||||
parsed out in these class.
|
||||
|
||||
A secondary reason for this package is to place some governance on how
|
||||
configuration changes are handled in `dbt`. Until recently, changes
|
||||
have been handled via DROP/CREATE. However, we are going to start
|
||||
applying changes to existing objects. Given this is new functionality,
|
||||
it makes sense to place this in a new subpackage.
|
||||
14
core/dbt/adapters/relation_configs/__init__.py
Normal file
14
core/dbt/adapters/relation_configs/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from dbt.adapters.relation_configs.base import ( # noqa: F401
|
||||
RelationConfigBase,
|
||||
RelationConfig,
|
||||
RelationConfigChangeAction,
|
||||
RelationConfigChange,
|
||||
RelationConfigChangeCollection,
|
||||
RelationResults,
|
||||
ValidationRule,
|
||||
)
|
||||
from dbt.adapters.relation_configs.materialized_view import ( # noqa: F401
|
||||
MaterializedViewConfig,
|
||||
MaterializedViewConfigChange,
|
||||
MaterializedViewConfigChangeCollection,
|
||||
)
|
||||
221
core/dbt/adapters/relation_configs/base.py
Normal file
221
core/dbt/adapters/relation_configs/base.py
Normal file
@@ -0,0 +1,221 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Union, Dict, Set, Tuple, Hashable, Any, Optional
|
||||
|
||||
import agate
|
||||
from dbt.contracts.graph.nodes import ModelNode
|
||||
from dbt.contracts.relation import RelationType
|
||||
from dbt.dataclass_schema import StrEnum
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
from dbt.utils import filter_null_values
|
||||
|
||||
|
||||
"""
|
||||
This is what relation metadata from the database looks like. It's a dictionary because there will be
|
||||
multiple grains of data for a single object. For example, a materialized view in Postgres has base level information,
|
||||
like name. But it also can have multiple indexes, which needs to be a separate query. It might look like this:
|
||||
|
||||
{
|
||||
"base": agate.Row({"table_name": "table_abc", "query": "select * from table_def"})
|
||||
"indexes": agate.Table("rows": [
|
||||
agate.Row({"name": "index_a", "columns": ["column_a"], "type": "hash", "unique": False}),
|
||||
agate.Row({"name": "index_b", "columns": ["time_dim_a"], "type": "btree", "unique": False}),
|
||||
])
|
||||
}
|
||||
"""
|
||||
RelationResults = Dict[str, Union[agate.Row, agate.Table]]
|
||||
|
||||
|
||||
# a more specific error is optional, but encouraged
|
||||
ValidationCheck = bool
|
||||
ValidationRule = Union[Tuple[ValidationCheck, DbtRuntimeError], ValidationCheck]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RelationConfigBase(Hashable, ABC):
|
||||
relation_type: Optional[RelationType] = None
|
||||
|
||||
def __post_init__(self):
|
||||
self.run_validation_rules()
|
||||
|
||||
def validation_rules(self) -> Set[ValidationRule]:
|
||||
"""
|
||||
A set of validation rules to run against the object upon creation.
|
||||
|
||||
A validation rule is a combination of a validation check (bool) and an optional error message.
|
||||
|
||||
This defaults to no validation rules if not implemented. It's recommended to override this with values,
|
||||
but that may not always be necessary.
|
||||
|
||||
Returns: a set of validation rules
|
||||
"""
|
||||
return set()
|
||||
|
||||
def run_validation_rules(self):
|
||||
for validation_rule in self.validation_rules():
|
||||
validation_check, error = self._parse_validation_rule(validation_rule)
|
||||
|
||||
try:
|
||||
assert validation_check
|
||||
except AssertionError:
|
||||
raise error
|
||||
|
||||
self.run_child_validation_rules()
|
||||
|
||||
def run_child_validation_rules(self):
|
||||
for attr_value in vars(self).values():
|
||||
if isinstance(attr_value, RelationConfigBase):
|
||||
attr_value.run_validation_rules()
|
||||
if isinstance(attr_value, set):
|
||||
for member in attr_value:
|
||||
if isinstance(member, RelationConfigBase):
|
||||
member.run_validation_rules()
|
||||
|
||||
def _parse_validation_rule(
|
||||
self, validation_rule: ValidationRule
|
||||
) -> Tuple[ValidationCheck, DbtRuntimeError]:
|
||||
default_error = DbtRuntimeError(
|
||||
f"There was a validation error in preparing this relation: {self.relation_type}."
|
||||
"No additional context was provided by this adapter."
|
||||
)
|
||||
if isinstance(validation_rule, tuple):
|
||||
return validation_rule
|
||||
elif isinstance(validation_rule, bool):
|
||||
return validation_rule, default_error
|
||||
else:
|
||||
raise DbtRuntimeError(f"Invalid validation rule format: {validation_rule}")
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, kwargs_dict) -> "RelationConfigBase":
|
||||
"""
|
||||
This assumes the subclass of `RelationConfigBase` is flat, in the sense that no attribute is
|
||||
itself another subclass of `RelationConfigBase`. If that's not the case, this should be overriden
|
||||
to manually manage that complexity. This can be automated in the future with something like
|
||||
`mashumaro` or `pydantic`.
|
||||
|
||||
Args:
|
||||
kwargs_dict: the dict representation of this instance
|
||||
|
||||
Returns: the `RelationConfigBase` representation associated with the provided dict
|
||||
"""
|
||||
return cls(**filter_null_values(kwargs_dict))
|
||||
|
||||
@abstractmethod
|
||||
def __hash__(self) -> int:
|
||||
raise self._not_implemented_error()
|
||||
|
||||
@abstractmethod
|
||||
def __eq__(self, other) -> bool:
|
||||
raise self._not_implemented_error()
|
||||
|
||||
@classmethod
|
||||
def _not_implemented_error(cls):
|
||||
return NotImplementedError(
|
||||
f"The relation type {cls.relation_type} has not been configured for this adapter."
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RelationConfig(RelationConfigBase, ABC):
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def parse_model_node(cls, model_node: ModelNode) -> dict:
|
||||
"""
|
||||
Translates the description of this relation using jinja context variables into a dict representation
|
||||
of `RelationConfig`. This is generally used in conjunction with `RelationConfig.from_dict()`
|
||||
|
||||
Args:
|
||||
model_node: the `ModelNode` instance that's in the `RuntimeConfigObject` in the jinja context
|
||||
|
||||
Returns: a raw dictionary of kwargs that can be used to create a `RelationConfig` instance
|
||||
"""
|
||||
raise cls._not_implemented_error()
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
|
||||
"""
|
||||
Translates the description of this relation using data from the database into a dict representation
|
||||
of `RelationConfig`. This is generally used in conjunction with `RelationConfig.from_dict()`
|
||||
|
||||
Args:
|
||||
relation_results: a dictionary of results from a "describe" macro. See `RelationResults`
|
||||
|
||||
Returns: a raw dictionary of kwargs that can be used to create a `RelationConfig` instance
|
||||
"""
|
||||
raise cls._not_implemented_error()
|
||||
|
||||
|
||||
class RelationConfigChangeAction(StrEnum):
|
||||
alter = "alter"
|
||||
create = "create"
|
||||
drop = "drop"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RelationConfigChange(RelationConfigBase, ABC):
|
||||
action: Optional[RelationConfigChangeAction] = None
|
||||
context: Hashable = (
|
||||
None # this is usually a RelationConfig, e.g. IndexConfig, but shouldn't be limited
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def requires_full_refresh(self) -> bool:
|
||||
raise self._not_implemented_error()
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((self.action, self.context))
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, RelationConfigChange):
|
||||
return all({self.action == other.action, self.context == other.context})
|
||||
return False
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RelationConfigChangeCollection(ABC):
|
||||
"""
|
||||
Relation configuration changes should be registered on this class as a group, by defining a new attribute
|
||||
of type Set[RelationConfigChange]. For example:
|
||||
|
||||
class PostgresIndexConfigChange(RelationConfigChange):
|
||||
action = RelationConfigChangeAction.drop
|
||||
context = PostgresIndexConfig
|
||||
|
||||
@property
|
||||
def requires_full_refresh(self) -> bool:
|
||||
return False
|
||||
|
||||
class PostgresMaterializedViewAutoRefreshConfigChange(RelationConfigChange):
|
||||
# this doesn't exist in Postgres, but assume it does for this example
|
||||
|
||||
action = RelationConfigChangeAction.alter
|
||||
context = PostgresMaterializedView.auto_refresh
|
||||
|
||||
@property
|
||||
def requires_full_refresh(self) -> bool:
|
||||
return True
|
||||
|
||||
class PostgresMaterializedViewConfigChanges(RelationConfigChanges):
|
||||
auto_refresh: Set[PostgresMaterializedViewAutoRefreshConfigChange]
|
||||
indexes: Set[PostgresIndexConfigChange]
|
||||
"""
|
||||
|
||||
relation_type: Optional[RelationType] = None
|
||||
|
||||
def config_change_groups(self) -> Set[str]:
|
||||
config_change_groups = set()
|
||||
for attr_name, attr_value in vars(self).items():
|
||||
if isinstance(attr_value, set) and all(
|
||||
isinstance(member, RelationConfigChange) for member in attr_value
|
||||
):
|
||||
config_change_groups.add(attr_name)
|
||||
return config_change_groups
|
||||
|
||||
def requires_full_refresh(self) -> bool:
|
||||
individual_config_change_requires_full_refresh = {
|
||||
config_change.requires_full_refresh()
|
||||
for config_change_group in self.config_change_groups()
|
||||
for config_change in getattr(self, config_change_group)
|
||||
}
|
||||
return any(individual_config_change_requires_full_refresh)
|
||||
24
core/dbt/adapters/relation_configs/materialized_view.py
Normal file
24
core/dbt/adapters/relation_configs/materialized_view.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from abc import ABC
|
||||
from dataclasses import dataclass
|
||||
|
||||
from dbt.adapters.relation_configs.base import (
|
||||
RelationConfig,
|
||||
RelationConfigChange,
|
||||
RelationConfigChangeCollection,
|
||||
)
|
||||
from dbt.contracts.relation import RelationType
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MaterializedViewConfig(RelationConfig, ABC):
|
||||
relation_type = RelationType.MaterializedView
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MaterializedViewConfigChange(RelationConfigChange, ABC):
|
||||
relation_type = RelationType.MaterializedView
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MaterializedViewConfigChangeCollection(RelationConfigChangeCollection, ABC):
|
||||
relation_type = RelationType.MaterializedView
|
||||
@@ -25,6 +25,7 @@ from dbt.exceptions import (
|
||||
RelationWrongTypeError,
|
||||
ContractError,
|
||||
ColumnTypeMissingError,
|
||||
FailFastError,
|
||||
)
|
||||
|
||||
|
||||
@@ -107,6 +108,10 @@ def column_type_missing(column_names) -> NoReturn:
|
||||
raise ColumnTypeMissingError(column_names)
|
||||
|
||||
|
||||
def raise_fail_fast_error(msg, node=None) -> NoReturn:
|
||||
raise FailFastError(msg, node=node)
|
||||
|
||||
|
||||
# Update this when a new function should be added to the
|
||||
# dbt context's `exceptions` key!
|
||||
CONTEXT_EXPORTS = {
|
||||
@@ -131,6 +136,7 @@ CONTEXT_EXPORTS = {
|
||||
relation_wrong_type,
|
||||
raise_contract_error,
|
||||
column_type_missing,
|
||||
raise_fail_fast_error,
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -2,15 +2,17 @@ from dataclasses import field, Field, dataclass
|
||||
from enum import Enum
|
||||
from itertools import chain
|
||||
from typing import Any, List, Optional, Dict, Union, Type, TypeVar, Callable
|
||||
|
||||
from dbt.dataclass_schema import (
|
||||
dbtClassMixin,
|
||||
ValidationError,
|
||||
register_pattern,
|
||||
StrEnum,
|
||||
)
|
||||
from dbt.contracts.graph.unparsed import AdditionalPropertiesAllowed, Docs
|
||||
from dbt.contracts.graph.utils import validate_color
|
||||
from dbt.exceptions import DbtInternalError, CompilationError
|
||||
from dbt.contracts.util import Replaceable, list_str
|
||||
from dbt.exceptions import DbtInternalError, CompilationError
|
||||
from dbt import hooks
|
||||
from dbt.node_types import NodeType
|
||||
|
||||
@@ -189,6 +191,16 @@ class Severity(str):
|
||||
register_pattern(Severity, insensitive_patterns("warn", "error"))
|
||||
|
||||
|
||||
class OnConfigurationChangeOption(StrEnum):
|
||||
Apply = "apply"
|
||||
Continue = "continue"
|
||||
Fail = "fail"
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "OnConfigurationChangeOption":
|
||||
return cls.Apply
|
||||
|
||||
|
||||
@dataclass
|
||||
class ContractConfig(dbtClassMixin, Replaceable):
|
||||
enforced: bool = False
|
||||
@@ -287,11 +299,17 @@ class BaseConfig(AdditionalPropertiesAllowed, Replaceable):
|
||||
return False
|
||||
return True
|
||||
|
||||
# This is used in 'add_config_call' to created the combined config_call_dict.
|
||||
# This is used in 'add_config_call' to create the combined config_call_dict.
|
||||
# 'meta' moved here from node
|
||||
mergebehavior = {
|
||||
"append": ["pre-hook", "pre_hook", "post-hook", "post_hook", "tags"],
|
||||
"update": ["quoting", "column_types", "meta", "docs", "contract"],
|
||||
"update": [
|
||||
"quoting",
|
||||
"column_types",
|
||||
"meta",
|
||||
"docs",
|
||||
"contract",
|
||||
],
|
||||
"dict_key_append": ["grants"],
|
||||
}
|
||||
|
||||
@@ -445,6 +463,9 @@ class NodeConfig(NodeAndTestConfig):
|
||||
# sometimes getting the Union order wrong, causing serialization failures.
|
||||
unique_key: Union[str, List[str], None] = None
|
||||
on_schema_change: Optional[str] = "ignore"
|
||||
on_configuration_change: OnConfigurationChangeOption = field(
|
||||
default_factory=OnConfigurationChangeOption.default
|
||||
)
|
||||
grants: Dict[str, Any] = field(
|
||||
default_factory=dict, metadata=MergeBehavior.DictKeyAppend.meta()
|
||||
)
|
||||
|
||||
@@ -17,9 +17,13 @@ class RelationType(StrEnum):
|
||||
Table = "table"
|
||||
View = "view"
|
||||
CTE = "cte"
|
||||
MaterializedView = "materializedview"
|
||||
MaterializedView = "materialized_view"
|
||||
External = "external"
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "RelationType":
|
||||
return cls.View
|
||||
|
||||
|
||||
class ComponentName(StrEnum):
|
||||
Database = "database"
|
||||
|
||||
@@ -21,3 +21,21 @@
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro get_drop_index_sql(relation, index_name) -%}
|
||||
{{ adapter.dispatch('get_drop_index_sql', 'dbt')(relation, index_name) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_drop_index_sql(relation, index_name) -%}
|
||||
{{ exceptions.raise_compiler_error("`get_drop_index_sql has not been implemented for this adapter.") }}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
{% macro get_show_indexes_sql(relation) -%}
|
||||
{{ adapter.dispatch('get_show_indexes_sql', 'dbt')(relation) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__get_show_indexes_sql(relation) -%}
|
||||
{{ exceptions.raise_compiler_error("`get_show_indexes_sql has not been implemented for this adapter.") }}
|
||||
{%- endmacro %}
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
{% macro get_alter_materialized_view_as_sql(
|
||||
relation,
|
||||
configuration_changes,
|
||||
sql,
|
||||
existing_relation,
|
||||
backup_relation,
|
||||
intermediate_relation
|
||||
) %}
|
||||
{{- log('Applying ALTER to: ' ~ relation) -}}
|
||||
{{- adapter.dispatch('get_alter_materialized_view_as_sql', 'dbt')(
|
||||
relation,
|
||||
configuration_changes,
|
||||
sql,
|
||||
existing_relation,
|
||||
backup_relation,
|
||||
intermediate_relation
|
||||
) -}}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro default__get_alter_materialized_view_as_sql(
|
||||
relation,
|
||||
configuration_changes,
|
||||
sql,
|
||||
existing_relation,
|
||||
backup_relation,
|
||||
intermediate_relation
|
||||
) %}
|
||||
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,9 @@
|
||||
{% macro get_create_materialized_view_as_sql(relation, sql) -%}
|
||||
{{- log('Applying CREATE to: ' ~ relation) -}}
|
||||
{{- adapter.dispatch('get_create_materialized_view_as_sql', 'dbt')(relation, sql) -}}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
{% macro default__get_create_materialized_view_as_sql(relation, sql) -%}
|
||||
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,23 @@
|
||||
{% macro get_materialized_view_configuration_changes(existing_relation, new_config) %}
|
||||
/* {#
|
||||
It's recommended that configuration changes be formatted as follows:
|
||||
{"<change_category>": [{"action": "<name>", "context": ...}]}
|
||||
|
||||
For example:
|
||||
{
|
||||
"indexes": [
|
||||
{"action": "drop", "context": "index_abc"},
|
||||
{"action": "create", "context": {"columns": ["column_1", "column_2"], "type": "hash", "unique": True}},
|
||||
],
|
||||
}
|
||||
|
||||
Either way, `get_materialized_view_configuration_changes` needs to align with `get_alter_materialized_view_as_sql`.
|
||||
#} */
|
||||
{{- log('Determining configuration changes on: ' ~ existing_relation) -}}
|
||||
{%- do return(adapter.dispatch('get_materialized_view_configuration_changes', 'dbt')(existing_relation, new_config)) -%}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro default__get_materialized_view_configuration_changes(existing_relation, new_config) %}
|
||||
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,121 @@
|
||||
{% materialization materialized_view, default %}
|
||||
{% set existing_relation = load_cached_relation(this) %}
|
||||
{% set target_relation = this.incorporate(type=this.MaterializedView) %}
|
||||
{% set intermediate_relation = make_intermediate_relation(target_relation) %}
|
||||
{% set backup_relation_type = target_relation.MaterializedView if existing_relation is none else existing_relation.type %}
|
||||
{% set backup_relation = make_backup_relation(target_relation, backup_relation_type) %}
|
||||
|
||||
{{ materialized_view_setup(backup_relation, intermediate_relation, pre_hooks) }}
|
||||
|
||||
{% set build_sql = materialized_view_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}
|
||||
|
||||
{% if build_sql == '' %}
|
||||
{{ materialized_view_execute_no_op(target_relation) }}
|
||||
{% else %}
|
||||
{{ materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }}
|
||||
{% endif %}
|
||||
|
||||
{{ materialized_view_teardown(backup_relation, intermediate_relation, post_hooks) }}
|
||||
|
||||
{{ return({'relations': [target_relation]}) }}
|
||||
|
||||
{% endmaterialization %}
|
||||
|
||||
|
||||
{% macro materialized_view_setup(backup_relation, intermediate_relation, pre_hooks) %}
|
||||
|
||||
-- backup_relation and intermediate_relation should not already exist in the database
|
||||
-- it's possible these exist because of a previous run that exited unexpectedly
|
||||
{% set preexisting_backup_relation = load_cached_relation(backup_relation) %}
|
||||
{% set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) %}
|
||||
|
||||
-- drop the temp relations if they exist already in the database
|
||||
{{ drop_relation_if_exists(preexisting_backup_relation) }}
|
||||
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
|
||||
|
||||
{{ run_hooks(pre_hooks, inside_transaction=False) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro materialized_view_teardown(backup_relation, intermediate_relation, post_hooks) %}
|
||||
|
||||
-- drop the temp relations if they exist to leave the database clean for the next run
|
||||
{{ drop_relation_if_exists(backup_relation) }}
|
||||
{{ drop_relation_if_exists(intermediate_relation) }}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=False) }}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro materialized_view_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %}
|
||||
|
||||
{% set full_refresh_mode = should_full_refresh() %}
|
||||
|
||||
-- determine the scenario we're in: create, full_refresh, alter, refresh data
|
||||
{% if existing_relation is none %}
|
||||
{% set build_sql = get_create_materialized_view_as_sql(target_relation, sql) %}
|
||||
{% elif full_refresh_mode or not existing_relation.is_materialized_view %}
|
||||
{% set build_sql = get_replace_materialized_view_as_sql(target_relation, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
{% else %}
|
||||
|
||||
-- get config options
|
||||
{% set on_configuration_change = config.get('on_configuration_change') %}
|
||||
{% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %}
|
||||
|
||||
{% if configuration_changes is none %}
|
||||
{% set build_sql = refresh_materialized_view(target_relation) %}
|
||||
|
||||
{% elif on_configuration_change == 'apply' %}
|
||||
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
{% elif on_configuration_change == 'continue' %}
|
||||
{% set build_sql = '' %}
|
||||
{{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }}
|
||||
{% elif on_configuration_change == 'fail' %}
|
||||
{{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }}
|
||||
|
||||
{% else %}
|
||||
-- this only happens if the user provides a value other than `apply`, 'skip', 'fail'
|
||||
{{ exceptions.raise_compiler_error("Unexpected configuration scenario") }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% do return(build_sql) %}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro materialized_view_execute_no_op(target_relation) %}
|
||||
{% do store_raw_result(
|
||||
name="main",
|
||||
message="skip " ~ target_relation,
|
||||
code="skip",
|
||||
rows_affected="-1"
|
||||
) %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %}
|
||||
|
||||
-- `BEGIN` happens here:
|
||||
{{ run_hooks(pre_hooks, inside_transaction=True) }}
|
||||
|
||||
{% set grant_config = config.get('grants') %}
|
||||
|
||||
{% call statement(name="main") %}
|
||||
{{ build_sql }}
|
||||
{% endcall %}
|
||||
|
||||
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
|
||||
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
|
||||
|
||||
{% do persist_docs(target_relation, model) %}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=True) }}
|
||||
|
||||
{{ adapter.commit() }}
|
||||
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,9 @@
|
||||
{% macro refresh_materialized_view(relation) %}
|
||||
{{- log('Applying REFRESH to: ' ~ relation) -}}
|
||||
{{- adapter.dispatch('refresh_materialized_view', 'dbt')(relation) -}}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro default__refresh_materialized_view(relation) %}
|
||||
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,9 @@
|
||||
{% macro get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
{{- log('Applying REPLACE to: ' ~ relation) -}}
|
||||
{{- adapter.dispatch('get_replace_materialized_view_as_sql', 'dbt')(relation, sql, existing_relation, backup_relation, intermediate_relation) -}}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro default__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
{{ exceptions.raise_compiler_error("Materialized views have not been implemented for this adapter.") }}
|
||||
{% endmacro %}
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
{%- set grant_config = config.get('grants') -%}
|
||||
{%- set agate_table = load_agate_table() -%}
|
||||
-- grab current tables grants config for comparision later on
|
||||
-- grab current tables grants config for comparison later on
|
||||
|
||||
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Set, List, Any
|
||||
|
||||
from dbt.adapters.base.meta import available
|
||||
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
|
||||
from dbt.adapters.sql import SQLAdapter
|
||||
|
||||
@@ -1,7 +1,21 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Set
|
||||
|
||||
from dbt.adapters.base.relation import BaseRelation
|
||||
from dbt.adapters.relation_configs import (
|
||||
RelationConfigChangeAction,
|
||||
RelationResults,
|
||||
)
|
||||
from dbt.context.providers import RuntimeConfigObject
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
|
||||
from dbt.adapters.postgres.relation_configs import (
|
||||
PostgresIndexConfig,
|
||||
PostgresIndexChange,
|
||||
PostgresMaterializedViewConfig,
|
||||
PostgresMaterializedViewConfigChangeCollection,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True, eq=False, repr=False)
|
||||
class PostgresRelation(BaseRelation):
|
||||
@@ -20,3 +34,64 @@ class PostgresRelation(BaseRelation):
|
||||
|
||||
def relation_max_name_length(self):
|
||||
return 63
|
||||
|
||||
def get_materialized_view_config_change_collection(
|
||||
self, relation_results: RelationResults, runtime_config: RuntimeConfigObject
|
||||
) -> Optional[PostgresMaterializedViewConfigChangeCollection]:
|
||||
"""
|
||||
Postgres-specific implementation of `BaseRelation.get_materialized_view_config_changes`.
|
||||
|
||||
The only tracked changes for materialized views are indexes.
|
||||
"""
|
||||
config_change_collection_dict = {}
|
||||
|
||||
existing_materialized_view_config = PostgresMaterializedViewConfig.parse_relation_results(
|
||||
relation_results
|
||||
)
|
||||
existing_materialized_view = PostgresMaterializedViewConfig.from_dict(
|
||||
existing_materialized_view_config
|
||||
)
|
||||
|
||||
new_materialized_view_config = PostgresMaterializedViewConfig.parse_model_node(
|
||||
runtime_config.model
|
||||
)
|
||||
new_materialized_view = PostgresMaterializedViewConfig.from_dict(
|
||||
new_materialized_view_config
|
||||
)
|
||||
|
||||
if index_config_changes := self._get_index_config_changes(
|
||||
existing_materialized_view.indexes, new_materialized_view.indexes
|
||||
):
|
||||
config_change_collection_dict.update({"indexes": index_config_changes})
|
||||
|
||||
if config_change_collection_dict:
|
||||
return PostgresMaterializedViewConfigChangeCollection(**config_change_collection_dict)
|
||||
|
||||
def _get_index_config_changes(
|
||||
self, existing_indexes: Set[PostgresIndexConfig], new_indexes: Set[PostgresIndexConfig]
|
||||
) -> Set[PostgresIndexChange]:
|
||||
"""
|
||||
Get the index updates that will occur as a result of a new run
|
||||
|
||||
There are four scenarios:
|
||||
|
||||
1. Indexes are equal -> don't return these
|
||||
2. Index is new -> create these
|
||||
3. Index is old -> drop these
|
||||
4. Indexes are not equal -> drop old, create new -> two actions
|
||||
|
||||
Returns: a set of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
|
||||
"""
|
||||
drop_changes = set(
|
||||
PostgresIndexChange.from_dict(
|
||||
{"action": RelationConfigChangeAction.drop, "context": index}
|
||||
)
|
||||
for index in existing_indexes.difference(new_indexes)
|
||||
)
|
||||
create_changes = set(
|
||||
PostgresIndexChange.from_dict(
|
||||
{"action": RelationConfigChangeAction.create, "context": index}
|
||||
)
|
||||
for index in new_indexes.difference(existing_indexes)
|
||||
)
|
||||
return drop_changes.union(create_changes)
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
from dbt.adapters.postgres.relation_configs.index import ( # noqa: F401
|
||||
PostgresIndexConfig,
|
||||
PostgresIndexChange,
|
||||
)
|
||||
from dbt.adapters.postgres.relation_configs.materialized_view import ( # noqa: F401
|
||||
PostgresMaterializedViewConfig,
|
||||
PostgresMaterializedViewConfigChangeCollection,
|
||||
)
|
||||
175
plugins/postgres/dbt/adapters/postgres/relation_configs/index.py
Normal file
175
plugins/postgres/dbt/adapters/postgres/relation_configs/index.py
Normal file
@@ -0,0 +1,175 @@
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional, Set, Dict, Union, List, Tuple, Any
|
||||
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
|
||||
from dbt.adapters.relation_configs import (
|
||||
RelationConfig,
|
||||
RelationConfigChangeAction,
|
||||
RelationConfigChange,
|
||||
RelationResults,
|
||||
ValidationRule,
|
||||
)
|
||||
|
||||
|
||||
# A `NodeConfig` instance can have multiple indexes, this is just one index
|
||||
# e.g. {"columns": ["column_a", "column_b"], "unique": True, "type": "hash"}
|
||||
Columns = List[str]
|
||||
ModelNodeEntry = Dict[str, Union[Columns, bool, str]]
|
||||
|
||||
|
||||
class PostgresIndexMethod(str, Enum):
|
||||
btree = "btree"
|
||||
hash = "hash"
|
||||
gist = "gist"
|
||||
spgist = "spgist"
|
||||
gin = "gin"
|
||||
brin = "brin"
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "PostgresIndexMethod":
|
||||
return cls.btree
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PostgresIndexConfig(RelationConfig):
|
||||
"""
|
||||
This config fallows the specs found here:
|
||||
https://www.postgresql.org/docs/current/sql-createindex.html
|
||||
|
||||
The following parameters are configurable by dbt:
|
||||
- name: the name of the index in the database, this isn't predictable since we apply a timestamp
|
||||
- unique: checks for duplicate values when the index is created and on data updates
|
||||
- method: the index method to be used
|
||||
- column_names: the columns in the index
|
||||
|
||||
Applicable defaults for non-configurable parameters:
|
||||
- concurrently: `False`
|
||||
- nulls_distinct: `True`
|
||||
"""
|
||||
|
||||
name: Optional[str] = None
|
||||
column_names: Optional[Set[str]] = field(default_factory=set)
|
||||
unique: Optional[bool] = False
|
||||
method: Optional[PostgresIndexMethod] = PostgresIndexMethod.btree
|
||||
|
||||
def validation_rules(self) -> Set[ValidationRule]:
|
||||
return {
|
||||
(
|
||||
self.column_names is not None,
|
||||
DbtRuntimeError("Indexes require at least one column, but none were provided"),
|
||||
),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def parse_model_node(cls, model_node_entry: ModelNodeEntry) -> dict:
|
||||
kwargs = {
|
||||
"unique": model_node_entry.get("unique"),
|
||||
"method": model_node_entry.get("type"),
|
||||
}
|
||||
|
||||
if column_names := model_node_entry.get("columns", []):
|
||||
# TODO: include the QuotePolicy instead of defaulting to lower()
|
||||
kwargs.update({"column_names": set(column.lower() for column in column_names)})
|
||||
|
||||
return kwargs
|
||||
|
||||
@classmethod
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
|
||||
index = relation_results.get("base", {})
|
||||
index_config = {
|
||||
"name": index.get("name"),
|
||||
# we shouldn't have to adjust the values from the database for the QuotePolicy
|
||||
"column_names": set(index.get("column_names", "").split(",")),
|
||||
"unique": index.get("unique"),
|
||||
"method": index.get("method"),
|
||||
}
|
||||
return index_config
|
||||
|
||||
def as_user_config(self):
|
||||
"""
|
||||
Returns: a dictionary that can be passed into `get_create_index_sql()`
|
||||
"""
|
||||
config = {
|
||||
"columns": list(self.column_names),
|
||||
"unique": self.unique,
|
||||
"type": self.method.value,
|
||||
}
|
||||
return config
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(
|
||||
# don't include the name for hashing since we generate a time-specific name for indexes in Postgres
|
||||
(
|
||||
frozenset(self.column_names),
|
||||
self.unique,
|
||||
self.method,
|
||||
)
|
||||
)
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, PostgresIndexConfig):
|
||||
# don't include the name for equality since we generate a time-specific name for indexes in Postgres
|
||||
return all(
|
||||
{
|
||||
self.column_names == other.column_names,
|
||||
self.unique == other.unique,
|
||||
self.method == other.method,
|
||||
}
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PostgresIndexChange(RelationConfigChange):
|
||||
"""
|
||||
Example of an index change:
|
||||
{
|
||||
"action": "create",
|
||||
"context": {
|
||||
"name": "", # we don't know the name since it gets created as a hash at runtime
|
||||
"columns": ["column_1", "column_3"],
|
||||
"type": "hash",
|
||||
"unique": True
|
||||
}
|
||||
},
|
||||
{
|
||||
"action": "drop",
|
||||
"context": {
|
||||
"name": "index_abc", # we only need this to drop, but we need the rest to compare
|
||||
"columns": ["column_1"],
|
||||
"type": "btree",
|
||||
"unique": True
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
context: PostgresIndexConfig = None
|
||||
|
||||
def requires_full_refresh(self) -> bool:
|
||||
return False
|
||||
|
||||
def validation_rules(self) -> Set[Union[Tuple[bool, DbtRuntimeError], bool]]:
|
||||
return {
|
||||
(
|
||||
self.action
|
||||
in {RelationConfigChangeAction.create, RelationConfigChangeAction.drop},
|
||||
DbtRuntimeError(
|
||||
"Invalid operation, only `drop` and `create` changes are supported for indexes."
|
||||
),
|
||||
),
|
||||
(
|
||||
not (self.action == RelationConfigChangeAction.drop and self.context.name is None),
|
||||
DbtRuntimeError("Invalid operation, attempting to drop an index with no name."),
|
||||
),
|
||||
(
|
||||
not (
|
||||
self.action == RelationConfigChangeAction.create
|
||||
and self.context.column_names == set()
|
||||
),
|
||||
DbtRuntimeError(
|
||||
"Invalid operations, attempting to create an index with no columns."
|
||||
),
|
||||
),
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, Set, Any
|
||||
|
||||
from dbt.adapters.relation_configs import (
|
||||
MaterializedViewConfig,
|
||||
MaterializedViewConfigChangeCollection,
|
||||
RelationResults,
|
||||
ValidationRule,
|
||||
)
|
||||
from dbt.contracts.graph.nodes import ModelNode
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
|
||||
from dbt.adapters.postgres.relation_configs.index import PostgresIndexConfig, PostgresIndexChange
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PostgresMaterializedViewConfig(MaterializedViewConfig):
|
||||
"""
|
||||
This config follows the specs found here:
|
||||
https://www.postgresql.org/docs/current/sql-creatematerializedview.html
|
||||
|
||||
The following parameters are configurable by dbt:
|
||||
- table_name: name of the materialized view
|
||||
- query: the query that defines the view
|
||||
- indexes: the collection (set) of indexes on the materialized view
|
||||
|
||||
Applicable defaults for non-configurable parameters:
|
||||
- method: `heap`
|
||||
- tablespace_name: `default_tablespace`
|
||||
- with_data: `True`
|
||||
"""
|
||||
|
||||
table_name: Optional[str] = None
|
||||
query: Optional[str] = None
|
||||
indexes: Set[PostgresIndexConfig] = field(default_factory=set)
|
||||
|
||||
def validation_rules(self) -> Set[ValidationRule]:
|
||||
# index rules get run by default
|
||||
return {
|
||||
(
|
||||
self.table_name is None or len(self.table_name) <= 63,
|
||||
DbtRuntimeError(
|
||||
f"The materialized view name is more than 63 characters: {self.table_name}"
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, kwargs_dict) -> "PostgresMaterializedViewConfig":
|
||||
config_dict = {
|
||||
"table_name": kwargs_dict.get("table_name"),
|
||||
"query": kwargs_dict.get("query"),
|
||||
"indexes": {
|
||||
PostgresIndexConfig.from_dict(index) for index in kwargs_dict.get("indexes", {})
|
||||
},
|
||||
}
|
||||
materialized_view: "PostgresMaterializedViewConfig" = super().from_dict(config_dict) # type: ignore
|
||||
return materialized_view
|
||||
|
||||
@classmethod
|
||||
def parse_model_node(cls, model_node: ModelNode) -> dict:
|
||||
"""
|
||||
Postgres-specific implementation of `RelationConfig.from_model_node()` for materialized views
|
||||
"""
|
||||
kwargs = {
|
||||
"table_name": model_node.identifier,
|
||||
"query": model_node.compiled_code,
|
||||
}
|
||||
|
||||
# create index objects for each index found in the config
|
||||
if indexes := model_node.config.extra.get("indexes"):
|
||||
index_configs = [PostgresIndexConfig.parse_model_node(index) for index in indexes]
|
||||
kwargs.update({"indexes": index_configs})
|
||||
|
||||
return kwargs
|
||||
|
||||
@classmethod
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
|
||||
"""
|
||||
Postgres-specific implementation of `RelationConfig.from_relation_results()` for materialized views
|
||||
"""
|
||||
base_config = relation_results.get("base", {})
|
||||
kwargs = {
|
||||
"table_name": base_config.get("table_name"),
|
||||
"query": base_config.get("query"),
|
||||
}
|
||||
|
||||
# create index objects for each index found in the config
|
||||
if indexes := relation_results.get("indexes"):
|
||||
index_configs = [
|
||||
PostgresIndexConfig.parse_relation_results({"base": index})
|
||||
for index in indexes.rows
|
||||
]
|
||||
kwargs.update({"indexes": index_configs})
|
||||
|
||||
return kwargs
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(
|
||||
(
|
||||
self.table_name,
|
||||
self.query,
|
||||
self.indexes,
|
||||
)
|
||||
)
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, PostgresMaterializedViewConfig):
|
||||
return all(
|
||||
{
|
||||
self.table_name == other.table_name,
|
||||
self.query == other.query,
|
||||
self.indexes == other.indexes,
|
||||
}
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PostgresMaterializedViewConfigChangeCollection(MaterializedViewConfigChangeCollection):
|
||||
indexes: Optional[Set[PostgresIndexChange]] = None
|
||||
@@ -95,6 +95,14 @@
|
||||
'view' as type
|
||||
from pg_views
|
||||
where schemaname ilike '{{ schema_relation.schema }}'
|
||||
union all
|
||||
select
|
||||
'{{ schema_relation.database }}' as database,
|
||||
matviewname as name,
|
||||
schemaname as schema,
|
||||
'materialized_view' as type
|
||||
from pg_matviews
|
||||
where schemaname ilike '{{ schema_relation.schema }}'
|
||||
{% endcall %}
|
||||
{{ return(load_result('list_relations_without_caching').table) }}
|
||||
{% endmacro %}
|
||||
@@ -209,3 +217,34 @@
|
||||
{% macro postgres__copy_grants() %}
|
||||
{{ return(False) }}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro postgres__get_show_indexes_sql(relation) %}
|
||||
select
|
||||
i.relname as name,
|
||||
m.amname as method,
|
||||
ix.indisunique as "unique",
|
||||
array_to_string(array_agg(a.attname), ',') as column_names
|
||||
from pg_index ix
|
||||
join pg_class i
|
||||
on i.oid = ix.indexrelid
|
||||
join pg_am m
|
||||
on m.oid=i.relam
|
||||
join pg_class t
|
||||
on t.oid = ix.indrelid
|
||||
join pg_namespace n
|
||||
on n.oid = t.relnamespace
|
||||
join pg_attribute a
|
||||
on a.attrelid = t.oid
|
||||
and a.attnum = ANY(ix.indkey)
|
||||
where t.relname = '{{ relation.identifier }}'
|
||||
and n.nspname = '{{ relation.schema }}'
|
||||
and t.relkind in ('r', 'm')
|
||||
group by 1, 2, 3
|
||||
order by 1, 2, 3
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{%- macro postgres__get_drop_index_sql(relation, index_name) -%}
|
||||
drop index if exists "{{ index_name }}"
|
||||
{%- endmacro -%}
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
{% macro postgres__get_alter_materialized_view_as_sql(
|
||||
relation,
|
||||
configuration_changes,
|
||||
sql,
|
||||
existing_relation,
|
||||
backup_relation,
|
||||
intermediate_relation
|
||||
) %}
|
||||
|
||||
-- apply a full refresh immediately if needed
|
||||
{% if configuration_changes.requires_full_refresh() %}
|
||||
|
||||
{{ get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) }}
|
||||
|
||||
-- otherwise apply individual changes as needed
|
||||
{% else %}
|
||||
|
||||
{{ postgres__update_indexes_on_materialized_view(relation, configuration_changes.indexes) }}
|
||||
|
||||
{%- endif -%}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro postgres__get_create_materialized_view_as_sql(relation, sql) %}
|
||||
create materialized view if not exists {{ relation }} as {{ sql }};
|
||||
|
||||
{% for _index_dict in config.get('indexes', []) -%}
|
||||
{{- get_create_index_sql(relation, _index_dict) -}}
|
||||
{%- endfor -%}
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro postgres__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
{{- get_create_materialized_view_as_sql(intermediate_relation, sql) -}}
|
||||
|
||||
{% if existing_relation is not none %}
|
||||
alter materialized view {{ existing_relation }} rename to {{ backup_relation.include(database=False, schema=False) }};
|
||||
{% endif %}
|
||||
|
||||
alter materialized view {{ intermediate_relation }} rename to {{ relation.include(database=False, schema=False) }};
|
||||
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro postgres__get_materialized_view_configuration_changes(existing_relation, new_config) %}
|
||||
{% set _existing_materialized_view = postgres__describe_materialized_view(existing_relation) %}
|
||||
{% set _configuration_changes = existing_relation.get_relation_config_change_collection(_existing_materialized_view, new_config) %}
|
||||
{% do return(_configuration_changes) %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro postgres__refresh_materialized_view(relation) %}
|
||||
refresh materialized view {{ relation }};
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{%- macro postgres__update_indexes_on_materialized_view(relation, index_changes) -%}
|
||||
{{- log("Applying UPDATE INDEXES to: " ~ relation) -}}
|
||||
|
||||
{%- for _index_change in index_changes -%}
|
||||
{%- set _index = _index_change.context -%}
|
||||
|
||||
{%- if _index_change.action == "drop" -%}
|
||||
|
||||
{{ postgres__get_drop_index_sql(relation, _index.name) }};
|
||||
|
||||
{%- elif _index_change.action == "create" -%}
|
||||
|
||||
{{ postgres__get_create_index_sql(relation, _index.as_user_config()) }}
|
||||
|
||||
{%- else -%}
|
||||
|
||||
{{- exceptions.raise_compiler_error(
|
||||
"Unsupported action supplied to postgres__update_indexes_on_materialized_view: " ~ _index_change.action)
|
||||
-}}
|
||||
|
||||
{%- endif -%}
|
||||
|
||||
{%- endfor -%}
|
||||
|
||||
{%- endmacro -%}
|
||||
|
||||
|
||||
{% macro postgres__describe_materialized_view(relation) %}
|
||||
-- for now just get the indexes, we don't need the name or the query yet
|
||||
{% set _indexes = run_query(get_show_indexes_sql(relation)) %}
|
||||
{% do return({'indexes': _indexes}) %}
|
||||
{% endmacro %}
|
||||
@@ -42,7 +42,7 @@
|
||||
referenced_class.kind
|
||||
from relation
|
||||
join class as referenced_class on relation.class=referenced_class.id
|
||||
where referenced_class.kind in ('r', 'v')
|
||||
where referenced_class.kind in ('r', 'v', 'm')
|
||||
),
|
||||
relationships as (
|
||||
select
|
||||
|
||||
@@ -71,6 +71,8 @@ setup(
|
||||
install_requires=[
|
||||
"dbt-core=={}".format(package_version),
|
||||
"{}~=2.8".format(DBT_PSYCOPG2_NAME),
|
||||
# installed via dbt-core, but referenced directly, don't pin to avoid version conflicts with dbt-core
|
||||
"agate",
|
||||
],
|
||||
zip_safe=False,
|
||||
classifiers=[
|
||||
|
||||
@@ -147,6 +147,7 @@ def basic_uncompiled_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -199,6 +200,7 @@ def basic_compiled_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
|
||||
@@ -81,6 +81,7 @@ def populated_node_config_dict():
|
||||
"tags": [],
|
||||
"extra": "even more",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -165,6 +166,7 @@ def base_parsed_model_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -273,6 +275,7 @@ def complex_parsed_model_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -352,6 +355,7 @@ def complex_parsed_model_object():
|
||||
"quoting": {},
|
||||
"column_types": {"a": "text"},
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
"docs": {"show": True},
|
||||
@@ -370,6 +374,7 @@ def complex_parsed_model_object():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -520,6 +525,7 @@ def basic_parsed_seed_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -611,6 +617,7 @@ def complex_parsed_seed_dict():
|
||||
"tags": [],
|
||||
"quote_columns": True,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -816,6 +823,7 @@ def base_parsed_hook_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -898,6 +906,7 @@ def complex_parsed_hook_dict():
|
||||
"quoting": {},
|
||||
"tags": [],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -1254,6 +1263,7 @@ def basic_timestamp_snapshot_config_dict():
|
||||
"target_database": "some_snapshot_db",
|
||||
"target_schema": "some_snapshot_schema",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -1291,6 +1301,7 @@ def complex_timestamp_snapshot_config_dict():
|
||||
"strategy": "timestamp",
|
||||
"updated_at": "last_update",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -1356,6 +1367,7 @@ def basic_check_snapshot_config_dict():
|
||||
"strategy": "check",
|
||||
"check_cols": "all",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -1393,6 +1405,7 @@ def complex_set_snapshot_config_dict():
|
||||
"strategy": "check",
|
||||
"check_cols": ["a", "b"],
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -1509,6 +1522,7 @@ def basic_timestamp_snapshot_dict():
|
||||
"strategy": "timestamp",
|
||||
"updated_at": "last_update",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
@@ -1656,6 +1670,7 @@ def basic_check_snapshot_dict():
|
||||
"strategy": "check",
|
||||
"check_cols": "all",
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"docs": {"show": True},
|
||||
|
||||
69
tests/adapter/dbt/tests/adapter/materialized_view/base.py
Normal file
69
tests/adapter/dbt/tests/adapter/materialized_view/base.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from typing import List, Tuple, Optional
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from dbt.dataclass_schema import StrEnum
|
||||
from dbt.tests.util import run_dbt, get_manifest, run_dbt_and_capture
|
||||
|
||||
|
||||
def run_model(
|
||||
model: str,
|
||||
run_args: Optional[List[str]] = None,
|
||||
full_refresh: bool = False,
|
||||
expect_pass: bool = True,
|
||||
) -> Tuple[list, str]:
|
||||
args = ["--debug", "run", "--models", model]
|
||||
if full_refresh:
|
||||
args.append("--full-refresh")
|
||||
if run_args:
|
||||
args.extend(run_args)
|
||||
return run_dbt_and_capture(args, expect_pass=expect_pass)
|
||||
|
||||
|
||||
def assert_message_in_logs(logs: str, message: str, expected_fail: bool = False):
|
||||
# if the logs are json strings, then 'jsonify' the message because of things like escape quotes
|
||||
if os.environ.get("DBT_LOG_FORMAT", "") == "json":
|
||||
message = message.replace(r'"', r"\"")
|
||||
|
||||
if expected_fail:
|
||||
assert message not in logs
|
||||
else:
|
||||
assert message in logs
|
||||
|
||||
|
||||
def get_records(project, model: str) -> List[tuple]:
|
||||
sql = f"select * from {project.database}.{project.test_schema}.{model};"
|
||||
return [tuple(row) for row in project.run_sql(sql, fetch="all")]
|
||||
|
||||
|
||||
def get_row_count(project, model: str) -> int:
|
||||
sql = f"select count(*) from {project.database}.{project.test_schema}.{model};"
|
||||
return project.run_sql(sql, fetch="one")[0]
|
||||
|
||||
|
||||
def insert_record(project, record: tuple, model: str, columns: List[str]):
|
||||
sql = f"""
|
||||
insert into {project.database}.{project.test_schema}.{model} ({', '.join(columns)})
|
||||
values ({','.join(str(value) for value in record)})
|
||||
;"""
|
||||
project.run_sql(sql)
|
||||
|
||||
|
||||
def assert_model_exists_and_is_correct_type(project, model: str, relation_type: StrEnum):
|
||||
# In general, `relation_type` will be of type `RelationType`.
|
||||
# However, in some cases (e.g. `dbt-snowflake`) adapters will have their own `RelationType`.
|
||||
manifest = get_manifest(project.project_root)
|
||||
model_metadata = manifest.nodes[f"model.test.{model}"]
|
||||
assert model_metadata.config.materialized == relation_type
|
||||
assert get_row_count(project, model) >= 0
|
||||
|
||||
|
||||
class Base:
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def setup(self, project):
|
||||
run_dbt(["run"])
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def project(self, project):
|
||||
yield project
|
||||
@@ -0,0 +1,97 @@
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from dbt.tests.util import read_file, write_file, relation_from_name
|
||||
from dbt.contracts.graph.model_config import OnConfigurationChangeOption
|
||||
from dbt.contracts.results import RunStatus
|
||||
|
||||
from dbt.tests.adapter.materialized_view.base import (
|
||||
Base,
|
||||
assert_message_in_logs,
|
||||
)
|
||||
|
||||
|
||||
def get_project_config(project):
|
||||
file_yaml = read_file(project.project_root, "dbt_project.yml")
|
||||
return yaml.safe_load(file_yaml)
|
||||
|
||||
|
||||
def set_project_config(project, config):
|
||||
config_yaml = yaml.safe_dump(config)
|
||||
write_file(config_yaml, project.project_root, "dbt_project.yml")
|
||||
|
||||
|
||||
def get_model_file(project, model: str) -> str:
|
||||
return read_file(project.project_root, "models", f"{model}.sql")
|
||||
|
||||
|
||||
def set_model_file(project, model: str, model_sql: str):
|
||||
write_file(model_sql, project.project_root, "models", f"{model}.sql")
|
||||
|
||||
|
||||
def assert_proper_scenario(
|
||||
on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
status: RunStatus,
|
||||
messages_in_logs: List[str] = None,
|
||||
messages_not_in_logs: List[str] = None,
|
||||
):
|
||||
assert len(results.results) == 1
|
||||
result = results.results[0]
|
||||
|
||||
assert result.node.config.on_configuration_change == on_configuration_change
|
||||
assert result.status == status
|
||||
for message in messages_in_logs or []:
|
||||
assert_message_in_logs(logs, message)
|
||||
for message in messages_not_in_logs or []:
|
||||
assert_message_in_logs(logs, message, expected_fail=True)
|
||||
|
||||
|
||||
class OnConfigurationChangeBase(Base):
|
||||
|
||||
on_configuration_change = OnConfigurationChangeOption.Apply
|
||||
base_materialized_view = "base_materialized_view"
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {"models": {"on_configuration_change": str(self.on_configuration_change)}}
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def alter_message(self, project):
|
||||
return f"Applying ALTER to: {relation_from_name(project.adapter, self.base_materialized_view)}"
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def create_message(self, project):
|
||||
return f"Applying CREATE to: {relation_from_name(project.adapter, self.base_materialized_view)}"
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def refresh_message(self, project):
|
||||
return f"Applying REFRESH to: {relation_from_name(project.adapter, self.base_materialized_view)}"
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def replace_message(self, project):
|
||||
return f"Applying REPLACE to: {relation_from_name(project.adapter, self.base_materialized_view)}"
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def configuration_change_message(self, project):
|
||||
return (
|
||||
f"Determining configuration changes on: "
|
||||
f"{relation_from_name(project.adapter, self.base_materialized_view)}"
|
||||
)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def configuration_change_continue_message(self, project):
|
||||
return (
|
||||
f"Configuration changes were identified and `on_configuration_change` "
|
||||
f"was set to `continue` for `{relation_from_name(project.adapter, self.base_materialized_view)}`"
|
||||
)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def configuration_change_fail_message(self, project):
|
||||
return (
|
||||
f"Configuration changes were identified and `on_configuration_change` "
|
||||
f"was set to `fail` for `{relation_from_name(project.adapter, self.base_materialized_view)}`"
|
||||
)
|
||||
@@ -11,4 +11,4 @@
|
||||
* Explore using:
|
||||
* https://github.com/pytest-docker-compose/pytest-docker-compose or
|
||||
* https://github.com/avast/pytest-docker for automatically managing a postgres instance running in a docker container
|
||||
* Track test converage (https://pytest-cov.readthedocs.io/en/latest)
|
||||
* Track test coverage (https://pytest-cov.readthedocs.io/en/latest)
|
||||
|
||||
@@ -29,6 +29,7 @@ def get_rendered_model_config(**updates):
|
||||
"persist_docs": {},
|
||||
"full_refresh": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"unique_key": None,
|
||||
"grants": {},
|
||||
@@ -59,6 +60,7 @@ def get_rendered_seed_config(**updates):
|
||||
"quote_columns": True,
|
||||
"full_refresh": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -96,6 +98,7 @@ def get_rendered_snapshot_config(**updates):
|
||||
"persist_docs": {},
|
||||
"full_refresh": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"strategy": "check",
|
||||
"check_cols": "all",
|
||||
"unique_key": "id",
|
||||
|
||||
@@ -90,6 +90,7 @@ class TestList:
|
||||
"alias": None,
|
||||
"check_cols": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"meta": {},
|
||||
"grants": {},
|
||||
"packages": [],
|
||||
@@ -127,6 +128,7 @@ class TestList:
|
||||
"persist_docs": {},
|
||||
"full_refresh": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -170,6 +172,7 @@ class TestList:
|
||||
"full_refresh": None,
|
||||
"unique_key": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -207,6 +210,7 @@ class TestList:
|
||||
"full_refresh": None,
|
||||
"unique_key": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -244,6 +248,7 @@ class TestList:
|
||||
"full_refresh": None,
|
||||
"unique_key": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -281,6 +286,7 @@ class TestList:
|
||||
"full_refresh": None,
|
||||
"unique_key": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -328,6 +334,7 @@ class TestList:
|
||||
"persist_docs": {},
|
||||
"full_refresh": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
@@ -393,6 +400,7 @@ class TestList:
|
||||
"full_refresh": None,
|
||||
"unique_key": None,
|
||||
"on_schema_change": "ignore",
|
||||
"on_configuration_change": "apply",
|
||||
"database": None,
|
||||
"schema": None,
|
||||
"alias": None,
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
import pytest
|
||||
|
||||
from dbt.tests.util import relation_from_name
|
||||
from tests.adapter.dbt.tests.adapter.materialized_view.base import Base
|
||||
from tests.adapter.dbt.tests.adapter.materialized_view.on_configuration_change import (
|
||||
OnConfigurationChangeBase,
|
||||
get_model_file,
|
||||
set_model_file,
|
||||
)
|
||||
|
||||
|
||||
class PostgresBasicBase(Base):
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
base_table = """
|
||||
{{ config(materialized='table') }}
|
||||
select 1 as base_column
|
||||
"""
|
||||
base_materialized_view = """
|
||||
{{ config(materialized='materialized_view') }}
|
||||
select * from {{ ref('base_table') }}
|
||||
"""
|
||||
return {"base_table.sql": base_table, "base_materialized_view.sql": base_materialized_view}
|
||||
|
||||
|
||||
class PostgresOnConfigurationChangeBase(OnConfigurationChangeBase):
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
base_table = """
|
||||
{{ config(
|
||||
materialized='table',
|
||||
indexes=[{'columns': ['id', 'value']}]
|
||||
) }}
|
||||
select
|
||||
1 as id,
|
||||
100 as value,
|
||||
42 as new_id,
|
||||
4242 as new_value
|
||||
"""
|
||||
base_materialized_view = """
|
||||
{{ config(
|
||||
materialized='materialized_view',
|
||||
indexes=[{'columns': ['id', 'value']}]
|
||||
) }}
|
||||
select * from {{ ref('base_table') }}
|
||||
"""
|
||||
return {"base_table.sql": base_table, "base_materialized_view.sql": base_materialized_view}
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def configuration_changes(self, project):
|
||||
initial_model = get_model_file(project, "base_materialized_view")
|
||||
|
||||
# change the index from [`id`, `value`] to [`new_id`, `new_value`]
|
||||
new_model = initial_model.replace(
|
||||
"indexes=[{'columns': ['id', 'value']}]",
|
||||
"indexes=[{'columns': ['new_id', 'new_value']}]",
|
||||
)
|
||||
set_model_file(project, "base_materialized_view", new_model)
|
||||
|
||||
yield
|
||||
|
||||
# set this back for the next test
|
||||
set_model_file(project, "base_materialized_view", initial_model)
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def update_index_message(self, project):
|
||||
return f"Applying UPDATE INDEXES to: {relation_from_name(project.adapter, 'base_materialized_view')}"
|
||||
@@ -0,0 +1,143 @@
|
||||
from dbt.contracts.graph.model_config import OnConfigurationChangeOption
|
||||
from dbt.contracts.results import RunStatus
|
||||
from dbt.contracts.relation import RelationType
|
||||
from tests.adapter.dbt.tests.adapter.materialized_view.base import (
|
||||
run_model,
|
||||
assert_model_exists_and_is_correct_type,
|
||||
insert_record,
|
||||
get_row_count,
|
||||
)
|
||||
from tests.adapter.dbt.tests.adapter.materialized_view.on_configuration_change import (
|
||||
assert_proper_scenario,
|
||||
)
|
||||
|
||||
from tests.functional.materializations.materialized_view_tests.fixtures import (
|
||||
PostgresOnConfigurationChangeBase,
|
||||
PostgresBasicBase,
|
||||
)
|
||||
|
||||
|
||||
class TestBasic(PostgresBasicBase):
|
||||
def test_relation_is_materialized_view_on_initial_creation(self, project):
|
||||
assert_model_exists_and_is_correct_type(
|
||||
project, "base_materialized_view", RelationType.MaterializedView
|
||||
)
|
||||
assert_model_exists_and_is_correct_type(project, "base_table", RelationType.Table)
|
||||
|
||||
def test_relation_is_materialized_view_when_rerun(self, project):
|
||||
run_model("base_materialized_view")
|
||||
assert_model_exists_and_is_correct_type(
|
||||
project, "base_materialized_view", RelationType.MaterializedView
|
||||
)
|
||||
|
||||
def test_relation_is_materialized_view_on_full_refresh(self, project):
|
||||
run_model("base_materialized_view", full_refresh=True)
|
||||
assert_model_exists_and_is_correct_type(
|
||||
project, "base_materialized_view", RelationType.MaterializedView
|
||||
)
|
||||
|
||||
def test_relation_is_materialized_view_on_update(self, project):
|
||||
run_model("base_materialized_view", run_args=["--vars", "quoting: {identifier: True}"])
|
||||
assert_model_exists_and_is_correct_type(
|
||||
project, "base_materialized_view", RelationType.MaterializedView
|
||||
)
|
||||
|
||||
def test_updated_base_table_data_only_shows_in_materialized_view_after_rerun(self, project):
|
||||
# poll database
|
||||
table_start = get_row_count(project, "base_table")
|
||||
view_start = get_row_count(project, "base_materialized_view")
|
||||
|
||||
# insert new record in table
|
||||
new_record = (2,)
|
||||
insert_record(project, new_record, "base_table", ["base_column"])
|
||||
|
||||
# poll database
|
||||
table_mid = get_row_count(project, "base_table")
|
||||
view_mid = get_row_count(project, "base_materialized_view")
|
||||
|
||||
# refresh the materialized view
|
||||
run_model("base_materialized_view")
|
||||
|
||||
# poll database
|
||||
table_end = get_row_count(project, "base_table")
|
||||
view_end = get_row_count(project, "base_materialized_view")
|
||||
|
||||
# new records were inserted in the table but didn't show up in the view until it was refreshed
|
||||
assert table_start < table_mid == table_end
|
||||
assert view_start == view_mid < view_end
|
||||
|
||||
|
||||
class OnConfigurationChangeCommon(PostgresOnConfigurationChangeBase):
|
||||
def test_full_refresh_takes_precedence_over_any_configuration_changes(
|
||||
self, configuration_changes, replace_message, configuration_change_message
|
||||
):
|
||||
results, logs = run_model("base_materialized_view", full_refresh=True)
|
||||
assert_proper_scenario(
|
||||
self.on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
RunStatus.Success,
|
||||
messages_in_logs=[replace_message],
|
||||
messages_not_in_logs=[configuration_change_message],
|
||||
)
|
||||
|
||||
def test_model_is_refreshed_with_no_configuration_changes(
|
||||
self, refresh_message, configuration_change_message
|
||||
):
|
||||
results, logs = run_model("base_materialized_view")
|
||||
assert_proper_scenario(
|
||||
self.on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
RunStatus.Success,
|
||||
messages_in_logs=[refresh_message, configuration_change_message],
|
||||
)
|
||||
|
||||
|
||||
class TestOnConfigurationChangeApply(OnConfigurationChangeCommon):
|
||||
def test_model_applies_changes_with_configuration_changes(
|
||||
self, configuration_changes, alter_message, update_index_message
|
||||
):
|
||||
results, logs = run_model("base_materialized_view")
|
||||
assert_proper_scenario(
|
||||
self.on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
RunStatus.Success,
|
||||
messages_in_logs=[alter_message, update_index_message],
|
||||
)
|
||||
|
||||
|
||||
class TestOnConfigurationChangeContinue(OnConfigurationChangeCommon):
|
||||
|
||||
on_configuration_change = OnConfigurationChangeOption.Continue
|
||||
|
||||
def test_model_is_not_refreshed_with_configuration_changes(
|
||||
self, configuration_changes, configuration_change_continue_message, refresh_message
|
||||
):
|
||||
results, logs = run_model("base_materialized_view")
|
||||
assert_proper_scenario(
|
||||
self.on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
RunStatus.Success,
|
||||
messages_in_logs=[configuration_change_continue_message],
|
||||
messages_not_in_logs=[refresh_message],
|
||||
)
|
||||
|
||||
|
||||
class TestOnConfigurationChangeFail(OnConfigurationChangeCommon):
|
||||
|
||||
on_configuration_change = OnConfigurationChangeOption.Fail
|
||||
|
||||
def test_run_fails_with_configuration_changes(
|
||||
self, configuration_changes, configuration_change_fail_message
|
||||
):
|
||||
results, logs = run_model("base_materialized_view", expect_pass=False)
|
||||
assert_proper_scenario(
|
||||
self.on_configuration_change,
|
||||
results,
|
||||
logs,
|
||||
RunStatus.Error,
|
||||
messages_in_logs=[configuration_change_fail_message],
|
||||
)
|
||||
Reference in New Issue
Block a user