mirror of
https://github.com/dlt-hub/dlt.git
synced 2025-12-17 19:31:30 +00:00
pyarrow: respect resource hints before extract (#3436)
* merge resource hints before extract for all backends * check load package directly * better type check * log if unsupported hints * better log message * do not use ensure_table_schema_columns * test for desired behavior * refactor * clarified test assertions * lint
This commit is contained in:
@@ -24,6 +24,7 @@ from dlt.common.configuration.specs import (
|
||||
from dlt.common.exceptions import DltException, MissingDependencyException
|
||||
from dlt.common.schema import TTableSchemaColumns
|
||||
from dlt.common.schema.typing import TWriteDispositionDict
|
||||
from dlt.common.schema.utils import merge_columns
|
||||
from dlt.common.typing import TColumnNames, TDataItem, TSortOrder
|
||||
from dlt.common.jsonpath import extract_simple_field_name
|
||||
from dlt.common.utils import is_typeerror_due_to_wrong_call
|
||||
@@ -304,6 +305,21 @@ def table_rows(
|
||||
query_adapter_callback: Optional[TQueryAdapter],
|
||||
resolve_foreign_keys: bool,
|
||||
) -> Iterator[TDataItem]:
|
||||
resource = None
|
||||
limit = None
|
||||
try:
|
||||
resource = dlt.current.resource()
|
||||
limit = resource.limit
|
||||
resource_columns_hints_require_data = callable(resource.columns)
|
||||
if resource_columns_hints_require_data and backend == "pyarrow":
|
||||
table_name = table.name if isinstance(table, Table) else table
|
||||
logger.warning(
|
||||
f"Dynamic column hints for '{table_name}' cannot be applied with pyarrow "
|
||||
"backend. Use static hints (dict/list) to override reflected types."
|
||||
)
|
||||
except DltException:
|
||||
# in old versions of dlt, resource is not available, so we need to reflect the table again
|
||||
pass
|
||||
if isinstance(table, str): # Reflection is deferred
|
||||
table = Table(
|
||||
table,
|
||||
@@ -324,39 +340,49 @@ def table_rows(
|
||||
)
|
||||
|
||||
# set the primary_key in the incremental
|
||||
# TODO: check for primary key in resource._hints
|
||||
if incremental and incremental.primary_key is None:
|
||||
primary_key = hints["primary_key"]
|
||||
if primary_key is not None:
|
||||
incremental.primary_key = primary_key
|
||||
|
||||
# yield empty record to set hints
|
||||
# Merge resource hints with reflection hints before yielding
|
||||
if resource and hints.get("columns") and not callable(resource.columns):
|
||||
hints["columns"] = merge_columns(hints["columns"], resource.columns)
|
||||
|
||||
# yield empty record to set hints and create schema
|
||||
# Note: Empty list [] will be written as typed-jsonl (object format), but actual
|
||||
# data rows will be written in their native format (e.g., parquet for arrow backend).
|
||||
yield dlt.mark.with_hints(
|
||||
[],
|
||||
dlt.mark.make_hints(
|
||||
**hints,
|
||||
),
|
||||
)
|
||||
# Set columns_hints for TableLoader
|
||||
columns_hints = hints["columns"]
|
||||
else:
|
||||
# table was already reflected
|
||||
hints = table_to_resource_hints(
|
||||
table,
|
||||
reflection_level,
|
||||
type_adapter_callback,
|
||||
backend == "sqlalchemy", # skip nested types
|
||||
resolve_foreign_keys=resolve_foreign_keys,
|
||||
)
|
||||
# table was already reflected -> try to use resource hints
|
||||
if not resource or callable(resource.columns):
|
||||
hints = table_to_resource_hints(
|
||||
table,
|
||||
reflection_level,
|
||||
type_adapter_callback,
|
||||
backend == "sqlalchemy", # skip nested types
|
||||
resolve_foreign_keys=resolve_foreign_keys,
|
||||
)
|
||||
columns_hints = hints["columns"]
|
||||
|
||||
limit = None
|
||||
try:
|
||||
limit = dlt.current.resource().limit
|
||||
except DltException:
|
||||
pass
|
||||
else:
|
||||
# take column hints from resource (which includes user applied hints)
|
||||
# Handle callable columns hint (can't resolve without data item)
|
||||
columns_hints = resource.columns
|
||||
|
||||
loader = TableLoader(
|
||||
engine,
|
||||
backend,
|
||||
table,
|
||||
hints["columns"],
|
||||
columns_hints,
|
||||
incremental=incremental,
|
||||
chunk_size=chunk_size,
|
||||
limit=limit,
|
||||
|
||||
@@ -6,6 +6,9 @@ from importlib.metadata import version
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
import dlt
|
||||
from dlt.common import logger
|
||||
from dlt.common import json
|
||||
@@ -84,6 +87,68 @@ def convert_time_to_us(table):
|
||||
return new_table
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"defer_table_reflect",
|
||||
[False, True],
|
||||
ids=lambda x: "defer_table_reflect" + ("_true" if x else "=false"),
|
||||
)
|
||||
def test_pyarrow_applies_hints_before_extract(
|
||||
postgres_db: PostgresSourceDB,
|
||||
defer_table_reflect: bool,
|
||||
) -> None:
|
||||
"""Test that user-provided hints (via apply_hints) are merged with reflection hints
|
||||
for all backends (unless hints are dynamic)
|
||||
"""
|
||||
|
||||
table = sql_table(
|
||||
credentials=postgres_db.credentials,
|
||||
schema=postgres_db.schema,
|
||||
table="has_precision",
|
||||
backend="pyarrow",
|
||||
reflection_level="full_with_precision",
|
||||
defer_table_reflect=defer_table_reflect,
|
||||
)
|
||||
|
||||
# Apply hints to override numeric_col to double (even though DB has decimal type)
|
||||
table.apply_hints(
|
||||
write_disposition="replace",
|
||||
file_format="parquet",
|
||||
columns={
|
||||
"numeric_col": {
|
||||
"data_type": "double",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
pipeline = make_pipeline("duckdb")
|
||||
# Use count_rows so empty Arrow Table (0 rows) for defered-reflect doesn't count toward limit
|
||||
load_info = pipeline.run(table.add_limit(1, count_rows=True))
|
||||
|
||||
# verify
|
||||
# 1: pipeline schema
|
||||
numeric_col_schema_in_pipeline = pipeline.default_schema.get_table("has_precision")["columns"][
|
||||
"numeric_col"
|
||||
]
|
||||
assert numeric_col_schema_in_pipeline["data_type"] == "double"
|
||||
|
||||
# 2: loader file format (file ends with .parquet)
|
||||
load_package = pipeline.get_load_package_info(load_info.loads_ids[0])
|
||||
completed_jobs = load_package.jobs["completed_jobs"]
|
||||
has_precision_jobs = [
|
||||
job
|
||||
for job in completed_jobs
|
||||
if job.job_file_info.table_name == "has_precision" and job.file_path.endswith(".parquet")
|
||||
]
|
||||
assert len(has_precision_jobs) == 1
|
||||
|
||||
# 3: column schema in parquet file should also be double (float64)
|
||||
parquet_path = has_precision_jobs[0].file_path
|
||||
parquet_schema = pq.read_schema(parquet_path)
|
||||
numeric_col_schema_in_parquet = parquet_schema.field("numeric_col").type
|
||||
|
||||
assert pa.types.is_float64(numeric_col_schema_in_parquet)
|
||||
|
||||
|
||||
def test_sqlalchemy_no_quoted_name(postgres_db: PostgresSourceDB, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Ensures that table names internally passed as `quoted_name` to `sql_table` are not persisted
|
||||
|
||||
Reference in New Issue
Block a user