diff --git a/.gitignore b/.gitignore index 56875ee42..1ee072ccd 100644 --- a/.gitignore +++ b/.gitignore @@ -149,4 +149,4 @@ local_cache/ # test file for examples are generated and should not be committed docs/examples/**/test*.py -compiled_requirements.txt \ No newline at end of file +compiled_requirements.txt diff --git a/dlt/_workspace/_workspace_context.py b/dlt/_workspace/_workspace_context.py index 5fb617bb2..132365409 100644 --- a/dlt/_workspace/_workspace_context.py +++ b/dlt/_workspace/_workspace_context.py @@ -98,6 +98,14 @@ class WorkspaceRunContext(ProfilesRunContext): # this also resolves workspace config if necessary initialize_runtime(self.name, self.config.runtime) + # if on runtime, add additional tracker + if self.runtime_config.run_id: + from dlt._workspace.helpers.runtime import runtime_artifacts + from dlt.pipeline import trace + + if runtime_artifacts not in trace.TRACKING_MODULES: + trace.TRACKING_MODULES.append(runtime_artifacts) + @property def runtime_config(self) -> WorkspaceRuntimeConfiguration: return self.config.runtime diff --git a/dlt/_workspace/configuration.py b/dlt/_workspace/configuration.py index 21560bcc7..6cfa9b8bf 100644 --- a/dlt/_workspace/configuration.py +++ b/dlt/_workspace/configuration.py @@ -33,6 +33,8 @@ class WorkspaceRuntimeConfiguration(RuntimeConfiguration): invite_code: Optional[str] = None """Invite code for dltHub Runtime""" + __section__: ClassVar[str] = "runtime" + @configspec class WorkspaceConfiguration(BaseConfiguration): diff --git a/dlt/_workspace/deployment/package_builder.py b/dlt/_workspace/deployment/package_builder.py index 4bd822d99..64e033eb5 100644 --- a/dlt/_workspace/deployment/package_builder.py +++ b/dlt/_workspace/deployment/package_builder.py @@ -64,7 +64,10 @@ class PackageBuilder: manifest_info.size = len(manifest_yaml) tar.addfile(manifest_info, BytesIO(manifest_yaml)) - return digest256_tar_stream(output_stream) + content_hash, _ = digest256_tar_stream( + output_stream, filter_file_names=lambda x: x != DEFAULT_MANIFEST_FILE_NAME + ) + return content_hash def build_package(self, file_selector: BaseFileSelector) -> Tuple[Path, str]: """Create deployment package file, return (path, content_hash)""" diff --git a/dlt/_workspace/helpers/dashboard/config.py b/dlt/_workspace/helpers/dashboard/config.py index 13d253296..c875b7a4c 100644 --- a/dlt/_workspace/helpers/dashboard/config.py +++ b/dlt/_workspace/helpers/dashboard/config.py @@ -33,6 +33,12 @@ class DashboardConfiguration(BaseConfiguration): datetime_format: str = "YYYY-MM-DD HH:mm:ss Z" """The format of the datetime strings""" + sync_from_runtime: bool = False + """ + Whether to sync the pipeline states and traces from the runtime backup. + Needs to be run inside a dlt workspace with runtime artifacts credentials set. + """ + # this is needed for using this as a param in the cache def __hash__(self) -> int: return hash( diff --git a/dlt/_workspace/helpers/dashboard/dlt_dashboard.py b/dlt/_workspace/helpers/dashboard/dlt_dashboard.py index 8af954083..aa9bcaee3 100644 --- a/dlt/_workspace/helpers/dashboard/dlt_dashboard.py +++ b/dlt/_workspace/helpers/dashboard/dlt_dashboard.py @@ -999,6 +999,14 @@ def utils_discover_pipelines( Discovers local pipelines and returns a multiselect widget to select one of the pipelines """ + # sync from runtime if enabled + _tmp_config = utils.resolve_dashboard_config(None) + if _tmp_config.sync_from_runtime: + from dlt._workspace.helpers.runtime.runtime_artifacts import sync_from_runtime + + with mo.status.spinner(title="Syncing pipeline list from runtime"): + sync_from_runtime() + _run_context = dlt.current.run_context() if ( isinstance(_run_context, ProfilesRunContext) @@ -1011,7 +1019,7 @@ def utils_discover_pipelines( dlt_all_pipelines: List[Dict[str, Any]] = [] dlt_pipelines_dir, dlt_all_pipelines = utils.get_local_pipelines( mo_cli_arg_pipelines_dir, - addtional_pipelines=[mo_cli_arg_pipeline, mo_query_var_pipeline_name], + additional_pipelines=[mo_cli_arg_pipeline, mo_query_var_pipeline_name], ) dlt_pipeline_select: mo.ui.multiselect = mo.ui.multiselect( diff --git a/dlt/_workspace/helpers/dashboard/utils.py b/dlt/_workspace/helpers/dashboard/utils.py index 7f92de098..98b5fec40 100644 --- a/dlt/_workspace/helpers/dashboard/utils.py +++ b/dlt/_workspace/helpers/dashboard/utils.py @@ -9,7 +9,6 @@ from typing import ( List, Mapping, Optional, - Set, Tuple, Union, cast, @@ -27,6 +26,7 @@ import dlt import marimo as mo import pyarrow import traceback +import datetime # noqa: I251 from dlt.common.configuration import resolve_configuration from dlt.common.configuration.specs import known_sections @@ -44,6 +44,7 @@ from dlt.common.storages.configuration import WithLocalFiles from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.typing import DictStrAny, TypedDict from dlt.common.utils import map_nested_keys_in_place +from dlt.common.pipeline import get_dlt_pipelines_dir from dlt._workspace.helpers.dashboard import ui_elements as ui from dlt._workspace.helpers.dashboard.config import DashboardConfiguration @@ -116,7 +117,7 @@ def get_pipeline_last_run(pipeline_name: str, pipelines_dir: str) -> float: def get_local_pipelines( - pipelines_dir: str = None, sort_by_trace: bool = True, addtional_pipelines: List[str] = None + pipelines_dir: str = None, sort_by_trace: bool = True, additional_pipelines: List[str] = None ) -> Tuple[str, List[Dict[str, Any]]]: """Get the local pipelines directory and the list of pipeline names in it. @@ -134,8 +135,8 @@ def get_local_pipelines( except Exception: pipelines = [] - if addtional_pipelines: - for pipeline in addtional_pipelines: + if additional_pipelines: + for pipeline in additional_pipelines: if pipeline and pipeline not in pipelines: pipelines.append(pipeline) @@ -215,7 +216,11 @@ def pipeline_details( credentials = "Could not resolve credentials." # find the pipeline in all_pipelines and get the timestamp - pipeline_timestamp = get_pipeline_last_run(pipeline.pipeline_name, pipeline.pipelines_dir) + trace = pipeline.last_trace + + last_executed = "No trace found" + if trace and hasattr(trace, "started_at"): + last_executed = _date_from_timestamp_with_ago(c, trace.started_at) details_dict = { "pipeline_name": pipeline.pipeline_name, @@ -224,7 +229,7 @@ def pipeline_details( if pipeline.destination else "No destination set" ), - "last executed": _date_from_timestamp_with_ago(c, pipeline_timestamp), + "last executed": last_executed, "credentials": credentials, "dataset_name": pipeline.dataset_name, "working_dir": pipeline.working_dir, @@ -663,7 +668,7 @@ def build_pipeline_link_list( ) -> str: """Build a list of links to the pipeline.""" if not pipelines: - return "No local pipelines found." + return "No pipelines found." count = 0 link_list: str = "" @@ -746,12 +751,15 @@ def build_exception_section(p: dlt.Pipeline) -> List[Any]: def _date_from_timestamp_with_ago( - config: DashboardConfiguration, timestamp: Union[int, float] + config: DashboardConfiguration, timestamp: Union[int, float, datetime.datetime] ) -> str: """Return a date with ago section""" if not timestamp or timestamp == 0: return "never" - p_ts = pendulum.from_timestamp(timestamp) + if isinstance(timestamp, datetime.datetime): + p_ts = pendulum.instance(timestamp) + else: + p_ts = pendulum.from_timestamp(timestamp) time_formatted = p_ts.format(config.datetime_format) ago = p_ts.diff_for_humans() return f"{ago} ({time_formatted})" diff --git a/dlt/_workspace/helpers/runtime/__init__.py b/dlt/_workspace/helpers/runtime/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dlt/_workspace/helpers/runtime/runtime_artifacts.py b/dlt/_workspace/helpers/runtime/runtime_artifacts.py new file mode 100644 index 000000000..dd3ba6e31 --- /dev/null +++ b/dlt/_workspace/helpers/runtime/runtime_artifacts.py @@ -0,0 +1,191 @@ +"""Implements SupportsTracking""" +from typing import Any, ClassVar, List, Optional, Tuple, Union +import fsspec +import pickle +import os + +import dlt +from dlt.common import logger +from dlt.common.configuration.exceptions import ConfigurationException +from dlt.common.configuration.resolve import resolve_configuration +from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec +from dlt.common.storages.configuration import FilesystemConfiguration +from dlt.common.storages.fsspec_filesystem import FileItemDict, fsspec_from_config, glob_files +from dlt.common.versioned_state import json_encode_state + +from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline +from dlt._workspace.run_context import DEFAULT_WORKSPACE_WORKING_FOLDER +from dlt._workspace._workspace_context import WorkspaceRunContext + + +@configspec +class RuntimeArtifactsConfiguration(BaseConfiguration): + artifacts: FilesystemConfiguration = None + + +def sync_from_runtime() -> None: + """Sync the pipeline states and traces from the runtime backup, recursively.""" + from dlt._workspace.helpers.runtime.runtime_artifacts import _get_runtime_artifacts_fs + + def sync_dir(fs: fsspec.AbstractFileSystem, src_root: str, dst_root: str) -> None: + """Recursively sync src_root on fs into dst_root locally, always using fs.walk.""" + + os.makedirs(dst_root, exist_ok=True) + + for file_dict in glob_files(fs, src_root): + file_item = FileItemDict(file_dict, fs) + + relative_dir = os.path.dirname(file_dict["relative_path"]) + local_dir = dst_root if relative_dir == "." else os.path.join(dst_root, relative_dir) + os.makedirs(local_dir, exist_ok=True) + + local_file = os.path.join(dst_root, file_dict["relative_path"]) + + logger.info(f"Restoring artifact {local_file}") + with open(local_file, "wb") as lf: + lf.write(file_item.read_bytes()) + + ts = file_dict["modification_date"].timestamp() + os.utime(local_file, (ts, ts)) # (atime, mtime) + + context = dlt.current.run_context() + + if not context.runtime_config.run_id: + return + + if not isinstance(context, WorkspaceRunContext): + return + + fs, config = _get_runtime_artifacts_fs(section="sync") + if not fs: + return + + # TODO: there's no good way to get this value on sync. + data_dir_root = os.path.join( + context.settings_dir, DEFAULT_WORKSPACE_WORKING_FOLDER + ) # the local .var folder + + # Just sync the whole base folder into the local pipelines dir + sync_dir(fs, config.bucket_url, data_dir_root) + + +def _get_runtime_artifacts_fs( + section: str, +) -> Tuple[fsspec.AbstractFileSystem, FilesystemConfiguration]: + try: + config = resolve_configuration(RuntimeArtifactsConfiguration(), sections=(section,)) + except ConfigurationException: + logger.info(f"No artifact storage credentials found for {section}") + return None, None + + return fsspec_from_config(config.artifacts)[0], config.artifacts + + +def _write_to_bucket( + fs: fsspec.AbstractFileSystem, + bucket_url: str, + pipeline_name: str, + paths: List[str], + data: Union[str, bytes], + mode: str = "w", +) -> None: + # write to bucket using the config, same object may be written to multiple paths + + for path in paths: + with fs.open(f"{bucket_url}/{pipeline_name}/{path}", mode=mode) as f: + f.write(data) + + +def _send_trace_to_bucket( + fs: fsspec.AbstractFileSystem, bucket_url: str, trace: PipelineTrace, pipeline: SupportsPipeline +) -> None: + """ + Send the full trace pickled to the runtime bucket + """ + pickled_trace = pickle.dumps(trace) + _write_to_bucket( + fs, + bucket_url, + pipeline.pipeline_name, + [ + "trace.pickle", + ], # save current and by start time + pickled_trace, + mode="wb", + ) + + +def _send_state_to_bucket( + fs: fsspec.AbstractFileSystem, bucket_url: str, pipeline: SupportsPipeline +) -> None: + encoded_state = json_encode_state(pipeline.state) + _write_to_bucket( + fs, + bucket_url, + pipeline.pipeline_name, + [ + "state.json", + ], # save current and by start time + encoded_state, + mode="w", + ) + + +def _send_schemas_to_bucket( + fs: fsspec.AbstractFileSystem, bucket_url: str, pipeline: SupportsPipeline +) -> None: + schema_dir = os.path.join(pipeline.working_dir, "schemas") + for schema_file in os.listdir(schema_dir): + _write_to_bucket( + fs, + bucket_url, + pipeline.pipeline_name, + [f"schemas/{schema_file}"], + open(os.path.join(schema_dir, schema_file), "rb").read(), + mode="wb", + ) + + +def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: + pass + + +def on_start_trace_step( + trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline +) -> None: + pass + + +def on_end_trace_step( + trace: PipelineTrace, + step: PipelineStepTrace, + pipeline: SupportsPipeline, + step_info: Any, + send_state: bool, +) -> None: + pass + + +def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None: + # skip if runtime not running + if pipeline.run_context.runtime_config.run_id is None: + return + + fs, config = _get_runtime_artifacts_fs(section="send") + if fs: + logger.info( + f"Sending run artifacts from pipeline `{pipeline.pipeline_name}` to" + f" `{config.bucket_url}`" + ) + try: + _send_trace_to_bucket(fs, config.bucket_url, trace, pipeline) + _send_state_to_bucket(fs, config.bucket_url, pipeline) + _send_schemas_to_bucket(fs, config.bucket_url, pipeline) + except Exception: + logger.exception( + f"Sending run artifacts from pipeline `{pipeline.pipeline_name}` to" + f" `{config.bucket_url}`" + ) + raise + else: + logger.info("Pipeline results reported to runtime") diff --git a/dlt/common/configuration/specs/runtime_configuration.py b/dlt/common/configuration/specs/runtime_configuration.py index 068a45aa6..e8b52abfd 100644 --- a/dlt/common/configuration/specs/runtime_configuration.py +++ b/dlt/common/configuration/specs/runtime_configuration.py @@ -33,6 +33,7 @@ class RuntimeConfiguration(BaseConfiguration): config_files_storage_path: str = "/run/config/" """Platform connection""" dlthub_dsn: Optional[TSecretStrValue] = None + run_id: Optional[str] = None http_show_error_body: bool = False """Include HTTP response body in raised exceptions/logs. Default is False""" http_max_error_body_length: int = 8192 diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 21b1cc7a7..5f0783ca2 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -1,5 +1,5 @@ import re -from typing import TYPE_CHECKING, Any, BinaryIO, Literal +from typing import TYPE_CHECKING, Any, BinaryIO, IO import os from pathlib import Path import sys @@ -131,33 +131,50 @@ def digest256_file_stream(stream: BinaryIO, chunk_size: int = 4096) -> str: return base64.b64encode(digest).decode("ascii") -def digest256_tar_stream(stream: BinaryIO, chunk_size: int = 8192) -> str: - """Returns a base64 encoded sha3_256 hash of tar archive contents. +def digest256_tar_stream( + stream: IO[bytes], + filter_file_names: Callable[[str], bool] = lambda x: True, + chunk_size: int = 8192, +) -> Tuple[str, List[str]]: + """Calculates hash and collects file names from tar archive in a single pass. - Hashes only filenames and file contents, ignoring timestamps and other metadata. - Members are sorted by name before hashing, so tar member order doesn't affect - the hash. + Hashes only file names and file contents of filtered members, ignoring timestamps + and other tar metadata. Members are sorted by name before hashing for consistency. + Operates entirely in-memory to prevent leakage of sensitive data. - Note: This function operates entirely in-memory using tar.extractfile() which reads - from the archive stream. No files are written to disk, preventing leakage of sensitive - data that may be contained in the archive. + Args: + stream: Binary stream containing the tar archive + filter_file_names: Callable that returns True for members to include in hash + and file names list. Default includes all members. Use this to exclude + metadata files (e.g., manifest.yaml) from the hash calculation. + chunk_size: Size of chunks to read when hashing file contents. Default 8192. + + Returns: + tuple: (content_hash, file_names) """ stream.seek(0) hash_obj = hashlib.sha3_256() + file_names = [] with tarfile.open(fileobj=stream, mode="r:*") as tar: members = sorted(tar.getmembers(), key=lambda m: m.name) for member in members: + if not filter_file_names(member.name): + continue + hash_obj.update(member.name.encode()) if member.isfile(): + file_names.append(member.name) f = tar.extractfile(member) if f: while chunk := f.read(chunk_size): hash_obj.update(chunk) digest = hash_obj.digest() - return base64.b64encode(digest).decode("ascii") + content_hash = base64.b64encode(digest).decode("ascii") + + return content_hash, file_names def str2bool(v: str) -> bool: diff --git a/docs/uv.lock b/docs/uv.lock index a1822e37c..b67554a4a 100644 --- a/docs/uv.lock +++ b/docs/uv.lock @@ -822,7 +822,7 @@ wheels = [ [[package]] name = "dlt" -version = "1.19.0" +version = "1.19.1" source = { editable = "../" } dependencies = [ { name = "click" }, @@ -983,7 +983,7 @@ requires-dist = [ { name = "sqlalchemy", marker = "extra == 'pyiceberg'", specifier = ">=1.4" }, { name = "sqlalchemy", marker = "extra == 'sql-database'", specifier = ">=1.4" }, { name = "sqlalchemy", marker = "extra == 'sqlalchemy'", specifier = ">=1.4" }, - { name = "sqlglot", specifier = ">=25.4.0" }, + { name = "sqlglot", specifier = ">=25.4.0,!=28.1" }, { name = "tantivy", marker = "extra == 'lancedb'", specifier = ">=0.22.0" }, { name = "tenacity", specifier = ">=8.0.2" }, { name = "tomlkit", specifier = ">=0.11.3" }, diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index d1748d58a..86138c87a 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -753,6 +753,7 @@ def test_configuration_is_mutable_mapping(environment: Any, env_provider: Config "request_max_retry_delay": 300, "config_files_storage_path": "storage", "dlthub_dsn": None, + "run_id": None, "http_show_error_body": False, "http_max_error_body_length": 8192, "secret_value": None, diff --git a/tests/workspace/test_workspace_context.py b/tests/workspace/test_workspace_context.py index 029c862d1..43fd4752c 100644 --- a/tests/workspace/test_workspace_context.py +++ b/tests/workspace/test_workspace_context.py @@ -1,11 +1,13 @@ import os import pytest import pickle +import tempfile import dlt from dlt._workspace._workspace_context import WorkspaceRunContext, switch_context from dlt._workspace.cli.utils import check_delete_local_data, delete_local_data from dlt._workspace.exceptions import WorkspaceRunContextNotAvailable +from dlt._workspace.helpers.runtime.runtime_artifacts import sync_from_runtime from dlt._workspace.profile import DEFAULT_PROFILE, read_profile_pin, save_profile_pin from dlt._workspace.run_context import ( DEFAULT_LOCAL_FOLDER, @@ -16,7 +18,9 @@ from dlt._workspace.cli.echo import always_choose from dlt.common.runtime.exceptions import RunContextNotAvailable from dlt.common.runtime.run_context import DOT_DLT, RunContext, global_dir +from dlt.common.storages.file_storage import FileStorage from tests.pipeline.utils import assert_table_counts +from tests.utils import clean_test_storage from tests.workspace.utils import isolated_workspace @@ -146,6 +150,43 @@ def test_workspace_pipeline() -> None: assert os.path.isdir(os.path.join(ctx.local_dir, "prod_ducklake.files")) +def test_workspace_send_artifacts() -> None: + pytest.importorskip("duckdb", minversion="1.3.2") + + # create a random temp directory for the test bucket + with tempfile.TemporaryDirectory() as temp_bucket_dir: + bucket_base = os.path.join(temp_bucket_dir, "local_bucket", "workspace_id") + send_bucket_url = os.path.join(bucket_base, "tests", "pipelines") + + # mock run id to enable artifact storage + os.environ["RUNTIME__RUN_ID"] = "uniq_run_id" + # emit runtime filesystem info + os.environ["SEND__ARTIFACTS__BUCKET_URL"] = send_bucket_url + # auto create dirs + os.environ["ARTIFACTS__KWARGS"] = '{"auto_mkdir": true}' + + with isolated_workspace("pipelines", profile="tests") as ctx: + # `ducklake_pipeline` configured in config.toml + pipeline = dlt.pipeline(pipeline_name="ducklake_pipeline") + pipeline.run([{"foo": 1}, {"foo": 2}], table_name="table_foo") + + print(ctx.run_dir) + + # delete the whole workspace + clean_test_storage() + + with isolated_workspace("pipelines", profile="tests") as ctx: + # now restore pipeline from bucket + os.environ["SYNC__ARTIFACTS__BUCKET_URL"] = bucket_base + sync_from_runtime() + # now pipeline sees restored state + pipeline = dlt.pipeline(pipeline_name="ducklake_pipeline") + assert pipeline.first_run is False + assert pipeline.default_schema_name == "ducklake" + assert pipeline.default_schema.tables["table_foo"] is not None + assert pipeline.last_trace is not None + + def assert_dev_config() -> None: # check profile toml providers assert dlt.config["config_val"] == "config.toml"