Add support for Iceberg table format in Dynamic Tables (#1183)

* add support for iceberg dynamic tables
* remove is_dynamic-related guards as that is ga now
* simplify dynamic table testing
* add iceberg dynamic tables to existing dynamic table tests
* add standard incremental tables into the relation swap scenarios
* account for the fact that snowflake does not support renaming iceberg relations
* account for all scenarios when swapping relation types, including those which currently require a full refresh
* make it clearer which scenarios are included in each run and why by pulling the criteria into one function

---------

Co-authored-by: Mila Page <67295367+VersusFacit@users.noreply.github.com>
This commit is contained in:
Mike Alfare
2024-09-27 15:52:04 -04:00
committed by GitHub
parent 423111f5c7
commit 0521395a30
15 changed files with 593 additions and 66 deletions

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table format in Dynamic Tables
time: 2024-09-17T10:05:05.609859-04:00
custom:
Author: mikealfare
Issue: "1183"

View File

@@ -17,6 +17,7 @@ from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error
from dbt.adapters.snowflake.relation_configs import (
SnowflakeCatalogConfigChange,
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
@@ -114,6 +115,12 @@ class SnowflakeRelation(BaseRelation):
context=new_dynamic_table.refresh_mode,
)
if new_dynamic_table.catalog != existing_dynamic_table.catalog:
config_change_collection.catalog = SnowflakeCatalogConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.catalog,
)
if config_change_collection.has_changes:
return config_change_collection
return None
@@ -132,6 +139,14 @@ class SnowflakeRelation(BaseRelation):
return self.replace_path(**path_part_map)
@property
def can_be_renamed(self) -> bool:
"""
Standard tables and dynamic tables can be renamed, but Snowflake does not support renaming iceberg relations.
The iceberg standard does support renaming, so this may change in the future.
"""
return self.type in self.renameable_relations and not self.is_iceberg_format
def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as

View File

@@ -1,3 +1,7 @@
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
@@ -5,9 +9,9 @@ from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat

View File

@@ -0,0 +1,123 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TYPE_CHECKING, Set
if TYPE_CHECKING:
import agate
from dbt.adapters.relation_configs import (
RelationConfigChange,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtConfigError
from typing_extensions import Self
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
The following parameters are configurable by dbt:
- table_format: format for interfacing with the table, e.g. default, iceberg
- external_volume: name of the external volume in Snowflake
- base_location: the directory within the external volume that contains the data
*Note*: This directory cant be changed after you create a table.
The following parameters are not currently configurable by dbt:
- name: snowflake
"""
table_format: Optional[TableFormat] = TableFormat.default()
name: Optional[str] = "SNOWFLAKE"
external_volume: Optional[str] = None
base_location: Optional[str] = None
@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.base_location is not None),
DbtConfigError("Please provide a `base_location` when using iceberg"),
),
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"),
DbtConfigError(
"Only Snowflake catalogs are currently supported when using iceberg"
),
),
}
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": config_dict.get("name"),
"external_volume": config_dict.get("external_volume"),
"base_location": config_dict.get("base_location"),
}
if table_format := config_dict.get("table_format"):
kwargs_dict["table_format"] = TableFormat(table_format)
return super().from_dict(kwargs_dict)
@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
if relation_config.config.extra.get("table_format") is None:
return {}
config_dict = {
"table_format": relation_config.config.extra.get("table_format"),
"name": "SNOWFLAKE", # this is not currently configurable
}
if external_volume := relation_config.config.extra.get("external_volume"):
config_dict["external_volume"] = external_volume
if base_location := relation_config.config.extra.get("base_location_subpath"):
config_dict["base_location"] = base_location
return config_dict
@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}
if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}
# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}
return config_dict
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfigChange(RelationConfigChange):
context: Optional[SnowflakeCatalogConfig] = None
@property
def requires_full_refresh(self) -> bool:
return True

View File

@@ -8,6 +8,11 @@ from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard lib
from typing_extensions import Self
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)
if TYPE_CHECKING:
import agate
@@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()
@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
@@ -69,12 +75,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}
dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
return dynamic_table
return super().from_dict(kwargs_dict)
@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
@@ -85,18 +91,19 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
}
if refresh_mode := relation_config.config.extra.get("refresh_mode"):
config_dict.update(refresh_mode=refresh_mode.upper())
config_dict["refresh_mode"] = refresh_mode.upper()
if initialize := relation_config.config.extra.get("initialize"):
config_dict.update(initialize=initialize.upper())
config_dict["initialize"] = initialize.upper()
return config_dict
@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0]
config_dict = {
@@ -106,6 +113,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}
@@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None
catalog: Optional[SnowflakeCatalogConfigChange] = None
@property
def requires_full_refresh(self) -> bool:
@@ -157,9 +166,10 @@ class SnowflakeDynamicTableConfigChangeset:
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
self.catalog.requires_full_refresh if self.catalog else False,
]
)
@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog])

View File

@@ -1,4 +1,5 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self
class TableFormat(StrEnum):
@@ -10,5 +11,9 @@ class TableFormat(StrEnum):
DEFAULT = "default"
ICEBERG = "iceberg"
@classmethod
def default(cls) -> Self:
return cls("default")
def __str__(self):
return self.value

View File

@@ -1,16 +1,83 @@
{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic table
--
-- Args:
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Globals:
-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig
-- Returns:
-- A valid DDL statement which will result in a new dynamic table.
-#}
{%- set dynamic_table = relation.from_config(config.model) -%}
{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
{%- endif -%}
{%- endmacro %}
{% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a standard dynamic table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic standard table.
-#}
create dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)
{%- endmacro %}
{% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic iceberg table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = {{ dynamic_table.catalog.base_location }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)

View File

@@ -1,4 +1,14 @@
{% macro snowflake__describe_dynamic_table(relation) %}
{#-
-- Get all relevant metadata about a dynamic table
--
-- Args:
-- - relation: SnowflakeRelation - the relation to describe
-- Returns:
-- A dictionary with one or two entries depending on whether iceberg is enabled:
-- - dynamic_table: the metadata associated with a standard dynamic table
-- - catalog: the metadata associated with the iceberg catalog
-#}
{%- set _dynamic_table_sql -%}
show dynamic tables
like '{{ relation.identifier }}'
@@ -14,7 +24,32 @@
"refresh_mode"
from table(result_scan(last_query_id()))
{%- endset %}
{% set _dynamic_table = run_query(_dynamic_table_sql) %}
{% set results = {'dynamic_table': run_query(_dynamic_table_sql)} %}
{% do return({'dynamic_table': _dynamic_table}) %}
{% if adapter.behavior.enable_iceberg_materializations.no_warn %}
{% set _ = results.update({'catalog': run_query(_get_describe_iceberg_catalog_sql(relation))}) %}
{% endif %}
{% do return(results) %}
{% endmacro %}
{% macro _get_describe_iceberg_catalog_sql(relation) %}
{#-
-- Produce DQL that returns all relevant metadata about an iceberg catalog
--
-- Args:
-- - relation: SnowflakeRelation - the relation to describe
-- Returns:
-- A valid DQL statement that will return metadata associated with an iceberg catalog
-#}
show iceberg tables
like '{{ relation.identifier }}'
in schema {{ relation.database }}.{{ relation.schema }}
;
select
"catalog_name",
"external_volume_name",
"base_location"
from table(result_scan(last_query_id()))
{% endmacro %}

View File

@@ -1,16 +1,82 @@
{% macro snowflake__get_replace_dynamic_table_sql(relation, sql) -%}
{#-
-- Produce DDL that replaces a dynamic table with a new dynamic table
--
-- Args:
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Globals:
-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig
-- Returns:
-- A valid DDL statement which will result in a new dynamic table.
-#}
{%- set dynamic_table = relation.from_config(config.model) -%}
{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
{%- endif -%}
{%- endmacro %}
{% macro _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that replaces a standard dynamic table with a new standard dynamic table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic standard table.
-#}
create or replace dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)
{%- endmacro %}
{% macro _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that replaces a dynamic iceberg table with a new dynamic iceberg table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}
create or replace dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = {{ dynamic_table.catalog.base_location }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)

View File

@@ -0,0 +1,14 @@
{% macro optional(name, value, quote_char = '') %}
{#-
-- Insert optional DDL parameters only when their value is provided; makes DDL statements more readable
--
-- Args:
-- - name: the name of the DDL option
-- - value: the value of the DDL option, may be None
-- - quote_char: the quote character to use (e.g. string), leave blank if unnecessary (e.g. integer or bool)
-- Returns:
-- If the value is not None (e.g. provided by the user), return the option setting DDL
-- If the value is None, return an empty string
-#}
{% if value is not none %}{{ name }} = {{ quote_char }}{{ value }}{{ quote_char }}{% endif %}
{% endmacro %}

View File

@@ -10,7 +10,7 @@ DYNAMIC_TABLE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
target_lag='2 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
@@ -28,11 +28,25 @@ select * from {{ ref('my_seed') }}
"""
DYNAMIC_ICEBERG_TABLE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='INCREMENTAL',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""
DYNAMIC_TABLE_ALTER = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='5 minutes',
target_lag='5 minutes',
refresh_mode='INCREMENTAL',
) }}
select * from {{ ref('my_seed') }}
@@ -43,8 +57,36 @@ DYNAMIC_TABLE_REPLACE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
target_lag='2 minutes',
refresh_mode='FULL',
) }}
select * from {{ ref('my_seed') }}
"""
DYNAMIC_ICEBERG_TABLE_ALTER = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='5 minutes',
refresh_mode='INCREMENTAL',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""
DYNAMIC_ICEBERG_TABLE_REPLACE = """
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='2 minutes',
refresh_mode='FULL',
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_subpath="subpath",
) }}
select * from {{ ref('my_seed') }}
"""

View File

@@ -7,6 +7,7 @@ from tests.functional.utils import query_relation_type
class TestBasic:
iceberg: bool = False
@pytest.fixture(scope="class", autouse=True)
def seeds(self):
@@ -14,10 +15,17 @@ class TestBasic:
@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
my_models = {
"my_dynamic_table.sql": models.DYNAMIC_TABLE,
"my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM,
}
if self.iceberg:
my_models.update(
{
"my_dynamic_iceberg_table.sql": models.DYNAMIC_ICEBERG_TABLE,
}
)
yield my_models
@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
@@ -28,3 +36,13 @@ class TestBasic:
run_dbt(["run", "--full-refresh"])
assert query_relation_type(project, "my_dynamic_table") == "dynamic_table"
assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table"
if self.iceberg:
assert query_relation_type(project, "my_dynamic_iceberg_table") == "dynamic_table"
class TestBasicIcebergOn(TestBasic):
iceberg = True
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}

View File

@@ -7,6 +7,7 @@ from tests.functional.utils import describe_dynamic_table, update_model
class Changes:
iceberg: bool = False
@pytest.fixture(scope="class", autouse=True)
def seeds(self):
@@ -14,10 +15,18 @@ class Changes:
@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
my_models = {
"dynamic_table_alter.sql": models.DYNAMIC_TABLE,
"dynamic_table_replace.sql": models.DYNAMIC_TABLE,
}
if self.iceberg:
my_models.update(
{
"dynamic_table_iceberg_alter.sql": models.DYNAMIC_ICEBERG_TABLE,
"dynamic_table_iceberg_replace.sql": models.DYNAMIC_ICEBERG_TABLE,
}
)
yield my_models
@pytest.fixture(scope="function", autouse=True)
def setup_class(self, project):
@@ -33,14 +42,23 @@ class Changes:
update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE)
if self.iceberg:
update_model(
project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE_ALTER
)
update_model(
project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE_REPLACE
)
yield
update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE)
update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE)
if self.iceberg:
update_model(project, "dynamic_table_iceberg_alter", models.DYNAMIC_ICEBERG_TABLE)
update_model(project, "dynamic_table_iceberg_replace", models.DYNAMIC_ICEBERG_TABLE)
@staticmethod
def assert_changes_are_applied(project):
def assert_changes_are_applied(self, project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "5 minutes" # this updated
@@ -51,8 +69,18 @@ class Changes:
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "FULL" # this updated
@staticmethod
def assert_changes_are_not_applied(project):
if self.iceberg:
altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "5 minutes" # this updated
assert altered_iceberg.refresh_mode == "INCREMENTAL"
replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert replaced_iceberg.refresh_mode == "FULL" # this updated
def assert_changes_are_not_applied(self, project):
altered = describe_dynamic_table(project, "dynamic_table_alter")
assert altered.snowflake_warehouse == "DBT_TESTING"
assert altered.target_lag == "2 minutes" # this would have updated, but didn't
@@ -63,6 +91,19 @@ class Changes:
assert replaced.target_lag == "2 minutes"
assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't
if self.iceberg:
altered_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_alter")
assert altered_iceberg.snowflake_warehouse == "DBT_TESTING"
assert altered_iceberg.target_lag == "2 minutes" # this would have updated, but didn't
assert altered_iceberg.refresh_mode == "INCREMENTAL"
replaced_iceberg = describe_dynamic_table(project, "dynamic_table_iceberg_replace")
assert replaced_iceberg.snowflake_warehouse == "DBT_TESTING"
assert replaced_iceberg.target_lag == "2 minutes"
assert (
replaced_iceberg.refresh_mode == "INCREMENTAL"
) # this would have updated, but didn't
def test_full_refresh_is_always_successful(self, project):
# this always passes and always changes the configuration, regardless of on_configuration_change
# and regardless of whether the changes require a replace versus an alter
@@ -81,6 +122,17 @@ class TestChangesApply(Changes):
self.assert_changes_are_applied(project)
class TestChangesApplyIcebergOn(TestChangesApply):
iceberg = True
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "apply"},
"flags": {"enable_iceberg_materializations": True},
}
class TestChangesContinue(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
@@ -92,6 +144,17 @@ class TestChangesContinue(Changes):
self.assert_changes_are_not_applied(project)
class TestChangesContinueIcebergOn(TestChangesContinue):
iceberg = True
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "continue"},
"flags": {"enable_iceberg_materializations": True},
}
class TestChangesFail(Changes):
@pytest.fixture(scope="class")
def project_config_update(self):
@@ -101,3 +164,14 @@ class TestChangesFail(Changes):
# this fails and does not change the configuration
run_dbt(["run"], expect_pass=False)
self.assert_changes_are_not_applied(project)
class TestChangesFailIcebergOn(TestChangesFail):
iceberg = True
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {"on_configuration_change": "fail"},
"flags": {"enable_iceberg_materializations": True},
}

View File

@@ -55,7 +55,7 @@ ICEBERG_TABLE = """
select * from {{ ref('my_seed') }}
"""
ICEBERG_INCREMENTAL_TABLE = """
INCREMENTAL_ICEBERG_TABLE = """
{{ config(
materialized='incremental',
table_format='iceberg',
@@ -65,3 +65,13 @@ ICEBERG_INCREMENTAL_TABLE = """
) }}
select * from {{ ref('my_seed') }}
"""
INCREMENTAL_TABLE = """
{{ config(
materialized='incremental',
incremental_strategy='append',
unique_key="id",
) }}
select * from {{ ref('my_seed') }}
"""

View File

@@ -13,16 +13,25 @@ from tests.functional.utils import describe_dynamic_table, query_relation_type,
class Model:
model: str
relation_type: str
table_format: Optional[str] = None
incremental: Optional[bool] = None
table_format: Optional[str] = "default"
is_incremental: Optional[bool] = False
@property
def name(self) -> str:
name = f"{self.relation_type}"
if self.table_format:
name += f"_{self.table_format}"
if self.is_incremental:
name = f"{self.relation_type}_{self.table_format}_incremental"
else:
name = f"{self.relation_type}_{self.table_format}"
return name
@property
def is_iceberg(self) -> bool:
return self.table_format == "iceberg"
@property
def is_standard_table(self) -> bool:
return self.relation_type == "table" and not self.is_incremental
@dataclass
class Scenario:
@@ -37,26 +46,47 @@ class Scenario:
def error_message(self) -> str:
return f"Failed when migrating from: {self.initial.name} to: {self.final.name}"
@property
def uses_iceberg(self) -> bool:
return any([self.initial.is_iceberg, self.final.is_iceberg])
relations = [
Model(models.VIEW, "view"),
Model(models.TABLE, "table", "default"),
# to be activated upon merge of dynamic table support PR
# Model(models.DYNAMIC_TABLE, "dynamic_table", "default"),
# Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"),
Model(models.INCREMENTAL_TABLE, "table", "default", is_incremental=True),
Model(models.DYNAMIC_TABLE, "dynamic_table", "default"),
Model(models.ICEBERG_TABLE, "table", "iceberg"),
Model(models.ICEBERG_INCREMENTAL_TABLE, "table", "iceberg", incremental=True),
Model(models.INCREMENTAL_ICEBERG_TABLE, "table", "iceberg", is_incremental=True),
Model(models.DYNAMIC_ICEBERG_TABLE, "dynamic_table", "iceberg"),
]
scenarios = [Scenario(*scenario) for scenario in product(relations, relations)]
def requires_full_refresh(scenario) -> bool:
return any(
[
# we can only swap incremental to table and back if both are iceberg
scenario.initial.is_incremental
and scenario.final.is_standard_table
and scenario.initial.table_format != scenario.final.table_format,
scenario.initial.is_standard_table
and scenario.final.is_incremental
and scenario.initial.table_format != scenario.final.table_format,
# we can't swap from an incremental to a dynamic table because the materialization does not handle this case
scenario.initial.relation_type == "dynamic_table" and scenario.final.is_incremental,
]
)
class TestRelationTypeChange:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": False}}
@staticmethod
def include(scenario) -> bool:
return (
scenario.initial.table_format != "iceberg" and scenario.final.table_format != "iceberg"
)
return not scenario.uses_iceberg and not requires_full_refresh(scenario)
@pytest.fixture(scope="class", autouse=True)
def seeds(self):
@@ -77,7 +107,11 @@ class TestRelationTypeChange:
for scenario in scenarios:
if self.include(scenario):
update_model(project, scenario.name, scenario.final.model)
run_dbt(["run"])
# allow for dbt to fail so that we can see which scenarios pass and which scenarios fail
try:
run_dbt(["run"], expect_pass=False)
except Exception:
pass
@pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios])
def test_replace(self, project, scenario):
@@ -91,9 +125,17 @@ class TestRelationTypeChange:
pytest.skip()
"""
Upon adding the logic needed for seamless transitions to and from incremental models without data loss, we can coalesce these test cases.
"""
class TestRelationTypeChangeFullRefreshRequired(TestRelationTypeChange):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {"enable_iceberg_materializations": False},
"models": {"full_refresh": True},
}
@staticmethod
def include(scenario) -> bool:
return not scenario.uses_iceberg and requires_full_refresh(scenario)
class TestRelationTypeChangeIcebergOn(TestRelationTypeChange):
@@ -103,21 +145,17 @@ class TestRelationTypeChangeIcebergOn(TestRelationTypeChange):
@staticmethod
def include(scenario) -> bool:
return any(
(
# scenario 1: Everything that doesn't include incremental relations on Iceberg
(
(
scenario.initial.table_format == "iceberg"
or scenario.final.table_format == "iceberg"
)
and not scenario.initial.incremental
and not scenario.final.incremental
),
# scenario 2: Iceberg Incremental swaps allowed
(
scenario.initial.table_format == "iceberg"
and scenario.final.table_format == "iceberg"
),
)
)
return scenario.uses_iceberg and not requires_full_refresh(scenario)
class TestRelationTypeChangeIcebergOnFullRefreshRequired(TestRelationTypeChange):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {"enable_iceberg_materializations": True},
"models": {"full_refresh": True},
}
@staticmethod
def include(scenario) -> bool:
return scenario.uses_iceberg and requires_full_refresh(scenario)