mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-22 17:01:27 +00:00
Compare commits
18 Commits
enable-pos
...
feature/ma
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c00650337b | ||
|
|
10c90b2c03 | ||
|
|
be6337b10d | ||
|
|
60338bfef0 | ||
|
|
1d41b50144 | ||
|
|
1e3f4034aa | ||
|
|
34130bb784 | ||
|
|
a6f8e427aa | ||
|
|
44b3c70f79 | ||
|
|
6e4a960770 | ||
|
|
aba3c0315b | ||
|
|
6499a6884c | ||
|
|
e3b986e56d | ||
|
|
99296fe424 | ||
|
|
f67f6bebff | ||
|
|
4cc726f6e5 | ||
|
|
ea1b44de4d | ||
|
|
7544382104 |
6
.changes/unreleased/Features-20231012-122917.yaml
Normal file
6
.changes/unreleased/Features-20231012-122917.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Optimize refreshing materialized views when autorefreshed
|
||||
time: 2023-10-12T12:29:17.705373-04:00
|
||||
custom:
|
||||
Author: mikealfare
|
||||
Issue: "6911"
|
||||
2
.flake8
2
.flake8
@@ -10,3 +10,5 @@ ignore =
|
||||
E741
|
||||
E501 # long line checking is done in black
|
||||
exclude = test/
|
||||
per-file-ignores =
|
||||
*/__init__.py: F401
|
||||
|
||||
@@ -22,6 +22,7 @@ from typing import (
|
||||
)
|
||||
|
||||
from dbt.adapters.capability import Capability, CapabilityDict
|
||||
from dbt.adapters.relation_configs import RelationConfigFactory
|
||||
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint
|
||||
|
||||
import agate
|
||||
@@ -246,6 +247,18 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
self.cache = RelationsCache()
|
||||
self.connections = self.ConnectionManager(config)
|
||||
self._macro_manifest_lazy: Optional[MacroManifest] = None
|
||||
self.relation_config_factory = self._relation_config_factory()
|
||||
|
||||
def _relation_config_factory(self) -> RelationConfigFactory:
|
||||
"""
|
||||
This sets the default relation config factory in the init.
|
||||
If you need to adjust the default settings, override this
|
||||
returning an instance with the settings specific to your adapter.
|
||||
|
||||
See `dbt.adapters.relation_configs.factory.RelationConfigFactory`
|
||||
for more information regarding these settings.
|
||||
"""
|
||||
return RelationConfigFactory()
|
||||
|
||||
###
|
||||
# Methods that pass through to the connection manager
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
from dbt.adapters.relation_configs.config_base import ( # noqa: F401
|
||||
from dbt.adapters.relation_configs._factory import RelationConfigFactory
|
||||
from dbt.adapters.relation_configs._materialized_view import (
|
||||
MaterializedViewRelationConfig,
|
||||
)
|
||||
from dbt.adapters.relation_configs.config_base import (
|
||||
RelationConfigBase,
|
||||
RelationResults,
|
||||
)
|
||||
from dbt.adapters.relation_configs.config_change import ( # noqa: F401
|
||||
from dbt.adapters.relation_configs.config_change import (
|
||||
RelationConfigChangeAction,
|
||||
RelationConfigChange,
|
||||
)
|
||||
from dbt.adapters.relation_configs.config_validation import ( # noqa: F401
|
||||
from dbt.adapters.relation_configs.config_validation import (
|
||||
RelationConfigValidationMixin,
|
||||
RelationConfigValidationRule,
|
||||
)
|
||||
|
||||
58
core/dbt/adapters/relation_configs/_factory.py
Normal file
58
core/dbt/adapters/relation_configs/_factory.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from typing import Dict, Type
|
||||
|
||||
from dbt.contracts.graph.nodes import ParsedNode
|
||||
from dbt.contracts.relation import RelationType
|
||||
from dbt.dataclass_schema import StrEnum
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
|
||||
from dbt.adapters.relation_configs.config_base import RelationConfigBase
|
||||
from dbt.adapters.relation_configs._materialized_view import MaterializedViewRelationConfig
|
||||
|
||||
|
||||
class RelationConfigFactory:
|
||||
"""
|
||||
This provides a way to work with relation configs both in the adapter and in the jinja context.
|
||||
|
||||
This factory comes with a default set of settings which can be overridden in BaseAdapter.
|
||||
|
||||
Args:
|
||||
relation_types: an enum that contains all possible relation types for this adapter
|
||||
this is generally `RelationType`, but there are cases where an adapter may override
|
||||
`RelationType` to include more options or exclude options
|
||||
relation_configs: a map from a relation_type to a relation_config
|
||||
this is generally only overridden if `relation_types` is also overridden
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# the `StrEnum` class will generally be `RelationType`, however this allows for extending that Enum
|
||||
self.relation_types: Type[StrEnum] = kwargs.get("relation_types", RelationType)
|
||||
self.relation_configs: Dict[StrEnum, Type[RelationConfigBase]] = kwargs.get(
|
||||
"relation_configs",
|
||||
{
|
||||
RelationType.MaterializedView: MaterializedViewRelationConfig,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
for relation_type in self.relation_configs.keys():
|
||||
self.relation_types(relation_type)
|
||||
except ValueError:
|
||||
raise DbtRuntimeError(
|
||||
f"Received relation configs for {relation_type} " # noqa
|
||||
f"but these relation types are not registered on this factory.\n"
|
||||
f" registered relation types: {', '.join(self.relation_types)}\n"
|
||||
)
|
||||
|
||||
def make_from_node(self, node: ParsedNode) -> RelationConfigBase:
|
||||
relation_type = self.relation_types(node.config.materialized)
|
||||
relation_config = self._relation_config(relation_type)
|
||||
return relation_config.from_node(node)
|
||||
|
||||
def _relation_config(self, relation_type: StrEnum) -> Type[RelationConfigBase]:
|
||||
if relation := self.relation_configs.get(relation_type):
|
||||
return relation
|
||||
raise DbtRuntimeError(
|
||||
f"This factory does not have a relation config for this type.\n"
|
||||
f" received: {relation_type}\n"
|
||||
f" options: {', '.join(t for t in self.relation_configs.keys())}\n"
|
||||
)
|
||||
7
core/dbt/adapters/relation_configs/_materialized_view.py
Normal file
7
core/dbt/adapters/relation_configs/_materialized_view.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from dbt.adapters.relation_configs.config_base import RelationConfigBase
|
||||
|
||||
|
||||
class MaterializedViewRelationConfig(RelationConfigBase):
|
||||
@property
|
||||
def auto_refresh(self) -> bool:
|
||||
return False
|
||||
@@ -1,7 +1,9 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Union, Dict
|
||||
from typing import Any, Dict, Union
|
||||
from typing_extensions import Self
|
||||
|
||||
import agate
|
||||
from dbt.contracts.graph.nodes import ModelNode, ParsedNode
|
||||
from dbt.utils import filter_null_values
|
||||
|
||||
|
||||
@@ -24,7 +26,7 @@ RelationResults = Dict[str, Union[agate.Row, agate.Table]]
|
||||
@dataclass(frozen=True)
|
||||
class RelationConfigBase:
|
||||
@classmethod
|
||||
def from_dict(cls, kwargs_dict) -> "RelationConfigBase":
|
||||
def from_dict(cls, kwargs_dict: Dict[str, Any]) -> Self:
|
||||
"""
|
||||
This assumes the subclass of `RelationConfigBase` is flat, in the sense that no attribute is
|
||||
itself another subclass of `RelationConfigBase`. If that's not the case, this should be overriden
|
||||
@@ -37,8 +39,53 @@ class RelationConfigBase:
|
||||
"""
|
||||
return cls(**filter_null_values(kwargs_dict)) # type: ignore
|
||||
|
||||
###
|
||||
# Parser for internal nodes, from dbt
|
||||
###
|
||||
|
||||
@classmethod
|
||||
def _not_implemented_error(cls) -> NotImplementedError:
|
||||
return NotImplementedError(
|
||||
"This relation type has not been fully configured for this adapter."
|
||||
def from_node(cls, node: ParsedNode) -> Self:
|
||||
config_dict = cls.parse_node(node)
|
||||
return cls.from_dict(config_dict)
|
||||
|
||||
@classmethod
|
||||
def from_model_node(cls, model_node: ModelNode) -> Self:
|
||||
# this method is being deprecated in favor of the more generic `from_node`
|
||||
return cls.from_node(model_node)
|
||||
|
||||
@classmethod
|
||||
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
|
||||
# this method was originally implemented as `parse_model_node`
|
||||
if hasattr(cls, "parse_model_node"):
|
||||
return cls.parse_model_node(node)
|
||||
return {}
|
||||
|
||||
###
|
||||
# Parser for database results, generally used with `SQLAdapter`
|
||||
###
|
||||
|
||||
@classmethod
|
||||
def from_relation_results(cls, relation_results: RelationResults) -> Self:
|
||||
config_dict = cls.parse_relation_results(relation_results)
|
||||
return cls.from_dict(config_dict)
|
||||
|
||||
@classmethod
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
|
||||
raise NotImplementedError(
|
||||
"`parse_relation_results` has not been implemented for this relation_type."
|
||||
)
|
||||
|
||||
###
|
||||
# Parser for api results, generally used with `BaseAdapter`
|
||||
###
|
||||
|
||||
@classmethod
|
||||
def from_api_results(cls, api_results: Any) -> Self:
|
||||
config_dict = cls.parse_api_results(api_results)
|
||||
return cls.from_dict(config_dict)
|
||||
|
||||
@classmethod
|
||||
def parse_api_results(cls, api_results: Any) -> Dict[str, Any]:
|
||||
raise NotImplementedError(
|
||||
"`parse_api_results` has not been implemented for this relation_type."
|
||||
)
|
||||
|
||||
@@ -15,9 +15,9 @@ class RelationConfigChangeAction(StrEnum):
|
||||
@dataclass(frozen=True, eq=True, unsafe_hash=True)
|
||||
class RelationConfigChange(RelationConfigBase, ABC):
|
||||
action: RelationConfigChangeAction
|
||||
context: Hashable # this is usually a RelationConfig, e.g. IndexConfig, but shouldn't be limited
|
||||
context: Hashable # this is usually a RelationConfigBase, e.g. IndexConfig, but shouldn't be limited
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def requires_full_refresh(self) -> bool:
|
||||
raise self._not_implemented_error()
|
||||
return True
|
||||
|
||||
@@ -14,6 +14,7 @@ from typing import (
|
||||
)
|
||||
from typing_extensions import Protocol
|
||||
|
||||
from dbt.adapters.base import BaseAdapter
|
||||
from dbt.adapters.base.column import Column
|
||||
from dbt.adapters.factory import get_adapter, get_adapter_package_names, get_adapter_type_names
|
||||
from dbt.clients import agate_helper
|
||||
@@ -107,7 +108,7 @@ class BaseDatabaseWrapper:
|
||||
via a relation proxy.
|
||||
"""
|
||||
|
||||
def __init__(self, adapter, namespace: MacroNamespace):
|
||||
def __init__(self, adapter: BaseAdapter, namespace: MacroNamespace):
|
||||
self._adapter = adapter
|
||||
self.Relation = RelationProxy(adapter)
|
||||
self._namespace = namespace
|
||||
@@ -125,6 +126,10 @@ class BaseDatabaseWrapper:
|
||||
def commit(self):
|
||||
return self._adapter.commit_if_has_connection()
|
||||
|
||||
@property
|
||||
def relation_config_factory(self):
|
||||
return self._adapter.relation_config_factory
|
||||
|
||||
def _get_adapter_macro_prefixes(self) -> List[str]:
|
||||
# order matters for dispatch:
|
||||
# 1. current adapter
|
||||
|
||||
@@ -65,7 +65,15 @@
|
||||
{% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %}
|
||||
|
||||
{% if configuration_changes is none %}
|
||||
{% set build_sql = refresh_materialized_view(target_relation) %}
|
||||
{% set relation_config = adapter.relation_config_factory.make_from_node(config.model) %}
|
||||
{% if relation_config.auto_refresh %}
|
||||
{% set build_sql = '' %}
|
||||
{{ exceptions.warn(
|
||||
"No configuration changes were identified on: `" ~ target_relation ~ "`. Continuing."
|
||||
) }}
|
||||
{% else %}
|
||||
{% set build_sql = refresh_materialized_view(target_relation) %}
|
||||
{% endif %}
|
||||
|
||||
{% elif on_configuration_change == 'apply' %}
|
||||
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %}
|
||||
|
||||
@@ -631,3 +631,11 @@ def get_model_file(project, relation: BaseRelation) -> str:
|
||||
|
||||
def set_model_file(project, relation: BaseRelation, model_sql: str):
|
||||
write_file(model_sql, project.project_root, "models", f"{relation.name}.sql")
|
||||
|
||||
|
||||
class UtilityMethodNotImplementedError(NotImplementedError):
|
||||
def __int__(self, class_name: str, method_name: str, additional_message: Optional[str] = None):
|
||||
message = f"To use this test, please implement `{class_name}`.`{method_name}`."
|
||||
if additional_message:
|
||||
message += f" {additional_message}"
|
||||
super().__init__(message)
|
||||
|
||||
@@ -59,9 +59,7 @@ class PostgresRelation(BaseRelation):
|
||||
existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
|
||||
relation_results
|
||||
)
|
||||
new_materialized_view = PostgresMaterializedViewConfig.from_model_node(
|
||||
runtime_config.model
|
||||
)
|
||||
new_materialized_view = PostgresMaterializedViewConfig.from_node(runtime_config.model)
|
||||
|
||||
config_change_collection.indexes = self._get_index_config_changes(
|
||||
existing_materialized_view.indexes, new_materialized_view.indexes
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Set, FrozenSet
|
||||
from typing import Any, Dict, FrozenSet, Set
|
||||
|
||||
import agate
|
||||
from dbt.dataclass_schema import StrEnum
|
||||
@@ -60,7 +60,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, config_dict) -> "PostgresIndexConfig":
|
||||
def from_dict(cls, config_dict: Dict[str, Any]) -> "PostgresIndexConfig":
|
||||
# TODO: include the QuotePolicy instead of defaulting to lower()
|
||||
kwargs_dict = {
|
||||
"name": config_dict.get("name"),
|
||||
@@ -74,7 +74,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
|
||||
return index
|
||||
|
||||
@classmethod
|
||||
def parse_model_node(cls, model_node_entry: dict) -> dict:
|
||||
def parse_node(cls, model_node_entry: Dict[str, Any]) -> Dict[str, Any]:
|
||||
config_dict = {
|
||||
"column_names": set(model_node_entry.get("columns", set())),
|
||||
"unique": model_node_entry.get("unique"),
|
||||
@@ -83,7 +83,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
|
||||
return config_dict
|
||||
|
||||
@classmethod
|
||||
def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
|
||||
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]:
|
||||
config_dict = {
|
||||
"name": relation_results_entry.get("name"),
|
||||
"column_names": set(relation_results_entry.get("column_names", "").split(",")),
|
||||
@@ -93,7 +93,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
|
||||
return config_dict
|
||||
|
||||
@property
|
||||
def as_node_config(self) -> dict:
|
||||
def as_node_config(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Returns: a dictionary that can be passed into `get_create_index_sql()`
|
||||
"""
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Set, FrozenSet, List
|
||||
from typing import Any, Dict, FrozenSet, List, Set
|
||||
|
||||
import agate
|
||||
from dbt.adapters.relation_configs import (
|
||||
RelationConfigBase,
|
||||
MaterializedViewRelationConfig,
|
||||
RelationResults,
|
||||
RelationConfigValidationMixin,
|
||||
RelationConfigValidationRule,
|
||||
)
|
||||
from dbt.contracts.graph.nodes import ModelNode
|
||||
from dbt.contracts.graph.nodes import ParsedNode
|
||||
from dbt.exceptions import DbtRuntimeError
|
||||
|
||||
from dbt.adapters.postgres.relation_configs.constants import MAX_CHARACTERS_IN_IDENTIFIER
|
||||
@@ -19,14 +19,15 @@ from dbt.adapters.postgres.relation_configs.index import (
|
||||
|
||||
|
||||
@dataclass(frozen=True, eq=True, unsafe_hash=True)
|
||||
class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidationMixin):
|
||||
class PostgresMaterializedViewConfig(
|
||||
MaterializedViewRelationConfig, RelationConfigValidationMixin
|
||||
):
|
||||
"""
|
||||
This config follows the specs found here:
|
||||
https://www.postgresql.org/docs/current/sql-creatematerializedview.html
|
||||
|
||||
The following parameters are configurable by dbt:
|
||||
- table_name: name of the materialized view
|
||||
- query: the query that defines the view
|
||||
- indexes: the collection (set) of indexes on the materialized view
|
||||
|
||||
Applicable defaults for non-configurable parameters:
|
||||
@@ -36,7 +37,6 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
|
||||
"""
|
||||
|
||||
table_name: str = ""
|
||||
query: str = ""
|
||||
indexes: FrozenSet[PostgresIndexConfig] = field(default_factory=frozenset)
|
||||
|
||||
@property
|
||||
@@ -54,10 +54,9 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, config_dict: dict) -> "PostgresMaterializedViewConfig":
|
||||
def from_dict(cls, config_dict: Dict[str, Any]) -> "PostgresMaterializedViewConfig":
|
||||
kwargs_dict = {
|
||||
"table_name": config_dict.get("table_name"),
|
||||
"query": config_dict.get("query"),
|
||||
"indexes": frozenset(
|
||||
PostgresIndexConfig.from_dict(index) for index in config_dict.get("indexes", {})
|
||||
),
|
||||
@@ -66,31 +65,16 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
|
||||
return materialized_view
|
||||
|
||||
@classmethod
|
||||
def from_model_node(cls, model_node: ModelNode) -> "PostgresMaterializedViewConfig":
|
||||
materialized_view_config = cls.parse_model_node(model_node)
|
||||
materialized_view = cls.from_dict(materialized_view_config)
|
||||
return materialized_view
|
||||
|
||||
@classmethod
|
||||
def parse_model_node(cls, model_node: ModelNode) -> dict:
|
||||
indexes: List[dict] = model_node.config.extra.get("indexes", [])
|
||||
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
|
||||
indexes: List[dict] = node.config.extra.get("indexes", [])
|
||||
config_dict = {
|
||||
"table_name": model_node.identifier,
|
||||
"query": model_node.compiled_code,
|
||||
"indexes": [PostgresIndexConfig.parse_model_node(index) for index in indexes],
|
||||
"table_name": node.identifier,
|
||||
"indexes": [PostgresIndexConfig.parse_node(index) for index in indexes],
|
||||
}
|
||||
return config_dict
|
||||
|
||||
@classmethod
|
||||
def from_relation_results(
|
||||
cls, relation_results: RelationResults
|
||||
) -> "PostgresMaterializedViewConfig":
|
||||
materialized_view_config = cls.parse_relation_results(relation_results)
|
||||
materialized_view = cls.from_dict(materialized_view_config)
|
||||
return materialized_view
|
||||
|
||||
@classmethod
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
|
||||
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
|
||||
indexes: agate.Table = relation_results.get("indexes", agate.Table(rows={}))
|
||||
config_dict = {
|
||||
"indexes": [
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
from datetime import datetime
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from dbt.tests.util import UtilityMethodNotImplementedError, run_dbt
|
||||
|
||||
|
||||
class MaterializedViewAutoRefreshNoChanges:
|
||||
"""
|
||||
When dbt runs on a materialized view that has no configuration changes, it can default
|
||||
to manually refresh the materialized view. In order to optimize cost and performance,
|
||||
there is no need to run a manual refresh if one is already scheduled due to
|
||||
auto refresh being turned on. Therefore, we should ensure that a manual refresh
|
||||
is only issued if the materialized view does not refresh automatically, and dbt
|
||||
otherwise does nothing.
|
||||
|
||||
To implement:
|
||||
- override `seeds` and provide a seed for your materialized views
|
||||
- override `models` and provide a materialized view auto refresh turned off called "auto_refresh_off.sql"
|
||||
- override `last_refreshed` with logic that inspects the platform for the last refresh timestamp
|
||||
|
||||
If your platform supports auto refresh:
|
||||
- in `models`, provide another materialized view with auto refresh turned on called "auto_refresh_on.sql"
|
||||
|
||||
If your platform does not support auto refresh:
|
||||
- override `test_manual_refresh_does_not_occur_when_auto_refresh_is_on` and mark it with `@pytest.mark.skip`
|
||||
"""
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def seeds(self):
|
||||
yield {"my_seed.csv": ""}
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def models(self):
|
||||
yield {"auto_refresh_on.sql": "", "auto_refresh_off.sql": ""}
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def setup(self, project):
|
||||
run_dbt(["seed"])
|
||||
run_dbt(["run"])
|
||||
yield
|
||||
|
||||
def last_refreshed(self, project, materialized_view: str) -> datetime:
|
||||
raise UtilityMethodNotImplementedError(
|
||||
"MaterializedViewAutoRefreshNoChanges", "last_refreshed"
|
||||
)
|
||||
|
||||
def run_dbt_with_no_changes_and_capture_refresh_times(
|
||||
self, project, materialized_view: str
|
||||
) -> Tuple[datetime, datetime]:
|
||||
last_refresh = self.last_refreshed(project, materialized_view)
|
||||
run_dbt(["run", "--models", materialized_view])
|
||||
next_refresh = self.last_refreshed(project, materialized_view)
|
||||
return last_refresh, next_refresh
|
||||
|
||||
def test_manual_refresh_occurs_when_auto_refresh_is_off(self, project):
|
||||
last_refresh, next_refresh = self.run_dbt_with_no_changes_and_capture_refresh_times(
|
||||
project, "auto_refresh_off"
|
||||
)
|
||||
assert next_refresh > last_refresh
|
||||
|
||||
def test_manual_refresh_does_not_occur_when_auto_refresh_is_on(self, project):
|
||||
last_refresh, next_refresh = self.run_dbt_with_no_changes_and_capture_refresh_times(
|
||||
project, "auto_refresh_on"
|
||||
)
|
||||
assert next_refresh == last_refresh
|
||||
@@ -6,6 +6,7 @@ from dbt.adapters.base.relation import BaseRelation
|
||||
from dbt.contracts.graph.model_config import OnConfigurationChangeOption
|
||||
from dbt.contracts.relation import RelationType
|
||||
from dbt.tests.util import (
|
||||
UtilityMethodNotImplementedError,
|
||||
assert_message_in_logs,
|
||||
get_model_file,
|
||||
run_dbt,
|
||||
@@ -45,10 +46,7 @@ class MaterializedViewChanges:
|
||||
"""
|
||||
Check the starting state; this should align with `files.MY_MATERIALIZED_VIEW`.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"To use this test, please implement `check_start_state`,"
|
||||
" inherited from `MaterializedViewsChanges`."
|
||||
)
|
||||
raise UtilityMethodNotImplementedError("MaterializedViewsChanges", "check_start_state")
|
||||
|
||||
@staticmethod
|
||||
def change_config_via_alter(project, materialized_view):
|
||||
@@ -65,10 +63,8 @@ class MaterializedViewChanges:
|
||||
"""
|
||||
Verify that the changes in `change_config_via_alter` were applied.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"To use this test, please implement `change_config_via_alter` and"
|
||||
" `check_state_alter_change_is_applied`,"
|
||||
" inherited from `MaterializedViewsChanges`."
|
||||
raise UtilityMethodNotImplementedError(
|
||||
"MaterializedViewsChanges", "change_config_via_alter"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -87,17 +83,13 @@ class MaterializedViewChanges:
|
||||
Verify that the changes in `change_config_via_replace` were applied.
|
||||
This is independent of `check_state_alter_change_is_applied`.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"To use this test, please implement `change_config_via_replace` and"
|
||||
" `check_state_replace_change_is_applied`,"
|
||||
" inherited from `MaterializedViewsChanges`."
|
||||
raise UtilityMethodNotImplementedError(
|
||||
"MaterializedViewsChanges", "change_config_via_replace"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
|
||||
raise NotImplementedError(
|
||||
"To use this test, please implement `query_relation_type`, inherited from `MaterializedViewsChanges`."
|
||||
)
|
||||
raise UtilityMethodNotImplementedError("MaterializedViewsChanges", "query_relation_type")
|
||||
|
||||
"""
|
||||
Configure these if needed
|
||||
@@ -152,6 +144,16 @@ class MaterializedViewChanges:
|
||||
assert_message_in_logs(f"Applying ALTER to: {my_materialized_view}", logs, False)
|
||||
assert_message_in_logs(f"Applying REPLACE to: {my_materialized_view}", logs)
|
||||
|
||||
def test_no_alter_and_no_replace_occurs_with_no_changes(self, project, my_materialized_view):
|
||||
# no changes were made to the model
|
||||
_, logs = run_dbt_and_capture(
|
||||
["--debug", "run", "--models", my_materialized_view.identifier]
|
||||
)
|
||||
# no changes were submitted to the database
|
||||
assert self.query_relation_type(project, my_materialized_view) == "materialized_view"
|
||||
assert_message_in_logs(f"Applying ALTER to: {my_materialized_view}", logs, False)
|
||||
assert_message_in_logs(f"Applying REPLACE to: {my_materialized_view}", logs, False)
|
||||
|
||||
|
||||
class MaterializedViewChangesApplyMixin:
|
||||
@pytest.fixture(scope="class")
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
SEED__MY_SEED = """
|
||||
id,value
|
||||
1,100
|
||||
2,200
|
||||
3,300
|
||||
""".strip()
|
||||
|
||||
|
||||
# postgres does not let you inspect the database for a last refresh date
|
||||
# we need to create our own by adding a last_modified date to materialized view itself
|
||||
MODEL__MY_MATERIALIZED_VIEW = """
|
||||
{{ config(
|
||||
materialized="materialized_view"
|
||||
) }}
|
||||
select *, now() as last_refreshed from {{ ref('my_seed') }}
|
||||
"""
|
||||
|
||||
|
||||
# see above for why we are just querying the table instead of a metadata table
|
||||
MACRO__LAST_REFRESH = """
|
||||
{% macro postgres__test__last_refresh(schema, identifier) %}
|
||||
{% set _sql %}
|
||||
select max(last_refreshed) as last_refresh from {{ schema }}.{{ identifier }}
|
||||
{% endset %}
|
||||
{% do return(run_query(_sql)) %}
|
||||
{% endmacro %}
|
||||
"""
|
||||
@@ -0,0 +1,36 @@
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.adapter.dbt.tests.adapter.materialized_view.auto_refresh import (
|
||||
MaterializedViewAutoRefreshNoChanges,
|
||||
)
|
||||
|
||||
from tests.functional.materializations.materialized_view_tests import files
|
||||
|
||||
|
||||
class TestMaterializedViewAutoRefreshNoChanges(MaterializedViewAutoRefreshNoChanges):
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def seeds(self):
|
||||
yield {"my_seed.csv": files.SEED__MY_SEED}
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def models(self):
|
||||
yield {"auto_refresh_off.sql": files.MODEL__MY_MATERIALIZED_VIEW}
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def macros(self):
|
||||
yield {"postgres__test__last_refresh.sql": files.MACRO__LAST_REFRESH}
|
||||
|
||||
def last_refreshed(self, project, materialized_view: str) -> datetime:
|
||||
with project.adapter.connection_named("__test"):
|
||||
kwargs = {"schema": project.test_schema, "identifier": materialized_view}
|
||||
last_refresh_results = project.adapter.execute_macro(
|
||||
"postgres__test__last_refresh", kwargs=kwargs
|
||||
)
|
||||
last_refresh = last_refresh_results[0].get("last_refresh")
|
||||
return last_refresh
|
||||
|
||||
@pytest.mark.skip("Postgres does not support auto refresh.")
|
||||
def test_manual_refresh_does_not_occur_when_auto_refresh_is_on(self, project):
|
||||
pass
|
||||
Reference in New Issue
Block a user