Feature: Introduce support of http based resources for fs source (#3029)

* Feature, Add support of http based paths

* Feature, Add support of http resources

* Feature, Enforce coercion to pendulum types. Add support of RFC 1123 format

* Feature, Add cloudfront base_url to the configurations

* Feature, Add a test for http based resources

* Feature, Add a test case for RFC 1123 datetime format

* Feature, Remove test cases related to datetime parsing in RFC and timestamp formats

* Revert "Feature, Enforce coercion to pendulum types. Add support of RFC 1123 format"

This reverts commit 142624b24a.

* Feature, Restore the structure of the url for the cdn

* Feature, Replace custom datetime parser function with a single dispatched one

* Feature, Add a stub package for singledispatch

* Feature, Reffactor pendulume datetime processing functions

* Feature, Fix the linting errors in time related tests

* Feature, Fix the declaration

* Feature, Revert the changes related to datetime parsing

* Feature, Add http schema for testing. Add pendulum parser to support RFC 1123 format

* Feature, Update the configuration for http bucket

* Feature, Add a http server. Update the test for http fs

* Feature, Upgrade fsspec

* Feature, Fix codestyle

* Feature, Fix the protocol validation for fsspec args

* Feature, Fix the typing annotations

* Add an example for http filesystem

* Feature, Add schema to the urlparse call

* Feature, Fix the codestyle for http entries in MIME_DISPATCH

* Feature, Expand the list of supported locations in the docs

* uses more random port and closes httpd to release it properly, drops auto fixture as it would be attached to all tests

* moves httpd tests to common tests

* adds http extra to support fsspec

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
This commit is contained in:
Max Yakovenko
2025-10-23 18:08:15 +03:00
committed by GitHub
parent f14eca1cfb
commit 98c81466ea
13 changed files with 140 additions and 19 deletions

View File

@@ -131,7 +131,7 @@ jobs:
if: matrix.python-version != '3.14.0-beta.4'
- name: Install pipeline and sources dependencies
run: uv sync ${{ matrix.uv_sync_args }} --extra duckdb --extra cli --extra parquet --extra deltalake --extra sql_database --group sentry-sdk --group pipeline --group sources --group ibis
run: uv sync ${{ matrix.uv_sync_args }} --extra http --extra duckdb --extra cli --extra parquet --extra deltalake --extra sql_database --group sentry-sdk --group pipeline --group sources --group ibis
if: matrix.python-version != '3.14.0-beta.4'
- name: Run extract and pipeline tests

View File

@@ -137,7 +137,17 @@ def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
return uri
MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url, "sftp": _make_sftp_url}
def _make_http_url(schema: str, fs_path: str, bucket_url: str) -> str:
parsed_http_url = urlparse(fs_path, schema)
return urlunparse(parsed_http_url)
MAKE_URI_DISPATCH = {
"az": _make_az_url,
"file": _make_file_url,
"sftp": _make_sftp_url,
"https": _make_http_url,
}
MAKE_URI_DISPATCH["adl"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfs"] = MAKE_URI_DISPATCH["az"]

View File

@@ -64,6 +64,12 @@ MTIME_DISPATCH = {
"adl": lambda f: ensure_pendulum_datetime_utc(f["LastModified"]),
"az": lambda f: ensure_pendulum_datetime_utc(f["last_modified"]),
"gcs": lambda f: ensure_pendulum_datetime_utc(f["updated"]),
"https": lambda f: cast(
pendulum.DateTime, pendulum.parse(f["Last-Modified"], exact=True, strict=False)
),
"http": lambda f: cast(
pendulum.DateTime, pendulum.parse(f["Last-Modified"], exact=True, strict=False)
),
"file": lambda f: ensure_pendulum_datetime_utc(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime_utc(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime_utc(f["modifiedTime"]),
@@ -157,7 +163,7 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
fs_kwargs.update(DEFAULT_KWARGS.get(protocol, {}))
if protocol == "sftp":
if protocol in ("https", "http", "sftp"):
fs_kwargs.clear()
if config.kwargs is not None:

View File

@@ -23,7 +23,28 @@ For the most common cases we provide `readers` source that does the above in a s
## Quick example
In two steps:
Let's see how to load a parquet file from a public website. The following example downloads a single file of yellow taxi trip records from the [NYC Taxi & Limousine Commission](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) website and loads it into DuckDB.
```py
import datetime as dt
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
filesystem_resource = filesystem(
bucket_url="https://d37ci6vzurychx.cloudfront.net/trip-data",
file_glob=f"yellow_tripdata_{(dt.datetime.now() - dt.timedelta(days=90)).strftime('%Y-%m')}.parquet",
)
filesystem_pipe = filesystem_resource | read_parquet()
# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("yellow_tripdata"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```
This section illustrates how to perform an efficient incremental load of Parquet files from a remote source, specifically an S3 bucket.
```py
import dlt

View File

@@ -11,6 +11,7 @@ The filesystem source allows seamless loading of files from the following locati
* Azure Blob Storage
* remote filesystem (via SFTP)
* local filesystem
* Public CDN
The filesystem source natively supports [CSV](../../file-formats/csv.md), [Parquet](../../file-formats/parquet.md), and [JSONL](../../file-formats/jsonl.md) files and allows customization for loading any type of structured file.

View File

@@ -52,7 +52,7 @@ dependencies = [
"orjson>=3.11.0 ; python_version > '3.13'",
"tenacity>=8.0.2",
"jsonpath-ng>=1.5.3",
"fsspec>=2022.4.0",
"fsspec>=2025.9.0",
"packaging>=21.1",
"pluggy>=1.3.0",
"win-precise-time>=1.4.2 ; os_name == 'nt' and python_version < '3.13'",
@@ -108,6 +108,9 @@ az = [
sftp = [
"paramiko>=3.3.0"
]
http = [
"aiohttp>3.9.0"
]
snowflake = ["snowflake-connector-python>=3.5.0"]
motherduck = [
"duckdb>=0.9",

View File

@@ -5,6 +5,7 @@ dlthub_telemetry=false
[tests]
bucket_url_gs="gs://ci-test-bucket"
bucket_url_s3="s3://dlt-ci-test-bucket"
bucket_url_http="http://localhost:8189"
# uses local_dir which is _storage/data
bucket_url_file="data"
bucket_url_az="az://dlt-ci-test-bucket"

View File

@@ -9,7 +9,11 @@ from tests.load.utils import (
drop_pipeline,
empty_schema,
)
from tests.utils import preserve_environ, auto_test_run_context, autouse_test_storage
from tests.utils import (
preserve_environ,
auto_test_run_context,
autouse_test_storage,
)
@pytest.fixture(scope="function", params=DEFAULT_BUCKETS)

View File

@@ -21,7 +21,7 @@ from tests.pipeline.utils import (
load_table_counts,
assert_query_column,
)
from tests.utils import TEST_STORAGE_ROOT
from tests.utils import TEST_STORAGE_ROOT, public_http_server
from tests.load.sources.filesystem.cases import GLOB_RESULTS, TESTS_BUCKET_URLS

View File

@@ -85,6 +85,7 @@ GDRIVE_BUCKET = dlt.config.get("tests.bucket_url_gdrive", str)
FILE_BUCKET = dlt.config.get("tests.bucket_url_file", str)
R2_BUCKET = dlt.config.get("tests.bucket_url_r2", str)
SFTP_BUCKET = dlt.config.get("tests.bucket_url_sftp", str)
HTTP_BUCKET = dlt.config.get("tests.bucket_url_http", str)
MEMORY_BUCKET = dlt.config.get("tests.memory", str)
ALL_FILESYSTEM_DRIVERS = dlt.config.get("ALL_FILESYSTEM_DRIVERS", list) or [
@@ -95,6 +96,7 @@ ALL_FILESYSTEM_DRIVERS = dlt.config.get("ALL_FILESYSTEM_DRIVERS", list) or [
"gdrive",
"file",
"memory",
"https",
"r2",
"sftp",
]

View File

@@ -0,0 +1,28 @@
import pytest
import dlt
from dlt.sources.filesystem import filesystem
from tests.load.utils import HTTP_BUCKET
from tests.pipeline.utils import assert_load_info
from tests.utils import public_http_server
@pytest.mark.parametrize(
"bucket_url",
[
HTTP_BUCKET,
],
)
def test_http_filesystem(public_http_server, bucket_url: str):
public_resource = filesystem(bucket_url=bucket_url, file_glob="parquet/mlb_players.parquet")
pipeline = dlt.pipeline("test_http_load", dev_mode=True, destination="duckdb")
# just execute iterator
load_info = pipeline.run(
[
public_resource.with_name("http_parquet_example"),
]
)
assert_load_info(load_info)
assert pipeline.last_trace.last_normalize_info.row_counts["http_parquet_example"] == 1

View File

@@ -1,9 +1,14 @@
import contextlib
from http import HTTPStatus
import http.server
import multiprocessing
import os
import platform
import threading
import sys
from functools import partial
from os import environ
from pathlib import Path
from typing import Any, Iterable, Iterator, Literal, Optional, Union, get_args, List
from unittest.mock import patch
@@ -35,7 +40,7 @@ from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableFormat
from dlt.common.storages import FileStorage
from dlt.common.storages.versioned_storage import VersionedStorage
from dlt.common.typing import StrAny, TDataItem
from dlt.common.typing import StrAny, TDataItem, PathLike
from dlt.common.utils import set_working_dir
TEST_STORAGE_ROOT = "_storage"
@@ -139,6 +144,19 @@ def TEST_DICT_CONFIG_PROVIDER():
return provider
class PublicCDNHandler(http.server.SimpleHTTPRequestHandler):
@classmethod
def factory(cls, *args, directory: Path) -> "PublicCDNHandler":
return cls(*args, directory=directory)
def __init__(self, *args, directory: Optional[Path] = None):
super().__init__(*args, directory=str(directory) if directory else None)
def list_directory(self, path: Union[str, PathLike]) -> None:
self.send_error(HTTPStatus.FORBIDDEN, "Directory listing is forbidden")
return None
class MockHttpResponse(Response):
def __init__(self, status_code: int) -> None:
self.status_code = status_code
@@ -204,6 +222,29 @@ def auto_module_test_run_context(auto_module_test_storage) -> Iterator[None]:
yield from create_test_run_context()
@pytest.fixture
def public_http_server():
"""
A simple HTTP server serving files from the current directory.
Used to simulate public CDN. It allows only file access, directory listing is forbidden.
"""
httpd = http.server.ThreadingHTTPServer(
("localhost", 8189),
partial(
PublicCDNHandler.factory, directory=Path.cwd().joinpath("tests/common/storages/samples")
),
)
server_thread = threading.Thread(target=httpd.serve_forever, daemon=True)
server_thread.start()
try:
yield httpd
finally:
# always close
httpd.shutdown()
server_thread.join()
httpd.server_close()
def create_test_run_context() -> Iterator[None]:
# this plugs active context
ctx = PluggableRunContext()

26
uv.lock generated
View File

@@ -2140,6 +2140,9 @@ gcp = [
gs = [
{ name = "gcsfs" },
]
http = [
{ name = "aiohttp" },
]
lancedb = [
{ name = "lancedb", marker = "python_full_version < '3.13'" },
{ name = "pyarrow" },
@@ -2336,6 +2339,7 @@ requires-dist = [
{ name = "adlfs", marker = "extra == 'az'", specifier = ">=2024.7.0" },
{ name = "adlfs", marker = "extra == 'clickhouse'", specifier = ">=2024.7.0" },
{ name = "adlfs", marker = "extra == 'synapse'", specifier = ">=2024.7.0" },
{ name = "aiohttp", marker = "extra == 'http'", specifier = ">3.9.0" },
{ name = "alembic", marker = "extra == 'sqlalchemy'", specifier = ">1.10.0" },
{ name = "botocore", marker = "extra == 'athena'", specifier = ">=1.28" },
{ name = "botocore", marker = "extra == 'filesystem'", specifier = ">=1.28" },
@@ -2354,7 +2358,7 @@ requires-dist = [
{ name = "duckdb", marker = "extra == 'ducklake'", specifier = ">=1.2.0" },
{ name = "duckdb", marker = "extra == 'motherduck'", specifier = ">=0.9" },
{ name = "duckdb", marker = "extra == 'workspace'", specifier = ">=0.9" },
{ name = "fsspec", specifier = ">=2022.4.0" },
{ name = "fsspec", specifier = ">=2025.9.0" },
{ name = "gcsfs", marker = "extra == 'bigquery'", specifier = ">=2022.4.0" },
{ name = "gcsfs", marker = "extra == 'clickhouse'", specifier = ">=2022.4.0" },
{ name = "gcsfs", marker = "extra == 'gcp'", specifier = ">=2022.4.0" },
@@ -2430,7 +2434,7 @@ requires-dist = [
{ name = "weaviate-client", marker = "extra == 'weaviate'", specifier = ">=3.22" },
{ name = "win-precise-time", marker = "python_full_version < '3.13' and os_name == 'nt'", specifier = ">=1.4.2" },
]
provides-extras = ["gcp", "bigquery", "postgres", "redshift", "parquet", "duckdb", "ducklake", "filesystem", "s3", "gs", "az", "sftp", "snowflake", "motherduck", "cli", "athena", "weaviate", "mssql", "synapse", "qdrant", "databricks", "clickhouse", "dremio", "lancedb", "deltalake", "sql-database", "sqlalchemy", "pyiceberg", "postgis", "workspace", "dbml"]
provides-extras = ["gcp", "bigquery", "postgres", "redshift", "parquet", "duckdb", "ducklake", "filesystem", "s3", "gs", "az", "sftp", "http", "snowflake", "motherduck", "cli", "athena", "weaviate", "mssql", "synapse", "qdrant", "databricks", "clickhouse", "dremio", "lancedb", "deltalake", "sql-database", "sqlalchemy", "pyiceberg", "postgis", "workspace", "dbml"]
[package.metadata.requires-dev]
adbc = [{ name = "adbc-driver-postgresql", specifier = ">=1.6.0" }]
@@ -3210,16 +3214,16 @@ wheels = [
[[package]]
name = "fsspec"
version = "2025.5.1"
version = "2025.9.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/00/f7/27f15d41f0ed38e8fcc488584b57e902b331da7f7c6dcda53721b15838fc/fsspec-2025.5.1.tar.gz", hash = "sha256:2e55e47a540b91843b755e83ded97c6e897fa0942b11490113f09e9c443c2475", size = 303033, upload-time = "2025-05-24T12:03:23.792Z" }
sdist = { url = "https://files.pythonhosted.org/packages/de/e0/bab50af11c2d75c9c4a2a26a5254573c0bd97cea152254401510950486fa/fsspec-2025.9.0.tar.gz", hash = "sha256:19fd429483d25d28b65ec68f9f4adc16c17ea2c7c7bf54ec61360d478fb19c19", size = 304847, upload-time = "2025-09-02T19:10:49.215Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/61/78c7b3851add1481b048b5fdc29067397a1784e2910592bc81bb3f608635/fsspec-2025.5.1-py3-none-any.whl", hash = "sha256:24d3a2e663d5fc735ab256263c4075f374a174c3410c0b25e5bd1970bceaa462", size = 199052, upload-time = "2025-05-24T12:03:21.66Z" },
{ url = "https://files.pythonhosted.org/packages/47/71/70db47e4f6ce3e5c37a607355f80da8860a33226be640226ac52cb05ef2e/fsspec-2025.9.0-py3-none-any.whl", hash = "sha256:530dc2a2af60a414a832059574df4a6e10cce927f6f4a78209390fe38955cfb7", size = 199289, upload-time = "2025-09-02T19:10:47.708Z" },
]
[[package]]
name = "gcsfs"
version = "2025.5.1"
version = "2025.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
@@ -3230,9 +3234,9 @@ dependencies = [
{ name = "google-cloud-storage" },
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d8/4a/47ad326cc74ccfd97e125c0087a36d516ed74c61f53e458067737378d0f2/gcsfs-2025.5.1.tar.gz", hash = "sha256:ba945530cf4857cd9d599ccb3ae729c65c39088880b11c4df1fecac30df5f3e3", size = 82173, upload-time = "2025-05-24T12:12:58.519Z" }
sdist = { url = "https://files.pythonhosted.org/packages/54/55/cd737f96929f9cf52666bd49e9b4d1aac697655b3ab17c49ab4fb587bb12/gcsfs-2025.9.0.tar.gz", hash = "sha256:36b8c379d9789d5332a45a3aa2840ec518ff73c6d21c1e962f53318d1cd65db9", size = 82843, upload-time = "2025-09-02T19:23:19.22Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ff/eb/9182e875592c48d282c5eab602000f0618817b9011b2b2925165e4b4b7f3/gcsfs-2025.5.1-py2.py3-none-any.whl", hash = "sha256:48712471ff71ac83d3e2152ba4dc232874698466e344d5e700feba06b0a0de7b", size = 36581, upload-time = "2025-05-24T12:12:57.011Z" },
{ url = "https://files.pythonhosted.org/packages/4c/41/f793f3bae39f9fbaabf935d0afcf3aa99e417c92096f3e232ac085aaddbf/gcsfs-2025.9.0-py2.py3-none-any.whl", hash = "sha256:38208bc79af60c693e44ff2f0bd6fd3ca664fea1940fe6770ac1c6003aa0f559", size = 36893, upload-time = "2025-09-02T19:23:18.002Z" },
]
[[package]]
@@ -8450,16 +8454,16 @@ wheels = [
[[package]]
name = "s3fs"
version = "2025.5.1"
version = "2025.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiobotocore" },
{ name = "aiohttp" },
{ name = "fsspec" },
]
sdist = { url = "https://files.pythonhosted.org/packages/7f/6f/d0ee452580d7d0643a1a776b95dfef2144023f3fc077038e07d651995d34/s3fs-2025.5.1.tar.gz", hash = "sha256:84beffa231b8ed94f8d667e93387b38351e1c4447aedea5c2c19dd88b7fcb658", size = 77276, upload-time = "2025-05-24T12:14:11.442Z" }
sdist = { url = "https://files.pythonhosted.org/packages/ee/f3/8e6371436666aedfd16e63ff68a51b8a8fcf5f33a0eee33c35e0b2476b27/s3fs-2025.9.0.tar.gz", hash = "sha256:6d44257ef19ea64968d0720744c4af7a063a05f5c1be0e17ce943bef7302bc30", size = 77823, upload-time = "2025-09-02T19:18:21.781Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d5/c0/f5cc95ec88694429fcb841a37456be0a27463bc39d43edbd36e3164120ed/s3fs-2025.5.1-py3-none-any.whl", hash = "sha256:7475e7c40a3a112f17144907ffae50782ab6c03487fe0b45a9c3942bb7a5c606", size = 30476, upload-time = "2025-05-24T12:14:10.056Z" },
{ url = "https://files.pythonhosted.org/packages/37/b3/ca7d58ca25b1bb6df57e6cbd0ca8d6437a4b9ce1cd35adc8a6b2949c113b/s3fs-2025.9.0-py3-none-any.whl", hash = "sha256:c33c93d48f66ed440dbaf6600be149cdf8beae4b6f8f0201a209c5801aeb7e30", size = 30319, upload-time = "2025-09-02T19:18:20.563Z" },
]
[[package]]