Compare commits

...

18 Commits

Author SHA1 Message Date
Mike Alfare
c00650337b fix exception type 2023-10-19 13:10:47 -07:00
Mike Alfare
10c90b2c03 forward from_model_node to from_node for backwards compatibility 2023-10-14 17:49:17 -04:00
Mike Alfare
be6337b10d update warning for no refresh to match existing warning in other adapters 2023-10-14 16:19:55 -04:00
Mike Alfare
60338bfef0 return a conservative default instead of raising an exception 2023-10-14 14:09:46 -04:00
Mike Alfare
1d41b50144 update types and names to be more accurate 2023-10-14 14:03:22 -04:00
Mike Alfare
1e3f4034aa update types and names to be more accurate, move some parsing methods into the base relation config 2023-10-14 13:59:33 -04:00
Mike Alfare
34130bb784 make submodule private to push import to the subpackage level 2023-10-14 12:59:53 -04:00
Mike Alfare
a6f8e427aa add relation config factory for encapsulating relation config methods 2023-10-14 12:57:01 -04:00
Mike Alfare
44b3c70f79 add global F401 ignore for __init__.py files 2023-10-14 12:56:53 -04:00
Mike Alfare
6e4a960770 add method for getting the relation config and condition on auto_refresh before manually refreshing in the materialization 2023-10-13 15:18:52 -04:00
Mike Alfare
aba3c0315b add comments explaining how postgres last refreshed date works for materialized views 2023-10-12 20:47:49 -04:00
Mike Alfare
6499a6884c revert unintended whitespace change 2023-10-12 20:44:52 -04:00
Mike Alfare
e3b986e56d implement autorefresh test for dbt-postgres 2023-10-12 20:38:23 -04:00
Mike Alfare
99296fe424 add test framework for interaction between auto refresh and running dbt with no changes 2023-10-12 19:44:57 -04:00
Mike Alfare
f67f6bebff add a stock exception for non-implemented test utilities 2023-10-12 19:43:50 -04:00
Mike Alfare
4cc726f6e5 add a stock exception for non-implemented test utilities 2023-10-12 19:35:40 -04:00
Mike Alfare
ea1b44de4d add test to ensure no changes are made to the database when no changes are detected 2023-10-12 19:28:56 -04:00
Mike Alfare
7544382104 changelog 2023-10-12 12:29:34 -04:00
18 changed files with 335 additions and 63 deletions

View File

@@ -0,0 +1,6 @@
kind: Features
body: Optimize refreshing materialized views when autorefreshed
time: 2023-10-12T12:29:17.705373-04:00
custom:
Author: mikealfare
Issue: "6911"

View File

@@ -10,3 +10,5 @@ ignore =
E741
E501 # long line checking is done in black
exclude = test/
per-file-ignores =
*/__init__.py: F401

View File

@@ -22,6 +22,7 @@ from typing import (
)
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.adapters.relation_configs import RelationConfigFactory
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint
import agate
@@ -246,6 +247,18 @@ class BaseAdapter(metaclass=AdapterMeta):
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._macro_manifest_lazy: Optional[MacroManifest] = None
self.relation_config_factory = self._relation_config_factory()
def _relation_config_factory(self) -> RelationConfigFactory:
"""
This sets the default relation config factory in the init.
If you need to adjust the default settings, override this
returning an instance with the settings specific to your adapter.
See `dbt.adapters.relation_configs.factory.RelationConfigFactory`
for more information regarding these settings.
"""
return RelationConfigFactory()
###
# Methods that pass through to the connection manager

View File

@@ -1,12 +1,16 @@
from dbt.adapters.relation_configs.config_base import ( # noqa: F401
from dbt.adapters.relation_configs._factory import RelationConfigFactory
from dbt.adapters.relation_configs._materialized_view import (
MaterializedViewRelationConfig,
)
from dbt.adapters.relation_configs.config_base import (
RelationConfigBase,
RelationResults,
)
from dbt.adapters.relation_configs.config_change import ( # noqa: F401
from dbt.adapters.relation_configs.config_change import (
RelationConfigChangeAction,
RelationConfigChange,
)
from dbt.adapters.relation_configs.config_validation import ( # noqa: F401
from dbt.adapters.relation_configs.config_validation import (
RelationConfigValidationMixin,
RelationConfigValidationRule,
)

View File

@@ -0,0 +1,58 @@
from typing import Dict, Type
from dbt.contracts.graph.nodes import ParsedNode
from dbt.contracts.relation import RelationType
from dbt.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.relation_configs.config_base import RelationConfigBase
from dbt.adapters.relation_configs._materialized_view import MaterializedViewRelationConfig
class RelationConfigFactory:
"""
This provides a way to work with relation configs both in the adapter and in the jinja context.
This factory comes with a default set of settings which can be overridden in BaseAdapter.
Args:
relation_types: an enum that contains all possible relation types for this adapter
this is generally `RelationType`, but there are cases where an adapter may override
`RelationType` to include more options or exclude options
relation_configs: a map from a relation_type to a relation_config
this is generally only overridden if `relation_types` is also overridden
"""
def __init__(self, **kwargs):
# the `StrEnum` class will generally be `RelationType`, however this allows for extending that Enum
self.relation_types: Type[StrEnum] = kwargs.get("relation_types", RelationType)
self.relation_configs: Dict[StrEnum, Type[RelationConfigBase]] = kwargs.get(
"relation_configs",
{
RelationType.MaterializedView: MaterializedViewRelationConfig,
},
)
try:
for relation_type in self.relation_configs.keys():
self.relation_types(relation_type)
except ValueError:
raise DbtRuntimeError(
f"Received relation configs for {relation_type} " # noqa
f"but these relation types are not registered on this factory.\n"
f" registered relation types: {', '.join(self.relation_types)}\n"
)
def make_from_node(self, node: ParsedNode) -> RelationConfigBase:
relation_type = self.relation_types(node.config.materialized)
relation_config = self._relation_config(relation_type)
return relation_config.from_node(node)
def _relation_config(self, relation_type: StrEnum) -> Type[RelationConfigBase]:
if relation := self.relation_configs.get(relation_type):
return relation
raise DbtRuntimeError(
f"This factory does not have a relation config for this type.\n"
f" received: {relation_type}\n"
f" options: {', '.join(t for t in self.relation_configs.keys())}\n"
)

View File

@@ -0,0 +1,7 @@
from dbt.adapters.relation_configs.config_base import RelationConfigBase
class MaterializedViewRelationConfig(RelationConfigBase):
@property
def auto_refresh(self) -> bool:
return False

View File

@@ -1,7 +1,9 @@
from dataclasses import dataclass
from typing import Union, Dict
from typing import Any, Dict, Union
from typing_extensions import Self
import agate
from dbt.contracts.graph.nodes import ModelNode, ParsedNode
from dbt.utils import filter_null_values
@@ -24,7 +26,7 @@ RelationResults = Dict[str, Union[agate.Row, agate.Table]]
@dataclass(frozen=True)
class RelationConfigBase:
@classmethod
def from_dict(cls, kwargs_dict) -> "RelationConfigBase":
def from_dict(cls, kwargs_dict: Dict[str, Any]) -> Self:
"""
This assumes the subclass of `RelationConfigBase` is flat, in the sense that no attribute is
itself another subclass of `RelationConfigBase`. If that's not the case, this should be overriden
@@ -37,8 +39,53 @@ class RelationConfigBase:
"""
return cls(**filter_null_values(kwargs_dict)) # type: ignore
###
# Parser for internal nodes, from dbt
###
@classmethod
def _not_implemented_error(cls) -> NotImplementedError:
return NotImplementedError(
"This relation type has not been fully configured for this adapter."
def from_node(cls, node: ParsedNode) -> Self:
config_dict = cls.parse_node(node)
return cls.from_dict(config_dict)
@classmethod
def from_model_node(cls, model_node: ModelNode) -> Self:
# this method is being deprecated in favor of the more generic `from_node`
return cls.from_node(model_node)
@classmethod
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
# this method was originally implemented as `parse_model_node`
if hasattr(cls, "parse_model_node"):
return cls.parse_model_node(node)
return {}
###
# Parser for database results, generally used with `SQLAdapter`
###
@classmethod
def from_relation_results(cls, relation_results: RelationResults) -> Self:
config_dict = cls.parse_relation_results(relation_results)
return cls.from_dict(config_dict)
@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_relation_results` has not been implemented for this relation_type."
)
###
# Parser for api results, generally used with `BaseAdapter`
###
@classmethod
def from_api_results(cls, api_results: Any) -> Self:
config_dict = cls.parse_api_results(api_results)
return cls.from_dict(config_dict)
@classmethod
def parse_api_results(cls, api_results: Any) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_api_results` has not been implemented for this relation_type."
)

View File

@@ -15,9 +15,9 @@ class RelationConfigChangeAction(StrEnum):
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class RelationConfigChange(RelationConfigBase, ABC):
action: RelationConfigChangeAction
context: Hashable # this is usually a RelationConfig, e.g. IndexConfig, but shouldn't be limited
context: Hashable # this is usually a RelationConfigBase, e.g. IndexConfig, but shouldn't be limited
@property
@abstractmethod
def requires_full_refresh(self) -> bool:
raise self._not_implemented_error()
return True

View File

@@ -14,6 +14,7 @@ from typing import (
)
from typing_extensions import Protocol
from dbt.adapters.base import BaseAdapter
from dbt.adapters.base.column import Column
from dbt.adapters.factory import get_adapter, get_adapter_package_names, get_adapter_type_names
from dbt.clients import agate_helper
@@ -107,7 +108,7 @@ class BaseDatabaseWrapper:
via a relation proxy.
"""
def __init__(self, adapter, namespace: MacroNamespace):
def __init__(self, adapter: BaseAdapter, namespace: MacroNamespace):
self._adapter = adapter
self.Relation = RelationProxy(adapter)
self._namespace = namespace
@@ -125,6 +126,10 @@ class BaseDatabaseWrapper:
def commit(self):
return self._adapter.commit_if_has_connection()
@property
def relation_config_factory(self):
return self._adapter.relation_config_factory
def _get_adapter_macro_prefixes(self) -> List[str]:
# order matters for dispatch:
# 1. current adapter

View File

@@ -65,7 +65,15 @@
{% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %}
{% if configuration_changes is none %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{% set relation_config = adapter.relation_config_factory.make_from_node(config.model) %}
{% if relation_config.auto_refresh %}
{% set build_sql = '' %}
{{ exceptions.warn(
"No configuration changes were identified on: `" ~ target_relation ~ "`. Continuing."
) }}
{% else %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{% endif %}
{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %}

View File

@@ -631,3 +631,11 @@ def get_model_file(project, relation: BaseRelation) -> str:
def set_model_file(project, relation: BaseRelation, model_sql: str):
write_file(model_sql, project.project_root, "models", f"{relation.name}.sql")
class UtilityMethodNotImplementedError(NotImplementedError):
def __int__(self, class_name: str, method_name: str, additional_message: Optional[str] = None):
message = f"To use this test, please implement `{class_name}`.`{method_name}`."
if additional_message:
message += f" {additional_message}"
super().__init__(message)

View File

@@ -59,9 +59,7 @@ class PostgresRelation(BaseRelation):
existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = PostgresMaterializedViewConfig.from_model_node(
runtime_config.model
)
new_materialized_view = PostgresMaterializedViewConfig.from_node(runtime_config.model)
config_change_collection.indexes = self._get_index_config_changes(
existing_materialized_view.indexes, new_materialized_view.indexes

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Set, FrozenSet
from typing import Any, Dict, FrozenSet, Set
import agate
from dbt.dataclass_schema import StrEnum
@@ -60,7 +60,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
}
@classmethod
def from_dict(cls, config_dict) -> "PostgresIndexConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> "PostgresIndexConfig":
# TODO: include the QuotePolicy instead of defaulting to lower()
kwargs_dict = {
"name": config_dict.get("name"),
@@ -74,7 +74,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
return index
@classmethod
def parse_model_node(cls, model_node_entry: dict) -> dict:
def parse_node(cls, model_node_entry: Dict[str, Any]) -> Dict[str, Any]:
config_dict = {
"column_names": set(model_node_entry.get("columns", set())),
"unique": model_node_entry.get("unique"),
@@ -83,7 +83,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
return config_dict
@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]:
config_dict = {
"name": relation_results_entry.get("name"),
"column_names": set(relation_results_entry.get("column_names", "").split(",")),
@@ -93,7 +93,7 @@ class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
return config_dict
@property
def as_node_config(self) -> dict:
def as_node_config(self) -> Dict[str, Any]:
"""
Returns: a dictionary that can be passed into `get_create_index_sql()`
"""

View File

@@ -1,14 +1,14 @@
from dataclasses import dataclass, field
from typing import Set, FrozenSet, List
from typing import Any, Dict, FrozenSet, List, Set
import agate
from dbt.adapters.relation_configs import (
RelationConfigBase,
MaterializedViewRelationConfig,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.graph.nodes import ParsedNode
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.postgres.relation_configs.constants import MAX_CHARACTERS_IN_IDENTIFIER
@@ -19,14 +19,15 @@ from dbt.adapters.postgres.relation_configs.index import (
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidationMixin):
class PostgresMaterializedViewConfig(
MaterializedViewRelationConfig, RelationConfigValidationMixin
):
"""
This config follows the specs found here:
https://www.postgresql.org/docs/current/sql-creatematerializedview.html
The following parameters are configurable by dbt:
- table_name: name of the materialized view
- query: the query that defines the view
- indexes: the collection (set) of indexes on the materialized view
Applicable defaults for non-configurable parameters:
@@ -36,7 +37,6 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
"""
table_name: str = ""
query: str = ""
indexes: FrozenSet[PostgresIndexConfig] = field(default_factory=frozenset)
@property
@@ -54,10 +54,9 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
}
@classmethod
def from_dict(cls, config_dict: dict) -> "PostgresMaterializedViewConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> "PostgresMaterializedViewConfig":
kwargs_dict = {
"table_name": config_dict.get("table_name"),
"query": config_dict.get("query"),
"indexes": frozenset(
PostgresIndexConfig.from_dict(index) for index in config_dict.get("indexes", {})
),
@@ -66,31 +65,16 @@ class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidatio
return materialized_view
@classmethod
def from_model_node(cls, model_node: ModelNode) -> "PostgresMaterializedViewConfig":
materialized_view_config = cls.parse_model_node(model_node)
materialized_view = cls.from_dict(materialized_view_config)
return materialized_view
@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
indexes: List[dict] = model_node.config.extra.get("indexes", [])
def parse_node(cls, node: ParsedNode) -> Dict[str, Any]:
indexes: List[dict] = node.config.extra.get("indexes", [])
config_dict = {
"table_name": model_node.identifier,
"query": model_node.compiled_code,
"indexes": [PostgresIndexConfig.parse_model_node(index) for index in indexes],
"table_name": node.identifier,
"indexes": [PostgresIndexConfig.parse_node(index) for index in indexes],
}
return config_dict
@classmethod
def from_relation_results(
cls, relation_results: RelationResults
) -> "PostgresMaterializedViewConfig":
materialized_view_config = cls.parse_relation_results(relation_results)
materialized_view = cls.from_dict(materialized_view_config)
return materialized_view
@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
indexes: agate.Table = relation_results.get("indexes", agate.Table(rows={}))
config_dict = {
"indexes": [

View File

@@ -0,0 +1,67 @@
from datetime import datetime
from typing import Tuple
import pytest
from dbt.tests.util import UtilityMethodNotImplementedError, run_dbt
class MaterializedViewAutoRefreshNoChanges:
"""
When dbt runs on a materialized view that has no configuration changes, it can default
to manually refresh the materialized view. In order to optimize cost and performance,
there is no need to run a manual refresh if one is already scheduled due to
auto refresh being turned on. Therefore, we should ensure that a manual refresh
is only issued if the materialized view does not refresh automatically, and dbt
otherwise does nothing.
To implement:
- override `seeds` and provide a seed for your materialized views
- override `models` and provide a materialized view auto refresh turned off called "auto_refresh_off.sql"
- override `last_refreshed` with logic that inspects the platform for the last refresh timestamp
If your platform supports auto refresh:
- in `models`, provide another materialized view with auto refresh turned on called "auto_refresh_on.sql"
If your platform does not support auto refresh:
- override `test_manual_refresh_does_not_occur_when_auto_refresh_is_on` and mark it with `@pytest.mark.skip`
"""
@pytest.fixture(scope="class", autouse=True)
def seeds(self):
yield {"my_seed.csv": ""}
@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {"auto_refresh_on.sql": "", "auto_refresh_off.sql": ""}
@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])
yield
def last_refreshed(self, project, materialized_view: str) -> datetime:
raise UtilityMethodNotImplementedError(
"MaterializedViewAutoRefreshNoChanges", "last_refreshed"
)
def run_dbt_with_no_changes_and_capture_refresh_times(
self, project, materialized_view: str
) -> Tuple[datetime, datetime]:
last_refresh = self.last_refreshed(project, materialized_view)
run_dbt(["run", "--models", materialized_view])
next_refresh = self.last_refreshed(project, materialized_view)
return last_refresh, next_refresh
def test_manual_refresh_occurs_when_auto_refresh_is_off(self, project):
last_refresh, next_refresh = self.run_dbt_with_no_changes_and_capture_refresh_times(
project, "auto_refresh_off"
)
assert next_refresh > last_refresh
def test_manual_refresh_does_not_occur_when_auto_refresh_is_on(self, project):
last_refresh, next_refresh = self.run_dbt_with_no_changes_and_capture_refresh_times(
project, "auto_refresh_on"
)
assert next_refresh == last_refresh

View File

@@ -6,6 +6,7 @@ from dbt.adapters.base.relation import BaseRelation
from dbt.contracts.graph.model_config import OnConfigurationChangeOption
from dbt.contracts.relation import RelationType
from dbt.tests.util import (
UtilityMethodNotImplementedError,
assert_message_in_logs,
get_model_file,
run_dbt,
@@ -45,10 +46,7 @@ class MaterializedViewChanges:
"""
Check the starting state; this should align with `files.MY_MATERIALIZED_VIEW`.
"""
raise NotImplementedError(
"To use this test, please implement `check_start_state`,"
" inherited from `MaterializedViewsChanges`."
)
raise UtilityMethodNotImplementedError("MaterializedViewsChanges", "check_start_state")
@staticmethod
def change_config_via_alter(project, materialized_view):
@@ -65,10 +63,8 @@ class MaterializedViewChanges:
"""
Verify that the changes in `change_config_via_alter` were applied.
"""
raise NotImplementedError(
"To use this test, please implement `change_config_via_alter` and"
" `check_state_alter_change_is_applied`,"
" inherited from `MaterializedViewsChanges`."
raise UtilityMethodNotImplementedError(
"MaterializedViewsChanges", "change_config_via_alter"
)
@staticmethod
@@ -87,17 +83,13 @@ class MaterializedViewChanges:
Verify that the changes in `change_config_via_replace` were applied.
This is independent of `check_state_alter_change_is_applied`.
"""
raise NotImplementedError(
"To use this test, please implement `change_config_via_replace` and"
" `check_state_replace_change_is_applied`,"
" inherited from `MaterializedViewsChanges`."
raise UtilityMethodNotImplementedError(
"MaterializedViewsChanges", "change_config_via_replace"
)
@staticmethod
def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
raise NotImplementedError(
"To use this test, please implement `query_relation_type`, inherited from `MaterializedViewsChanges`."
)
raise UtilityMethodNotImplementedError("MaterializedViewsChanges", "query_relation_type")
"""
Configure these if needed
@@ -152,6 +144,16 @@ class MaterializedViewChanges:
assert_message_in_logs(f"Applying ALTER to: {my_materialized_view}", logs, False)
assert_message_in_logs(f"Applying REPLACE to: {my_materialized_view}", logs)
def test_no_alter_and_no_replace_occurs_with_no_changes(self, project, my_materialized_view):
# no changes were made to the model
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_materialized_view.identifier]
)
# no changes were submitted to the database
assert self.query_relation_type(project, my_materialized_view) == "materialized_view"
assert_message_in_logs(f"Applying ALTER to: {my_materialized_view}", logs, False)
assert_message_in_logs(f"Applying REPLACE to: {my_materialized_view}", logs, False)
class MaterializedViewChangesApplyMixin:
@pytest.fixture(scope="class")

View File

@@ -0,0 +1,27 @@
SEED__MY_SEED = """
id,value
1,100
2,200
3,300
""".strip()
# postgres does not let you inspect the database for a last refresh date
# we need to create our own by adding a last_modified date to materialized view itself
MODEL__MY_MATERIALIZED_VIEW = """
{{ config(
materialized="materialized_view"
) }}
select *, now() as last_refreshed from {{ ref('my_seed') }}
"""
# see above for why we are just querying the table instead of a metadata table
MACRO__LAST_REFRESH = """
{% macro postgres__test__last_refresh(schema, identifier) %}
{% set _sql %}
select max(last_refreshed) as last_refresh from {{ schema }}.{{ identifier }}
{% endset %}
{% do return(run_query(_sql)) %}
{% endmacro %}
"""

View File

@@ -0,0 +1,36 @@
from datetime import datetime
import pytest
from tests.adapter.dbt.tests.adapter.materialized_view.auto_refresh import (
MaterializedViewAutoRefreshNoChanges,
)
from tests.functional.materializations.materialized_view_tests import files
class TestMaterializedViewAutoRefreshNoChanges(MaterializedViewAutoRefreshNoChanges):
@pytest.fixture(scope="class", autouse=True)
def seeds(self):
yield {"my_seed.csv": files.SEED__MY_SEED}
@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {"auto_refresh_off.sql": files.MODEL__MY_MATERIALIZED_VIEW}
@pytest.fixture(scope="class", autouse=True)
def macros(self):
yield {"postgres__test__last_refresh.sql": files.MACRO__LAST_REFRESH}
def last_refreshed(self, project, materialized_view: str) -> datetime:
with project.adapter.connection_named("__test"):
kwargs = {"schema": project.test_schema, "identifier": materialized_view}
last_refresh_results = project.adapter.execute_macro(
"postgres__test__last_refresh", kwargs=kwargs
)
last_refresh = last_refresh_results[0].get("last_refresh")
return last_refresh
@pytest.mark.skip("Postgres does not support auto refresh.")
def test_manual_refresh_does_not_occur_when_auto_refresh_is_on(self, project):
pass