Files
dlt/tests/conftest.py
rudolfix 6f015553eb feat/explains partition and split loading (#2737)
* extracts a method to count rows in items in data writers

* drains mssql cursor from recordsets, disables multi-statement execution due to driver problems

* allows to enable and disable root key propagation via source setting, uses normalizer config prop, adds tests and docs

* allows to use parent_key if nesting level < 2

* documents standard http session settings, sets shorter timeouts in telemetry

* adds way to count rows in add_limit, fixes edge cases

* propagates error when generating sql jobs

* skips two step table create in pyiceberg if no partitions

* adds docs and examples for backfilling

* excludes md from lfs

* fixes incorrect exit condition in python object incremental open start range

* simplifies and documents pipeline drop

* updates tables in schema in nesting order

* makes encoding NotRequired in FileItem

* makes filesystem source to follow row_order

* explains partition and split loading, sql_database tests and examples

* fixes add_limit max_items and legacy root key with tests

* fixes docs link

* fixes and tests schema.drop_tables

* improves docs, fixes links and tests

* tests, docs and regression fixes

* also counts empty pages in add_limit

* fixes scd2 tests

* fixes wrong root_key usage in scd2 sqlalchemy

* fixes tests

* makes Incremental to return None on fully filtered batches

* review fixes

* fixes tx scope in backfill db test
2025-09-20 11:53:26 +02:00

178 lines
6.4 KiB
Python

import os
import dataclasses
import logging
from typing import Dict, List, Any
# patch which providers to enable
from dlt.common.configuration.providers import (
ConfigProvider,
EnvironProvider,
SecretsTomlProvider,
ConfigTomlProvider,
GoogleSecretsProvider,
)
from dlt.common.configuration.specs.config_providers_context import (
ConfigProvidersConfiguration,
)
from dlt.common.runtime.run_context import RunContext
def initial_providers(self) -> List[ConfigProvider]:
# do not read the global config
return [
EnvironProvider(),
SecretsTomlProvider(settings_dir="tests/.dlt"),
ConfigTomlProvider(settings_dir="tests/.dlt"),
]
RunContext.initial_providers = initial_providers # type: ignore[method-assign]
# also disable extras
ConfigProvidersConfiguration.enable_airflow_secrets = False
ConfigProvidersConfiguration.enable_google_secrets = False
CACHED_GOOGLE_SECRETS: Dict[str, Any] = {}
class CachedGoogleSecretsProvider(GoogleSecretsProvider):
def _look_vault(self, full_key, hint):
if full_key not in CACHED_GOOGLE_SECRETS:
CACHED_GOOGLE_SECRETS[full_key] = super()._look_vault(full_key, hint)
return CACHED_GOOGLE_SECRETS[full_key]
def _list_vault(self):
key_ = "__list_vault"
if key_ not in CACHED_GOOGLE_SECRETS:
CACHED_GOOGLE_SECRETS[key_] = super()._list_vault()
return CACHED_GOOGLE_SECRETS[key_]
from dlt.common.configuration.providers import google_secrets
google_secrets.GoogleSecretsProvider = CachedGoogleSecretsProvider # type: ignore[misc]
def pytest_configure(config):
# patch the configurations to use test storage by default, we modify the types (classes) fields
# the dataclass implementation will use those patched values when creating instances (the values present
# in the declaration are not frozen allowing patching). this is needed by common storage tests
from dlt.common.configuration.specs import runtime_configuration
from dlt.common.storages import configuration as storage_configuration
test_storage_root = "_storage"
runtime_configuration.RuntimeConfiguration.config_files_storage_path = os.path.join(
test_storage_root, "config/"
)
# always use CI track endpoint when running tests
runtime_configuration.RuntimeConfiguration.dlthub_telemetry_endpoint = (
"https://telemetry-tracker.services4758.workers.dev"
)
delattr(runtime_configuration.RuntimeConfiguration, "__init__")
runtime_configuration.RuntimeConfiguration = dataclasses.dataclass( # type: ignore[misc]
runtime_configuration.RuntimeConfiguration, init=True, repr=False
) # type: ignore
storage_configuration.LoadStorageConfiguration.load_volume_path = os.path.join(
test_storage_root, "load"
)
delattr(storage_configuration.LoadStorageConfiguration, "__init__")
storage_configuration.LoadStorageConfiguration = dataclasses.dataclass( # type: ignore[misc,call-overload]
storage_configuration.LoadStorageConfiguration, init=True, repr=False
)
storage_configuration.NormalizeStorageConfiguration.normalize_volume_path = os.path.join(
test_storage_root, "normalize"
)
# delete __init__, otherwise it will not be recreated by dataclass
delattr(storage_configuration.NormalizeStorageConfiguration, "__init__")
storage_configuration.NormalizeStorageConfiguration = dataclasses.dataclass( # type: ignore[misc,call-overload]
storage_configuration.NormalizeStorageConfiguration, init=True, repr=False
)
storage_configuration.SchemaStorageConfiguration.schema_volume_path = os.path.join(
test_storage_root, "schemas"
)
delattr(storage_configuration.SchemaStorageConfiguration, "__init__")
storage_configuration.SchemaStorageConfiguration = dataclasses.dataclass( # type: ignore[misc,call-overload]
storage_configuration.SchemaStorageConfiguration, init=True, repr=False
)
assert runtime_configuration.RuntimeConfiguration.config_files_storage_path == os.path.join(
test_storage_root, "config/"
)
assert runtime_configuration.RuntimeConfiguration().config_files_storage_path == os.path.join(
test_storage_root, "config/"
)
# path pipeline instance id up to millisecond
from dlt.common import pendulum
from dlt.pipeline.pipeline import Pipeline
def _create_pipeline_instance_id(self) -> str:
return pendulum.now().format("_YYYYMMDDhhmmssSSSS")
Pipeline._create_pipeline_instance_id = _create_pipeline_instance_id # type: ignore[method-assign]
# disable sqlfluff logging
for log in ["sqlfluff.parser", "sqlfluff.linter", "sqlfluff.templater", "sqlfluff.lexer"]:
logging.getLogger(log).setLevel("ERROR")
# disable snowflake logging
for log in ["snowflake.connector.cursor", "snowflake.connector.connection"]:
logging.getLogger(log).setLevel("ERROR")
# disable azure logging
for log in ["azure.core.pipeline.policies.http_logging_policy"]:
logging.getLogger(log).setLevel("ERROR")
# disable databricks logging
for log in ["databricks.sql.client", "databricks.sql.auth"]:
logging.getLogger(log).setLevel("WARNING")
# disable httpx request logging (too verbose when testing qdrant)
logging.getLogger("httpx").setLevel("WARNING")
# disable googleapiclient logging
logging.getLogger("googleapiclient.discovery_cache").setLevel("WARNING")
# disable pyiceberg logging
logging.getLogger("pyiceberg").setLevel("WARNING")
# reset and init airflow db
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
from airflow.utils import db
import contextlib
import io
for log in [
"airflow.models.crypto",
"airflow.models.variable",
"airflow",
"alembic",
"alembic.runtime.migration",
]:
logging.getLogger(log).setLevel("ERROR")
with (
contextlib.redirect_stdout(io.StringIO()),
contextlib.redirect_stderr(io.StringIO()),
):
db.resetdb()
except Exception:
pass
# import faulthandler, atexit, sys
# faulthandler.enable() # makes sure the module is initialised
# atexit.register(lambda: faulthandler.dump_traceback(file=sys.stderr, all_threads=True))