ignores native config values if config spec does not implement those (#3233)

* does not fail config resolution if native valued provided to a config that does not implement native values

* updates databricks docs

* allows to replace hints regexes on schema

* removes partition hint on eth merge test on databricks

* adds pokemon table count consts

* reorgs databricks dlt fix

* fixes lancedb custom destination example

* fixes lancedb custom destination example

* reduces no sql_database examples run on ci

* fixes merge

* marks and skips rfam tests
This commit is contained in:
rudolfix
2025-10-22 22:48:13 +02:00
committed by GitHub
parent 7848c91f41
commit 0dcdcf0e33
16 changed files with 103 additions and 67 deletions

View File

@@ -63,11 +63,11 @@ jobs:
- os: windows-latest
python-version: "3.11"
shell: cmd
pytest_args: '-m "not forked"'
pytest_args: '-m "not forked and not rfam"'
- os: windows-latest
python-version: "3.13"
shell: cmd
pytest_args: '-m "not forked"'
pytest_args: '-m "not forked and not rfam"'
defaults:
run:

View File

@@ -9,7 +9,6 @@ from dlt.sources.credentials import ConnectionStringCredentials
from dlt.sources.sql_database import sql_database, sql_table, Table
from sqlalchemy.sql.sqltypes import TypeEngine
import sqlalchemy as sa
@@ -105,46 +104,13 @@ def load_standalone_table_resource() -> None:
defer_table_reflect=True,
)
# Run the resources together
info = pipeline.extract([family, genome], write_disposition="merge")
# Run the resources together (just take one page of results to make it faster)
info = pipeline.extract([family.add_limit(1), genome.add_limit(1)], write_disposition="merge")
print(info)
# Show inferred columns
print(pipeline.default_schema.to_pretty_yaml())
def select_columns() -> None:
"""Uses table adapter callback to modify list of columns to be selected"""
pipeline = dlt.pipeline(
pipeline_name="rfam_database",
destination="duckdb",
dataset_name="rfam_data_cols",
dev_mode=True,
)
def table_adapter(table: Table) -> Table:
print(table.name)
if table.name == "family":
# this is SqlAlchemy table. _columns are writable
# let's drop updated column
table._columns.remove(table.columns["updated"]) # type: ignore
return table
family = sql_table(
credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
table="family",
chunk_size=10,
reflection_level="full_with_precision",
table_adapter_callback=table_adapter,
)
# also we do not want the whole table, so we add limit to get just one chunk (10 records)
pipeline.run(family.add_limit(1))
# only 10 rows
print(pipeline.last_trace.last_normalize_info)
# no "updated" column in "family" table
print(pipeline.default_schema.to_pretty_yaml())
def select_with_end_value_and_row_order() -> None:
"""Gets data from a table withing a specified range and sorts rows descending"""
pipeline = dlt.pipeline(
@@ -347,9 +313,6 @@ if __name__ == "__main__":
# Load selected tables with different settings
# load_select_tables_from_database()
# load a table and select columns
# select_columns()
# load_entire_database()
# select_with_end_value_and_row_order()

View File

@@ -158,8 +158,10 @@ def _maybe_parse_native_value(
.as_dict_nondefault()
.items()
}
except (ValueError, NotImplementedError) as v_err:
except ValueError as v_err:
raise InvalidNativeValue(type(config), type(native_value), embedded_sections, v_err)
except NotImplementedError:
pass
return native_value # type: ignore[no-any-return]

View File

@@ -385,7 +385,7 @@ def validate_and_filter_items(
deleted.add(err_idx)
else:
raise NotImplementedError(
f"`{column_mode=:}` not implemented for Pydantic validation"
f"`{data_mode=:}` not implemented for Pydantic validation"
)
# validate again with error items removed

View File

@@ -418,13 +418,14 @@ class Schema:
def merge_hints(
self,
new_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]],
replace: bool = False,
normalize_identifiers: bool = True,
) -> None:
"""Merges existing default hints with `new_hints`. Normalizes names in column regexes if possible. Compiles setting at the end
"""Merges or replace existing default hints with `new_hints`. Normalizes names in column regexes if possible. Compiles setting at the end
NOTE: you can manipulate default hints collection directly via `Schema.settings` as long as you call Schema._compile_settings() at the end.
"""
self._merge_hints(new_hints, normalize_identifiers)
self._merge_hints(new_hints, replace=replace, normalize_identifiers=normalize_identifiers)
self._compile_settings()
def update_preferred_types(
@@ -813,6 +814,7 @@ class Schema:
def _merge_hints(
self,
new_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]],
replace: bool = False,
normalize_identifiers: bool = True,
) -> None:
"""Used by `merge_hints method, does not compile settings at the end"""
@@ -829,7 +831,7 @@ class Schema:
default_hints = self._settings.setdefault("default_hints", {})
# add `new_hints` to existing hints
for h, l in new_hints.items():
if h in default_hints:
if h in default_hints and not replace:
extend_list_deduplicated(default_hints[h], l, utils.canonical_simple_regex)
else:
# set new hint type

View File

@@ -16,6 +16,7 @@ You can get a Spotify client ID and secret from https://developer.spotify.com/.
We'll learn how to:
- Use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Delegate the embeddings to LanceDB using OpenAI Embeddings
- Use Pydantic for unified dlt and lancedb schema validation
"""
__source_name__ = "spotify"
@@ -59,10 +60,11 @@ os.environ["OPENAI_API_KEY"] = openai_api_key
class EpisodeSchema(LanceModel):
"""Used for dlt and lance schema validation"""
id: str # noqa: A003
name: str
description: str = func.SourceField()
vector: Vector(func.ndims()) = func.VectorField() # type: ignore[valid-type]
release_date: datetime.date
audio_preview_url: str
duration_ms: int
@@ -71,6 +73,12 @@ class EpisodeSchema(LanceModel):
# there is more data but we are not using it ...
class EpisodeSchemaVector(EpisodeSchema):
"""Adds lance vector field"""
vector: Vector(func.ndims()) = func.VectorField() # type: ignore[valid-type]
@dataclass(frozen=True)
class Shows:
monday_morning_data_chat: str = "3Km3lBNzJpc1nOTJUtbtMh"
@@ -120,11 +128,20 @@ def spotify_shows(
yield dlt.resource(
client.paginate(url, params={"limit": 50}),
name=show_name,
write_disposition="merge",
primary_key="id",
parallelized=True,
max_table_nesting=0,
)
# reuse lance model to filter out all non-matching items and extra columns from spotify api
# 1. unknown columns are removed ("columns": "discard_value")
# 2. non validating items (ie. without id or url) are removed ("data_type": "discard_row")
# 3. for some reason None values are returned as well 🤯, add_filter takes care of that
columns=EpisodeSchema,
schema_contract={
"tables": "evolve",
"columns": "discard_value",
"data_type": "discard_row",
},
).add_filter(lambda i: i is not None)
@dlt.destination(batch_size=250, name="lancedb")
@@ -135,13 +152,7 @@ def lancedb_destination(items: TDataItems, table: TTableSchema) -> None:
try:
tbl = db.open_table(table["name"])
except ValueError:
tbl = db.create_table(table["name"], schema=EpisodeSchema)
# remove all fields that are not in the schema
for item in items:
keys_to_remove = [key for key in item.keys() if key not in EpisodeSchema.model_fields]
for key in keys_to_remove:
del item[key]
tbl = db.create_table(table["name"], schema=EpisodeSchemaVector)
tbl.add(items)

View File

@@ -11,9 +11,11 @@ EXAMPLES_DIR = "../examples"
# settings
SKIP_FOLDERS = ["archive", ".", "_", "local_cache"]
SKIP_EXAMPLES: List[str] = []
# @pytest.mark.rfam
SKIP_EXAMPLES: List[str] = ["backfill_in_chunks", "connector_x_arrow"]
SKIP_FORK_EXAMPLES: List[str] = ["custom_destination_lancedb"]
# the entry point for the script
MAIN_CLAUSE = 'if __name__ == "__main__":'

View File

@@ -730,7 +730,36 @@ databricks_adapter(
## Troubleshooting
Use the following steps to avoid conflicts with Databricks' built-in Delta Live Tables (DLT) module and enable dltHub integration.
### 1. Add an `init` script
### Enable dlt on serverless (16.x)
Live Tables (DLT) are not available on serverless but the import machinery that is patching DLT is still there in form of import hooks. You
can temporarily disable this machinery to import `dlt` and use it afterwards. In a notebook cell (assuming that `dlt` is already installed):
```sh
%restart_python
```
```py
import sys
# dlt patching hook is the first one on the list
metas = list(sys.meta_path)
sys.meta_path = metas[1:]
# remove RUNTIME - uncomment on dlt before 1.18.0
# import os
# del os.environ["RUNTIME"]
import dlt
sys.meta_path = metas # restore post import hooks
# use dlt
info = dlt.run([1, 2, 3], destination=dlt.destinations.filesystem("_data"), table_name="digits")
print(info)
```
### Enable dlt on a cluster
#### 1. Add an `init` script
To ensure compatibility with the dltHub's dlt package in Databricks, add an `init` script that runs at cluster startup. This script installs the dlt package from dltHub, renames Databricks built-in DLT module to avoid naming conflicts, and updates internal references to allow continued use under the alias `dlt_dbricks`.
1. In your Databricks workspace directory, create a new file named `init.sh` and add the following content:
@@ -767,7 +796,7 @@ The following locations have been confirmed for the two latest LTS runtime versi
- 15.4 LTS: /databricks/python_shell/lib/dbruntime/DeltaLiveTablesHook.py
:::
### 2. Remove preloaded databricks modules in the notebook
#### 2. Remove preloaded databricks modules in the notebook
After the cluster starts, Databricks may partially import its built-in Delta Live Tables (DLT) modules, which can interfere with the dlt package from dltHub.
To ensure a clean environment, add the following code at the top of your notebook:

View File

@@ -1,4 +1,4 @@
import os
import pytest
from tests.pipeline.utils import assert_load_info
@@ -91,6 +91,7 @@ def api_snippet() -> None:
assert_load_info(load_info)
@pytest.mark.rfam
def db_snippet() -> None:
# @@@DLT_SNIPPET_START db
import dlt

View File

@@ -1,3 +1,4 @@
import pytest
from tests.pipeline.utils import assert_load_info
@@ -50,6 +51,7 @@ def csv_snippet() -> None:
assert_load_info(load_info)
@pytest.mark.rfam
def db_snippet() -> None:
# @@@DLT_SNIPPET_START db
import dlt

View File

@@ -401,7 +401,7 @@ multi_line_output = 3
[tool.pytest.ini_options]
pythonpath = ["dlt", "docs/website/docs"]
norecursedirs = [".direnv", ".eggs", "build", "dist"]
addopts = "--showlocals --durations 10"
addopts = "--showlocals --durations 10 -m 'not rfam'"
xfail_strict = true
log_cli_level = "INFO"
console_output_style = "count"
@@ -410,5 +410,6 @@ python_functions = ["*_test", "test_*", "*_snippet"]
filterwarnings = ["ignore::DeprecationWarning"]
markers = [
"essential: marks all essential tests",
"no_load: marks tests that do not load anything"
"no_load: marks tests that do not load anything",
"rfam: marks tests that use rfam db"
]

View File

@@ -54,6 +54,12 @@ class EmbeddedWithIgnoredEmbeddedConfiguration(BaseConfiguration):
ignored_embedded: EmbeddedIgnoredWithSectionedConfiguration = None
@configspec
class SectionedNativeValueConfiguration(SectionedConfiguration):
def parse_native_representation(self, native_value: Any) -> None:
raise ValueError(native_value)
def test_sectioned_configuration(environment: Any, env_provider: ConfigProvider) -> None:
with pytest.raises(ConfigFieldMissingException) as exc_val:
resolve.resolve_configuration(SectionedConfiguration())
@@ -83,6 +89,18 @@ def test_sectioned_configuration(environment: Any, env_provider: ConfigProvider)
assert C.password == "PASS"
def test_sectioned_configuration_ignore_native_values(environment) -> None:
environment["DLT_TEST"] = "invalid()"
environment["DLT_TEST__PASSWORD"] = "PASS"
resolve.resolve_configuration(SectionedConfiguration())
# same for runtime
from dlt.common.configuration.specs import RuntimeConfiguration
environment["RUNTIME"] = "16.0LTS"
resolve.resolve_configuration(RuntimeConfiguration())
def test_explicit_sections(mock_provider: MockProvider) -> None:
mock_provider.value = "value"
# mock providers separates sections with | and key with -
@@ -265,9 +283,9 @@ def test_section_with_pipeline_name(mock_provider: MockProvider) -> None:
# "PIPE", "DLT_TEST"
mock_provider.return_value_on = ()
mock_provider.reset_stats()
# () will return "value" which cannot be parsed by the SectionedConfiguration
# () will return "value" which cannot be parsed by the SectionedNativeValueConfiguration
with pytest.raises(InvalidNativeValue):
resolve.resolve_configuration(SectionedConfiguration())
resolve.resolve_configuration(SectionedNativeValueConfiguration())
mock_provider.return_value_on = ("DLT_TEST",)
mock_provider.reset_stats()
resolve.resolve_configuration(SectionedConfiguration())

View File

@@ -19,7 +19,7 @@ from dlt.common.schema.exceptions import (
from dlt.common.schema.typing import TLoaderMergeStrategy, TTableFormat
from dlt.common.typing import StrAny
from dlt.common.utils import digest128
from dlt.common.destination import AnyDestination, DestinationCapabilitiesContext
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import DestinationCapabilitiesException
from dlt.common.libs.pyarrow import row_tuples_to_arrow
@@ -71,6 +71,10 @@ def test_merge_on_keys_in_schema_nested_hints(
with open("tests/common/cases/schemas/eth/ethereum_schema_v11.yml", "r", encoding="utf-8") as f:
schema = dlt.Schema.from_dict(yaml.safe_load(f))
if destination_config.destination_type == "databricks":
# remove `partition` hint because it conflicts with `cluster` on databricks
schema.merge_hints({"partition": []}, replace=True)
# make block uncles unseen to trigger filtering loader in loader for nested tables
if has_table_seen_data(schema.tables["blocks__uncles"]):
del schema.tables["blocks__uncles"]["x-normalizer"]

View File

@@ -191,13 +191,13 @@ def test_load_sql_table_incremental(
assert_row_counts(pipeline, postgres_db, tables)
@pytest.mark.skip(reason="Skipping this test temporarily")
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("backend", ["sqlalchemy", "pandas", "pyarrow", "connectorx"])
@pytest.mark.rfam
def test_load_mysql_data_load(
destination_config: DestinationTestConfiguration, backend: TableBackend
) -> None:

View File

@@ -9,13 +9,13 @@ import pytest
"load_select_tables_from_database",
# "load_entire_database",
"load_standalone_table_resource",
"select_columns",
"specify_columns_to_load",
"test_pandas_backend_verbatim_decimals",
"select_with_end_value_and_row_order",
"my_sql_via_pyarrow",
),
)
@pytest.mark.rfam
def test_all_examples(example_name: str) -> None:
from dlt._workspace._templates._core_source_templates import sql_database_pipeline

View File

@@ -14,6 +14,7 @@ import importlib
("fruitshop_pipeline", ("load_shop",)),
],
)
@pytest.mark.rfam
def test_debug_pipeline(template_name: str, examples: str) -> None:
demo_module = importlib.import_module(
f"dlt._workspace._templates._single_file_templates.{template_name}"