Files
dlt/tests/workspace/helpers/dashboard/conftest.py
anuunchin 6f925caa89 Fix/3464 sync error results in success label (#3492)
* Last executed info

* Any step failure results in failure badge

* Test adjustments
2025-12-17 17:12:32 +01:00

77 lines
2.2 KiB
Python

import pytest
import tempfile
from tests.workspace.helpers.dashboard.example_pipelines import (
create_success_pipeline_duckdb,
create_success_pipeline_filesystem,
create_extract_exception_pipeline,
create_normalize_exception_pipeline,
create_never_ran_pipeline,
create_load_exception_pipeline,
create_no_destination_pipeline,
create_sync_exception_pipeline,
)
# resolver to resolve strings to pipelines
@pytest.fixture
def pipeline(request):
# request.param is one of the strings from parametrize
return request.getfixturevalue(request.param)
@pytest.fixture(scope="session")
def no_destination_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_no_destination_pipeline(temp_dir)
@pytest.fixture(scope="session")
def success_pipeline_duckdb():
with tempfile.TemporaryDirectory() as temp_dir:
import duckdb
db_conn = duckdb.connect()
try:
yield create_success_pipeline_duckdb(temp_dir, db_conn=db_conn)
finally:
db_conn.close()
@pytest.fixture(scope="session")
def success_pipeline_filesystem():
with tempfile.TemporaryDirectory() as temp_dir:
with tempfile.TemporaryDirectory() as storage:
yield create_success_pipeline_filesystem(temp_dir, storage)
@pytest.fixture(scope="session")
def extract_exception_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_extract_exception_pipeline(temp_dir)
@pytest.fixture(scope="session")
def normalize_exception_pipeline():
"""Fixture that creates a normalize exception pipeline"""
with tempfile.TemporaryDirectory() as temp_dir:
yield create_normalize_exception_pipeline(temp_dir)
@pytest.fixture(scope="session")
def never_ran_pipline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_never_ran_pipeline(temp_dir)
@pytest.fixture(scope="session")
def load_exception_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_load_exception_pipeline(temp_dir)
@pytest.fixture(scope="session")
def sync_exception_pipeline():
with tempfile.TemporaryDirectory() as temp_dir:
yield create_sync_exception_pipeline(temp_dir)