diff --git a/dlt/_workspace/helpers/dashboard/utils.py b/dlt/_workspace/helpers/dashboard/utils.py index 85b522096..956f33fb8 100644 --- a/dlt/_workspace/helpers/dashboard/utils.py +++ b/dlt/_workspace/helpers/dashboard/utils.py @@ -16,6 +16,7 @@ from typing import ( NamedTuple, get_args, ) +import datetime # noqa: 251 from typing_extensions import TypeAlias import os import platform @@ -390,6 +391,7 @@ def get_row_counts( DestinationUndefinedEntity, SqlClientNotAvailable, PipelineConfigMissing, + ConnectionError, ): # TODO: somehow propagate errors to the user here pass @@ -826,6 +828,7 @@ def _build_pipeline_execution_html( status: TPipelineRunStatus, steps_data: List[PipelineStepData], migrations_count: int = 0, + finished_at: Optional[datetime.datetime] = None, ) -> mo.Html: """ Build an HTML visualization for a pipeline execution using CSS classes @@ -834,9 +837,15 @@ def _build_pipeline_execution_html( last = len(steps_data) - 1 # Build the general info of the execution + relative_time = "" + if finished_at: + time_ago = pendulum.instance(finished_at).diff_for_humans() + relative_time = f"
Executed: {time_ago}
" + general_info = f"""
Last execution ID: {transaction_id[:8]}
Total time: {_format_duration(total_ms)}
+ {relative_time} """ # Build the pipeline execution timeline bar and labels @@ -895,16 +904,16 @@ def _get_steps_data_and_status( ) -> Tuple[List[PipelineStepData], TPipelineRunStatus]: """Gets trace steps data and the status of the corresponding pipeline execution""" steps_data: List[PipelineStepData] = [] + any_step_failed: bool = False for step in trace_steps: - if step.step not in get_args(TVisualPipelineStep): - continue + if step.step_exception is not None: + any_step_failed = True - if not step.finished_at: + if step.step not in get_args(TVisualPipelineStep) or not step.finished_at: continue duration_ms = (step.finished_at - step.started_at).total_seconds() * 1000 - steps_data.append( PipelineStepData( step=cast(TVisualPipelineStep, step.step), @@ -912,8 +921,7 @@ def _get_steps_data_and_status( failed=step.step_exception is not None, ) ) - is_failed = any(s.failed for s in steps_data) - status: TPipelineRunStatus = "failed" if is_failed else "succeeded" + status: TPipelineRunStatus = "failed" if any_step_failed else "succeeded" return steps_data, status @@ -941,6 +949,7 @@ def build_pipeline_execution_visualization(trace: PipelineTrace) -> Optional[mo. status, steps_data, migrations_count, + trace.finished_at, ) diff --git a/tests/workspace/helpers/dashboard/conftest.py b/tests/workspace/helpers/dashboard/conftest.py index 17f2c65dc..ff29c18e1 100644 --- a/tests/workspace/helpers/dashboard/conftest.py +++ b/tests/workspace/helpers/dashboard/conftest.py @@ -9,6 +9,7 @@ from tests.workspace.helpers.dashboard.example_pipelines import ( create_never_ran_pipeline, create_load_exception_pipeline, create_no_destination_pipeline, + create_sync_exception_pipeline, ) @@ -67,3 +68,9 @@ def never_ran_pipline(): def load_exception_pipeline(): with tempfile.TemporaryDirectory() as temp_dir: yield create_load_exception_pipeline(temp_dir) + + +@pytest.fixture(scope="session") +def sync_exception_pipeline(): + with tempfile.TemporaryDirectory() as temp_dir: + yield create_sync_exception_pipeline(temp_dir) diff --git a/tests/workspace/helpers/dashboard/example_pipelines.py b/tests/workspace/helpers/dashboard/example_pipelines.py index b1abe3ba7..f11645bd4 100644 --- a/tests/workspace/helpers/dashboard/example_pipelines.py +++ b/tests/workspace/helpers/dashboard/example_pipelines.py @@ -5,6 +5,7 @@ # from typing import Any +from unittest.mock import patch import duckdb import dlt @@ -26,6 +27,7 @@ NORMALIZE_EXCEPTION_PIPELINE = "normalize_exception_pipeline" NEVER_RAN_PIPELINE = "never_ran_pipline" LOAD_EXCEPTION_PIPELINE = "load_exception_pipeline" NO_DESTINATION_PIPELINE = "no_destination_pipeline" +SYNC_EXCEPTION_PIPELINE = "sync_exception_pipeline" ALL_PIPELINES = [ SUCCESS_PIPELINE_DUCKDB, @@ -35,12 +37,14 @@ ALL_PIPELINES = [ LOAD_EXCEPTION_PIPELINE, NO_DESTINATION_PIPELINE, SUCCESS_PIPELINE_FILESYSTEM, + SYNC_EXCEPTION_PIPELINE, ] PIPELINES_WITH_EXCEPTIONS = [ EXTRACT_EXCEPTION_PIPELINE, NORMALIZE_EXCEPTION_PIPELINE, LOAD_EXCEPTION_PIPELINE, + SYNC_EXCEPTION_PIPELINE, ] PIPELINES_WITH_LOAD = [SUCCESS_PIPELINE_DUCKDB, SUCCESS_PIPELINE_FILESYSTEM] @@ -251,6 +255,29 @@ def create_no_destination_pipeline(pipelines_dir: str = None): return pipeline +def create_sync_exception_pipeline(pipelines_dir: str = None): + """Create a test pipeline that raises an exception in the sync step""" + pipeline = dlt.pipeline( + pipeline_name=SYNC_EXCEPTION_PIPELINE, + pipelines_dir=pipelines_dir, + destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")), + ) + + @dlt.resource + def dummy_data(): + yield [{"id": 1, "value": "test"}] + + with patch.object(pipeline, "_restore_state_from_destination") as mock_restore: + mock_restore.side_effect = ConnectionError("Cannot connect to destination for sync") + + with pytest.raises(Exception) as excinfo: + pipeline.run(dummy_data()) + + assert "failed at `step=sync`" in str(excinfo) + + return pipeline + + # NOTE: this script can be run to create the test pipelines globally for manual testing of the dashboard app and cli if __name__ == "__main__": create_success_pipeline_duckdb() @@ -260,3 +287,4 @@ if __name__ == "__main__": create_never_ran_pipeline() create_load_exception_pipeline() create_no_destination_pipeline() + create_sync_exception_pipeline() diff --git a/tests/workspace/helpers/dashboard/test_utils.py b/tests/workspace/helpers/dashboard/test_utils.py index 06487d8aa..b5ea5d3ae 100644 --- a/tests/workspace/helpers/dashboard/test_utils.py +++ b/tests/workspace/helpers/dashboard/test_utils.py @@ -62,6 +62,7 @@ from tests.workspace.helpers.dashboard.example_pipelines import ( NEVER_RAN_PIPELINE, LOAD_EXCEPTION_PIPELINE, NO_DESTINATION_PIPELINE, + SYNC_EXCEPTION_PIPELINE, create_success_pipeline_duckdb, create_fruitshop_duckdb_with_shared_dataset, create_humans_arrow_duckdb_with_shared_dataset, @@ -550,6 +551,8 @@ def test_trace(pipeline: dlt.Pipeline): assert len(result) == 2 assert result[0]["step"] == "extract" assert result[1]["step"] == "normalize" + elif pipeline.pipeline_name == SYNC_EXCEPTION_PIPELINE: + assert len(result) == 0 else: assert len(result) == 3 assert result[0]["step"] == "extract" @@ -786,6 +789,7 @@ def test_sanitize_trace_for_display(pipeline: dlt.Pipeline): (SUCCESS_PIPELINE_FILESYSTEM, {"extract", "normalize", "load"}, "succeeded"), (EXTRACT_EXCEPTION_PIPELINE, {"extract"}, "failed"), (LOAD_EXCEPTION_PIPELINE, {"extract", "normalize", "load"}, "failed"), + (SYNC_EXCEPTION_PIPELINE, set(), "failed"), ], indirect=["pipeline"], ) @@ -803,9 +807,9 @@ def test_get_steps_data_and_status( assert all(step.duration_ms > 0 for step in steps_data) if expected_status == "succeeded": - assert all(step.failed is False for step in steps_data) + assert all(step.step_exception is None for step in trace.steps) else: - assert any(step.failed is True for step in steps_data) + assert any(step.step_exception is not None for step in trace.steps) assert set([step.step for step in steps_data]) == expected_steps