Feat/iceberg advanced partitioning (#3053)

* feat: implement advanced Iceberg partitioning with explicit ordering

- Add support for advanced partition transforms (year, month, day, hour, bucket, truncate)
- Implement explicit partition ordering via index property
- Add custom partition naming support
- Implement priority system: advanced partitioning overrides legacy partition: True
- Add comprehensive validation for partition specifications
- Add graceful error handling for PyIceberg limitations
- Add performance optimization with early exit for non-partitioned schemas
- Update schema typing to support dict/list partition syntax
- Add pyiceberg-core>=0.6.0 dependency for advanced transforms
- Add comprehensive test suite with 22+ test cases covering all scenarios

Backward compatible: existing partition: True syntax continues to work
Resolves partition ordering limitations in Iceberg table format

* Port iceberg_partition and build_iceberg_partition_spec to dlt core

* update type hint in IcebergLoadFilesystemJob

* Add tests for Iceberg advanced partitioning; remove unused partition extraction code

* Add docs for iceberg_adapter

---------

Co-authored-by: Anton Burnashev <anton.burnashev@gmail.com>
This commit is contained in:
Rakesh V.
2025-12-12 15:27:56 +05:30
committed by GitHub
parent 6658d5468d
commit 34669f1ac7
10 changed files with 1291 additions and 27 deletions

View File

@@ -1,5 +1,6 @@
import os
from typing import Dict, Any, List, Optional
from typing import Dict, Any, List, Optional, Union
import warnings
from fsspec import AbstractFileSystem
from packaging.version import Version
@@ -27,6 +28,10 @@ try:
from pyiceberg.table import Table as IcebergTable
from pyiceberg.catalog import Catalog as IcebergCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec as IcebergPartitionSpec,
)
import pyarrow as pa
except ModuleNotFoundError:
raise MissingDependencyException(
@@ -174,14 +179,17 @@ def create_table(
catalog: IcebergCatalog,
table_id: str,
table_location: str,
schema: pa.Schema,
schema: Union[pa.Schema, "pyiceberg.schema.Schema"],
partition_columns: Optional[List[str]] = None,
partition_spec: Optional[IcebergPartitionSpec] = UNPARTITIONED_PARTITION_SPEC,
) -> None:
schema = ensure_iceberg_compatible_arrow_schema(schema)
if isinstance(schema, pa.Schema):
schema = ensure_iceberg_compatible_arrow_schema(schema)
if partition_columns:
# If the table is partitioned, create it in two steps:
# (1) start a create-table transaction, and (2) add the partition spec before committing
warnings.warn(
"partition_columns is deprecated. Use partition_spec instead.", DeprecationWarning
)
with catalog.create_table_transaction(
table_id,
schema=schema,
@@ -192,7 +200,12 @@ def create_table(
for col in partition_columns:
update_spec.add_identity(col)
else:
catalog.create_table(table_id, schema=schema, location=table_location)
catalog.create_table(
identifier=table_id,
schema=schema,
location=table_location,
partition_spec=partition_spec,
)
def get_iceberg_tables(

View File

@@ -9,6 +9,7 @@ from dlt.destinations.impl.clickhouse.clickhouse_adapter import clickhouse_adapt
from dlt.destinations.impl.athena.athena_adapter import athena_adapter, athena_partition
from dlt.destinations.impl.postgres.postgres_adapter import postgres_adapter
from dlt.destinations.impl.databricks.databricks_adapter import databricks_adapter
from dlt.destinations.impl.filesystem.iceberg_adapter import iceberg_adapter, iceberg_partition
__all__ = [
"weaviate_adapter",
@@ -22,4 +23,6 @@ __all__ = [
"athena_partition",
"postgres_adapter",
"databricks_adapter",
"iceberg_adapter",
"iceberg_partition",
]

View File

@@ -15,6 +15,7 @@ from typing import (
cast,
Any,
Dict,
TYPE_CHECKING,
)
from fsspec import AbstractFileSystem
@@ -66,7 +67,7 @@ from dlt.common.destination.exceptions import (
OpenTableCatalogNotSupported,
OpenTableFormatNotSupported,
)
from dlt.common.schema.exceptions import SchemaCorruptedException
from dlt.destinations.job_impl import (
ReferenceFollowupJobRequest,
FinalizedLoadJob,
@@ -80,6 +81,9 @@ from dlt.destinations.utils import (
verify_schema_replace_disposition,
)
if TYPE_CHECKING:
from dlt.destinations.impl.filesystem.iceberg_adapter import PartitionSpec
CURRENT_VERSION: int = 2
SUPPORTED_VERSIONS: set[int] = {1, CURRENT_VERSION}
@@ -226,7 +230,14 @@ class DeltaLoadFilesystemJob(TableFormatLoadFilesystemJob):
class IcebergLoadFilesystemJob(TableFormatLoadFilesystemJob):
def run(self) -> None:
from dlt.common.libs.pyiceberg import write_iceberg_table, merge_iceberg_table, create_table
from dlt.common.libs.pyiceberg import (
write_iceberg_table,
merge_iceberg_table,
create_table,
)
from dlt.destinations.impl.filesystem.iceberg_partition_spec import (
build_iceberg_partition_spec,
)
try:
table = self._job_client.load_open_table(
@@ -237,13 +248,27 @@ class IcebergLoadFilesystemJob(TableFormatLoadFilesystemJob):
except DestinationUndefinedEntity:
location = self._job_client.get_open_table_location("iceberg", self.load_table_name)
table_id = f"{self._job_client.dataset_name}.{self.load_table_name}"
create_table(
self._job_client.get_open_table_catalog("iceberg"),
table_id,
table_location=location,
schema=self.arrow_dataset.schema,
partition_columns=self._partition_columns,
)
spec_list = self._get_partition_spec_list()
if spec_list:
partition_spec, iceberg_schema = build_iceberg_partition_spec(
self.arrow_dataset.schema, spec_list
)
create_table(
self._job_client.get_open_table_catalog("iceberg"),
table_id,
table_location=location,
schema=iceberg_schema,
partition_spec=partition_spec,
)
else:
create_table(
self._job_client.get_open_table_catalog("iceberg"),
table_id,
table_location=location,
schema=self.arrow_dataset.schema,
)
# run again with created table
self.run()
return
@@ -262,6 +287,30 @@ class IcebergLoadFilesystemJob(TableFormatLoadFilesystemJob):
write_disposition=self._load_table["write_disposition"],
)
def _get_partition_spec_list(self) -> List["PartitionSpec"]:
"""Resolve partition specs. Combines legacy partition columns (identity transform)
with partition hints. Validates that identity partitions are not duplicated.
"""
from dlt.destinations.impl.filesystem.iceberg_adapter import (
parse_partition_hints,
create_identity_specs,
)
legacy_columns = self._partition_columns
hint_specs = parse_partition_hints(self._load_table)
for spec in hint_specs:
if spec.transform == "identity" and spec.source_column in legacy_columns:
raise SchemaCorruptedException(
self._schema.name,
f"Column '{spec.source_column}' is defined both as a partition column "
"and in partition hints.",
)
identity_specs = create_identity_specs(legacy_columns)
return identity_specs + hint_specs
class FilesystemLoadJobWithFollowup(HasFollowupJobs, FilesystemLoadJob):
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRequest]:

View File

@@ -0,0 +1,251 @@
from dataclasses import dataclass
from typing import Any, List, Dict, Union, Sequence, Optional, cast
from dlt.common.destination.typing import PreparedTableSchema
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
PARTITION_HINT = "x-iceberg-partition"
@dataclass(frozen=True)
class PartitionSpec:
source_column: str
transform: str = "identity"
param_value: Optional[int] = None
partition_field: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"transform": self.transform,
"source_column": self.source_column,
}
if self.partition_field:
d["partition_field"] = self.partition_field
if self.param_value is not None:
d["param_value"] = self.param_value
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "PartitionSpec":
return cls(
source_column=d["source_column"],
transform=d["transform"],
param_value=d.get("param_value"),
partition_field=d.get("partition_field"),
)
class iceberg_partition:
"""Helper class with factory methods for creating partition specs."""
@staticmethod
def identity(column_name: str) -> PartitionSpec:
"""Create an identity partition on a column.
Args:
column_name: The name of the column to partition on
Returns:
A PartitionSpec for identity partitioning
"""
return PartitionSpec(column_name, "identity")
@staticmethod
def year(column_name: str, partition_field_name: Optional[str] = None) -> PartitionSpec:
"""Create a year partition on a timestamp/date column.
Args:
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for year partitioning
"""
return PartitionSpec(column_name, "year", partition_field=partition_field_name)
@staticmethod
def month(column_name: str, partition_field_name: Optional[str] = None) -> PartitionSpec:
"""Create a month partition on a timestamp/date column.
Args:
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for month partitioning
"""
return PartitionSpec(column_name, "month", partition_field=partition_field_name)
@staticmethod
def day(column_name: str, partition_field_name: Optional[str] = None) -> PartitionSpec:
"""Create a day partition on a timestamp/date column.
Args:
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for day partitioning
"""
return PartitionSpec(column_name, "day", partition_field=partition_field_name)
@staticmethod
def hour(column_name: str, partition_field_name: Optional[str] = None) -> PartitionSpec:
"""Create an hour partition on a timestamp column.
Args:
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for hour partitioning
"""
return PartitionSpec(column_name, "hour", partition_field=partition_field_name)
@staticmethod
def bucket(
num_buckets: int, column_name: str, partition_field_name: Optional[str] = None
) -> PartitionSpec:
"""Create a bucket partition on a column.
Args:
num_buckets: The number of buckets to create
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for bucket partitioning
"""
return PartitionSpec(
source_column=column_name,
transform="bucket",
param_value=num_buckets,
partition_field=partition_field_name,
)
@staticmethod
def truncate(
width: int, column_name: str, partition_field_name: Optional[str] = None
) -> PartitionSpec:
"""Create a truncate partition on a string column.
Args:
width: The width to truncate to
column_name: The name of the column to partition on
partition_field_name: Optional custom name for the partition field
Returns:
A PartitionSpec for truncate partitioning
"""
return PartitionSpec(
source_column=column_name,
transform="truncate",
param_value=width,
partition_field=partition_field_name,
)
def iceberg_adapter(
data: Any,
partition: Union[str, PartitionSpec, Sequence[Union[str, PartitionSpec]]] = None,
) -> DltResource:
"""Prepares data or a DltResource for loading into Apache Iceberg table.
Takes raw data or an existing DltResource and configures it for Iceberg,
primarily by defining partitioning strategies via the DltResource's hints.
Args:
data: The data to be transformed. This can be raw data (e.g., list of dicts)
or an instance of `DltResource`. If raw data is provided, it will be
encapsulated into a `DltResource` instance.
partition: Defines how the Iceberg table should be partitioned.
Must be provided. It accepts:
- A single column name (string): Defaults to an identity transform.
- A `PartitionSpec` object: Allows for detailed partition configuration,
including transformation types (year, month, day, hour, bucket, truncate).
Use the `iceberg_partition` helper class to create these specs.
- A sequence of the above: To define multiple partition columns.
Returns:
A `DltResource` instance configured with Iceberg-specific partitioning hints,
ready for loading.
Raises:
ValueError: If `partition` is not specified or if an invalid
partition transform is requested within a `PartitionSpec`.
Examples:
>>> data = [{"id": 1, "event_time": "2023-03-15T10:00:00Z", "category": "A"}]
>>> resource = iceberg_adapter(
... data,
... partition=[
... "category", # Identity partition on category
... iceberg_partition.year("event_time"),
... ]
... )
>>> # The resource's hints now contain the Iceberg partition specs:
>>> # resource.compute_table_schema().get('x-iceberg-partition')
>>> # [
>>> # {'transform': 'identity', 'source_column': 'event_time'},
>>> # {'transform': 'year', 'source_column': 'event_time'},
>>> # ]
>>> #
>>> # Or in case of using an existing DltResource
>>> @dlt.resource
... def my_data():
... yield [{"value": "abc"}]
>>> iceberg_adapter(my_data, partition="value")
"""
resource = get_resource_for_adapter(data)
additional_table_hints: Dict[str, Any] = {}
if partition:
if isinstance(partition, (str, PartitionSpec)):
partition = [partition]
specs: List[PartitionSpec] = []
for item in partition:
if isinstance(item, PartitionSpec):
specs.append(item)
else:
# Item is the column name, use identity transform
specs.append(iceberg_partition.identity(item))
additional_table_hints[PARTITION_HINT] = [spec.to_dict() for spec in specs]
if additional_table_hints:
resource.apply_hints(additional_table_hints=additional_table_hints)
else:
raise ValueError("A value for `partition` must be specified.")
return resource
def parse_partition_hints(table_schema: PreparedTableSchema) -> List[PartitionSpec]:
"""Parse PARTITION_HINT from table schema into PartitionSpec list.
Args:
table_schema: dlt table schema containing partition hints
Returns:
List of PartitionSpec objects from hints, empty list if no hints found
"""
partition_hints = cast(List[Dict[str, Any]], table_schema.get(PARTITION_HINT, []))
specs = []
for spec_data in partition_hints:
spec = PartitionSpec.from_dict(spec_data)
specs.append(spec)
return specs
def create_identity_specs(column_names: List[str]) -> List[PartitionSpec]:
"""Create identity partition specs from column names.
Args:
column_names: List of column names to partition by identity
Returns:
List of PartitionSpec objects with identity transform
"""
return [iceberg_partition.identity(column_name) for column_name in column_names]

View File

@@ -0,0 +1,128 @@
from typing import Any, Callable, Dict, Optional, Sequence
from dlt import version
from dlt.common.exceptions import MissingDependencyException
try:
from pyiceberg.transforms import (
Transform,
IdentityTransform,
YearTransform,
MonthTransform,
DayTransform,
HourTransform,
BucketTransform,
TruncateTransform,
S,
)
from pyiceberg.partitioning import (
PartitionSpec as IcebergPartitionSpec,
PartitionField,
PARTITION_FIELD_ID_START,
)
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.io.pyarrow import pyarrow_to_schema
from pyiceberg.table.name_mapping import NameMapping, MappedField
except ImportError:
raise MissingDependencyException(
"dlt iceberg partition spec",
[f"{version.DLT_PKG_NAME}[pyiceberg]"],
"Install `pyiceberg` for dlt iceberg partition spec utilities to work",
)
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.destinations.impl.filesystem.iceberg_adapter import PartitionSpec
_TRANSFORM_LOOKUP: Dict[str, Callable[[Optional[int]], Transform[S, Any]]] = {
"identity": lambda _: IdentityTransform(),
"year": lambda _: YearTransform(),
"month": lambda _: MonthTransform(),
"day": lambda _: DayTransform(),
"hour": lambda _: HourTransform(),
"bucket": lambda n: BucketTransform(n),
"truncate": lambda w: TruncateTransform(w),
}
def get_partition_transform(spec: PartitionSpec) -> Transform[S, Any]:
"""Get the PyIceberg Transform object for a partition spec.
Args:
spec: The PartitionSpec to get the transform for
Returns:
A PyIceberg Transform object
Raises:
ValueError: If the transform is not recognized
"""
try:
factory = _TRANSFORM_LOOKUP[spec.transform]
except KeyError as exc:
raise ValueError(f"Unknown partition transformation type: {spec.transform}") from exc
return factory(spec.param_value)
def build_iceberg_partition_spec(
arrow_schema: pa.Schema,
spec_list: Sequence[PartitionSpec],
) -> tuple[IcebergPartitionSpec, IcebergSchema]:
"""
Turn a dlt PartitionSpec list into a PyIceberg PartitionSpec.
Returns the PartitionSpec and the IcebergSchema derived from the Arrow schema.
"""
name_mapping = NameMapping(
[
MappedField(field_id=i + 1, names=[name]) # type: ignore[call-arg]
for i, name in enumerate(arrow_schema.names)
]
)
iceberg_schema: IcebergSchema = pyarrow_to_schema(arrow_schema, name_mapping)
fields: list[PartitionField] = []
for pos, spec in enumerate(spec_list):
iceberg_field = iceberg_schema.find_field(spec.source_column)
fields.append(
PartitionField(
field_id=PARTITION_FIELD_ID_START + pos,
source_id=iceberg_field.field_id,
transform=get_partition_transform(spec),
name=_default_field_name(spec),
)
)
return IcebergPartitionSpec(*fields), iceberg_schema
def _default_field_name(spec: PartitionSpec) -> str:
"""
Replicate Iceberg's automatic partition-field naming by delegating to the private
_PartitionNameGenerator. Falls back to the user-supplied `partition_field` if present.
"""
from pyiceberg.partitioning import _PartitionNameGenerator
name_generator = _PartitionNameGenerator()
if spec.partition_field: # user-supplied `partition_field`
return spec.partition_field
# name generator requires field_id and source_id, but does not use them
dummy_field_id = 0
dummy_source_id = 0
if spec.transform == "bucket":
# bucket / truncate need the numeric parameter
return name_generator.bucket(
dummy_field_id, spec.source_column, dummy_source_id, spec.param_value
)
if spec.transform == "truncate":
return name_generator.truncate(
dummy_field_id, spec.source_column, dummy_source_id, spec.param_value
)
# identity, year, month, day, hour all have the same signature
method = getattr(name_generator, spec.transform)
return method(dummy_field_id, spec.source_column, dummy_source_id) # type: ignore[no-any-return]

View File

@@ -71,17 +71,12 @@ pipeline.run(my_resource, table_format="iceberg")
dlt always uses Parquet as `loader_file_format` when using the `iceberg` table format. Any setting of `loader_file_format` is disregarded.
:::
## Table format partitioning
Iceberg tables can be partitioned by specifying one or more `partition` column hints. This example partitions an Iceberg table by the `foo` column:
## Partitioning
```py
@dlt.resource(
table_format="iceberg",
columns={"foo": {"partition": True}}
)
def my_iceberg_resource():
...
```
Apache Iceberg supports [table partitioning](https://iceberg.apache.org/docs/latest/partitioning/) to optimize query performance. There are two ways to configure partitioning:
1. Using the [`iceberg_adapter`](#using-the-iceberg_adapter) function - for advanced partitioning with transformations (year, month, day, hour, bucket, truncate)
2. Using column-level [`partition`](#using-column-level-partition-property) property - for simple identity partitioning
:::note
Iceberg uses [hidden partioning](https://iceberg.apache.org/docs/latest/partitioning/).
@@ -91,6 +86,126 @@ Iceberg uses [hidden partioning](https://iceberg.apache.org/docs/latest/partitio
Partition evolution (changing partition columns after a table has been created) is not supported.
:::
### Using the `iceberg_adapter`
The `iceberg_adapter` function allows you to configure partitioning with various transformation functions.
#### Basic example
```py
from datetime import date
import dlt
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
data_items = [
{"id": 1, "category": "A", "created_at": date(2025, 1, 1)},
{"id": 2, "category": "A", "created_at": date(2025, 1, 15)},
{"id": 3, "category": "B", "created_at": date(2025, 2, 1)},
]
@dlt.resource(table_format="iceberg")
def events():
yield data_items
# Partition by category and month of created_at
iceberg_adapter(
events,
partition=[
"category", # identity partition (shorthand)
iceberg_partition.month("created_at"),
],
)
pipeline = dlt.pipeline("iceberg_example", destination="filesystem")
pipeline.run(events)
```
To use advanced partitioning, import both the adapter and the `iceberg_partition` helper:
```py
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
```
#### Partition transformations
Iceberg supports several transformation functions for partitioning. Use the `iceberg_partition` helper to create partition specifications:
* `iceberg_partition.identity(column_name)`: Partition by exact column values (this is the same as passing the column name as a string to the `iceberg_adapter`)
* `iceberg_partition.year(column_name)`: Partition by year from a date column
* `iceberg_partition.month(column_name)`: Partition by month from a date column
* `iceberg_partition.day(column_name)`: Partition by day from a date column
* `iceberg_partition.hour(column_name)`: Partition by hour from a timestamp column
* `iceberg_partition.bucket(n, column_name)`: Partition by hashed value into `n` buckets
* `iceberg_partition.truncate(length, column_name)`: Partition by truncated string value to `length`
#### Bucket partitioning
Distribute data across a fixed number of buckets using a hash function:
```py
iceberg_adapter(
resource,
partition=[iceberg_partition.bucket(16, "user_id")],
)
```
#### Truncate partitioning
Partition string values by a fixed prefix length:
```py
iceberg_adapter(
resource,
partition=[iceberg_partition.truncate(3, "category")], # "ELECTRONICS" → "ELE"
)
```
#### Custom partition field names
Specify custom names for partition fields:
```py
iceberg_adapter(
resource,
partition=[
iceberg_partition.year("activity_time", "activity_year"),
iceberg_partition.bucket(8, "user_id", "user_bucket"),
],
)
```
### Using column-level `partition` property
For simple identity partitioning, you can use the `partition` column hint directly in the resource definition:
```py
@dlt.resource(
table_format="iceberg",
columns={"region": {"partition": True}}
)
def my_iceberg_resource():
yield [
{"id": 1, "region": "US", "amount": 100},
{"id": 2, "region": "EU", "amount": 200},
]
```
Multiple columns can be partitioned:
```py
@dlt.resource(
table_format="iceberg",
columns={
"region": {"partition": True},
"category": {"partition": True},
}
)
def multi_partition_data():
...
```
## Table access helper functions
You can use the `get_iceberg_tables` helper function to access native table objects. These are `pyiceberg` [Table](https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table) objects.
@@ -120,10 +235,10 @@ The [S3-compatible](./filesystem.md#using-s3-compatible-storage) interface for G
The `az` [scheme](./filesystem.md#supported-schemes) is not supported when using the `iceberg` table format. Please use the `abfss` scheme. This is because `pyiceberg`, which dlt used under the hood, currently does not support `az`.
## Table format `merge` support
The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
:::warning
Until _pyiceberg_ > 0.9.1 is released, upsert is executed in chunks of **1000** rows.
Until _pyiceberg_ > 0.9.1 is released, upsert is executed in chunks of **1000** rows.
:::
:::warning

View File

@@ -173,6 +173,7 @@ sqlalchemy = [
]
pyiceberg = [
"pyiceberg>=0.9.1",
"pyiceberg-core>=0.6.0",
"pyarrow>=16.0.0",
"sqlalchemy>=1.4",
]

View File

@@ -0,0 +1,398 @@
import pytest
from typing import Any, Dict
import dlt
import pyarrow as pa
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
from dlt.destinations.impl.filesystem.iceberg_adapter import (
PARTITION_HINT,
PartitionSpec,
parse_partition_hints,
create_identity_specs,
)
from dlt.destinations.impl.filesystem.iceberg_partition_spec import (
build_iceberg_partition_spec,
get_partition_transform,
_default_field_name,
)
# mark all tests as essential, do not remove
pytestmark = pytest.mark.essential
@pytest.mark.parametrize(
"partition_input,expected_column",
[
(["region"], "region"),
("category", "category"),
],
ids=["list_input", "string_input"],
)
def test_iceberg_adapter_identity_partition(partition_input, expected_column) -> None:
@dlt.resource
def my_data():
yield [{"id": 1, "region": "US", "category": "A"}]
resource = iceberg_adapter(my_data, partition=partition_input)
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [{"transform": "identity", "source_column": expected_column}]
@pytest.mark.parametrize(
"transform_type",
["year", "month", "day", "hour"],
)
def test_iceberg_adapter_temporal_partition(transform_type) -> None:
@dlt.resource
def events():
yield [{"timestamp": "2024-03-15T10:30:45Z"}]
factory = getattr(iceberg_partition, transform_type)
resource = iceberg_adapter(events, partition=[factory("timestamp")])
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [{"transform": transform_type, "source_column": "timestamp"}]
@pytest.mark.parametrize(
"transform_type,param_value,column_name",
[
("bucket", 16, "user_id"),
("truncate", 4, "name"),
],
ids=["bucket", "truncate"],
)
def test_iceberg_adapter_parameterized_partition(transform_type, param_value, column_name) -> None:
@dlt.resource
def data():
yield [{"user_id": "123", "name": "example_string"}]
factory = getattr(iceberg_partition, transform_type)
resource = iceberg_adapter(data, partition=[factory(param_value, column_name)])
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [
{"transform": transform_type, "source_column": column_name, "param_value": param_value}
]
def test_iceberg_adapter_multiple_partitions() -> None:
@dlt.resource
def sales_data():
yield [
{
"id": 1,
"timestamp": "2024-01-15T10:30:00Z",
"region": "US",
"amount": 1250.00,
}
]
resource = iceberg_adapter(
sales_data,
partition=[iceberg_partition.month("timestamp"), "region"],
)
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [
{"transform": "month", "source_column": "timestamp"},
{"transform": "identity", "source_column": "region"},
]
def test_iceberg_adapter_custom_partition_field_name() -> None:
@dlt.resource
def activity():
yield [{"user_id": 123, "activity_time": "2024-01-01T00:00:00Z"}]
resource = iceberg_adapter(
activity,
partition=[
iceberg_partition.year("activity_time", "activity_year"),
iceberg_partition.bucket(8, "user_id", "user_bucket"),
],
)
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [
{"transform": "year", "source_column": "activity_time", "partition_field": "activity_year"},
{
"transform": "bucket",
"source_column": "user_id",
"param_value": 8,
"partition_field": "user_bucket",
},
]
def test_iceberg_adapter_no_partition_raises() -> None:
@dlt.resource
def my_data():
yield [{"id": 1}]
with pytest.raises(ValueError, match="A value for `partition` must be specified"):
iceberg_adapter(my_data, partition=None)
def test_iceberg_adapter_with_raw_data() -> None:
"""Test iceberg_adapter with raw data (not a DltResource)."""
data = [{"id": 1, "category": "A"}, {"id": 2, "category": "B"}]
resource = iceberg_adapter(data, partition=["category"])
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [{"transform": "identity", "source_column": "category"}]
def test_iceberg_adapter_complex_partitioning() -> None:
@dlt.resource
def orders():
yield [
{
"order_id": "ord_001",
"status": "completed",
"order_date": "2024-01-15",
"customer_id": "cust_123",
}
]
resource = iceberg_adapter(
orders,
partition=[
iceberg_partition.year("order_date", "yearly_partition"),
"status",
iceberg_partition.bucket(32, "customer_id", "customer_bucket"),
],
)
table_schema = resource.compute_table_schema()
partition_hints = table_schema.get(PARTITION_HINT, [])
assert partition_hints == [
{"transform": "year", "source_column": "order_date", "partition_field": "yearly_partition"},
{"transform": "identity", "source_column": "status"},
{
"transform": "bucket",
"source_column": "customer_id",
"param_value": 32,
"partition_field": "customer_bucket",
},
]
@pytest.mark.parametrize(
"table_schema,expected_specs",
[
(
{"name": "test_table", "columns": {"id": {"name": "id", "data_type": "bigint"}}},
[],
),
(
{
"name": "test_table",
PARTITION_HINT: [
{"transform": "year", "source_column": "created_at"},
{"transform": "identity", "source_column": "region"},
],
},
[("year", "created_at"), ("identity", "region")],
),
],
ids=["empty", "with_specs"],
)
def test_parse_partition_hints(table_schema, expected_specs) -> None:
specs = parse_partition_hints(table_schema)
assert len(specs) == len(expected_specs)
for spec, (expected_transform, expected_column) in zip(specs, expected_specs):
assert spec.transform == expected_transform
assert spec.source_column == expected_column
def test_create_identity_specs() -> None:
column_names = ["region", "category", "status"]
specs = create_identity_specs(column_names)
assert len(specs) == 3
for i, col_name in enumerate(column_names):
assert specs[i].source_column == col_name
assert specs[i].transform == "identity"
def test_build_iceberg_partition_spec() -> None:
from pyiceberg.types import LongType, TimestampType, StringType
from pyiceberg.transforms import YearTransform, IdentityTransform
arrow_schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("created_at", pa.timestamp("us")),
pa.field("region", pa.string()),
]
)
partition_specs = [
iceberg_partition.year("created_at"),
iceberg_partition.identity("region"),
]
iceberg_partition_spec, iceberg_schema = build_iceberg_partition_spec(
arrow_schema, partition_specs
)
assert iceberg_partition_spec is not None
assert len(iceberg_partition_spec.fields) == 2
year_field = iceberg_partition_spec.fields[0]
assert year_field.name == "created_at_year"
assert isinstance(year_field.transform, YearTransform)
assert year_field.source_id == iceberg_schema.find_field("created_at").field_id
identity_field = iceberg_partition_spec.fields[1]
assert identity_field.name == "region"
assert isinstance(identity_field.transform, IdentityTransform)
assert identity_field.source_id == iceberg_schema.find_field("region").field_id
id_field = iceberg_schema.find_field("id")
assert id_field.name == "id"
assert isinstance(id_field.field_type, LongType)
created_at_field = iceberg_schema.find_field("created_at")
assert created_at_field.name == "created_at"
assert isinstance(created_at_field.field_type, TimestampType)
region_field = iceberg_schema.find_field("region")
assert region_field.name == "region"
assert isinstance(region_field.field_type, StringType)
@pytest.mark.parametrize(
"source_column,transform,partition_field,param_value",
[
("created_at", "year", "created_year", None),
("created_at", "month", "month_partition", None),
("user_id", "bucket", "user_bucket", 32),
],
ids=["year", "month", "bucket"],
)
def test_partition_spec_roundtrip(source_column, transform, partition_field, param_value) -> None:
spec = PartitionSpec(
source_column=source_column,
transform=transform,
partition_field=partition_field,
param_value=param_value,
)
assert spec.source_column == source_column
assert spec.transform == transform
assert spec.partition_field == partition_field
assert spec.param_value == param_value
spec_dict = spec.to_dict()
assert spec_dict["source_column"] == source_column
assert spec_dict["transform"] == transform
assert spec_dict["partition_field"] == partition_field
restored_spec = PartitionSpec.from_dict(spec_dict)
assert restored_spec.source_column == source_column
assert restored_spec.transform == transform
assert restored_spec.partition_field == partition_field
assert restored_spec.param_value == param_value
def test_partition_spec_get_transform() -> None:
from pyiceberg.transforms import IdentityTransform, Transform
spec = PartitionSpec(source_column="region", transform="identity")
transform: Transform = get_partition_transform(spec) # type: ignore[type-arg]
assert isinstance(transform, IdentityTransform)
def test_partition_spec_invalid_transform() -> None:
spec = PartitionSpec(source_column="col", transform="invalid_transform")
with pytest.raises(ValueError, match="Unknown partition transformation type"):
get_partition_transform(spec)
@pytest.mark.parametrize(
"transform,source_column,param_value,expected_name",
[
("identity", "region", None, "region"),
("year", "created_at", None, "created_at_year"),
("month", "event_time", None, "event_time_month"),
("day", "timestamp", None, "timestamp_day"),
("hour", "log_time", None, "log_time_hour"),
("bucket", "user_id", 16, "user_id_bucket_16"),
("truncate", "name", 4, "name_trunc_4"),
],
ids=["identity", "year", "month", "day", "hour", "bucket", "truncate"],
)
def test_default_field_name_generation(
transform: str, source_column: str, param_value: int, expected_name: str
) -> None:
spec = PartitionSpec(
source_column=source_column,
transform=transform,
param_value=param_value,
)
generated_name = _default_field_name(spec)
assert generated_name == expected_name
def test_default_field_name_with_custom_partition_field() -> None:
spec = PartitionSpec(
source_column="created_at",
transform="year",
partition_field="custom_year_field",
)
generated_name = _default_field_name(spec)
assert generated_name == "custom_year_field"
def test_duplicate_partition_column_in_legacy_and_hints() -> None:
from dlt.common.schema.exceptions import SchemaCorruptedException
from dlt.destinations.impl.filesystem.filesystem import IcebergLoadFilesystemJob
from unittest.mock import MagicMock
mock_load_table = {
"name": "test_table",
"columns": {
# Partition defined in columns
"region": {"name": "region", "data_type": "text", "partition": True},
"id": {"name": "id", "data_type": "bigint"},
},
PARTITION_HINT: [
{"transform": "identity", "source_column": "region"},
],
"write_disposition": "append",
}
mock_job = MagicMock(spec=IcebergLoadFilesystemJob)
mock_job._load_table = mock_load_table
mock_job._partition_columns = ["region"]
mock_job._schema = MagicMock()
mock_job._schema.name = "test_schema"
with pytest.raises(SchemaCorruptedException) as exc_info:
IcebergLoadFilesystemJob._get_partition_spec_list(mock_job)
assert "region" in str(exc_info.value)
assert "defined both as a partition column" in str(exc_info.value)

View File

@@ -1051,3 +1051,294 @@ def test_parquet_to_delta_upgrade(destination_config: DestinationTestConfigurati
# optimize all delta tables to make sure storage is there
for table in delta_tables.values():
table.vacuum()
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
table_format_local_configs=True,
with_table_format="iceberg",
),
ids=lambda x: x.name,
)
def test_iceberg_adapter_partitioning(
destination_config: DestinationTestConfiguration,
) -> None:
from dlt.common.libs.pyiceberg import get_iceberg_tables
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
from pyiceberg.transforms import (
YearTransform,
BucketTransform,
TruncateTransform,
IdentityTransform,
)
pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)
# Test identity partitioning
@dlt.resource(table_format="iceberg")
def identity_partitioned():
yield [
{"id": 1, "category": "electronics"},
{"id": 2, "category": "clothing"},
]
resource = iceberg_adapter(
identity_partitioned, partition=iceberg_partition.identity("category")
)
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "identity_partitioned")["identity_partitioned"]
assert it.metadata.specs_struct().fields[0].name == "category"
assert isinstance(it.spec().fields[0].transform, IdentityTransform)
assert load_table_counts(pipeline, "identity_partitioned")["identity_partitioned"] == 2
# Test year partitioning
@dlt.resource(table_format="iceberg")
def year_partitioned():
yield [
{"id": 1, "timestamp": pendulum.datetime(2023, 6, 15, 10, 30)},
{"id": 2, "timestamp": pendulum.datetime(2024, 1, 20, 14, 45)},
]
resource = iceberg_adapter(year_partitioned, partition=iceberg_partition.year("timestamp"))
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "year_partitioned")["year_partitioned"]
partition_spec = it.metadata.specs_struct()
assert len(partition_spec.fields) == 1
assert partition_spec.fields[0].name == "timestamp_year"
assert isinstance(it.spec().fields[0].transform, YearTransform)
assert load_table_counts(pipeline, "year_partitioned")["year_partitioned"] == 2
# Test bucket partitioning
@dlt.resource(table_format="iceberg")
def bucket_partitioned():
yield [
{"user_id": 1001, "name": "Alice"},
{"user_id": 2002, "name": "Bob"},
{"user_id": 3003, "name": "Charlie"},
]
resource = iceberg_adapter(
bucket_partitioned, partition=iceberg_partition.bucket(16, "user_id")
)
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "bucket_partitioned")["bucket_partitioned"]
assert it.metadata.specs_struct().fields[0].name == "user_id_bucket_16"
assert isinstance(it.spec().fields[0].transform, BucketTransform)
assert load_table_counts(pipeline, "bucket_partitioned")["bucket_partitioned"] == 3
# Test truncate partitioning
@dlt.resource(table_format="iceberg")
def truncate_partitioned():
yield [
{"category": "electronics", "value": 100},
{"category": "clothing", "value": 200},
{"category": "electronics_accessories", "value": 50},
]
resource = iceberg_adapter(
truncate_partitioned, partition=iceberg_partition.truncate(4, "category")
)
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "truncate_partitioned")["truncate_partitioned"]
assert it.metadata.specs_struct().fields[0].name == "category_trunc_4"
assert isinstance(it.spec().fields[0].transform, TruncateTransform)
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
table_format_local_configs=True,
with_table_format="iceberg",
),
ids=lambda x: x.name,
)
def test_iceberg_adapter_data_verification(
destination_config: DestinationTestConfiguration,
) -> None:
from dlt.common.libs.pyiceberg import get_iceberg_tables
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
from pyiceberg.expressions import EqualTo
pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)
@dlt.resource(table_format="iceberg")
def sales_data():
yield [
{"id": 1, "region": "US", "amount": 100, "sale_date": pendulum.datetime(2024, 1, 15)},
{"id": 2, "region": "EU", "amount": 200, "sale_date": pendulum.datetime(2024, 2, 20)},
{"id": 3, "region": "US", "amount": 150, "sale_date": pendulum.datetime(2024, 3, 10)},
{"id": 4, "region": "APAC", "amount": 300, "sale_date": pendulum.datetime(2024, 1, 25)},
{"id": 5, "region": "EU", "amount": 250, "sale_date": pendulum.datetime(2024, 2, 28)},
]
resource = iceberg_adapter(
sales_data,
partition=[
iceberg_partition.month("sale_date", "sale_month"),
"region",
],
)
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "sales_data")["sales_data"]
partition_spec = it.metadata.specs_struct()
assert len(partition_spec.fields) == 2
assert partition_spec.fields[0].name == "sale_month"
assert partition_spec.fields[1].name == "region"
# Check if all data is present
all_data = it.scan().to_arrow()
assert all_data.num_rows == 5
# Check if partition pruning works by scanning with filter
us_data = it.scan(row_filter=EqualTo("region", "US")).to_arrow()
assert us_data.num_rows == 2
us_amounts = sorted([row["amount"] for row in us_data.to_pylist()])
assert us_amounts == [100, 150]
eu_data = it.scan(row_filter=EqualTo("region", "EU")).to_arrow()
assert eu_data.num_rows == 2
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
table_format_local_configs=True,
with_table_format="iceberg",
supports_merge=True,
),
ids=lambda x: x.name,
)
def test_iceberg_adapter_merge_write_disposition(
destination_config: DestinationTestConfiguration,
) -> None:
from dlt.common.libs.pyiceberg import get_iceberg_tables
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)
@dlt.resource(
table_format="iceberg",
write_disposition="merge",
primary_key="user_id",
)
def users():
yield [
{"user_id": 1, "name": "Alice", "region": "US", "score": 100},
{"user_id": 2, "name": "Bob", "region": "EU", "score": 200},
{"user_id": 3, "name": "Charlie", "region": "US", "score": 150},
]
resource = iceberg_adapter(
users,
partition=[iceberg_partition.bucket(8, "user_id"), "region"],
)
# Initial load
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "users")["users"]
assert it.scan().to_arrow().num_rows == 3
partition_spec = it.metadata.specs_struct()
assert len(partition_spec.fields) == 2
assert partition_spec.fields[0].name == "user_id_bucket_8"
assert partition_spec.fields[1].name == "region"
@dlt.resource(
table_format="iceberg",
write_disposition="merge",
primary_key="user_id",
)
def users_update():
yield [
{"user_id": 1, "name": "Alice Updated", "region": "US", "score": 110}, # Update
{"user_id": 4, "name": "Diana", "region": "APAC", "score": 250}, # New
]
resource_update = iceberg_adapter(
users_update.with_name("users"),
partition=[iceberg_partition.bucket(8, "user_id"), "region"],
)
info = pipeline.run(resource_update)
assert_load_info(info)
# Verify final state
it = get_iceberg_tables(pipeline, "users")["users"]
data = it.scan().to_arrow()
assert data.num_rows == 4
data_list = data.to_pylist()
alice = next(row for row in data_list if row["user_id"] == 1)
assert alice["name"] == "Alice Updated"
assert alice["score"] == 110
diana = next(row for row in data_list if row["user_id"] == 4)
assert diana["name"] == "Diana"
assert diana["region"] == "APAC"
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
table_format_local_configs=True,
with_table_format="iceberg",
),
ids=lambda x: x.name,
)
def test_iceberg_adapter_and_partition_column_coexistence(
destination_config: DestinationTestConfiguration,
) -> None:
"""Tests that legacy partition columns (columns={"col": {"partition": True}})
can coexist with advanced partition hints from iceberg_adapter.
"""
from dlt.common.libs.pyiceberg import get_iceberg_tables
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
from pyiceberg.transforms import IdentityTransform, YearTransform
pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)
@dlt.resource(
table_format="iceberg",
columns={"region": {"partition": True}}, # Old way of partitioning
)
def events():
yield [
{"id": 1, "region": "US", "timestamp": pendulum.datetime(2024, 3, 15)},
{"id": 2, "region": "EU", "timestamp": pendulum.datetime(2024, 6, 20)},
{"id": 3, "region": "US", "timestamp": pendulum.datetime(2023, 12, 1)},
]
# Add year partition via adapter
resource = iceberg_adapter(
events,
partition=iceberg_partition.year("timestamp"),
)
info = pipeline.run(resource)
assert_load_info(info)
it = get_iceberg_tables(pipeline, "events")["events"]
partition_spec = it.metadata.specs_struct()
assert len(partition_spec.fields) == 2
assert partition_spec.fields[0].name == "region"
assert isinstance(it.spec().fields[0].transform, IdentityTransform)
assert partition_spec.fields[1].name == "timestamp_year"
assert isinstance(it.spec().fields[1].transform, YearTransform)
assert load_table_counts(pipeline, "events")["events"] == 3

15
uv.lock generated
View File

@@ -2189,6 +2189,7 @@ postgres = [
pyiceberg = [
{ name = "pyarrow" },
{ name = "pyiceberg" },
{ name = "pyiceberg-core" },
{ name = "sqlalchemy" },
]
qdrant = [
@@ -2409,6 +2410,7 @@ requires-dist = [
{ name = "pyathena", marker = "extra == 'athena'", specifier = ">=2.9.6" },
{ name = "pydbml", marker = "extra == 'dbml'" },
{ name = "pyiceberg", marker = "extra == 'pyiceberg'", specifier = ">=0.9.1" },
{ name = "pyiceberg-core", marker = "extra == 'pyiceberg'", specifier = ">=0.6.0" },
{ name = "pyodbc", marker = "extra == 'mssql'", specifier = ">=4.0.39" },
{ name = "pyodbc", marker = "extra == 'synapse'", specifier = ">=4.0.39" },
{ name = "pytz", specifier = ">=2022.6" },
@@ -7253,6 +7255,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1c/56/f0c7014cd16bd9990bdfa9b901fe6ecc344d6b9775676dc35c48b3a6aca1/pyiceberg-0.9.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:023c3fcee36a441b7e20418b6e9cdc6f904141bfda09f8580dfe022d7faa7a53", size = 626539, upload-time = "2025-04-30T14:59:33.17Z" },
]
[[package]]
name = "pyiceberg-core"
version = "0.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c8/85/d3ec2e229d4e1bad3b9c4092889cae102eaf0d4ed62ce0e2de2b6e32cc4d/pyiceberg_core-0.7.0.tar.gz", hash = "sha256:8166883ace30a388d2f659634bec87731cad7bc52341c997fcdd4e13780e4345", size = 504657, upload-time = "2025-10-10T16:30:16.202Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/70/35/2942dcaf19ca1becc7f9a9001d0cf98168634732a33efbb06a6f382c36b1/pyiceberg_core-0.7.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:99e23463c30c4180329719fe1f120e779b20616a36bbdd42042b70063a13bd39", size = 56895735, upload-time = "2025-10-10T16:30:00.949Z" },
{ url = "https://files.pythonhosted.org/packages/93/54/dbf169474b8c336316657041fd6e2791b534f8ab3cffe50509533993de03/pyiceberg_core-0.7.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:94fe0281f09c84cdd4e56d217865ef1d81e0cb0b708ba6eb2f4c5ae8bf86f0ba", size = 30991747, upload-time = "2025-10-10T16:30:04.515Z" },
{ url = "https://files.pythonhosted.org/packages/ed/34/545cf261c343a8d04f75e25aa259f0cf5020b913ede0cb3bdf17c2c7690a/pyiceberg_core-0.7.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:74543885cc97e8d976707f97dd01e8f03dce8a5d3a01e41ada0aaefa13c742f9", size = 31572481, upload-time = "2025-10-10T16:30:07.319Z" },
{ url = "https://files.pythonhosted.org/packages/63/6b/c8ae2eb1fd60798b819720915e50ea29befafb8816141182df7fa5e788d6/pyiceberg_core-0.7.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2303276b0d8b57b8ce4a3ef9e5050ffdefd9561684728cd0a9a72a9ad0a6a74d", size = 30588256, upload-time = "2025-10-10T16:30:11.047Z" },
{ url = "https://files.pythonhosted.org/packages/4a/8f/e63e03afb2e655d6f984a4e57ea53aa860a545298eeb5bc887a5d5db9c3b/pyiceberg_core-0.7.0-cp39-abi3-win_amd64.whl", hash = "sha256:64dd3c3c7af6d39097e1d070161e98118e5dd423392992a9074e1ab813fd467b", size = 26606962, upload-time = "2025-10-10T16:30:13.96Z" },
]
[[package]]
name = "pyjwt"
version = "2.10.1"