Feature: Custom Iceberg base_location_root (#1289)

* update relation & add tests

* switch if/else pattern

* add change log

* add test for dynamic table with path and subpath

* modify base_location config

* update to base_location_root, include schema and table name in path

* resolve linting error (f-string nested double quotes)

* resolve linting error (move to new line)

* add unit tests & make final edits to functional tests

* lint unit test

* update changie

* update dt test cases for iceberg to be dynamic

---------

Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com>
This commit is contained in:
Leah Procopi
2025-01-24 11:29:16 -08:00
committed by GitHub
parent b687ac477a
commit 5d935eedba
6 changed files with 163 additions and 3 deletions

View File

@@ -0,0 +1,6 @@
kind: Features
body: Added support for custom iceberg base_location_root
time: 2025-01-13T13:34:14.326047-08:00
custom:
Author: LProcopi15
Issue: "1284"

View File

@@ -54,6 +54,7 @@ class SnowflakeConfig(AdapterConfig):
# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_root: Optional[str] = None
base_location_subpath: Optional[str] = None

View File

@@ -204,7 +204,10 @@ class SnowflakeRelation(BaseRelation):
return ""
def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"
# If the base_location_root config is supplied, overwrite the default value ("_dbt/")
base_location: str = (
f"{config.get('base_location_root', '_dbt')}/{self.schema}/{self.name}"
)
if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

View File

@@ -23,6 +23,37 @@ _MODEL_BASIC_ICEBERG_MODEL = """
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH = """
{{
config(
transient = "true",
materialized = "table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
)
}}
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH = """
{{
config(
transient = "true",
materialized = "table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
base_location_subpath="subpath",
)
}}
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_DYNAMIC_TABLE_MODEL = """
{{ config(
materialized='dynamic_table',
@@ -36,6 +67,38 @@ _MODEL_BASIC_DYNAMIC_TABLE_MODEL = """
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH = """
{{
config(
transient = "transient",
materialized = "dynamic_table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
)
}}
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH = """
{{
config(
transient = "true",
materialized = "dynamic_table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
base_location_subpath='subpath',
)
}}
select * from {{ ref('first_table') }}
"""
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH = """
{{ config(
materialized='dynamic_table',

View File

@@ -7,7 +7,11 @@ from dbt.tests.util import run_dbt, rm_file, write_file
from tests.functional.iceberg.models import (
_MODEL_BASIC_TABLE_MODEL,
_MODEL_BASIC_ICEBERG_MODEL,
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH,
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
_MODEL_BUILT_ON_ICEBERG_TABLE,
_MODEL_TABLE_BEFORE_SWAP,
@@ -26,14 +30,18 @@ class TestIcebergTableBuilds:
return {
"first_table.sql": _MODEL_BASIC_TABLE_MODEL,
"iceberg_table.sql": _MODEL_BASIC_ICEBERG_MODEL,
"iceberg_tableb.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH,
"iceberg_tablec.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH,
"table_built_on_iceberg_table.sql": _MODEL_BUILT_ON_ICEBERG_TABLE,
"dynamic_table.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL,
"dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
"dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH,
"dynamic_tablec.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH,
"dynamic_tabled.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
}
def test_iceberg_tables_build_and_can_be_referred(self, project):
run_results = run_dbt()
assert len(run_results) == 5
assert len(run_results) == 9
class TestIcebergTableTypeBuildsOnExistingTable:

View File

@@ -0,0 +1,79 @@
import pytest
from dbt.adapters.snowflake.relation import SnowflakeRelation
@pytest.fixture
def iceberg_config() -> dict:
"""Fixture providing standard Iceberg configuration."""
return {
"schema": "my_schema",
"identifier": "my_table",
"external_volume": "s3_iceberg_snow",
"base_location_root": "root_path",
"base_location_subpath": "subpath",
}
def get_actual_base_location(config: dict[str, str]) -> str:
"""Get the actual base location from the configuration by parsing the DDL predicates."""
relation = SnowflakeRelation.create(
schema=config["schema"],
identifier=config["identifier"],
)
actual_ddl_predicates = relation.get_iceberg_ddl_options(config).strip()
actual_base_location = actual_ddl_predicates.split("base_location = ")[1]
return actual_base_location
def test_iceberg_path_and_subpath(iceberg_config: dict[str, str]):
"""Test when base_location_root and base_location_subpath are provided"""
expected_base_location = (
f"'{iceberg_config['base_location_root']}/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}/"
f"{iceberg_config['base_location_subpath']}'"
).strip()
assert get_actual_base_location(iceberg_config) == expected_base_location
def test_iceberg_only_subpath(iceberg_config: dict[str, str]):
"""Test when only base_location_subpath is provided"""
del iceberg_config["base_location_root"]
expected_base_location = (
f"'_dbt/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}/"
f"{iceberg_config['base_location_subpath']}'"
).strip()
assert get_actual_base_location(iceberg_config) == expected_base_location
def test_iceberg_only_path(iceberg_config: dict[str, str]):
"""Test when only base_location_root is provided"""
del iceberg_config["base_location_subpath"]
expected_base_location = (
f"'{iceberg_config['base_location_root']}/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}'"
).strip()
assert get_actual_base_location(iceberg_config) == expected_base_location
def test_iceberg_no_path(iceberg_config: dict[str, str]):
"""Test when no base_location_root or is base_location_subpath provided"""
del iceberg_config["base_location_root"]
del iceberg_config["base_location_subpath"]
expected_base_location = (
f"'_dbt/" f"{iceberg_config['schema']}/" f"{iceberg_config['identifier']}'"
).strip()
assert get_actual_base_location(iceberg_config) == expected_base_location