Files
dlt/tests/workspace/helpers/dashboard/conftest.py
rudolfix 06bc05848b (chore) adds hub extra (#3428)
* adds hub extra

* makes hub module more user friendly when hub not installed

* test and lint fixes

* adds plugin version check util function

* adds dlt-runtime to hub extra, minimal import tests

* bumps to dlthub 0.20.0 alpha

* lists pipelines with cli using the same functions as dashboard, dlt pipeline will list pipelines by default

* adds configured propfiles method on context so only profiles with configs or pipelines are listed

* adds list of locations that contained actual configs to provider interface

* improves workspace and profile commands

* test fixes

* fixes tests
2025-12-05 16:15:19 +01:00

70 lines
2.0 KiB
Python

import pytest
import tempfile
from tests.workspace.helpers.dashboard.example_pipelines import (
create_success_pipeline_duckdb,
create_success_pipeline_filesystem,
create_extract_exception_pipeline,
create_normalize_exception_pipeline,
create_never_ran_pipeline,
create_load_exception_pipeline,
create_no_destination_pipeline,
)
# resolver to resolve strings to pipelines
@pytest.fixture
def pipeline(request):
# request.param is one of the strings from parametrize
return request.getfixturevalue(request.param)
@pytest.fixture(scope="session")
def no_destination_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_no_destination_pipeline(temp_dir)
@pytest.fixture(scope="session")
def success_pipeline_duckdb():
with tempfile.TemporaryDirectory() as temp_dir:
import duckdb
db_conn = duckdb.connect()
try:
yield create_success_pipeline_duckdb(temp_dir, db_conn=db_conn)
finally:
db_conn.close()
@pytest.fixture(scope="session")
def success_pipeline_filesystem():
with tempfile.TemporaryDirectory() as temp_dir:
with tempfile.TemporaryDirectory() as storage:
yield create_success_pipeline_filesystem(temp_dir, storage)
@pytest.fixture(scope="session")
def extract_exception_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_extract_exception_pipeline(temp_dir)
@pytest.fixture(scope="session")
def normalize_exception_pipeline():
"""Fixture that creates a normalize exception pipeline"""
with tempfile.TemporaryDirectory() as temp_dir:
yield create_normalize_exception_pipeline(temp_dir)
@pytest.fixture(scope="session")
def never_ran_pipline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_never_ran_pipeline(temp_dir)
@pytest.fixture(scope="session")
def load_exception_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_load_exception_pipeline(temp_dir)