implements run artifacts sync to a bucket using filesystem (#3339)

* a tracker that sends pipeline trace, schemas and trace to a bucket is activated when RUN_ID and workspace context are present
* a sync step is executed under the conditions above when workspace dashboard starts
* improves deployment packager (hash computation)
This commit is contained in:
ivasio
2025-12-04 15:48:39 +01:00
committed by GitHub
parent 28557bc82f
commit e8d45369f1
14 changed files with 310 additions and 24 deletions

View File

@@ -98,6 +98,14 @@ class WorkspaceRunContext(ProfilesRunContext):
# this also resolves workspace config if necessary
initialize_runtime(self.name, self.config.runtime)
# if on runtime, add additional tracker
if self.runtime_config.run_id:
from dlt._workspace.helpers.runtime import runtime_artifacts
from dlt.pipeline import trace
if runtime_artifacts not in trace.TRACKING_MODULES:
trace.TRACKING_MODULES.append(runtime_artifacts)
@property
def runtime_config(self) -> WorkspaceRuntimeConfiguration:
return self.config.runtime

View File

@@ -33,6 +33,8 @@ class WorkspaceRuntimeConfiguration(RuntimeConfiguration):
invite_code: Optional[str] = None
"""Invite code for dltHub Runtime"""
__section__: ClassVar[str] = "runtime"
@configspec
class WorkspaceConfiguration(BaseConfiguration):

View File

@@ -64,7 +64,10 @@ class PackageBuilder:
manifest_info.size = len(manifest_yaml)
tar.addfile(manifest_info, BytesIO(manifest_yaml))
return digest256_tar_stream(output_stream)
content_hash, _ = digest256_tar_stream(
output_stream, filter_file_names=lambda x: x != DEFAULT_MANIFEST_FILE_NAME
)
return content_hash
def build_package(self, file_selector: BaseFileSelector) -> Tuple[Path, str]:
"""Create deployment package file, return (path, content_hash)"""

View File

@@ -33,6 +33,12 @@ class DashboardConfiguration(BaseConfiguration):
datetime_format: str = "YYYY-MM-DD HH:mm:ss Z"
"""The format of the datetime strings"""
sync_from_runtime: bool = False
"""
Whether to sync the pipeline states and traces from the runtime backup.
Needs to be run inside a dlt workspace with runtime artifacts credentials set.
"""
# this is needed for using this as a param in the cache
def __hash__(self) -> int:
return hash(

View File

@@ -999,6 +999,14 @@ def utils_discover_pipelines(
Discovers local pipelines and returns a multiselect widget to select one of the pipelines
"""
# sync from runtime if enabled
_tmp_config = utils.resolve_dashboard_config(None)
if _tmp_config.sync_from_runtime:
from dlt._workspace.helpers.runtime.runtime_artifacts import sync_from_runtime
with mo.status.spinner(title="Syncing pipeline list from runtime"):
sync_from_runtime()
_run_context = dlt.current.run_context()
if (
isinstance(_run_context, ProfilesRunContext)
@@ -1011,7 +1019,7 @@ def utils_discover_pipelines(
dlt_all_pipelines: List[Dict[str, Any]] = []
dlt_pipelines_dir, dlt_all_pipelines = utils.get_local_pipelines(
mo_cli_arg_pipelines_dir,
addtional_pipelines=[mo_cli_arg_pipeline, mo_query_var_pipeline_name],
additional_pipelines=[mo_cli_arg_pipeline, mo_query_var_pipeline_name],
)
dlt_pipeline_select: mo.ui.multiselect = mo.ui.multiselect(

View File

@@ -9,7 +9,6 @@ from typing import (
List,
Mapping,
Optional,
Set,
Tuple,
Union,
cast,
@@ -27,6 +26,7 @@ import dlt
import marimo as mo
import pyarrow
import traceback
import datetime # noqa: I251
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import known_sections
@@ -44,6 +44,7 @@ from dlt.common.storages.configuration import WithLocalFiles
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.typing import DictStrAny, TypedDict
from dlt.common.utils import map_nested_keys_in_place
from dlt.common.pipeline import get_dlt_pipelines_dir
from dlt._workspace.helpers.dashboard import ui_elements as ui
from dlt._workspace.helpers.dashboard.config import DashboardConfiguration
@@ -116,7 +117,7 @@ def get_pipeline_last_run(pipeline_name: str, pipelines_dir: str) -> float:
def get_local_pipelines(
pipelines_dir: str = None, sort_by_trace: bool = True, addtional_pipelines: List[str] = None
pipelines_dir: str = None, sort_by_trace: bool = True, additional_pipelines: List[str] = None
) -> Tuple[str, List[Dict[str, Any]]]:
"""Get the local pipelines directory and the list of pipeline names in it.
@@ -134,8 +135,8 @@ def get_local_pipelines(
except Exception:
pipelines = []
if addtional_pipelines:
for pipeline in addtional_pipelines:
if additional_pipelines:
for pipeline in additional_pipelines:
if pipeline and pipeline not in pipelines:
pipelines.append(pipeline)
@@ -215,7 +216,11 @@ def pipeline_details(
credentials = "Could not resolve credentials."
# find the pipeline in all_pipelines and get the timestamp
pipeline_timestamp = get_pipeline_last_run(pipeline.pipeline_name, pipeline.pipelines_dir)
trace = pipeline.last_trace
last_executed = "No trace found"
if trace and hasattr(trace, "started_at"):
last_executed = _date_from_timestamp_with_ago(c, trace.started_at)
details_dict = {
"pipeline_name": pipeline.pipeline_name,
@@ -224,7 +229,7 @@ def pipeline_details(
if pipeline.destination
else "No destination set"
),
"last executed": _date_from_timestamp_with_ago(c, pipeline_timestamp),
"last executed": last_executed,
"credentials": credentials,
"dataset_name": pipeline.dataset_name,
"working_dir": pipeline.working_dir,
@@ -663,7 +668,7 @@ def build_pipeline_link_list(
) -> str:
"""Build a list of links to the pipeline."""
if not pipelines:
return "No local pipelines found."
return "No pipelines found."
count = 0
link_list: str = ""
@@ -746,12 +751,15 @@ def build_exception_section(p: dlt.Pipeline) -> List[Any]:
def _date_from_timestamp_with_ago(
config: DashboardConfiguration, timestamp: Union[int, float]
config: DashboardConfiguration, timestamp: Union[int, float, datetime.datetime]
) -> str:
"""Return a date with ago section"""
if not timestamp or timestamp == 0:
return "never"
p_ts = pendulum.from_timestamp(timestamp)
if isinstance(timestamp, datetime.datetime):
p_ts = pendulum.instance(timestamp)
else:
p_ts = pendulum.from_timestamp(timestamp)
time_formatted = p_ts.format(config.datetime_format)
ago = p_ts.diff_for_humans()
return f"{ago} ({time_formatted})"

View File

@@ -0,0 +1,191 @@
"""Implements SupportsTracking"""
from typing import Any, ClassVar, List, Optional, Tuple, Union
import fsspec
import pickle
import os
import dlt
from dlt.common import logger
from dlt.common.configuration.exceptions import ConfigurationException
from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec
from dlt.common.storages.configuration import FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import FileItemDict, fsspec_from_config, glob_files
from dlt.common.versioned_state import json_encode_state
from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline
from dlt._workspace.run_context import DEFAULT_WORKSPACE_WORKING_FOLDER
from dlt._workspace._workspace_context import WorkspaceRunContext
@configspec
class RuntimeArtifactsConfiguration(BaseConfiguration):
artifacts: FilesystemConfiguration = None
def sync_from_runtime() -> None:
"""Sync the pipeline states and traces from the runtime backup, recursively."""
from dlt._workspace.helpers.runtime.runtime_artifacts import _get_runtime_artifacts_fs
def sync_dir(fs: fsspec.AbstractFileSystem, src_root: str, dst_root: str) -> None:
"""Recursively sync src_root on fs into dst_root locally, always using fs.walk."""
os.makedirs(dst_root, exist_ok=True)
for file_dict in glob_files(fs, src_root):
file_item = FileItemDict(file_dict, fs)
relative_dir = os.path.dirname(file_dict["relative_path"])
local_dir = dst_root if relative_dir == "." else os.path.join(dst_root, relative_dir)
os.makedirs(local_dir, exist_ok=True)
local_file = os.path.join(dst_root, file_dict["relative_path"])
logger.info(f"Restoring artifact {local_file}")
with open(local_file, "wb") as lf:
lf.write(file_item.read_bytes())
ts = file_dict["modification_date"].timestamp()
os.utime(local_file, (ts, ts)) # (atime, mtime)
context = dlt.current.run_context()
if not context.runtime_config.run_id:
return
if not isinstance(context, WorkspaceRunContext):
return
fs, config = _get_runtime_artifacts_fs(section="sync")
if not fs:
return
# TODO: there's no good way to get this value on sync.
data_dir_root = os.path.join(
context.settings_dir, DEFAULT_WORKSPACE_WORKING_FOLDER
) # the local .var folder
# Just sync the whole base folder into the local pipelines dir
sync_dir(fs, config.bucket_url, data_dir_root)
def _get_runtime_artifacts_fs(
section: str,
) -> Tuple[fsspec.AbstractFileSystem, FilesystemConfiguration]:
try:
config = resolve_configuration(RuntimeArtifactsConfiguration(), sections=(section,))
except ConfigurationException:
logger.info(f"No artifact storage credentials found for {section}")
return None, None
return fsspec_from_config(config.artifacts)[0], config.artifacts
def _write_to_bucket(
fs: fsspec.AbstractFileSystem,
bucket_url: str,
pipeline_name: str,
paths: List[str],
data: Union[str, bytes],
mode: str = "w",
) -> None:
# write to bucket using the config, same object may be written to multiple paths
for path in paths:
with fs.open(f"{bucket_url}/{pipeline_name}/{path}", mode=mode) as f:
f.write(data)
def _send_trace_to_bucket(
fs: fsspec.AbstractFileSystem, bucket_url: str, trace: PipelineTrace, pipeline: SupportsPipeline
) -> None:
"""
Send the full trace pickled to the runtime bucket
"""
pickled_trace = pickle.dumps(trace)
_write_to_bucket(
fs,
bucket_url,
pipeline.pipeline_name,
[
"trace.pickle",
], # save current and by start time
pickled_trace,
mode="wb",
)
def _send_state_to_bucket(
fs: fsspec.AbstractFileSystem, bucket_url: str, pipeline: SupportsPipeline
) -> None:
encoded_state = json_encode_state(pipeline.state)
_write_to_bucket(
fs,
bucket_url,
pipeline.pipeline_name,
[
"state.json",
], # save current and by start time
encoded_state,
mode="w",
)
def _send_schemas_to_bucket(
fs: fsspec.AbstractFileSystem, bucket_url: str, pipeline: SupportsPipeline
) -> None:
schema_dir = os.path.join(pipeline.working_dir, "schemas")
for schema_file in os.listdir(schema_dir):
_write_to_bucket(
fs,
bucket_url,
pipeline.pipeline_name,
[f"schemas/{schema_file}"],
open(os.path.join(schema_dir, schema_file), "rb").read(),
mode="wb",
)
def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None:
pass
def on_start_trace_step(
trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline
) -> None:
pass
def on_end_trace_step(
trace: PipelineTrace,
step: PipelineStepTrace,
pipeline: SupportsPipeline,
step_info: Any,
send_state: bool,
) -> None:
pass
def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None:
# skip if runtime not running
if pipeline.run_context.runtime_config.run_id is None:
return
fs, config = _get_runtime_artifacts_fs(section="send")
if fs:
logger.info(
f"Sending run artifacts from pipeline `{pipeline.pipeline_name}` to"
f" `{config.bucket_url}`"
)
try:
_send_trace_to_bucket(fs, config.bucket_url, trace, pipeline)
_send_state_to_bucket(fs, config.bucket_url, pipeline)
_send_schemas_to_bucket(fs, config.bucket_url, pipeline)
except Exception:
logger.exception(
f"Sending run artifacts from pipeline `{pipeline.pipeline_name}` to"
f" `{config.bucket_url}`"
)
raise
else:
logger.info("Pipeline results reported to runtime")

View File

@@ -33,6 +33,7 @@ class RuntimeConfiguration(BaseConfiguration):
config_files_storage_path: str = "/run/config/"
"""Platform connection"""
dlthub_dsn: Optional[TSecretStrValue] = None
run_id: Optional[str] = None
http_show_error_body: bool = False
"""Include HTTP response body in raised exceptions/logs. Default is False"""
http_max_error_body_length: int = 8192

View File

@@ -1,5 +1,5 @@
import re
from typing import TYPE_CHECKING, Any, BinaryIO, Literal
from typing import TYPE_CHECKING, Any, BinaryIO, IO
import os
from pathlib import Path
import sys
@@ -131,33 +131,50 @@ def digest256_file_stream(stream: BinaryIO, chunk_size: int = 4096) -> str:
return base64.b64encode(digest).decode("ascii")
def digest256_tar_stream(stream: BinaryIO, chunk_size: int = 8192) -> str:
"""Returns a base64 encoded sha3_256 hash of tar archive contents.
def digest256_tar_stream(
stream: IO[bytes],
filter_file_names: Callable[[str], bool] = lambda x: True,
chunk_size: int = 8192,
) -> Tuple[str, List[str]]:
"""Calculates hash and collects file names from tar archive in a single pass.
Hashes only filenames and file contents, ignoring timestamps and other metadata.
Members are sorted by name before hashing, so tar member order doesn't affect
the hash.
Hashes only file names and file contents of filtered members, ignoring timestamps
and other tar metadata. Members are sorted by name before hashing for consistency.
Operates entirely in-memory to prevent leakage of sensitive data.
Note: This function operates entirely in-memory using tar.extractfile() which reads
from the archive stream. No files are written to disk, preventing leakage of sensitive
data that may be contained in the archive.
Args:
stream: Binary stream containing the tar archive
filter_file_names: Callable that returns True for members to include in hash
and file names list. Default includes all members. Use this to exclude
metadata files (e.g., manifest.yaml) from the hash calculation.
chunk_size: Size of chunks to read when hashing file contents. Default 8192.
Returns:
tuple: (content_hash, file_names)
"""
stream.seek(0)
hash_obj = hashlib.sha3_256()
file_names = []
with tarfile.open(fileobj=stream, mode="r:*") as tar:
members = sorted(tar.getmembers(), key=lambda m: m.name)
for member in members:
if not filter_file_names(member.name):
continue
hash_obj.update(member.name.encode())
if member.isfile():
file_names.append(member.name)
f = tar.extractfile(member)
if f:
while chunk := f.read(chunk_size):
hash_obj.update(chunk)
digest = hash_obj.digest()
return base64.b64encode(digest).decode("ascii")
content_hash = base64.b64encode(digest).decode("ascii")
return content_hash, file_names
def str2bool(v: str) -> bool:

4
docs/uv.lock generated
View File

@@ -822,7 +822,7 @@ wheels = [
[[package]]
name = "dlt"
version = "1.19.0"
version = "1.19.1"
source = { editable = "../" }
dependencies = [
{ name = "click" },
@@ -983,7 +983,7 @@ requires-dist = [
{ name = "sqlalchemy", marker = "extra == 'pyiceberg'", specifier = ">=1.4" },
{ name = "sqlalchemy", marker = "extra == 'sql-database'", specifier = ">=1.4" },
{ name = "sqlalchemy", marker = "extra == 'sqlalchemy'", specifier = ">=1.4" },
{ name = "sqlglot", specifier = ">=25.4.0" },
{ name = "sqlglot", specifier = ">=25.4.0,!=28.1" },
{ name = "tantivy", marker = "extra == 'lancedb'", specifier = ">=0.22.0" },
{ name = "tenacity", specifier = ">=8.0.2" },
{ name = "tomlkit", specifier = ">=0.11.3" },

View File

@@ -753,6 +753,7 @@ def test_configuration_is_mutable_mapping(environment: Any, env_provider: Config
"request_max_retry_delay": 300,
"config_files_storage_path": "storage",
"dlthub_dsn": None,
"run_id": None,
"http_show_error_body": False,
"http_max_error_body_length": 8192,
"secret_value": None,

View File

@@ -1,11 +1,13 @@
import os
import pytest
import pickle
import tempfile
import dlt
from dlt._workspace._workspace_context import WorkspaceRunContext, switch_context
from dlt._workspace.cli.utils import check_delete_local_data, delete_local_data
from dlt._workspace.exceptions import WorkspaceRunContextNotAvailable
from dlt._workspace.helpers.runtime.runtime_artifacts import sync_from_runtime
from dlt._workspace.profile import DEFAULT_PROFILE, read_profile_pin, save_profile_pin
from dlt._workspace.run_context import (
DEFAULT_LOCAL_FOLDER,
@@ -16,7 +18,9 @@ from dlt._workspace.cli.echo import always_choose
from dlt.common.runtime.exceptions import RunContextNotAvailable
from dlt.common.runtime.run_context import DOT_DLT, RunContext, global_dir
from dlt.common.storages.file_storage import FileStorage
from tests.pipeline.utils import assert_table_counts
from tests.utils import clean_test_storage
from tests.workspace.utils import isolated_workspace
@@ -146,6 +150,43 @@ def test_workspace_pipeline() -> None:
assert os.path.isdir(os.path.join(ctx.local_dir, "prod_ducklake.files"))
def test_workspace_send_artifacts() -> None:
pytest.importorskip("duckdb", minversion="1.3.2")
# create a random temp directory for the test bucket
with tempfile.TemporaryDirectory() as temp_bucket_dir:
bucket_base = os.path.join(temp_bucket_dir, "local_bucket", "workspace_id")
send_bucket_url = os.path.join(bucket_base, "tests", "pipelines")
# mock run id to enable artifact storage
os.environ["RUNTIME__RUN_ID"] = "uniq_run_id"
# emit runtime filesystem info
os.environ["SEND__ARTIFACTS__BUCKET_URL"] = send_bucket_url
# auto create dirs
os.environ["ARTIFACTS__KWARGS"] = '{"auto_mkdir": true}'
with isolated_workspace("pipelines", profile="tests") as ctx:
# `ducklake_pipeline` configured in config.toml
pipeline = dlt.pipeline(pipeline_name="ducklake_pipeline")
pipeline.run([{"foo": 1}, {"foo": 2}], table_name="table_foo")
print(ctx.run_dir)
# delete the whole workspace
clean_test_storage()
with isolated_workspace("pipelines", profile="tests") as ctx:
# now restore pipeline from bucket
os.environ["SYNC__ARTIFACTS__BUCKET_URL"] = bucket_base
sync_from_runtime()
# now pipeline sees restored state
pipeline = dlt.pipeline(pipeline_name="ducklake_pipeline")
assert pipeline.first_run is False
assert pipeline.default_schema_name == "ducklake"
assert pipeline.default_schema.tables["table_foo"] is not None
assert pipeline.last_trace is not None
def assert_dev_config() -> None:
# check profile toml providers
assert dlt.config["config_val"] == "config.toml"