Feat/adds workspace configuration (#3221)

* removes runtime configuration from pipeline context to run context with corresponding action of initializing local runtime

* improves telemetry instrumentation decorator + tests + disable telemetry in dlt tests by default

* resolves DashboardConfiguration so it is placed within workspace or pipeline configuration

* adds and resolves WorkspaceConfiguration and corresponding WorkspaceRuntimeConfiguration

* reorganizes cli commands, wrappers, adds missing telemetry tracking

* uses working and local dir overrides without adding profile names

* uses python language to display stacktraces in marimo

* restores runtime_config on pipeline pointing to new PipelineRuntimeConfiguration

* renames working dir def _data to .var and updates .gitignore

* adds workspace show command

* adds reload method on run context, fixes requests helper test

* slight cli improvements

* new workspace writeup without sidebar

* docstring for cli plugins
This commit is contained in:
rudolfix
2025-10-20 23:18:54 +02:00
committed by GitHub
parent 3298b4059f
commit 563c764f29
67 changed files with 1520 additions and 805 deletions

View File

@@ -1,3 +1,4 @@
import os
from typing import Any, Dict, Optional
from dlt._workspace.exceptions import WorkspaceRunContextNotAvailable
@@ -21,9 +22,8 @@ def plug_workspace_context_impl(
) -> Optional[RunContextBase]:
# TODO: if recursive search was requested
# if runtime_kwargs.get("_look_recursive")
run_dir = run_dir or "."
run_dir = os.path.abspath(run_dir or ".")
if is_workspace_dir(run_dir):
# TODO: use RunContext to read WorkspaceConfiguration
profile: str = None
if runtime_kwargs:
profile = runtime_kwargs.get("profile")

View File

@@ -3,6 +3,10 @@ secrets.toml
*.secrets.toml
# ignore pinned profile name
.dlt/profile-name
# ignore default working dir
.dlt/.var
# ignore default local dir (loaded data)
_local
# ignore basic python artifacts
.env
**/__pycache__/

View File

@@ -6,10 +6,13 @@ from dlt.common import known_env
from dlt.common.configuration.container import Container
from dlt.common.configuration.providers import EnvironProvider
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.specs import known_sections
from dlt.common.configuration.specs.pluggable_run_context import (
ProfilesRunContext,
PluggableRunContext,
)
from dlt.common.configuration.specs.runtime_configuration import RuntimeConfiguration
from dlt.common.runtime.init import initialize_runtime
from dlt.common.runtime.run_context import (
DOT_DLT,
switch_context as _switch_context,
@@ -18,6 +21,7 @@ from dlt.common.runtime.run_context import (
)
from dlt.common.typing import copy_sig_ret
from dlt._workspace.configuration import WorkspaceConfiguration, WorkspaceRuntimeConfiguration
from dlt._workspace.exceptions import WorkspaceRunContextNotAvailable
from dlt._workspace.profile import BUILT_IN_PROFILES, DEFAULT_PROFILE, read_profile_pin
from dlt._workspace.providers import ProfileConfigTomlProvider, ProfileSecretsTomlProvider
@@ -36,12 +40,10 @@ class WorkspaceRunContext(ProfilesRunContext):
self._init_run_dir = run_dir
self._name = name
self._profile = profile
self._data_dir = default_working_dir(
self.settings_dir, name, profile, DEFAULT_WORKSPACE_WORKING_FOLDER
)
# TODO: if local_dir == run_dir and profile "dev" profile prefixing for local_dir for OSS compat
self._local_dir = default_working_dir(self.run_dir, name, profile, DEFAULT_LOCAL_FOLDER)
self._data_dir: str = None
self._local_dir: str = None
self._global_dir = global_dir()
self._config: WorkspaceConfiguration = None
@property
def name(self) -> str:
@@ -64,6 +66,7 @@ class WorkspaceRunContext(ProfilesRunContext):
@property
def local_dir(self) -> str:
assert self._local_dir, "local_dir used before workspace configuration got resolved"
return os.environ.get(known_env.DLT_LOCAL_DIR, self._local_dir)
@property
@@ -76,6 +79,7 @@ class WorkspaceRunContext(ProfilesRunContext):
@property
def data_dir(self) -> str:
assert self._data_dir, "data_dir used before workspace configuration got resolved"
return os.environ.get(known_env.DLT_DATA_DIR, self._data_dir)
def initial_providers(self) -> List[ConfigProvider]:
@@ -86,6 +90,49 @@ class WorkspaceRunContext(ProfilesRunContext):
]
return providers
def initialize_runtime(self, runtime_config: RuntimeConfiguration = None) -> None:
if runtime_config is not None:
assert isinstance(runtime_config, WorkspaceRuntimeConfiguration)
self.config.runtime = runtime_config
# this also resolves workspace config if necessary
initialize_runtime(self.name, self.config.runtime)
@property
def runtime_config(self) -> WorkspaceRuntimeConfiguration:
return self._config.runtime
@property
def config(self) -> WorkspaceConfiguration:
def _to_run_dir(dir_: Optional[str]) -> Optional[str]:
if not dir_:
return None
return os.path.join(self.run_dir, dir_)
if self._config is None:
from dlt.common.configuration.resolve import resolve_configuration
self._config = resolve_configuration(
WorkspaceConfiguration(), sections=(known_sections.WORKSPACE,)
)
# overwrite name
if self._config.settings.name:
self._name = self._config.settings.name
self._data_dir = _to_run_dir(self._config.settings.working_dir) or default_working_dir(
self.settings_dir,
self.name,
self.profile,
DEFAULT_WORKSPACE_WORKING_FOLDER,
)
self._local_dir = _to_run_dir(self._config.settings.local_dir) or default_working_dir(
self.run_dir,
self.name,
self.profile,
DEFAULT_LOCAL_FOLDER,
)
return self._config
@property
def module(self) -> Optional[ModuleType]:
try:

View File

@@ -3,11 +3,12 @@ import shutil
from pathlib import Path
from typing import List, Tuple, get_args, Literal, Union, cast
from dlt._workspace.cli import echo as fmt
from dlt.common.libs import git
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.runtime import run_context
from dlt._workspace.cli import echo as fmt, utils
TSupportedIde = Literal[
"amp",
"codex",
@@ -137,5 +138,10 @@ def vibe_source_setup(
fmt.echo(fmt.bold(file))
@utils.track_command("ai", False, operation="setup")
def ai_setup_command_wrapper(ide: TSupportedIde, branch: Union[str, None], repo: str) -> None:
ai_setup_command(ide, location=repo, branch=branch)
# TODO create a command to create a copy-pasteable MCP server config
def mcp_command() -> None: ...

View File

@@ -1,176 +0,0 @@
from typing import Any, Optional, Union
import yaml
import os
import click
from dlt.version import __version__
from dlt.common.json import json
from dlt.common.schema import Schema
from dlt.common.typing import DictStrAny
from dlt.pipeline.exceptions import CannotRestorePipelineException
import dlt._workspace.cli.echo as fmt
from dlt._workspace.cli import utils
from dlt._workspace.cli.exceptions import CliCommandException, PipelineWasNotRun
from dlt._workspace.cli._init_command import (
init_command,
list_sources_command,
list_destinations_command,
)
from dlt._workspace.cli._pipeline_command import pipeline_command
from dlt._workspace.cli._telemetry_command import (
change_telemetry_status_command,
telemetry_status_command,
DLT_TELEMETRY_DOCS_URL,
)
from dlt._workspace.cli._ai_command import ai_setup_command, TSupportedIde
try:
from dlt._workspace.cli import _deploy_command
except ModuleNotFoundError:
pass
@utils.track_command("init", False, "source_name", "destination_type")
def init_command_wrapper(
source_name: str,
destination_type: str,
repo_location: str,
branch: str,
eject_source: bool = False,
) -> None:
init_command(
source_name,
destination_type,
repo_location,
branch,
eject_source,
)
@utils.track_command("list_sources", False)
def list_sources_command_wrapper(repo_location: str, branch: str) -> None:
list_sources_command(repo_location, branch)
@utils.track_command("list_destinations", False)
def list_destinations_command_wrapper() -> None:
list_destinations_command()
@utils.track_command("pipeline", True, "operation")
def pipeline_command_wrapper(
operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, **command_kwargs: Any
) -> None:
try:
pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, **command_kwargs)
except CannotRestorePipelineException as ex:
click.secho(str(ex), err=True, fg="red")
click.secho(
"Try command %s to restore the pipeline state from destination"
% fmt.bold(f"dlt pipeline {pipeline_name} sync")
)
raise CliCommandException(error_code=-2)
@utils.track_command("deploy", False, "deployment_method")
def deploy_command_wrapper(
pipeline_script_path: str,
deployment_method: str,
repo_location: str,
branch: Optional[str] = None,
**kwargs: Any,
) -> None:
try:
utils.ensure_git_command("deploy")
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
raise CliCommandException(error_code=-2)
from git import InvalidGitRepositoryError, NoSuchPathError
try:
_deploy_command.deploy_command(
pipeline_script_path=pipeline_script_path,
deployment_method=deployment_method,
repo_location=repo_location,
branch=branch,
**kwargs,
)
except (CannotRestorePipelineException, PipelineWasNotRun) as ex:
fmt.note(
"You must run the pipeline locally successfully at least once in order to deploy it."
)
raise CliCommandException(error_code=-3, raiseable_exception=ex)
except InvalidGitRepositoryError:
click.secho(
"No git repository found for pipeline script %s." % fmt.bold(pipeline_script_path),
err=True,
fg="red",
)
fmt.note("If you do not have a repository yet, you can do either of:")
fmt.note(
"- Run the following command to initialize new repository: %s" % fmt.bold("git init")
)
fmt.note(
"- Add your local code to Github as described here: %s"
% fmt.bold(
"https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github"
)
)
fmt.note(
"Please refer to %s for further assistance"
% fmt.bold(_deploy_command.DLT_DEPLOY_DOCS_URL)
)
raise CliCommandException(error_code=-4)
except NoSuchPathError as path_ex:
click.secho("The pipeline script does not exist\n%s" % str(path_ex), err=True, fg="red")
raise CliCommandException(error_code=-5)
@utils.track_command("schema", False, "operation")
def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool) -> None:
with open(file_path, "rb") as f:
if os.path.splitext(file_path)[1][1:] == "json":
schema_dict: DictStrAny = json.load(f)
else:
schema_dict = yaml.safe_load(f)
s = Schema.from_dict(schema_dict)
if format_ == "json":
schema_str = s.to_pretty_json(remove_defaults=remove_defaults)
elif format_ == "yaml":
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
elif format_ == "dbml":
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
fmt.echo(schema_str)
@utils.track_command("dashboard", True)
def dashboard_command_wrapper(pipelines_dir: Optional[str], edit: bool) -> None:
from dlt._workspace.helpers.dashboard.runner import run_dashboard
run_dashboard(pipelines_dir=pipelines_dir, edit=edit)
@utils.track_command("telemetry", False)
def telemetry_status_command_wrapper() -> None:
telemetry_status_command()
@utils.track_command("telemetry_switch", False, "enabled")
def telemetry_change_status_command_wrapper(enabled: bool) -> None:
try:
change_telemetry_status_command(enabled)
except Exception as ex:
raise CliCommandException(docs_url=DLT_TELEMETRY_DOCS_URL, raiseable_exception=ex)
@utils.track_command("ai_setup", False)
def ai_setup_command_wrapper(ide: TSupportedIde, branch: Union[str, None], repo: str) -> None:
ai_setup_command(ide, location=repo, branch=branch)

View File

@@ -21,6 +21,8 @@ from dlt._workspace.cli._deploy_command_helpers import (
wrap_template_str,
get_schedule_description,
)
from dlt._workspace.cli.exceptions import CliCommandException, PipelineWasNotRun
from dlt.pipeline.exceptions import CannotRestorePipelineException
REQUIREMENTS_GITHUB_ACTION = "requirements_github_action.txt"
DLT_AIRFLOW_GCP_DOCS_URL = (
@@ -426,3 +428,54 @@ class AirflowDeployment(BaseDeployment):
fmt.echo("c. Push changes to github. Use your Git UI or the following command")
fmt.echo(fmt.bold("git push origin"))
fmt.echo("6. You should see your pipeline in Airflow.")
@utils.track_command("deploy", False, "deployment_method")
def deploy_command_wrapper(
pipeline_script_path: str,
deployment_method: str,
repo_location: str,
branch: Optional[str] = None,
**kwargs: Any,
) -> None:
try:
utils.ensure_git_command("deploy")
except Exception as ex:
fmt.secho(str(ex), err=True, fg="red")
raise CliCommandException(error_code=-2)
from git import InvalidGitRepositoryError, NoSuchPathError
try:
deploy_command(
pipeline_script_path=pipeline_script_path,
deployment_method=deployment_method,
repo_location=repo_location,
branch=branch,
**kwargs,
)
except (CannotRestorePipelineException, PipelineWasNotRun) as ex:
fmt.note(
"You must run the pipeline locally successfully at least once in order to deploy it."
)
raise CliCommandException(error_code=-3, raiseable_exception=ex)
except InvalidGitRepositoryError:
fmt.secho(
"No git repository found for pipeline script %s." % fmt.bold(pipeline_script_path),
err=True,
fg="red",
)
fmt.note("If you do not have a repository yet, you can do either of:")
fmt.note(
"- Run the following command to initialize new repository: %s" % fmt.bold("git init")
)
fmt.note(
"- Add your local code to Github as described here: %s"
% fmt.bold(
"https://docs.github.com/en/get-started/importing-your-projects-to-github/importing-source-code-to-github/adding-locally-hosted-code-to-github"
)
)
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
raise CliCommandException(error_code=-4)
except NoSuchPathError as path_ex:
fmt.secho("The pipeline script does not exist\n%s" % str(path_ex), err=True, fg="red")
raise CliCommandException(error_code=-5)

View File

@@ -29,7 +29,6 @@ from dlt.common.configuration.providers import (
StringTomlProvider,
)
from dlt.common.libs.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.runtime_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
@@ -38,6 +37,7 @@ from dlt.common.utils import set_working_dir
from dlt.pipeline.pipeline import Pipeline
from dlt.pipeline.trace import PipelineTrace
from dlt.pipeline.configuration import get_default_pipeline_name
from dlt.reflection import names as n
from dlt.reflection.script_visitor import PipelineScriptVisitor

View File

@@ -6,14 +6,12 @@ from rich.markdown import Markdown
from dlt.version import __version__
from dlt.common.runners import Venv
from dlt._workspace.cli import SupportsCliCommand
import dlt._workspace.cli.echo as fmt
from dlt._workspace.cli import SupportsCliCommand, echo as fmt, _debug
from dlt._workspace.cli.exceptions import CliCommandException
from dlt._workspace.cli._command_wrappers import (
from dlt._workspace.cli._telemetry_command import (
telemetry_change_status_command_wrapper,
)
from dlt._workspace.cli import _debug
from dlt._workspace.cli.echo import maybe_no_stdin
from dlt._workspace.cli.utils import display_run_context_info

View File

@@ -801,3 +801,30 @@ def _list_verified_sources(
def _list_core_destinations() -> list[str]:
return dlt.destinations.__all__
@utils.track_command("init", False, "source_name", "destination_type")
def init_command_wrapper(
source_name: str,
destination_type: str,
repo_location: str,
branch: str,
eject_source: bool = False,
) -> None:
init_command(
source_name,
destination_type,
repo_location,
branch,
eject_source,
)
@utils.track_command("list_sources", False)
def list_sources_command_wrapper(repo_location: str, branch: str) -> None:
list_sources_command(repo_location, branch)
@utils.track_command("list_destinations", False)
def list_destinations_command_wrapper() -> None:
list_destinations_command()

View File

@@ -2,14 +2,12 @@ import os
import yaml
from typing import Any, Sequence, Tuple
import dlt
from dlt._workspace.cli.exceptions import CliCommandInnerException
from dlt.common.json import json
from dlt.common.pipeline import get_dlt_pipelines_dir, TSourceState
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.runtime import run_context
from dlt.common.schema.utils import (
group_tables_by_resource,
has_table_seen_data,
@@ -21,8 +19,8 @@ from dlt.common.storages import FileStorage, PackageStorage
from dlt.extract.state import resource_state
from dlt.pipeline.helpers import pipeline_drop
from dlt.pipeline.exceptions import CannotRestorePipelineException
from dlt._workspace.cli import echo as fmt
from dlt._workspace.cli import echo as fmt, utils
from dlt._workspace.cli.exceptions import CliCommandException, CliCommandInnerException
DLT_PIPELINE_COMMAND_DOCS_URL = (
@@ -321,6 +319,14 @@ def pipeline_command(
p = p.drop()
fmt.echo("Restoring from destination")
p.sync_destination()
if p.first_run:
# remote state was not found
p._wipe_working_folder()
fmt.error(
f"Pipeline {pipeline_name} was not found in dataset {dataset_name} in"
f" {destination}"
)
return
if operation == "load-package":
load_id = command_kwargs.get("load_id")
@@ -449,3 +455,18 @@ def pipeline_command(
fmt.warning(warning)
if fmt.confirm("Do you want to apply these changes?", default=False):
drop()
@utils.track_command("pipeline", True, "operation")
def pipeline_command_wrapper(
operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, **command_kwargs: Any
) -> None:
try:
pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, **command_kwargs)
except CannotRestorePipelineException as ex:
fmt.secho(str(ex), err=True, fg="red")
fmt.secho(
"Try command %s to restore the pipeline state from destination"
% fmt.bold(f"dlt pipeline {pipeline_name} sync")
)
raise CliCommandException(error_code=-2)

View File

@@ -1,3 +1,12 @@
"""Module registering command line plugins
To add a new plugin here, do the following:
1. create a new command class in like ie. `dlt._workspace.cli.commands `ProfileCommand(SupportsCliCommand):`
2. provide the implementation of command functions like ie. in `dlt._workspace.cli._profile_command`
3. remember to wrap command in telemetry ie. @utils.track_command("profile", track_before=False, operation="info")
4. register the plugin here.
this module is inspected by pluggy on dlt startup
"""
from typing import Type
from dlt.common.configuration import plugins
@@ -77,7 +86,7 @@ def plug_cli_ai() -> Type[plugins.SupportsCliCommand]:
@plugins.hookimpl(specname="plug_cli")
def plug_cli_profile() -> Type[plugins.SupportsCliCommand]:
if is_workspace_active():
from dlt._workspace.cli._profile_command import ProfileCommand
from dlt._workspace.cli.commands import ProfileCommand
return ProfileCommand
else:
@@ -87,7 +96,7 @@ def plug_cli_profile() -> Type[plugins.SupportsCliCommand]:
@plugins.hookimpl(specname="plug_cli")
def plug_cli_workspace() -> Type[plugins.SupportsCliCommand]:
if is_workspace_active():
from dlt._workspace.cli._workspace_command import WorkspaceCommand
from dlt._workspace.cli.commands import WorkspaceCommand
return WorkspaceCommand
else:

View File

@@ -1,5 +1,4 @@
import os
import argparse
from dlt._workspace._workspace_context import WorkspaceRunContext, active
from dlt._workspace.profile import (
@@ -8,66 +7,17 @@ from dlt._workspace.profile import (
read_profile_pin,
save_profile_pin,
)
from dlt._workspace.cli import SupportsCliCommand, echo as fmt
class ProfileCommand(SupportsCliCommand):
command = "profile"
help_string = "Manage Workspace built-in profiles"
description = """
Commands to list and pin profiles
Run without arguments to list all profiles, the default profile and the
pinned profile in current project.
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
parser.add_argument("profile_name", help="Name of the profile", nargs="?")
subparsers = parser.add_subparsers(
title="Available subcommands", dest="profile_command", required=False
)
subparsers.add_parser(
"info",
help="Show information about the current profile.",
description="Show information about the current profile.",
)
subparsers.add_parser(
"list",
help="Show list of built-in profiles.",
description="Show list of built-in profiles.",
)
subparsers.add_parser(
"pin",
help="Pin a profile to the Workspace.",
description="""
Pin a profile to the Workspace, this will be the new default profile while it is pinned.
""",
)
def execute(self, args: argparse.Namespace) -> None:
workspace_context = active()
if args.profile_command == "info" or not args.profile_command:
print_profile_info(workspace_context)
elif args.profile_command == "list":
list_profiles(workspace_context)
elif args.profile_command == "pin":
pin_profile(workspace_context, args.profile_name)
else:
self.parser.print_usage()
from dlt._workspace.cli import SupportsCliCommand, echo as fmt, utils
@utils.track_command("profile", track_before=False, operation="info")
def print_profile_info(workspace_run_context: WorkspaceRunContext) -> None:
fmt.echo("Current profile: %s" % fmt.bold(workspace_run_context.profile))
if pinned_profile := read_profile_pin(workspace_run_context):
fmt.echo("Pinned profile: %s" % fmt.bold(pinned_profile))
@utils.track_command("profile", track_before=False, operation="list")
def list_profiles(workspace_run_context: WorkspaceRunContext) -> None:
fmt.echo("Available profiles:")
for profile in workspace_run_context.available_profiles():
@@ -75,6 +25,7 @@ def list_profiles(workspace_run_context: WorkspaceRunContext) -> None:
fmt.echo("* %s - %s" % (fmt.bold(profile), desc))
@utils.track_command("profile", track_before=False, operation="pin")
def pin_profile(workspace_run_context: WorkspaceRunContext, profile_name: str) -> None:
if not profile_name:
pinned_profile = read_profile_pin(workspace_run_context)

View File

@@ -3,13 +3,14 @@ import os
from dlt.common.configuration.container import Container
from dlt.common.configuration.providers.toml import ConfigTomlProvider
from dlt.common.configuration.specs import RuntimeConfiguration
from dlt._workspace.cli import echo as fmt
from dlt._workspace.cli.utils import get_telemetry_status
from dlt._workspace.cli.config_toml_writer import WritableConfigValue, write_values
from dlt.common.configuration.specs import PluggableRunContext
from dlt.common.runtime.anon_tracker import get_anonymous_id
from dlt._workspace.cli import echo as fmt, utils
from dlt._workspace.cli.exceptions import CliCommandException
from dlt._workspace.cli.utils import get_telemetry_status
from dlt._workspace.cli.config_toml_writer import WritableConfigValue, write_values
DLT_TELEMETRY_DOCS_URL = "https://dlthub.com/docs/reference/telemetry"
@@ -49,3 +50,16 @@ def change_telemetry_status_command(enabled: bool) -> None:
fmt.echo("Telemetry switched %s" % fmt.bold("OFF"))
# reload config providers
Container()[PluggableRunContext].reload_providers()
@utils.track_command("telemetry", False)
def telemetry_status_command_wrapper() -> None:
telemetry_status_command()
@utils.track_command("telemetry_switch", False, "enabled")
def telemetry_change_status_command_wrapper(enabled: bool) -> None:
try:
change_telemetry_status_command(enabled)
except Exception as ex:
raise CliCommandException(docs_url=DLT_TELEMETRY_DOCS_URL, raiseable_exception=ex)

View File

@@ -6,72 +6,15 @@ from dlt.common.configuration.specs.pluggable_run_context import (
RunContextBase,
)
from dlt._workspace.cli import SupportsCliCommand, echo as fmt, utils
from dlt._workspace._workspace_context import WorkspaceRunContext, active
from dlt._workspace.cli.utils import add_mcp_arg_parser, delete_local_data
from dlt._workspace.cli import echo as fmt, utils
from dlt._workspace._workspace_context import WorkspaceRunContext
from dlt._workspace.cli.utils import check_delete_local_data, delete_local_data
from dlt._workspace.profile import read_profile_pin
class WorkspaceCommand(SupportsCliCommand):
command = "workspace"
help_string = "Manage current Workspace"
description = """
Commands to get info, cleanup local files and launch Workspace MCP
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
subparsers = parser.add_subparsers(
title="Available subcommands", dest="workspace_command", required=False
)
# clean command
clean_local_parser = subparsers.add_parser(
"clean",
help=(
"Cleans local data for the selected profile. Locally loaded data will be deleted. "
"Pipelines working directories are also deleted by default. Data in remote "
"destinations is not affected."
),
)
clean_local_parser.add_argument(
"--skip-data-dir",
action="store_true",
default=False,
help="Do not delete pipelines working dir.",
)
subparsers.add_parser(
"info",
help="Displays workspace info.",
)
DEFAULT_DLT_MCP_PORT = 43654
add_mcp_arg_parser(
subparsers,
"This MCP allows to attach to any pipeline that was previously ran in this workspace"
" and then facilitates schema and data exploration in the pipeline's dataset.",
"Launch dlt MCP server in current Python environment and Workspace in SSE transport"
" mode",
DEFAULT_DLT_MCP_PORT,
)
def execute(self, args: argparse.Namespace) -> None:
workspace_context = active()
if args.workspace_command == "info" or not args.workspace_command:
print_workspace_info(workspace_context)
elif args.workspace_command == "clean":
clean_workspace(workspace_context, args)
elif args.workspace_command == "mcp":
start_mcp(workspace_context, port=args.port, stdio=args.stdio)
else:
self.parser.print_usage()
@utils.track_command("workspace", track_before=False, operation="info")
def print_workspace_info(run_context: WorkspaceRunContext) -> None:
# fmt.echo("Workspace %s:" % fmt.bold(run_context.name))
fmt.echo("Workspace %s:" % fmt.bold(run_context.name))
fmt.echo("Workspace dir: %s" % fmt.bold(run_context.run_dir))
fmt.echo("Settings dir: %s" % fmt.bold(run_context.settings_dir))
# profile info
@@ -80,7 +23,7 @@ def print_workspace_info(run_context: WorkspaceRunContext) -> None:
fmt.echo(" Pipelines and other working data: %s" % fmt.bold(run_context.data_dir))
fmt.echo(" Locally loaded data: %s" % fmt.bold(run_context.local_dir))
if run_context.profile == read_profile_pin(run_context):
fmt.echo(" Profile in %s" % fmt.bold("pinned"))
fmt.echo(" Profile is %s" % fmt.bold("pinned"))
# provider info
providers_context = Container()[PluggableRunContext].providers
fmt.echo()
@@ -93,11 +36,15 @@ def print_workspace_info(run_context: WorkspaceRunContext) -> None:
fmt.echo(" provider is empty")
@utils.track_command("workspace", track_before=False, operation="clean")
def clean_workspace(run_context: RunContextBase, args: argparse.Namespace) -> None:
delete_local_data(run_context, args.skip_data_dir)
fmt.echo("Local pipelines data will be removed. Remote destinations are not affected.")
deleted_dirs = check_delete_local_data(run_context, args.skip_data_dir)
if deleted_dirs:
delete_local_data(run_context, deleted_dirs)
@utils.track_command("mcp_run", track_before=True)
@utils.track_command("workspace", track_before=True, operation="mcp")
def start_mcp(run_context: WorkspaceRunContext, port: int, stdio: bool) -> None:
from dlt._workspace.mcp import WorkspaceMCP
@@ -107,3 +54,10 @@ def start_mcp(run_context: WorkspaceRunContext, port: int, stdio: bool) -> None:
fmt.echo("Starting dlt MCP server", err=True)
mcp_server = WorkspaceMCP(f"dlt: {run_context.name}@{run_context.profile}", port=port)
mcp_server.run(transport)
@utils.track_command("dashboard", True)
def show_workspace(run_context: WorkspaceRunContext, edit: bool) -> None:
from dlt._workspace.helpers.dashboard.runner import run_dashboard
run_dashboard(edit=edit)

View File

@@ -1,35 +1,29 @@
from typing import Type
import argparse
import os
from typing import Optional
from dlt.common.configuration import plugins
import yaml
import dlt._workspace.cli.echo as fmt
from dlt.common import json
from dlt.common.schema.schema import Schema
from dlt.common.storages.configuration import SCHEMA_FILES_EXTENSIONS
from dlt.common.typing import DictStrAny
from dlt._workspace.cli import echo as fmt, utils
from dlt._workspace.cli import SupportsCliCommand, DEFAULT_VERIFIED_SOURCES_REPO
from dlt._workspace.cli.exceptions import CliCommandException
from dlt._workspace.cli._command_wrappers import (
init_command_wrapper,
list_sources_command_wrapper,
list_destinations_command_wrapper,
pipeline_command_wrapper,
schema_command_wrapper,
telemetry_status_command_wrapper,
deploy_command_wrapper,
ai_setup_command_wrapper,
dashboard_command_wrapper,
)
from dlt._workspace.cli.utils import add_mcp_arg_parser
from dlt._workspace.cli._ai_command import SUPPORTED_IDES
from dlt._workspace.cli._docs_command import render_argparse_markdown
from dlt._workspace.cli._pipeline_command import DLT_PIPELINE_COMMAND_DOCS_URL
from dlt._workspace.cli._init_command import DLT_INIT_DOCS_URL
from dlt._workspace.cli._telemetry_command import DLT_TELEMETRY_DOCS_URL
from dlt._workspace.cli.utils import add_mcp_arg_parser
from dlt._workspace.cli._deploy_command import (
DeploymentMethods,
COMMAND_DEPLOY_REPO_LOCATION,
SecretFormats,
DLT_DEPLOY_DOCS_URL,
)
from dlt.common.storages.configuration import SCHEMA_FILES_EXTENSIONS
try:
import pipdeptree
@@ -118,6 +112,12 @@ version if run again with an existing `source` name. You will be warned if files
)
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace.cli._init_command import (
list_destinations_command_wrapper,
list_sources_command_wrapper,
init_command_wrapper,
)
if args.list_sources:
list_sources_command_wrapper(args.location, args.branch)
elif args.list_destinations:
@@ -428,6 +428,8 @@ list of all tables and columns created at the destination during the loading of
)
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace.cli._pipeline_command import pipeline_command_wrapper
if args.list_pipelines:
pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
else:
@@ -470,6 +472,27 @@ The `dlt schema` command will load, validate and print out a dlt schema: `dlt sc
)
def execute(self, args: argparse.Namespace) -> None:
@utils.track_command("schema", False, "format_")
def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool) -> None:
with open(file_path, "rb") as f:
if os.path.splitext(file_path)[1][1:] == "json":
schema_dict: DictStrAny = json.load(f)
else:
schema_dict = yaml.safe_load(f)
s = Schema.from_dict(schema_dict)
if format_ == "json":
schema_str = s.to_pretty_json(remove_defaults=remove_defaults)
elif format_ == "yaml":
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
elif format_ == "dbml":
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
fmt.echo(schema_str)
schema_command_wrapper(args.file, args.format, args.remove_defaults)
@@ -499,6 +522,12 @@ The `dlt dashboard` command starts the dlt pipeline dashboard. You can use the d
)
def execute(self, args: argparse.Namespace) -> None:
@utils.track_command("dashboard", True)
def dashboard_command_wrapper(pipelines_dir: Optional[str], edit: bool) -> None:
from dlt._workspace.helpers.dashboard.runner import run_dashboard
run_dashboard(pipelines_dir=pipelines_dir, edit=edit)
dashboard_command_wrapper(pipelines_dir=args.pipelines_dir, edit=args.edit)
@@ -514,6 +543,8 @@ The `dlt telemetry` command shows the current status of dlt telemetry. Learn mor
self.parser = parser
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace.cli._telemetry_command import telemetry_status_command_wrapper
telemetry_status_command_wrapper()
@@ -635,6 +666,8 @@ the `dlt` Airflow wrapper (https://github.com/dlt-hub/dlt/blob/devel/dlt/helpers
self.parser.print_help()
raise CliCommandException()
else:
from dlt._workspace.cli._deploy_command import deploy_command_wrapper
deploy_command_wrapper(
pipeline_script_path=deploy_args.pop("pipeline_script_path"),
deployment_method=deploy_args.pop("deployment_method"),
@@ -667,6 +700,7 @@ If you are reading this on the docs website, you are looking at the rendered ver
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace.cli._dlt import _create_parser
from dlt._workspace.cli._docs_command import render_argparse_markdown
parser, _ = _create_parser()
@@ -725,48 +759,144 @@ Files are fetched from https://github.com/dlt-hub/verified-sources by default.
# ai_mcp_cmd = ai_subparsers.add_parser("mcp", help="Launch the dlt MCP server")
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace.cli._ai_command import ai_setup_command_wrapper
ai_setup_command_wrapper(ide=args.ide, branch=args.branch, repo=args.location)
#
# Register all commands
#
@plugins.hookimpl(specname="plug_cli")
def plug_cli_init() -> Type[SupportsCliCommand]:
return InitCommand
class WorkspaceCommand(SupportsCliCommand):
command = "workspace"
help_string = "Manage current Workspace"
description = """
Commands to get info, cleanup local files and launch Workspace MCP
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
subparsers = parser.add_subparsers(
title="Available subcommands", dest="workspace_command", required=False
)
# clean command
clean_local_parser = subparsers.add_parser(
"clean",
help=(
"Cleans local data for the selected profile. Locally loaded data will be deleted. "
"Pipelines working directories are also deleted by default. Data in remote "
"destinations is not affected."
),
)
clean_local_parser.add_argument(
"--skip-data-dir",
action="store_true",
default=False,
help="Do not delete pipelines working dir.",
)
subparsers.add_parser(
"info",
help="Displays workspace info.",
)
DEFAULT_DLT_MCP_PORT = 43654
add_mcp_arg_parser(
subparsers,
"This MCP allows to attach to any pipeline that was previously ran in this workspace"
" and then facilitates schema and data exploration in the pipeline's dataset.",
"Launch dlt MCP server in current Python environment and Workspace in SSE transport"
" mode by default.",
DEFAULT_DLT_MCP_PORT,
)
show_parser = subparsers.add_parser(
"show",
help="Shows Workspace Dashboard for the pipelines and data in this workspace.",
)
show_parser.add_argument(
"--edit",
action="store_true",
help="Eject Dashboard and start editable version",
default=None,
)
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace._workspace_context import active
from dlt._workspace.cli._workspace_command import (
print_workspace_info,
clean_workspace,
show_workspace,
start_mcp,
)
workspace_context = active()
if args.workspace_command == "info" or not args.workspace_command:
print_workspace_info(workspace_context)
elif args.workspace_command == "clean":
clean_workspace(workspace_context, args)
elif args.workspace_command == "show":
show_workspace(workspace_context, args.edit)
elif args.workspace_command == "mcp":
start_mcp(workspace_context, port=args.port, stdio=args.stdio)
else:
self.parser.print_usage()
@plugins.hookimpl(specname="plug_cli")
def plug_cli_pipeline() -> Type[SupportsCliCommand]:
return PipelineCommand
class ProfileCommand(SupportsCliCommand):
command = "profile"
help_string = "Manage Workspace built-in profiles"
description = """
Commands to list and pin profiles
Run without arguments to list all profiles, the default profile and the
pinned profile in current project.
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
self.parser = parser
@plugins.hookimpl(specname="plug_cli")
def plug_cli_schema() -> Type[SupportsCliCommand]:
return SchemaCommand
parser.add_argument("profile_name", help="Name of the profile", nargs="?")
subparsers = parser.add_subparsers(
title="Available subcommands", dest="profile_command", required=False
)
# TODO: define actual command and re-enable
@plugins.hookimpl(specname="plug_cli")
def plug_cli_dashboard() -> Type[SupportsCliCommand]:
return DashboardCommand
subparsers.add_parser(
"info",
help="Show information about the current profile.",
description="Show information about the current profile.",
)
subparsers.add_parser(
"list",
help="Show list of built-in profiles.",
description="Show list of built-in profiles.",
)
@plugins.hookimpl(specname="plug_cli")
def plug_cli_telemetry() -> Type[SupportsCliCommand]:
return TelemetryCommand
subparsers.add_parser(
"pin",
help="Pin a profile to the Workspace.",
description="""
Pin a profile to the Workspace, this will be the new default profile while it is pinned.
""",
)
def execute(self, args: argparse.Namespace) -> None:
from dlt._workspace._workspace_context import active
from dlt._workspace.cli._profile_command import (
print_profile_info,
list_profiles,
pin_profile,
)
@plugins.hookimpl(specname="plug_cli")
def plug_cli_deploy() -> Type[SupportsCliCommand]:
return DeployCommand
workspace_context = active()
@plugins.hookimpl(specname="plug_cli")
def plug_cli_docs() -> Type[SupportsCliCommand]:
return CliDocsCommand
@plugins.hookimpl(specname="plug_cli")
def plug_cli_ai() -> Type[SupportsCliCommand]:
return AiCommand
if args.profile_command == "info" or not args.profile_command:
print_profile_info(workspace_context)
elif args.profile_command == "list":
list_profiles(workspace_context)
elif args.profile_command == "pin":
pin_profile(workspace_context, args.profile_name)
else:
self.parser.print_usage()

View File

@@ -1,7 +1,7 @@
import ast
import os
import shutil
from typing import Any, Callable
from typing import Any, Callable, List
import dlt
from dlt.common.typing import TFun
@@ -65,22 +65,41 @@ def add_mcp_arg_parser(
def _may_safe_delete_local(run_context: RunContextBase, deleted_dir_type: str) -> bool:
deleted_dir = getattr(run_context, deleted_dir_type)
for ctx_attr, label in (
("run_dir", "run dir (workspace root)"),
("settings_dir", "settings dir"),
deleted_abs = os.path.abspath(deleted_dir)
run_dir_abs = os.path.abspath(run_context.run_dir)
settings_dir_abs = os.path.abspath(run_context.settings_dir)
# never allow deleting run_dir or settings_dir themselves
for ctx_abs, label in (
(run_dir_abs, "run dir (workspace root)"),
(settings_dir_abs, "settings dir"),
):
if os.path.abspath(deleted_dir) == os.path.abspath(getattr(run_context, ctx_attr)):
if deleted_abs == ctx_abs:
fmt.error(
f"{deleted_dir_type} `deleted_dir` is the same as {label} and cannot be deleted"
f"{deleted_dir_type} `{deleted_dir}` is the same as {label} and cannot be deleted"
)
return False
# ensure deleted directory is inside run_dir
try:
common = os.path.commonpath([deleted_abs, run_dir_abs])
except ValueError:
# occurs when paths are on different drives on windows
common = ""
if common != run_dir_abs:
fmt.error(
f"{deleted_dir_type} `{deleted_dir}` is not within run dir (workspace root) and cannot"
" be deleted"
)
return False
return True
def _wipe_dir(
run_context: RunContextBase, dir_attr: str, echo_template: str, recreate_dirs: bool = True
) -> None:
"""echo, safely wipe and optionally recreate a directory from run context.
"""Echo, safely wipe and optionally recreate a directory from run context.
Args:
run_context: Current run context.
@@ -110,32 +129,68 @@ def _wipe_dir(
os.makedirs(dir_path, exist_ok=True)
def delete_local_data(
run_context: RunContextBase, skip_data_dir: bool, recreate_dirs: bool = True
) -> None:
if isinstance(run_context, ProfilesRunContext):
if run_context.profile not in LOCAL_PROFILES:
fmt.warning("You will clean local data for a profile")
else:
def check_delete_local_data(run_context: RunContextBase, skip_data_dir: bool) -> List[str]:
"""Display paths to be deleted and ask for confirmation.
Args:
run_context: current run context.
skip_data_dir: when True, do not include the data_dir in the deletion set.
Returns:
A list of run_context attribute names that should be deleted. Empty list if user cancels.
Raises:
CliCommandException: if context is invalid or deletion is not safe.
"""
# ensure profiles context
if not isinstance(run_context, ProfilesRunContext):
fmt.error("Cannot delete local data for a context without profiles")
raise CliCommandException()
# delete all files in locally loaded data (if present)
_wipe_dir(
run_context,
"local_dir",
"Will delete locally loaded data in %s",
recreate_dirs,
)
# delete pipeline working folders & other entities data unless explicitly skipped
attrs: list[str] = ["local_dir"]
if not skip_data_dir:
_wipe_dir(
run_context,
"data_dir",
"Will delete pipeline working folders & other entities data %s",
recreate_dirs,
)
attrs.append("data_dir")
# ensure we never attempt to operate on run_dir or settings_dir
for attr in attrs:
if not _may_safe_delete_local(run_context, attr):
raise CliCommandException()
# display relative paths to run_dir
fmt.echo("The following dirs will be deleted:")
for attr in attrs:
dir_path = getattr(run_context, attr)
display_dir = os.path.relpath(dir_path, run_context.run_dir)
if attr == "local_dir":
template = "- %s (locally loaded data)"
elif attr == "data_dir":
template = "- %s (pipeline working folders)"
else:
raise ValueError(attr)
fmt.echo(template % fmt.style(display_dir, fg="yellow", reset=True))
# ask for confirmation
if not fmt.confirm("Do you want to proceed?", default=False):
return []
return attrs
def delete_local_data(
run_context: RunContextBase, dir_attrs: List[str], recreate_dirs: bool = True
) -> None:
"""Delete local data directories after explicit confirmation.
Args:
run_context: current run context.
dir_attrs: A list of run_context attribute names that should be deleted.
recreate_dirs: when True, recreate directories after deletion.
"""
# delete selected directories
for attr in dir_attrs:
_wipe_dir(run_context, attr, "Deleting %s", recreate_dirs)
def parse_init_script(
@@ -170,8 +225,31 @@ def ensure_git_command(command: str) -> None:
) from imp_ex
def track_command(command: str, track_before: bool, *args: str) -> Callable[[TFun], TFun]:
return with_telemetry("command", command, track_before, *args)
def track_command(
command: str, track_before: bool, *args: str, **kwargs: str
) -> Callable[[TFun], TFun]:
"""Return a telemetry decorator for CLI commands.
Wraps a function with anonymous telemetry tracking using ``with_telemetry``. Depending on
``track_before``, emits an event either before execution or after execution with success
information.
Success semantics:
- if the wrapped function returns an int, 0 is treated as success; other values as failure.
- for non-int returns, success is True unless an exception is raised.
Args:
command: event/command name to report.
track_before: if True, emit a single event before calling the function. if False,
emit a single event after the call, including success state.
*args: names of parameters from the decorated function whose values should be included
in the event properties.
**kwargs: additional key-value pairs to include in the event properties.
Returns:
a decorator that applies telemetry tracking to the decorated function.
"""
return with_telemetry("command", command, track_before, *args, **kwargs)
def get_telemetry_status() -> bool:

View File

@@ -0,0 +1,38 @@
from typing import ClassVar, Sequence
from git import Optional
from dlt.common.configuration.specs import known_sections
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec
from dlt.common.configuration.specs.runtime_configuration import RuntimeConfiguration
@configspec
class WorkspaceSettings(BaseConfiguration):
name: Optional[str] = None
# TODO: implement default profile switch. it requires reading the configuration, discovering
# the profile and then recreating the workspace context. since this functionality is not
# immediately needed it will be skipped for now
# default_profile: Optional[str] = None
working_dir: Optional[str] = None
"""Pipeline working dirs, other writable folders, local destination files (by default). Relative to workspace root"""
local_dir: Optional[str] = None
"""Destination local files, by default it is within data_dir/local. Relative to workspace root"""
@configspec
class WorkspaceRuntimeConfiguration(RuntimeConfiguration):
"""Extends runtime configuration with dlthub runtime"""
# TODO: connect workspace to runtime here
# TODO: optionally define scripts and other runtime settings
@configspec
class WorkspaceConfiguration(BaseConfiguration):
settings: WorkspaceSettings = None
runtime: WorkspaceRuntimeConfiguration = None
# NOTE: is resolved separately but in the same layout
# dashboard: DashboardConfiguration
# TODO: launch workspace mcp using mcp configuration
# mcp_config: McpConfiguration
__recommended_sections__: ClassVar[Sequence[str]] = (known_sections.WORKSPACE,)

View File

@@ -1,9 +1,9 @@
import dataclasses
from typing import List
from typing import ClassVar, List
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.configuration.specs import BaseConfiguration, known_sections
@configspec
@@ -41,3 +41,5 @@ class DashboardConfiguration(BaseConfiguration):
+ ", ".join(self.column_other_hints)
+ self.datetime_format
)
__section__: ClassVar[str] = known_sections.DASHBOARD

View File

@@ -26,13 +26,15 @@ def build_error_callout(message: str, code: str = None, traceback_string: str =
stack_items = [mo.md(message)]
if code:
stack_items.append(mo.ui.code_editor(code, language="sh"))
stack_items.append(
mo.ui.code_editor(code, language="python", disabled=True, show_copy_button=True)
)
if traceback_string:
stack_items.append(
mo.accordion(
{
"Show stacktrace for more information or debugging": mo.ui.code_editor(
traceback_string, language="sh"
traceback_string, language="python", disabled=True, show_copy_button=True
)
}
)

View File

@@ -2,7 +2,7 @@ import shutil
import functools
from itertools import chain
from pathlib import Path
from typing import Any, Dict, Iterable, List, Mapping, Tuple, Union, cast
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast
import os
import platform
import subprocess
@@ -24,6 +24,8 @@ from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.storages import FileStorage
from dlt.common.destination.client import DestinationClientConfiguration
from dlt.common.destination.exceptions import SqlClientNotAvailable
from dlt.common.storages.configuration import WithLocalFiles
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.typing import DictStrAny
from dlt.common.utils import map_nested_keys_in_place
@@ -34,9 +36,7 @@ from dlt.destinations.exceptions import DatabaseUndefinedRelation, DestinationUn
from dlt.pipeline.exceptions import PipelineConfigMissing
from dlt.pipeline.exceptions import CannotRestorePipelineException
from dlt.pipeline.trace import PipelineTrace
from dlt.common.destination.exceptions import SqlClientNotAvailable
from dlt.common.storages.configuration import WithLocalFiles
PICKLE_TRACE_FILE = "trace.pickle"
@@ -60,11 +60,31 @@ def _exception_to_string(exception: Exception) -> str:
return str(exception)
def resolve_dashboard_config(p: dlt.Pipeline) -> DashboardConfiguration:
def get_dashboard_config_sections(p: Optional[dlt.Pipeline]) -> Tuple[str, ...]:
"""Find dashboard config section layout for a particular pipeline or for active
run context type.
"""
sections: Tuple[str, ...] = ()
if p is None:
# use workspace section layout
context = dlt.current.run_context()
if context.config is None or not context.config.__class__.__recommended_sections__:
pass
else:
sections = tuple(context.config.__class__.__recommended_sections__) + sections
else:
# pipeline section layout
sections = (known_sections.PIPELINES, p.pipeline_name) + sections
return sections
def resolve_dashboard_config(p: Optional[dlt.Pipeline]) -> DashboardConfiguration:
"""Resolve the dashboard configuration"""
return resolve_configuration(
DashboardConfiguration(),
sections=(known_sections.DASHBOARD, p.pipeline_name if p else None),
sections=get_dashboard_config_sections(p),
)
@@ -692,7 +712,14 @@ def build_exception_section(p: dlt.Pipeline) -> List[Any]:
_result.append(
mo.accordion(
{"Show full stacktrace": mo.ui.code_editor("".join(_exception_traces), language="sh")},
{
"Show full stacktrace": mo.ui.code_editor(
"".join(_exception_traces),
language="python",
disabled=True,
show_copy_button=True,
)
},
lazy=True,
)
)

View File

@@ -7,7 +7,7 @@ from dlt.common.configuration.specs.pluggable_run_context import (
from dlt.common.runtime.exceptions import RunContextNotAvailable
from dlt.common.runtime.run_context import is_folder_writable, switch_context
DEFAULT_WORKSPACE_WORKING_FOLDER = "_data"
DEFAULT_WORKSPACE_WORKING_FOLDER = ".var"
DEFAULT_LOCAL_FOLDER = "_local"

View File

@@ -1,4 +1,3 @@
import copy
import contextlib
import dataclasses
import warnings
@@ -8,6 +7,7 @@ from typing import (
Callable,
List,
Optional,
Sequence,
Union,
Any,
Dict,
@@ -307,6 +307,8 @@ class BaseConfiguration(MutableMapping[str, Any]):
"""Fields set to non-defaults during resolve, including explicit values"""
__section__: ClassVar[str] = None
"""Obligatory section used by config providers when searching for keys, always present in the search path"""
__recommended_sections__: ClassVar[Sequence[str]] = None
"""Recommended sections layout"""
__config_gen_annotations__: ClassVar[List[str]] = []
"""Additional annotations for config generator, currently holds a list of fields of interest that have defaults"""
__dataclass_fields__: ClassVar[Dict[str, TDtcField]]

View File

@@ -41,3 +41,6 @@ DBT_CLOUD = "dbt_cloud"
DASHBOARD = "dashboard"
"""dashboard configuration (DashboardConfiguration)"""
WORKSPACE = "workspace"
"""Workspace configuration"""

View File

@@ -4,7 +4,10 @@ from typing import Any, ClassVar, Dict, List, Optional, Union
from abc import ABC, abstractmethod
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.specs.base_configuration import (
BaseConfiguration,
ContainerInjectableContext,
)
from dlt.common.configuration.specs.runtime_configuration import RuntimeConfiguration
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContainer
from dlt.common.typing import Self
@@ -71,6 +74,23 @@ class RunContextBase(ABC):
def initial_providers(self) -> List[ConfigProvider]:
"""Returns initial providers for this context"""
@abstractmethod
def initialize_runtime(self, runtime_config: RuntimeConfiguration = None) -> None:
"""Initializes runtime (ie. log, telemetry) using RuntimeConfiguration"""
pass
@property
@abstractmethod
def runtime_config(self) -> RuntimeConfiguration:
"""Runtime configuration used for initialize_runtime"""
pass
@property
@abstractmethod
def config(self) -> BaseConfiguration:
"""Returns (optionally resolves) run context configuration"""
pass
@abstractmethod
def get_data_entity(self, entity: str) -> str:
"""Gets path in data_dir where `entity` (ie. `pipelines`, `repos`) are stored"""
@@ -91,6 +111,14 @@ class RunContextBase(ABC):
def plug(self) -> None:
"""Called when context is added to container"""
def reload(self) -> "RunContextBase":
"""This will reload current context by triggering run context plugin via Container"""
from dlt.common.configuration.container import Container
plug_ctx = Container()[PluggableRunContext]
plug_ctx.reload(self.run_dir, runtime_kwargs=self.runtime_kwargs)
return plug_ctx.context
@staticmethod
def import_run_dir_module(run_dir: str) -> ModuleType:
"""Returns a top Python module of the workspace (if importable)"""
@@ -139,13 +167,10 @@ class PluggableRunContext(ContainerInjectableContext):
context: RunContextBase = None
providers: ConfigProvidersContainer
runtime_config: RuntimeConfiguration
_context_stack: List[Any] = []
def __init__(
self, init_context: RunContextBase = None, runtime_config: RuntimeConfiguration = None
) -> None:
def __init__(self, init_context: RunContextBase = None) -> None:
super().__init__()
if init_context:
@@ -154,7 +179,6 @@ class PluggableRunContext(ContainerInjectableContext):
# autodetect run dir
self._plug(run_dir=None)
self.providers = ConfigProvidersContainer(self.context.initial_providers())
self.runtime_config = runtime_config
def reload(
self,
@@ -170,7 +194,6 @@ class PluggableRunContext(ContainerInjectableContext):
elif self.context.runtime_kwargs:
runtime_kwargs = {**self.context.runtime_kwargs, **runtime_kwargs}
self.runtime_config = None
self.before_remove()
if isinstance(run_dir_or_context, str):
self._plug(run_dir_or_context, runtime_kwargs=runtime_kwargs)
@@ -189,8 +212,7 @@ class PluggableRunContext(ContainerInjectableContext):
super().after_add()
# initialize runtime if context comes back into container
if self.runtime_config:
self.initialize_runtime(self.runtime_config)
self.initialize_runtime()
def before_remove(self) -> None:
super().before_remove()
@@ -199,26 +221,20 @@ class PluggableRunContext(ContainerInjectableContext):
self.context.unplug()
def add_extras(self) -> None:
from dlt.common.configuration.resolve import resolve_configuration
# add extra providers
self.providers.add_extras()
# resolve runtime configuration
if not self.runtime_config:
self.initialize_runtime(resolve_configuration(RuntimeConfiguration()))
self.initialize_runtime()
# plug context
self.context.plug()
def initialize_runtime(self, runtime_config: RuntimeConfiguration) -> None:
self.runtime_config = runtime_config
# do not activate logger if not in the container
def initialize_runtime(self) -> None:
"""Calls initialize_runtime on context only if active in container. We do not want
to initialize runtime if instance is not active
"""
if not self.in_container:
return
from dlt.common.runtime.init import initialize_runtime
initialize_runtime(self.context, self.runtime_config)
self.context.initialize_runtime()
def _plug(self, run_dir: Optional[str], runtime_kwargs: Dict[str, Any] = None) -> None:
from dlt.common.configuration import plugins
@@ -230,17 +246,16 @@ class PluggableRunContext(ContainerInjectableContext):
def push_context(self) -> str:
"""Pushes current context on stack and returns assert cookie"""
cookie = uniq_id()
self._context_stack.append((cookie, self.context, self.providers, self.runtime_config))
self._context_stack.append((cookie, self.context, self.providers))
return cookie
def pop_context(self, cookie: str) -> None:
"""Pops context from stack and re-initializes it if in container"""
_c, context, providers, runtime_config = self._context_stack.pop()
_c, context, providers = self._context_stack.pop()
if cookie != _c:
raise ValueError(
f"Run context stack mangled. Got cookie `{_c}` but expected `{cookie}`"
)
self.runtime_config = runtime_config
self.reload(context)
def drop_context(self, cookie: str) -> None:

View File

@@ -1,20 +1,18 @@
import binascii
from os.path import isfile, join, abspath
from pathlib import Path
from os.path import isfile, join
from typing import Any, ClassVar, Optional, IO
import warnings
from dlt.common.typing import TSecretStrValue
from dlt.common.utils import encoding_for_mode, main_module_file_path, reveal_pseudo_secret
from dlt.common.utils import encoding_for_mode, reveal_pseudo_secret
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec
from dlt.common.configuration.exceptions import ConfigFileNotFoundException
from dlt.common.warnings import Dlt100DeprecationWarning
from dlt.common.runtime.exec_info import platform_supports_threading
@configspec
class RuntimeConfiguration(BaseConfiguration):
pipeline_name: Optional[str] = None
"""Pipeline name used as component in logging, must be explicitly set"""
sentry_dsn: Optional[str] = None # keep None to disable Sentry
slack_incoming_hook: Optional[TSecretStrValue] = None
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
@@ -43,16 +41,6 @@ class RuntimeConfiguration(BaseConfiguration):
__section__: ClassVar[str] = "runtime"
def on_resolved(self) -> None:
# generate pipeline name from the entry point script name
if not self.pipeline_name:
self.pipeline_name = get_default_pipeline_name(main_module_file_path())
else:
warnings.warn(
"pipeline_name in RuntimeConfiguration is deprecated. Use `pipeline_name` in"
" PipelineConfiguration config",
Dlt100DeprecationWarning,
stacklevel=1,
)
if self.slack_incoming_hook:
# it may be obfuscated base64 value
# TODO: that needs to be removed ASAP
@@ -81,11 +69,5 @@ class RuntimeConfiguration(BaseConfiguration):
return join(self.config_files_storage_path, name)
def get_default_pipeline_name(entry_point_file: str) -> str:
if entry_point_file:
entry_point_file = Path(entry_point_file).stem
return "dlt_" + (entry_point_file or "pipeline")
# backward compatibility
RunConfiguration = RuntimeConfiguration

View File

@@ -3,6 +3,7 @@ import dataclasses
from types import TracebackType
from typing import (
ClassVar,
Optional,
NamedTuple,
Literal,
@@ -22,7 +23,11 @@ import datetime # noqa: 251
from dlt.common import logger, pendulum
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.configuration import configspec, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.specs import (
BaseConfiguration,
CredentialsConfiguration,
known_sections,
)
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.destination.utils import (
resolve_replace_strategy,
@@ -151,6 +156,8 @@ class DestinationClientConfiguration(BaseConfiguration):
destination_name: Optional[str] = None # name of the destination
environment: Optional[str] = None
__recommended_sections__: ClassVar[Sequence[str]] = (known_sections.DESTINATION, "")
def fingerprint(self) -> str:
"""Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string"""
return ""

View File

@@ -503,8 +503,6 @@ class SupportsPipeline(Protocol):
"""The destination reference which is ModuleType. `destination.__name__` returns the name string"""
dataset_name: str
"""Name of the dataset to which pipeline will be loaded to"""
runtime_config: RuntimeConfiguration
"""A configuration of runtime options like logging level and format and various tracing options"""
run_context: RunContextBase
"""A run context associated with the pipeline when instance was created"""
working_dir: str

View File

@@ -150,7 +150,7 @@ def create_pool(config: PoolRunnerConfiguration) -> Executor:
executor = ProcessPoolExecutor(
max_workers=config.workers,
initializer=init.restore_run_context,
initargs=(ctx.context, ctx.runtime_config),
initargs=(ctx.context,),
mp_context=multiprocessing.get_context(method=start_method),
)
else:

View File

@@ -1,3 +0,0 @@
from .init import apply_runtime_config, init_telemetry
__all__ = ["apply_runtime_config", "init_telemetry"]

View File

@@ -109,7 +109,7 @@ def always_track() -> Iterator[None]:
from dlt.common.configuration.container import Container
from dlt.common.configuration.specs.pluggable_run_context import PluggableRunContext
init_anon_tracker(Container()[PluggableRunContext].runtime_config)
init_anon_tracker(Container()[PluggableRunContext].context.runtime_config)
try:
yield
finally:

View File

@@ -4,6 +4,7 @@ import contextlib
import sys
import multiprocessing
import platform
from typing import Optional
from dlt.common.runtime.typing import TExecutionContext, TVersion, TExecInfoNames
from dlt.common.typing import StrStr, StrAny, List
@@ -155,9 +156,11 @@ def is_marimo() -> bool:
return False
def dlt_version_info(pipeline_name: str) -> StrStr:
def dlt_version_info(pipeline_name: Optional[str]) -> StrStr:
"""Gets dlt version info including commit and image version available in docker"""
version_info = {"dlt_version": __version__, "pipeline_name": pipeline_name}
version_info = {"dlt_version": __version__}
if pipeline_name:
version_info["pipeline_name"] = pipeline_name
# extract envs with build info
version_info.update(filter_env_vars(["COMMIT_SHA", "IMAGE_VERSION"]))

View File

@@ -4,20 +4,19 @@ from dlt.common.configuration.specs.pluggable_run_context import (
RunContextBase,
)
# telemetry should be initialized only once
_INITIALIZED = False
def initialize_runtime(run_context: RunContextBase, runtime_config: RuntimeConfiguration) -> None:
from dlt.sources.helpers import requests
def initialize_runtime(logger_name: str, runtime_config: RuntimeConfiguration) -> None:
from dlt.common import logger
from dlt.common.runtime.exec_info import dlt_version_info
from dlt.common.runtime.telemetry import start_telemetry
from dlt.sources.helpers import requests
version = dlt_version_info(runtime_config.pipeline_name)
# initialize or re-initialize logging with new settings
logger.LOGGER = logger._create_logger(
run_context.name,
logger_name,
runtime_config.log_level,
runtime_config.log_format,
runtime_config.pipeline_name,
@@ -26,32 +25,17 @@ def initialize_runtime(run_context: RunContextBase, runtime_config: RuntimeConfi
# Init or update default requests client config
requests.init(runtime_config)
# initialize telemetry
start_telemetry(runtime_config)
def restore_run_context(run_context: RunContextBase, runtime_config: RuntimeConfiguration) -> None:
def restore_run_context(run_context: RunContextBase) -> None:
"""Restores `run_context` by placing it into container and if `runtime_config` is present, initializes runtime
Intended to be called by workers in a process pool.
"""
from dlt.common.configuration.container import Container
Container()[PluggableRunContext] = PluggableRunContext(run_context, runtime_config)
apply_runtime_config(runtime_config)
init_telemetry(runtime_config)
# make sure runtime configuration is attached
assert run_context.runtime_config is not None
def init_telemetry(runtime_config: RuntimeConfiguration) -> None:
"""Starts telemetry only once"""
from dlt.common.runtime.telemetry import start_telemetry
global _INITIALIZED
# initialize only once
if not _INITIALIZED:
start_telemetry(runtime_config)
_INITIALIZED = True
def apply_runtime_config(runtime_config: RuntimeConfiguration) -> None:
"""Updates run context with newest runtime_config"""
from dlt.common.configuration.container import Container
Container()[PluggableRunContext].initialize_runtime(runtime_config)
Container()[PluggableRunContext] = PluggableRunContext(run_context)

View File

@@ -14,10 +14,14 @@ from dlt.common.configuration.providers import (
ConfigTomlProvider,
)
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.configuration.specs.pluggable_run_context import (
RunContextBase,
PluggableRunContext,
)
from dlt.common.configuration.specs.runtime_configuration import RuntimeConfiguration
from dlt.common.runtime.init import initialize_runtime
# dlt settings folder
DOT_DLT = os.environ.get(known_env.DLT_CONFIG_FOLDER, ".dlt")
@@ -28,6 +32,7 @@ class RunContext(RunContextBase):
def __init__(self, run_dir: Optional[str]):
self._init_run_dir = run_dir or "."
self._runtime_config: RuntimeConfiguration = None
@property
def global_dir(self) -> str:
@@ -66,6 +71,22 @@ class RunContext(RunContextBase):
]
return providers
def initialize_runtime(self, runtime_config: RuntimeConfiguration = None) -> None:
if runtime_config is None:
self._runtime_config = resolve_configuration(RuntimeConfiguration())
else:
self._runtime_config = runtime_config
initialize_runtime(self.name, self._runtime_config)
@property
def runtime_config(self) -> RuntimeConfiguration:
return self._runtime_config
@property
def config(self) -> BaseConfiguration:
return None
@property
def module(self) -> Optional[ModuleType]:
try:

View File

@@ -52,8 +52,7 @@ def init_sentry(config: RuntimeConfiguration) -> None:
def disable_sentry() -> None:
# init without parameters disables sentry
sentry_sdk.init()
sentry_sdk.Hub.current.bind_client(None)
def before_send(event: DictStrAny, _unused_hint: Optional[StrAny] = None) -> Optional[DictStrAny]:

View File

@@ -20,8 +20,6 @@ _TELEMETRY_STARTED = False
def start_telemetry(config: RuntimeConfiguration) -> None:
# enable telemetry only once
global _TELEMETRY_STARTED
if is_telemetry_started():
return
@@ -41,6 +39,7 @@ def start_telemetry(config: RuntimeConfiguration) -> None:
init_platform_tracker()
global _TELEMETRY_STARTED
_TELEMETRY_STARTED = True
@@ -71,17 +70,50 @@ def is_telemetry_started() -> bool:
def with_telemetry(
category: TEventCategory, command: str, track_before: bool, *args: str
category: TEventCategory, command: str, track_before: bool, *args: str, **kwargs: Any
) -> Callable[[TFun], TFun]:
"""Adds telemetry to f: TFun and add optional f *args values to `properties` of telemetry event"""
"""Decorator factory that attaches telemetry to a callable.
The returned decorator wraps a function so that an anonymous telemetry event is
emitted either before execution (if ``track_before`` is True) or after execution.
When tracked after execution, the event includes whether the call succeeded.
Telemetry is initialized lazily on first use if it is not already running.
Args:
category: telemetry event category used by the anon tracker.
command: event/command name to report.
track_before: if True, emit a single event before calling the function.
if False, emit a single event after the call, including success state.
*args: names of parameters from the decorated function whose values should
be included in the event properties. names must match the function
signature and can refer to positional or keyword parameters.
**kwargs: additional key-value pairs to include in the event properties.
Returns:
a decorator that takes a function and returns a wrapped function with
telemetry tracking applied. the wrapped function preserves the original
signature and return value.
Notes:
- success is determined as follows when tracking after execution:
- if the wrapped function returns an int, 0 is treated as success; any
other value is treated as failure.
- for non-int returns, success is True unless an exception is raised.
- on exception, a failure event is emitted and the exception is re-raised.
- event properties always include elapsed execution time in seconds under
the 'elapsed' key and the success flag under 'success'.
"""
def decorator(f: TFun) -> TFun:
sig: inspect.Signature = inspect.signature(f)
def _wrap(*f_args: Any, **f_kwargs: Any) -> Any:
# look for additional arguments
# look for additional arguments in call arguments
bound_args = sig.bind(*f_args, **f_kwargs)
props = {p: bound_args.arguments[p] for p in args if p in bound_args.arguments}
# append additional props from kwargs
props.update(kwargs)
start_ts = time.time()
def _track(success: bool) -> None:

View File

@@ -16,7 +16,8 @@ from dlt.common.configuration.container import Container
from dlt.common.configuration.inject import get_orig_args, last_config
from dlt.common.destination import TLoaderFileFormat, Destination, TDestinationReferenceArg
from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir, TRefreshMode
from dlt.common.runtime import apply_runtime_config, init_telemetry
# from dlt.common.runtime import apply_runtime_config, init_telemetry
from dlt.pipeline.exceptions import CannotRestorePipelineException
from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs
@@ -141,11 +142,6 @@ def pipeline(
else:
pass
# modifies run_context and must go first
runtime_config = injection_kwargs["runtime"]
apply_runtime_config(runtime_config)
init_telemetry(runtime_config)
# if working_dir not provided use temp folder
if not pipelines_dir:
pipelines_dir = get_dlt_pipelines_dir()
@@ -174,7 +170,6 @@ def pipeline(
progress,
False,
last_config(**injection_kwargs),
runtime_config,
refresh=refresh,
)
# set it as current pipeline
@@ -204,10 +199,6 @@ def attach(
"""
ensure_correct_pipeline_kwargs(attach, **injection_kwargs)
runtime_config = injection_kwargs["runtime"]
apply_runtime_config(runtime_config)
init_telemetry(runtime_config)
# if working_dir not provided use temp folder
if not pipelines_dir:
pipelines_dir = get_dlt_pipelines_dir()
@@ -235,7 +226,6 @@ def attach(
progress,
True,
last_config(**injection_kwargs),
runtime_config,
)
# set it as current pipeline
p.activate()

View File

@@ -1,15 +1,28 @@
from typing import Any, Optional
from pathlib import Path
from typing import Any, ClassVar, Optional, Sequence
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import RuntimeConfiguration, BaseConfiguration
from dlt.common.configuration.specs import BaseConfiguration, known_sections
from dlt.common.configuration.specs.pluggable_run_context import PluggableRunContext
from dlt.common.typing import AnyFun, TSecretStrValue
from dlt.common.utils import digest256
from dlt.common.utils import digest256, main_module_file_path
from dlt.common.destination import TDestinationReferenceArg
from dlt.common.pipeline import TRefreshMode
from dlt.common.configuration.exceptions import ConfigurationValueError
@configspec
class PipelineRuntimeConfiguration(BaseConfiguration):
slack_incoming_hook: Optional[TSecretStrValue] = None
# mcp_config: McpConfiguration
pluggable_run_context: PluggableRunContext = None
"""Pluggable run context with current run context"""
# dashboard configuration is resolved separately but following the same section layout
# dashboard: DashboardConfiguration
__section__: ClassVar[str] = "runtime"
@configspec
class PipelineConfiguration(BaseConfiguration):
pipeline_name: Optional[str] = None
@@ -39,17 +52,14 @@ class PipelineConfiguration(BaseConfiguration):
dev_mode: bool = False
"""When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset."""
progress: Optional[str] = None
runtime: RuntimeConfiguration = None
runtime: PipelineRuntimeConfiguration = None
refresh: Optional[TRefreshMode] = None
"""Refresh mode for the pipeline to fully or partially reset a source during run. See docstring of `dlt.pipeline` for more details."""
pluggable_run_context: PluggableRunContext = None
"""Pluggable run context with current run context"""
def on_resolved(self) -> None:
# generate pipeline name from the entry point script name
if not self.pipeline_name:
self.pipeline_name = self.runtime.pipeline_name
else:
self.runtime.pipeline_name = self.pipeline_name
self.pipeline_name = get_default_pipeline_name(main_module_file_path())
if not self.pipeline_salt:
self.pipeline_salt = digest256(self.pipeline_name)
if self.dataset_name_layout and "%s" not in self.dataset_name_layout:
@@ -58,8 +68,20 @@ class PipelineConfiguration(BaseConfiguration):
" example: 'prefix_%s'"
)
# recommended layout is pipelines.<name>
__recommended_sections__: ClassVar[Sequence[str]] = (known_sections.PIPELINES, "")
def ensure_correct_pipeline_kwargs(f: AnyFun, **kwargs: Any) -> None:
for arg_name in kwargs:
if not hasattr(PipelineConfiguration, arg_name) and not arg_name.startswith("_dlt"):
raise TypeError(f"`{f.__name__}` got an unexpected keyword argument `{arg_name}`")
def get_default_pipeline_name(entry_point_file: str) -> str:
"""Generates default pipeline name based on an entry point of the current Python script
prefixed with "dlt_"
"""
if entry_point_file:
entry_point_file = Path(entry_point_file).stem
return "dlt_" + (entry_point_file or "pipeline")

View File

@@ -36,7 +36,7 @@ from dlt.common.destination.exceptions import (
DestinationNoStagingMode,
DestinationUndefinedEntity,
)
from dlt.common.runtime import signals, apply_runtime_config
from dlt.common.runtime import signals
from dlt.common.schema.typing import (
TSchemaTables,
TTableFormat,
@@ -112,7 +112,7 @@ from dlt.destinations.dataset import get_destination_clients
from dlt.load.configuration import LoaderConfiguration
from dlt.load import Load
from dlt.pipeline.configuration import PipelineConfiguration
from dlt.pipeline.configuration import PipelineConfiguration, PipelineRuntimeConfiguration
from dlt.pipeline.progress import _Collector, _NULL_COLLECTOR
from dlt.pipeline.exceptions import (
CannotRestorePipelineException,
@@ -321,7 +321,6 @@ class Pipeline(SupportsPipeline):
progress: _Collector,
must_attach_to_local_pipeline: bool,
config: PipelineConfiguration,
runtime: RuntimeConfiguration,
refresh: Optional[TRefreshMode] = None,
) -> None:
"""Initializes the Pipeline class which implements `dlt` pipeline. Please use `pipeline` function in `dlt` module to create a new Pipeline instance."""
@@ -334,8 +333,7 @@ class Pipeline(SupportsPipeline):
self.pipeline_salt = pipeline_salt
self.config = config
self.runtime_config = runtime
self.run_context = config.pluggable_run_context.context
self.run_context = config.runtime.pluggable_run_context.context
self.dev_mode = dev_mode
self.collector = progress or _NULL_COLLECTOR
self._destination = None
@@ -394,7 +392,6 @@ class Pipeline(SupportsPipeline):
self.collector,
False,
self.config,
self.runtime_config,
)
if pipeline_name is not None and pipeline_name != self.pipeline_name:
self = self.__class__(
@@ -410,7 +407,6 @@ class Pipeline(SupportsPipeline):
deepcopy(self.collector),
False,
self.config,
self.runtime_config,
)
# activate (possibly new) self
self.activate()
@@ -945,6 +941,10 @@ class Pipeline(SupportsPipeline):
return self._last_trace
return load_trace(self.working_dir)
@property
def runtime_config(self) -> PipelineRuntimeConfiguration:
return self.config.runtime
def __repr__(self) -> str:
kwargs = {
"pipeline_name": self.pipeline_name,
@@ -1398,8 +1398,7 @@ class Pipeline(SupportsPipeline):
def _set_context(self, is_active: bool) -> None:
if not self.is_active and is_active:
# initialize runtime if not active previously
apply_runtime_config(self.runtime_config)
pass
self.is_active = is_active
if is_active:

View File

@@ -53,13 +53,13 @@ def _send_trace_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) ->
Send the full trace after a run operation to the platform
TODO: Migrate this to open telemetry in the next iteration
"""
if not pipeline.runtime_config.dlthub_dsn:
if not pipeline.run_context.runtime_config.dlthub_dsn:
return
def _future_send() -> None:
try:
trace_dump = json.dumps(trace.asdict())
url = pipeline.runtime_config.dlthub_dsn + TRACE_URL_SUFFIX
url = pipeline.run_context.runtime_config.dlthub_dsn + TRACE_URL_SUFFIX
response = requests.put(url, data=trace_dump)
if response.status_code != 200:
logger.debug(
@@ -76,7 +76,7 @@ def _send_trace_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) ->
def _sync_schemas_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) -> None:
if not pipeline.runtime_config.dlthub_dsn:
if not pipeline.run_context.runtime_config.dlthub_dsn:
return
# sync only if load step was processed
@@ -104,7 +104,7 @@ def _sync_schemas_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline)
def _future_send() -> None:
try:
url = pipeline.runtime_config.dlthub_dsn + STATE_URL_SUFFIX
url = pipeline.run_context.runtime_config.dlthub_dsn + STATE_URL_SUFFIX
response = requests.put(url, data=json.dumps(payload))
if response.status_code != 200:
logger.debug(

View File

@@ -60,7 +60,7 @@ def slack_notify_load_success(incoming_hook: str, load_info: LoadInfo, trace: Pi
def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None:
if pipeline.runtime_config.sentry_dsn:
if pipeline.run_context.runtime_config.sentry_dsn:
# print(f"START SENTRY TX: {trace.transaction_id} SCOPE: {Hub.current.scope}"
transaction = Scope.get_current_scope().start_transaction(name=step, op=step)
if isinstance(transaction, Transaction):
@@ -71,7 +71,7 @@ def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: Supports
def on_start_trace_step(
trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline
) -> None:
if pipeline.runtime_config.sentry_dsn:
if pipeline.run_context.runtime_config.sentry_dsn:
# print(f"START SENTRY SPAN {trace.transaction_id}:{trace_step.span_id} SCOPE: {Hub.current.scope}")
span = Scope.get_current_scope().start_span(description=step, op=step)
_add_sentry_tags(span, pipeline)
@@ -114,8 +114,7 @@ def on_end_trace_step(
step_info: Any,
send_state: bool,
) -> None:
if pipeline.runtime_config.sentry_dsn:
# print(f"---END SENTRY SPAN {trace.transaction_id}:{step.span_id}: {step} SCOPE: {Hub.current.scope}")
if pipeline.run_context.runtime_config.sentry_dsn:
Scope.get_current_scope().span.__exit__(None, None, None)
# disable automatic slack messaging until we can configure messages themselves
# if step.step == "load":
@@ -137,6 +136,6 @@ def on_end_trace_step(
def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None:
if pipeline.runtime_config.sentry_dsn:
if pipeline.run_context.runtime_config.sentry_dsn:
# print(f"---END SENTRY TX: {trace.transaction_id} SCOPE: {Hub.current.scope}")
Scope.get_current_scope().transaction.__exit__(None, None, None)

View File

@@ -0,0 +1,3 @@
[runtime]
dlthub_telemetry=false
dlthub_telemetry_endpoint="https://telemetry-tracker.services4758.workers.dev"

View File

@@ -1,3 +1,7 @@
[runtime]
dlthub_telemetry=false
dlthub_telemetry_endpoint="https://telemetry-tracker.services4758.workers.dev"
[destination.weaviate]
vectorizer="text2vec-contextionary"
module_config={text2vec-contextionary = { vectorizeClassName = false, vectorizePropertyName = true}}

View File

@@ -44,7 +44,7 @@ class MyLogCollector(PlusLogCollector):
# os.environ["RUNTIME__SLACK_INCOMING_HOOK"] = "https://hooks.slack.com/services/..."
msg = f"schema {schema_name} change in pipeline {pipeline.pipeline_name}**:\n{changes}"
send_slack_message(pipeline.runtime_config.slack_incoming_hook, msg)
send_slack_message(pipeline.runtime_config.slack_incoming_hook, msg) # type: ignore
except Exception as e:
# fail without interrupting the pipeline
print(f"Error trying to send slack message: {e}")

View File

@@ -0,0 +1,191 @@
---
title: Workspace info
---
# Workspace
1. install dlt with workspace support
```sh
[uv] pip install "dlt[workspace]"
```
2. do a regular dlt init, for example
```sh
dlt init dlthub:fal duckdb
```
3. At this moment "new workspace" is hidden behind feature flag
```sh
dlt --help
```
just returns regular set of commands
4. Enable new workspace by creating `.dlt/.workspace` file
```sh
touch .dlt/.workspace
```
5. Now a new set of commands is available, try
```sh
dlt workspace info
```
to get basic information of the workspace. Now you can see basic file layout:
```sh
.
├── .dlt/
│ ├── config.toml
| ├── secrets.toml
| ├── .workspace # feature flag
│ └── .var/dev/ # working dir for pipelines for `dev` (default profile)
├── _local/dev # locally loaded data: ducklake, duckdb databases etc will go there
├── .gitignore
├── requirements.txt
```
Now let's run a simple pipeline
```py
import dlt
pipeline = dlt.pipeline(
pipeline_name="foo",
destination="duckdb",
dataset_name="lake_schema",
dev_mode=True,
)
info = pipeline.run(
[{"foo": 1}, {"foo": 2}],
table_name="table_foo",
)
print(info)
print(pipeline.dataset().table_foo["foo"].df())
```
From the output we see that data got loaded into `_local/dev/foo.duckdb` database and `dlt pipeline foo info`
tells us that pipelines working dir is in `.dlt/.var/dev/pipelines`. Further `dlt pipeline -l` shows just one pipeline belonging to current workspace.
**New Workspace fully isolates pipelines across different workspace on configuration and working directory level**.
6. Now we can access data.
* `dlt workspace show` will launch Workspace Dashboard
* `dlt workspace mcp` will launch Workspace MCP (Thierry's OSS MCP) in sse mode.
* `dlt pipeline foo mcp` will launch pipeline MCP (old Marcin's MCP) in sse mode.
* `dlt pipeline foo show` will launch Workspace Dashboard and open pipeline `foo`
## Profiles
New workspace adds concept of profiles that are used to:
1. secure access to data in different environments (ie. dev, tests and prod, access)
2. isolate pipelines from different workspaces and across profiles: pipeline may share code but they have
separate working directories and they store locally loaded data in separate locations.
After initialization, default **dev** profile is activated and from OSS user POV, everything works like they used to.
Profiles are to a large degree compatible with `project` profiles:
1. profile pinning works the same
2. configuring secrets and config toml for profiles works the same
3. `dlt profile` works +- the same
New Workspace is opinionated on several profiles
```sh
dlt profile list
Available profiles:
* dev - dev profile, workspace default
* prod - production profile, assumed by pipelines deployed in Runtime
* tests - profile assumed when running tests
* access - production profile, assumed by interactive notebooks in Runtime, typically with limited access rights
```
right now we plan to automatically assign profiles to Runtime jobs ie. batch jobs work on `prod` profile by default, interactive (notebooks) on `access` (read only profile.). But we'll see.
Now let's use profile to switch to production:
1. Add new named destination
First let's use another feature of new workspace: **named destinations**. We'll be able to easily switch and test pipelines without changing code. Our new destination has a name **warehouse**. Let's configure duckdb warehouse in `secrets.toml` (or `dev.secrets.toml` to fully split profiles).
```toml
[destination.warehouse]
destination_type="duckdb"
```
and change pipeline code (`destination="warehouse"`):
```py
pipeline = dlt.pipeline(
pipeline_name="foo",
destination="warehouse",
dataset_name="lake_schema",
dev_mode=True,
)
```
run the script again: you data got loaded to `_local/dev/warehouse.duckdb` now!
2. Add motherduck secrets to `prod` profile.
Now create `prod.secrets.toml` file:
```toml
[destination.warehouse]
destination_type="motherduck"
credentials="md:///dlt_data?motherduck_token=...."
```
and pin the **prod** profile to start testing in production 🤯
```sh
dlt profile prod pin
dlt profile
dlt workspace
```
Now you see that your new toml file will be read when pipeline runs.
Before we run pipeline script let's test connection to destination:
```sh
dlt --debug pipeline foo sync --destination warehouse --dataset-name lake_schema
```
(not ideal - we'll do a way better dry run soon). If your credentials are invalid or there's any other problem you'll get a detailed stack trace with an exception.
If connection is successful but there's no dataset on the Motherduck side you should get:
```sh
ERROR: Pipeline foo was not found in dataset lake_schema in warehouse
```
Now you can run pipeline script and observe your data getting into Motherduck. Now when you run Workspace Dashboard you'll see it connecting to remote dataset.
## Manage and configure workspace
You can cleanup workspace from all local files. This is intended to `dev` profile to easily start over:
```sh
dlt workspace clean
```
Workspace can be configured. You can change workspace name. **config.toml**:
```toml
[workspace.settings]
name="name_override"
```
You can also override local and working directories (not recommended). For example to have **dev** profile behaving exactly like OSS: **dev.config.toml**
```toml
[workspace.settings]
local_dir="."
working_dir="~/.dlt/"
```
Now `dlt pipeline -l` shows all OSS pipelines but `workspace clean` will refuse to work.
You can also configure dashboard and mcp (coming soon) on workspace and pipeline level:
```toml
[workspace.dashboard]
set="set"
[pipelines.foo.dashboard]
set="set"
```
Workspace also has runtime configuration that derives from OSS but will soon have dlthub Runtime settings:
```toml
[workspace.runtime]
log_level="DEBUG"
```

View File

@@ -1,5 +1,6 @@
[runtime]
# sentry_dsn="https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752"
dlthub_telemetry=false
[tests]
bucket_url_gs="gs://ci-test-bucket"

View File

@@ -717,12 +717,10 @@ def test_provider_values_over_embedded_default(environment: Any) -> None:
assert not c.instrumented.is_partial()
def test_run_configuration_gen_name(environment: Any) -> None:
C = resolve.resolve_configuration(RuntimeConfiguration())
assert C.pipeline_name.startswith("dlt_")
def test_runtime_configuration_telemetry_disabled_on_non_threading_platform(monkeypatch) -> None:
def test_runtime_configuration_telemetry_disabled_on_non_threading_platform(
monkeypatch, toml_providers: ConfigProvidersContainer
) -> None:
# inject toml_providers - the default context is disabling telemetry
c = resolve.resolve_configuration(RuntimeConfiguration())
assert c.dlthub_telemetry

View File

@@ -34,7 +34,7 @@ from dlt.common.configuration.specs import (
from dlt.common.runners.configuration import PoolRunnerConfiguration
from dlt.common.typing import TSecretValue
from tests.utils import preserve_environ, auto_unload_modules
from tests.utils import preserve_environ
from tests.common.configuration.utils import (
ConnectionStringCompatCredentials,
SecretCredentials,

View File

@@ -9,9 +9,9 @@ from dlt.common.configuration import resolve_configuration, configspec
from dlt.common.configuration.specs import RuntimeConfiguration
from dlt.common.exceptions import DltException, SignalReceivedException
from dlt.common.runners import pool_runner as runner
from dlt.common.runtime import apply_runtime_config
from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType
from dlt.common.runtime.init import initialize_runtime
from tests.common.runners.utils import (
_TestRunnableWorkerMethod,
_TestRunnableWorker,
@@ -140,7 +140,7 @@ def test_initialize_runtime() -> None:
logger._delete_current_logger()
logger.LOGGER = None
apply_runtime_config(config)
initialize_runtime("dlt", config)
assert logger.LOGGER is not None
logger.warning("hello")
@@ -151,8 +151,8 @@ def test_pool_runner_process_methods_forced(method) -> None:
multiprocessing.set_start_method(method, force=True)
r = _TestRunnableWorker(4)
# make sure signals and logging is initialized
C = resolve_configuration(RuntimeConfiguration())
apply_runtime_config(C)
config = resolve_configuration(RuntimeConfiguration())
initialize_runtime("dlt", config)
runs_count = runner.run_pool(configure(ProcessPoolConfiguration), r)
assert runs_count == 1
@@ -163,8 +163,8 @@ def test_pool_runner_process_methods_forced(method) -> None:
def test_pool_runner_process_methods_configured(method) -> None:
r = _TestRunnableWorker(4)
# make sure signals and logging is initialized
C = resolve_configuration(RuntimeConfiguration())
apply_runtime_config(C)
config = resolve_configuration(RuntimeConfiguration())
initialize_runtime("dlt", config)
runs_count = runner.run_pool(ProcessPoolConfiguration(start_method=method), r)
assert runs_count == 1

View File

@@ -33,11 +33,10 @@ def test_version_extract(environment: DictStrStr) -> None:
assert version == {"dlt_version": lib_version, "pipeline_name": "logger"}
# mock image info available in container
mock_image_env(environment)
version = exec_info.dlt_version_info("logger")
version = exec_info.dlt_version_info(None)
assert version == {
"dlt_version": lib_version,
"commit_sha": "192891",
"pipeline_name": "logger",
"image_version": "scale/v:112",
}

View File

@@ -6,7 +6,7 @@ import pickle
from dlt.common import logger
from dlt.common.configuration.container import Container
from dlt.common.configuration.specs import RuntimeConfiguration, PluggableRunContext
from dlt.common.runtime.init import _INITIALIZED, apply_runtime_config, restore_run_context
from dlt.common.runtime.init import restore_run_context
from dlt.common.runtime.run_context import (
DOT_DLT,
RunContext,
@@ -18,7 +18,7 @@ from dlt.common.storages.configuration import _make_file_url
from dlt.common.utils import set_working_dir
import tests
from tests.utils import MockableRunContext, TEST_STORAGE_ROOT
from tests.utils import MockableRunContext, TEST_STORAGE_ROOT, disable_temporary_telemetry
@pytest.fixture(autouse=True)
@@ -41,18 +41,23 @@ def test_run_context() -> None:
assert run_context.run_dir == run_context.local_dir
assert run_context.uri == _make_file_url(None, run_context.run_dir, None)
assert run_context.uri.startswith("file://")
assert run_context.config is None
# check config providers
assert len(run_context.initial_providers()) == 3
# apply runtime config
assert ctx.runtime_config is None
assert ctx.context.runtime_config is None
ctx.add_extras()
assert ctx.runtime_config is not None
# still not applied - must be in container
assert ctx.context.runtime_config is None
with Container().injectable_context(ctx):
ctx.initialize_runtime()
assert ctx.context.runtime_config is not None
runtime_config = RuntimeConfiguration()
ctx.initialize_runtime(runtime_config)
assert ctx.runtime_config is runtime_config
ctx.context.initialize_runtime(runtime_config)
assert ctx.context.runtime_config is runtime_config
# entities
assert "data_entity" in run_context.get_data_entity("data_entity")
@@ -61,7 +66,9 @@ def test_run_context() -> None:
assert run_context.get_run_entity("run_entity") == run_context.run_dir
# check if can be pickled
pickle.dumps(run_context)
pickled_ = pickle.dumps(run_context)
run_context_unpickled = pickle.loads(pickled_)
assert dict(run_context.runtime_config) == dict(run_context_unpickled.runtime_config)
# check plugin modules
# NOTE: first `dlt` - is the root module of current context, second is always present
@@ -77,51 +84,29 @@ def test_context_without_module() -> None:
def test_context_init_without_runtime() -> None:
runtime_config = RuntimeConfiguration()
ctx = PluggableRunContext()
with Container().injectable_context(ctx):
# logger is immediately initialized
assert logger.LOGGER is not None
# runtime is also initialized but logger was not created
assert ctx.runtime_config is not None
# this will call init_runtime on injected context internally
apply_runtime_config(runtime_config)
assert logger.LOGGER is not None
assert ctx.runtime_config is runtime_config
assert ctx.context.runtime_config is not None
def test_context_init_with_runtime() -> None:
runtime_config = RuntimeConfiguration()
ctx = PluggableRunContext(runtime_config=runtime_config)
assert ctx.runtime_config is runtime_config
# logger not initialized until placed in the container
assert logger.LOGGER is None
with Container().injectable_context(ctx):
assert ctx.runtime_config is runtime_config
assert logger.LOGGER is not None
def test_run_context_handover() -> None:
def test_run_context_handover(disable_temporary_telemetry) -> None:
# test handover of run context to process pool worker
runtime_config = RuntimeConfiguration()
ctx = PluggableRunContext()
container = Container()
old_ctx = container[PluggableRunContext]
runtime_config = old_ctx.context.runtime_config
try:
ctx.context._runtime_config = runtime_config # type: ignore
mock = MockableRunContext.from_context(ctx.context)
mock._name = "handover-dlt"
# this will insert pickled/unpickled objects into the container simulating cross process
# call in process pool
mock = pickle.loads(pickle.dumps(mock))
# also adds to context, should initialize runtime
global _INITIALIZED
try:
telemetry_init = _INITIALIZED
# do not initialize telemetry here
_INITIALIZED = True
# this will insert pickled/unpickled objects into the container
mock = pickle.loads(pickle.dumps(mock))
runtime_config = pickle.loads(pickle.dumps(runtime_config))
restore_run_context(mock, runtime_config)
finally:
_INITIALIZED = telemetry_init
restore_run_context(mock)
# logger initialized and named
assert logger.LOGGER.name == "handover-dlt"
@@ -132,7 +117,7 @@ def test_run_context_handover() -> None:
run_ctx = dlt.current.run_context()
assert run_ctx is mock
ctx = Container()[PluggableRunContext]
assert ctx.runtime_config is runtime_config
assert ctx.context.runtime_config is mock._runtime_config
finally:
container[PluggableRunContext] = old_ctx

View File

@@ -23,13 +23,13 @@ from tests.common.runtime.utils import mock_image_env, mock_github_env, mock_pod
from tests.common.configuration.utils import environment
from tests.utils import (
preserve_environ,
auto_unload_modules,
SentryLoggerConfiguration,
disable_temporary_telemetry,
init_test_logging,
start_test_telemetry,
deactivate_pipeline,
)
from dlt.common.runtime.telemetry import with_telemetry
@configspec
@@ -37,9 +37,7 @@ class SentryLoggerCriticalConfiguration(SentryLoggerConfiguration):
log_level: str = "CRITICAL"
def test_sentry_init(
environment: DictStrStr, disable_temporary_telemetry: RuntimeConfiguration
) -> None:
def test_sentry_init(environment: DictStrStr, disable_temporary_telemetry) -> None:
with patch("dlt.common.runtime.sentry.before_send", _mock_before_send):
mock_image_env(environment)
mock_pod_env(environment)
@@ -134,9 +132,7 @@ def test_telemetry_endpoint_exceptions(
)
def test_track_anon_event(
mocker: MockerFixture, disable_temporary_telemetry: RuntimeConfiguration
) -> None:
def test_track_anon_event(mocker: MockerFixture, disable_temporary_telemetry) -> None:
from dlt.common.runtime import anon_tracker
mock_github_env(os.environ)
@@ -189,6 +185,8 @@ def test_track_anon_event(
def test_forced_anon_tracker() -> None:
from dlt.common.runtime import anon_tracker
if anon_tracker._ANON_TRACKER_ENDPOINT is not None:
disable_anon_tracker()
assert anon_tracker._ANON_TRACKER_ENDPOINT is None
with anon_tracker.always_track():
@@ -220,7 +218,7 @@ def test_execution_context_with_plugin() -> None:
[True, False],
)
def test_on_first_dataset_access(
schema: Union[Schema, str, None], success: bool, monkeypatch
schema: Union[Schema, str, None], success: bool, monkeypatch, disable_temporary_telemetry
) -> None:
pipeline = dlt.pipeline("test_on_first_dataset_access", destination="duckdb")
@@ -266,6 +264,140 @@ def test_on_first_dataset_access(
assert event["properties"]["requested_schema_name_hash"] == requested_schema_name_hash
@pytest.mark.parametrize(
"case_name, behavior, expected_success",
[
("non-int return -> success", "return_non_int", True),
("int 0 return -> success", "return_zero", True),
("int non-zero return -> failure", "return_one", False),
("exception -> failure", "raise", False),
],
ids=[
"return-non-int-success",
"return-int-zero-success",
"return-int-nonzero-failure",
"raise-exception-failure",
],
)
def test_with_telemetry_track_after_various_outcomes(
case_name: str,
behavior: str,
expected_success: bool,
mocker: MockerFixture,
disable_temporary_telemetry,
) -> None:
# init test telemetry and capture outgoing events
mock_github_env(os.environ)
mock_pod_env(os.environ)
SENT_ITEMS.clear()
config = SentryLoggerConfiguration()
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
mocker.patch(
"dlt.common.runtime.anon_tracker.requests.post",
return_value=Mock(status_code=204),
)
# decorate a function so that 'x' and 'y' are included as props and an extra const prop
@with_telemetry("command", "cmd", False, "x", "y", extra_const="value", case=case_name)
def _fn(x: Any, y: Any) -> Any:
# implement different behaviors
if behavior == "return_non_int":
return "ok"
if behavior == "return_zero":
return 0
if behavior == "return_one":
return 1
if behavior == "raise":
raise ValueError("boom")
return None
# call the function and handle potential exception so we can flush telemetry
if behavior == "raise":
with pytest.raises(ValueError):
_fn("X", 7)
else:
_fn("X", 7)
# flush and collect the event
disable_anon_tracker()
# exactly one event was recorded
assert len(SENT_ITEMS) == 1
event = SENT_ITEMS[0]
# event name and basic structure
assert event["event"] == "command_cmd"
props = event["properties"]
assert props["event_category"] == "command"
assert props["event_name"] == "cmd"
# verify automatic props
assert isinstance(props["elapsed"], (int, float))
assert props["elapsed"] >= 0
assert props["success"] is expected_success
# verify captured arg values and extra kwargs
assert props["x"] == "X"
assert props["y"] == 7
assert props["extra_const"] == "value"
assert props["case"] == case_name
@pytest.mark.parametrize(
"case_name, behavior",
[
("before: return value", "return"),
("before: raise exception", "raise"),
],
ids=["track-before-return", "track-before-raise"],
)
def test_with_telemetry_track_before_emits_once_with_success(
case_name: str, behavior: str, mocker: MockerFixture, disable_temporary_telemetry
) -> None:
# init test telemetry and capture outgoing events
mock_github_env(os.environ)
mock_pod_env(os.environ)
SENT_ITEMS.clear()
config = SentryLoggerConfiguration()
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
mocker.patch(
"dlt.common.runtime.anon_tracker.requests.post",
return_value=Mock(status_code=204),
)
# when tracked before, success should always be True regardless of function outcome
@with_telemetry("command", "before_cmd", True, "x", ignored_param="ignored")
def _fn(x: Any) -> Any:
if behavior == "return":
return 1 # would be failure in after-mode but should be success here
raise RuntimeError("fail early")
if behavior == "raise":
with pytest.raises(RuntimeError):
_fn(123)
else:
_fn(123)
disable_anon_tracker()
# a single event is recorded with success True
assert len(SENT_ITEMS) == 1
event = SENT_ITEMS[0]
assert event["event"] == "command_before_cmd"
props = event["properties"]
assert props["event_category"] == "command"
assert props["event_name"] == "before_cmd"
assert props["success"] is True
# verify included props
assert props["x"] == 123
assert props["ignored_param"] == "ignored"
assert isinstance(props["elapsed"], (int, float)) and props["elapsed"] >= 0
def test_cleanup(environment: DictStrStr) -> None:
# this must happen after all forked tests (problems with tests teardowns in other tests)
pass

View File

@@ -3,7 +3,6 @@ from copy import copy
from typing import Any, Dict, Type
from unittest.mock import Mock, patch
import pytest
import pyarrow.csv as acsv
import pyarrow.parquet as pq
from dlt.common import json
@@ -17,6 +16,7 @@ from dlt.common.data_writers.writers import (
)
from dlt.common.libs.pyarrow import remove_columns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.utils import custom_environ
from tests.common.data_writers.utils import get_writer
from tests.cases import (
@@ -24,7 +24,7 @@ from tests.cases import (
arrow_table_all_data_types,
table_update_and_row,
)
from tests.utils import TestDataItemFormat, custom_environ
from tests.utils import TestDataItemFormat
def test_csv_arrow_writer_all_data_fields() -> None:

View File

@@ -14,6 +14,8 @@ from tenacity import retry_if_exception, Retrying, stop_after_attempt
from unittest.mock import patch
import pytest
from dlt.common.configuration import resolve
from dlt.common.configuration.specs.pluggable_run_context import PluggableRunContext
from dlt.common.known_env import DLT_LOCAL_DIR
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
@@ -57,6 +59,7 @@ from dlt.extract import DltResource, DltSource
from dlt.extract.extractors import MaterializedEmptyList
from dlt.load.exceptions import LoadClientJobFailed
from dlt.normalize.exceptions import NormalizeJobFailed
from dlt.pipeline.configuration import PipelineConfiguration
from dlt.pipeline.exceptions import (
InvalidPipelineName,
PipelineNeverRan,
@@ -95,7 +98,6 @@ def test_default_pipeline() -> None:
possible_names = ["dlt_pytest", "dlt_pipeline"]
assert p.pipeline_name in possible_names
assert p.pipelines_dir == os.path.abspath(os.path.join(TEST_STORAGE_ROOT, ".dlt", "pipelines"))
assert p.runtime_config.pipeline_name == p.pipeline_name
# default dataset name is not created until a destination that requires it is set
assert p.dataset_name is None
assert p.destination is None
@@ -115,6 +117,22 @@ def test_default_pipeline() -> None:
assert p.default_schema_name in ["dlt_pytest", "dlt"]
def test_pipeline_runtime_configuration() -> None:
c = resolve.resolve_configuration(PipelineConfiguration())
assert c.pipeline_name.startswith("dlt_")
assert c.runtime.slack_incoming_hook is None
# check pipeline runtime config
os.environ["RUNTIME__SLACK_INCOMING_HOOK"] = "https://hooks.slack.com/services/..."
c = resolve.resolve_configuration(PipelineConfiguration())
assert c.runtime.slack_incoming_hook == os.environ["RUNTIME__SLACK_INCOMING_HOOK"]
os.environ["TEST_P__RUNTIME__SLACK_INCOMING_HOOK"] = "#test-p-slack"
p = dlt.pipeline("test_p")
assert p.config.runtime.slack_incoming_hook == "#test-p-slack"
assert p.config.runtime is p.runtime_config
def test_default_pipeline_dataset_layout(environment) -> None:
# Set dataset_name_layout to "bobby_%s"
dataset_name_layout = "bobby_%s"
@@ -130,7 +148,6 @@ def test_default_pipeline_dataset_layout(environment) -> None:
]
assert p.pipeline_name in possible_names
assert p.pipelines_dir == os.path.abspath(os.path.join(TEST_STORAGE_ROOT, ".dlt", "pipelines"))
assert p.runtime_config.pipeline_name == p.pipeline_name
# dataset that will be used to load data is the pipeline name
assert p.dataset_name in possible_dataset_names
assert p.default_schema_name is None
@@ -352,8 +369,6 @@ def test_pipeline_with_non_alpha_name() -> None:
assert p.pipeline_name == name
# default dataset is set (we used filesystem destination that requires dataset)
assert p.dataset_name == f"{name}_dataset"
# also pipeline name in runtime must be correct
assert p.runtime_config.pipeline_name == p.pipeline_name
# this will create default schema
p.extract(["a", "b", "c"], table_name="data")
@@ -993,6 +1008,7 @@ def test_sentry_tracing() -> None:
import sentry_sdk
os.environ["RUNTIME__SENTRY_DSN"] = TEST_SENTRY_DSN
Container()[PluggableRunContext].reload()
pipeline_name = "pipe_" + uniq_id()
p = dlt.pipeline(pipeline_name=pipeline_name, destination=DUMMY_COMPLETE)

View File

@@ -649,7 +649,7 @@ def test_slack_hook(environment: DictStrStr) -> None:
with requests_mock.mock() as m:
m.post(hook_url, json={})
load_info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy")
assert slack_notify_load_success(load_info.pipeline.runtime_config.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == 200 # type: ignore[attr-defined]
assert slack_notify_load_success(load_info.pipeline.config.runtime.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == 200 # type: ignore[attr-defined]
assert m.called
message = m.last_request.json()
assert "rudolfix" in message["text"]
@@ -661,7 +661,7 @@ def test_broken_slack_hook(environment: DictStrStr) -> None:
environment["RUNTIME__SLACK_INCOMING_HOOK"] = "http://localhost:22"
load_info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy")
# connection error
assert slack_notify_load_success(load_info.pipeline.runtime_config.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == -1 # type: ignore[attr-defined]
assert slack_notify_load_success(load_info.pipeline.config.runtime.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == -1 # type: ignore[attr-defined]
# pipeline = dlt.pipeline()
# assert pipeline.last_trace is not None
# assert pipeline._trace is None

View File

@@ -212,7 +212,7 @@ def test_init_default_client() -> None:
os.environ.update({key: str(value) for key, value in cfg.items()})
dlt.pipeline(pipeline_name="dummy_pipeline")
dlt.current.run_context().reload()
session = default_client.session
assert session.timeout == cfg["RUNTIME__REQUEST_TIMEOUT"]

View File

@@ -28,15 +28,15 @@ from dlt.common.configuration.specs.config_providers_context import ConfigProvid
from dlt.common.configuration.specs.pluggable_run_context import (
RunContextBase,
)
from dlt.common.pipeline import LoadInfo, PipelineContext, SupportsPipeline
from dlt.common.pipeline import PipelineContext, SupportsPipeline
from dlt.common.runtime.run_context import DOT_DLT, RunContext
from dlt.common.runtime.telemetry import start_telemetry, stop_telemetry
from dlt.common.runtime.telemetry import stop_telemetry
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableFormat
from dlt.common.storages import FileStorage
from dlt.common.storages.versioned_storage import VersionedStorage
from dlt.common.typing import DictStrAny, StrAny, TDataItem
from dlt.common.utils import custom_environ, set_working_dir, uniq_id
from dlt.common.typing import StrAny, TDataItem
from dlt.common.utils import set_working_dir
TEST_STORAGE_ROOT = "_storage"
@@ -205,19 +205,11 @@ def auto_module_test_run_context(auto_module_test_storage) -> Iterator[None]:
def create_test_run_context() -> Iterator[None]:
# this plugs active context
ctx = PluggableRunContext()
mock = MockableRunContext.from_context(ctx.context)
mock._local_dir = os.path.abspath(TEST_STORAGE_ROOT)
mock._global_dir = mock._data_dir = os.path.join(mock._local_dir, DOT_DLT)
# ctx.context = mock
# also emit corresponding env variables so pipelines in env work like that
# with custom_environ(
# {
# known_env.DLT_LOCAL_DIR: mock.local_dir,
# known_env.DLT_DATA_DIR: mock.data_dir,
# }
# ):
ctx_plug = Container()[PluggableRunContext]
cookie = ctx_plug.push_context()
try:
@@ -306,6 +298,7 @@ class MockableRunContext(RunContext):
cls_._settings_dir = ctx.settings_dir
cls_._data_dir = ctx.data_dir
cls_._local_dir = ctx.local_dir
cls_._runtime_config = ctx.runtime_config
return cls_
@@ -438,9 +431,8 @@ def arrow_item_from_table(
def init_test_logging(c: RuntimeConfiguration = None) -> None:
if not c:
c = resolve_configuration(RuntimeConfiguration())
Container()[PluggableRunContext].initialize_runtime(c)
ctx = Container()[PluggableRunContext].context
ctx.initialize_runtime(c)
@configspec
@@ -455,7 +447,9 @@ def start_test_telemetry(c: RuntimeConfiguration = None):
stop_telemetry()
if not c:
c = resolve_configuration(RuntimeConfiguration())
start_telemetry(c)
c.dlthub_telemetry = True
ctx = Container()[PluggableRunContext].context
ctx.initialize_runtime(c)
@pytest.fixture
@@ -476,6 +470,9 @@ def disable_temporary_telemetry() -> Iterator[None]:
# force stop telemetry
telemetry._TELEMETRY_STARTED = True
stop_telemetry()
from dlt.common.runtime import anon_tracker
assert anon_tracker._ANON_TRACKER_ENDPOINT is None
def clean_test_storage(

View File

@@ -0,0 +1,6 @@
[workspace.settings]
name="name_override"
working_dir="_data"
[workspace.runtime]
pipeline_name="component"

View File

@@ -0,0 +1,2 @@
[workspace.settings]
local_dir="." # relative to run dir

View File

@@ -49,7 +49,7 @@ def test_invoke_list_pipelines(script_runner: ScriptRunner) -> None:
assert result.returncode == 0
assert "No pipelines found in" in result.stdout
# this is current workspace data dir
expected_path = os.path.join("_storage", "empty", ".dlt", "_data", "dev")
expected_path = os.path.join("_storage", "empty", ".dlt", ".var", "dev")
assert expected_path in result.stdout
result = script_runner.run(["dlt", "pipeline", "--list-pipelines"])

View File

@@ -1,23 +1,16 @@
import pytest
import io
import os
import contextlib
from typing import Any
from unittest.mock import patch
from dlt.common.configuration.container import Container
from dlt.common.configuration.providers import ConfigTomlProvider, CONFIG_TOML
from dlt.common.configuration.specs import PluggableRunContext
from dlt.common.typing import DictStrAny
from dlt._workspace.cli.utils import track_command
from dlt._workspace.cli._telemetry_command import (
telemetry_status_command,
change_telemetry_status_command,
)
from tests.utils import start_test_telemetry
def test_main_telemetry_command() -> None:
# run dir is patched to TEST_STORAGE/default (workspace root)
@@ -77,120 +70,3 @@ def test_main_telemetry_command() -> None:
project_toml = ConfigTomlProvider(run_context.settings_dir)
# local project toml was modified
assert project_toml._config_doc["runtime"]["dlthub_telemetry"] is False
def test_command_instrumentation() -> None:
@track_command("instrument_ok", False, "in_ok_param", "in_ok_param_2")
def instrument_ok(in_ok_param: str, in_ok_param_2: int) -> int:
return 0
@track_command("instrument_err_status", False, "in_err_status", "no_se")
def instrument_err_status(in_err_status: int) -> int:
return 1
@track_command("instrument_raises", False, "in_raises")
def instrument_raises(in_raises: bool) -> int:
raise Exception("failed")
@track_command("instrument_raises", True, "in_raises_2")
def instrument_raises_2(in_raises_2: bool) -> int:
raise Exception("failed")
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry()
SENT_ITEMS.clear()
instrument_ok("ok_param", 7)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_instrument_ok"
assert msg["properties"]["in_ok_param"] == "ok_param"
assert msg["properties"]["in_ok_param_2"] == 7
assert msg["properties"]["success"] is True
assert isinstance(msg["properties"]["elapsed"], float)
SENT_ITEMS.clear()
instrument_err_status(88)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_instrument_err_status"
assert msg["properties"]["in_err_status"] == 88
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
with pytest.raises(Exception):
instrument_raises(True)
msg = SENT_ITEMS[0]
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
with pytest.raises(Exception):
instrument_raises_2(True)
msg = SENT_ITEMS[0]
# this one is tracked BEFORE command is executed so success
assert msg["properties"]["success"] is True
def test_instrumentation_wrappers() -> None:
from dlt._workspace.cli import (
DEFAULT_VERIFIED_SOURCES_REPO,
)
from dlt._workspace.cli._deploy_command import (
DeploymentMethods,
COMMAND_DEPLOY_REPO_LOCATION,
)
from dlt._workspace.cli._command_wrappers import (
init_command_wrapper,
deploy_command_wrapper,
list_sources_command_wrapper,
)
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry()
SENT_ITEMS.clear()
with io.StringIO() as buf, contextlib.redirect_stderr(buf):
try:
init_command_wrapper("instrumented_source", "<UNK>", None, None)
except Exception:
pass
# output = buf.getvalue()
# assert "is not one of the standard dlt destinations" in output
msg = SENT_ITEMS[0]
assert msg["event"] == "command_init"
assert msg["properties"]["source_name"] == "instrumented_source"
assert msg["properties"]["destination_type"] == "<UNK>"
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
list_sources_command_wrapper(DEFAULT_VERIFIED_SOURCES_REPO, None)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_list_sources"
# SENT_ITEMS.clear()
# pipeline_command_wrapper("list", "-", None, 1)
# msg = SENT_ITEMS[0]
# assert msg["event"] == "command_pipeline"
# assert msg["properties"]["operation"] == "list"
SENT_ITEMS.clear()
try:
deploy_command_wrapper(
"list.py",
DeploymentMethods.github_actions.value,
COMMAND_DEPLOY_REPO_LOCATION,
schedule="* * * * *",
)
except Exception:
pass
msg = SENT_ITEMS[0]
assert msg["event"] == "command_deploy"
assert msg["properties"]["deployment_method"] == DeploymentMethods.github_actions.value
assert msg["properties"]["success"] is False
SENT_ITEMS = []
def _mock_before_send(event: DictStrAny, _unused_hint: Any = None) -> DictStrAny:
SENT_ITEMS.append(event)
# do not send this
return None

View File

@@ -4,21 +4,9 @@ import pytest
from dlt._workspace._workspace_context import WorkspaceRunContext
from tests.utils import (
auto_unload_modules,
)
from tests.workspace.utils import isolated_workspace
@pytest.fixture(autouse=True)
def auto_unload_core_sources(auto_unload_modules) -> None:
"""Unload core sources so all init tests will pass"""
sys.modules.pop("dlt.sources.rest_api", None)
sys.modules.pop("dlt.sources.sql_database", None)
sys.modules.pop("dlt.sources.filesystem", None)
@pytest.fixture(autouse=True)
def auto_isolated_workspace(
autouse_test_storage, preserve_run_context

View File

@@ -1,24 +1,25 @@
import contextlib
import io
import os
import shutil
from typing import Literal
from typing import Any
import pytest
from _pytest.capture import CaptureFixture
from _pytest.monkeypatch import MonkeyPatch
from pytest_mock import MockerFixture
from unittest.mock import patch, Mock
from dlt._workspace.cli.utils import delete_local_data
from dlt._workspace.cli.utils import delete_local_data, check_delete_local_data, track_command
from dlt._workspace.cli.exceptions import CliCommandException
from dlt._workspace.configuration import WorkspaceRuntimeConfiguration
from dlt.common.runtime.run_context import RunContext
from dlt.common.runtime.anon_tracker import disable_anon_tracker
from dlt.common.configuration.specs.pluggable_run_context import RunContextBase
# import the same pokemon fixture as in test_mcp_tools
from tests.workspace.utils import fruitshop_pipeline_context as fruitshop_pipeline_context
def _remove_dir(path: str) -> None:
"""remove a directory tree if it exists."""
if os.path.isdir(path):
shutil.rmtree(path)
from dlt._workspace.cli import echo
from tests.common.runtime.utils import mock_github_env, mock_pod_env
from tests.utils import SentryLoggerConfiguration, start_test_telemetry, disable_temporary_telemetry
@pytest.mark.parametrize(
@@ -49,8 +50,11 @@ def test_delete_local_data_recreate_behavior(
"""
ctx = fruitshop_pipeline_context
# call function under test
delete_local_data(ctx, skip_data_dir=skip_data_dir, recreate_dirs=recreate_dirs)
# list dirs to delete and auto-confirm
with echo.always_choose(always_choose_default=False, always_choose_value=True):
attrs = check_delete_local_data(ctx, skip_data_dir=skip_data_dir)
# perform deletion (which will only recreate when requested)
delete_local_data(ctx, attrs, recreate_dirs=recreate_dirs)
# local_dir is always processed
assert os.path.isdir(ctx.local_dir) is recreate_dirs
@@ -59,20 +63,22 @@ def test_delete_local_data_recreate_behavior(
expected_data_exists = skip_data_dir or recreate_dirs
assert os.path.isdir(ctx.data_dir) is expected_data_exists
# capture and check user-facing messages (ignore styled path)
# capture and check user-facing messages from check_delete_local_data
out = capsys.readouterr().out
assert "Will delete locally loaded data in " in out
assert "The following dirs will be deleted:" in out
assert "(locally loaded data)" in out
if skip_data_dir:
assert "Will delete pipeline working folders & other entities data " not in out
assert "(pipeline working folders)" not in out
else:
assert "Will delete pipeline working folders & other entities data " in out
assert "(pipeline working folders)" in out
def test_delete_local_data_with_plain_run_context_raises(capsys: CaptureFixture[str]) -> None:
"""ensure CliCommandException is raised when context lacks profiles."""
plain_ctx = RunContext(run_dir=".")
with pytest.raises(CliCommandException):
delete_local_data(plain_ctx, skip_data_dir=False, recreate_dirs=True)
# should fail before any confirmation prompt
check_delete_local_data(plain_ctx, skip_data_dir=False)
out = capsys.readouterr().out
assert "ERROR: Cannot delete local data for a context without profiles" in out
@@ -87,16 +93,7 @@ def _assert_protected_deletion(
*,
skip_data: bool,
) -> None:
"""helper to assert that attempting to delete a protected dir raises and logs an error.
Args:
ctx: Workspace run context fixture instance.
capsys: pytest capsys to capture output.
monkeypatch: pytest monkeypatch fixture to patch properties.
dir_attr: Attribute to delete ("local_dir" or "data_dir").
equals_attr: Attribute to match against ("run_dir" or "settings_dir").
skip_data: Whether to skip data_dir deletion in delete_local_data.
"""
"""helper to assert that attempting to delete a protected dir raises and logs an error."""
# compute target path to match the protected attribute
target_path = getattr(ctx, equals_attr)
@@ -105,11 +102,11 @@ def _assert_protected_deletion(
# exercise and assert
with pytest.raises(CliCommandException):
delete_local_data(ctx, skip_data_dir=skip_data, recreate_dirs=True)
check_delete_local_data(ctx, skip_data_dir=skip_data)
out = capsys.readouterr().out
label = "run dir (workspace root)" if equals_attr == "run_dir" else "settings dir"
assert f"ERROR: {dir_attr} `deleted_dir` is the same as {label} and cannot be deleted" in out
assert f"ERROR: {dir_attr} `{target_path}` is the same as {label} and cannot be deleted" in out
@pytest.mark.parametrize(
@@ -142,3 +139,232 @@ def test_delete_local_data_protects_run_and_settings_dirs(
_assert_protected_deletion(
fruitshop_pipeline_context, capsys, monkeypatch, dir_attr, equals_attr, skip_data=skip_data
)
def test_delete_local_data_rejects_dirs_outside_run_dir(
fruitshop_pipeline_context: RunContextBase,
capsys: CaptureFixture[str],
monkeypatch: MonkeyPatch,
tmp_path: Any,
) -> None:
"""ensure we refuse to operate on dirs that are not under the workspace run_dir."""
ctx = fruitshop_pipeline_context
# point local_dir to a path outside of run_dir
outside_dir = tmp_path / "outside_local"
outside_dir.mkdir(parents=True, exist_ok=True)
# patch attribute to simulate unsafe location
monkeypatch.setattr(
type(ctx), "local_dir", property(lambda self: str(outside_dir)), raising=True
)
with pytest.raises(CliCommandException):
check_delete_local_data(ctx, skip_data_dir=True)
out = capsys.readouterr().out
assert (
f"ERROR: local_dir `{ctx.local_dir}` is not within run dir (workspace root) and cannot be"
" deleted"
in out
)
def test_track_command_track_after_passes_params(
mocker: MockerFixture, disable_temporary_telemetry
) -> None:
"""verify track_command wraps with telemetry and forwards arg names and extra kwargs."""
# init test telemetry and capture outgoing events
mock_github_env(os.environ)
mock_pod_env(os.environ)
SENT_ITEMS.clear()
config = WorkspaceRuntimeConfiguration(dlthub_telemetry=True)
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
mocker.patch(
"dlt.common.runtime.anon_tracker.requests.post",
return_value=Mock(status_code=204),
)
@track_command("my_cmd", False, "x", "y", extra_const="value")
def _fn(x: Any, y: Any, z: Any = None) -> Any:
return "ok"
_fn("X", 7, z="ignored")
disable_anon_tracker()
assert len(SENT_ITEMS) == 1
event = SENT_ITEMS[0]
# event basics
assert event["event"] == "command_my_cmd"
props = event["properties"]
assert props["event_category"] == "command"
assert props["event_name"] == "my_cmd"
# captured args and extra kwargs
assert props["x"] == "X"
assert props["y"] == 7
assert props["extra_const"] == "value"
# automatic props
assert isinstance(props["elapsed"], (int, float)) and props["elapsed"] >= 0
assert props["success"] is True
def test_track_command_track_before_passes_params(
mocker: MockerFixture, disable_temporary_telemetry
) -> None:
"""when tracking before, event is emitted once with success True and includes provided params."""
mock_github_env(os.environ)
mock_pod_env(os.environ)
SENT_ITEMS.clear()
config = WorkspaceRuntimeConfiguration(dlthub_telemetry=True)
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
mocker.patch(
"dlt.common.runtime.anon_tracker.requests.post",
return_value=Mock(status_code=204),
)
@track_command("before_cmd", True, "p", ignored="const")
def _fn(p: Any) -> Any:
# raising should not affect success flag in before mode
raise RuntimeError("fail")
with pytest.raises(RuntimeError):
_fn(123)
disable_anon_tracker()
assert len(SENT_ITEMS) == 1
event = SENT_ITEMS[0]
assert event["event"] == "command_before_cmd"
props = event["properties"]
assert props["event_category"] == "command"
assert props["event_name"] == "before_cmd"
assert props["p"] == 123
assert props["ignored"] == "const"
assert isinstance(props["elapsed"], (int, float)) and props["elapsed"] >= 0
assert props["success"] is True
def test_command_instrumentation() -> None:
@track_command("instrument_ok", False, "in_ok_param", "in_ok_param_2")
def instrument_ok(in_ok_param: str, in_ok_param_2: int) -> int:
return 0
@track_command("instrument_err_status", False, "in_err_status", "no_se")
def instrument_err_status(in_err_status: int) -> int:
return 1
@track_command("instrument_raises", False, "in_raises")
def instrument_raises(in_raises: bool) -> int:
raise Exception("failed")
@track_command("instrument_raises", True, "in_raises_2")
def instrument_raises_2(in_raises_2: bool) -> int:
raise Exception("failed")
config = WorkspaceRuntimeConfiguration(dlthub_telemetry=True)
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
SENT_ITEMS.clear()
instrument_ok("ok_param", 7)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_instrument_ok"
assert msg["properties"]["in_ok_param"] == "ok_param"
assert msg["properties"]["in_ok_param_2"] == 7
assert msg["properties"]["success"] is True
assert isinstance(msg["properties"]["elapsed"], float)
SENT_ITEMS.clear()
instrument_err_status(88)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_instrument_err_status"
assert msg["properties"]["in_err_status"] == 88
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
with pytest.raises(Exception):
instrument_raises(True)
msg = SENT_ITEMS[0]
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
with pytest.raises(Exception):
instrument_raises_2(True)
msg = SENT_ITEMS[0]
# this one is tracked BEFORE command is executed so success
assert msg["properties"]["success"] is True
def test_instrumentation_wrappers() -> None:
from dlt._workspace.cli import (
DEFAULT_VERIFIED_SOURCES_REPO,
)
from dlt._workspace.cli._deploy_command import (
DeploymentMethods,
COMMAND_DEPLOY_REPO_LOCATION,
)
from dlt._workspace.cli._init_command import (
init_command_wrapper,
list_sources_command_wrapper,
)
from dlt._workspace.cli._deploy_command import (
deploy_command_wrapper,
)
config = WorkspaceRuntimeConfiguration(dlthub_telemetry=True)
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
start_test_telemetry(config)
SENT_ITEMS.clear()
with io.StringIO() as buf, contextlib.redirect_stderr(buf):
try:
init_command_wrapper("instrumented_source", "<UNK>", None, None)
except Exception:
pass
# output = buf.getvalue()
# assert "is not one of the standard dlt destinations" in output
msg = SENT_ITEMS[0]
assert msg["event"] == "command_init"
assert msg["properties"]["source_name"] == "instrumented_source"
assert msg["properties"]["destination_type"] == "<UNK>"
assert msg["properties"]["success"] is False
SENT_ITEMS.clear()
list_sources_command_wrapper(DEFAULT_VERIFIED_SOURCES_REPO, None)
msg = SENT_ITEMS[0]
assert msg["event"] == "command_list_sources"
SENT_ITEMS.clear()
try:
deploy_command_wrapper(
"list.py",
DeploymentMethods.github_actions.value,
COMMAND_DEPLOY_REPO_LOCATION,
schedule="* * * * *",
)
except Exception:
pass
msg = SENT_ITEMS[0]
assert msg["event"] == "command_deploy"
assert msg["properties"]["deployment_method"] == DeploymentMethods.github_actions.value
assert msg["properties"]["success"] is False
# telemetry helpers local to this module (avoid depending on other test modules)
SENT_ITEMS: list[dict[str, Any]] = []
def _mock_before_send(event: dict[str, Any], _unused_hint: Any = None) -> dict[str, Any]:
# capture event for assertions
SENT_ITEMS.append(event)
return event

View File

@@ -13,7 +13,7 @@ from dlt.common.runners import Venv
from dlt.common.typing import StrAny
from dlt.pipeline.exceptions import CannotRestorePipelineException
from dlt._workspace.cli import _deploy_command, echo, _command_wrappers
from dlt._workspace.cli import _deploy_command, echo
from dlt._workspace.cli.exceptions import CliCommandInnerException, PipelineWasNotRun
from dlt._workspace.cli._deploy_command_helpers import get_schedule_description
from dlt._workspace.cli.exceptions import CliCommandException
@@ -48,7 +48,7 @@ def test_deploy_command_no_repo(deployment_method: str, deployment_args: StrAny)
# test wrapper
with pytest.raises(CliCommandException) as ex:
_command_wrappers.deploy_command_wrapper(
_deploy_command.deploy_command_wrapper(
"debug_pipeline.py",
deployment_method,
_deploy_command.COMMAND_DEPLOY_REPO_LOCATION,
@@ -77,7 +77,7 @@ def test_deploy_command(deployment_method: str, deployment_args: StrAny) -> None
)
assert "Your current repository has no origin set" in py_ex.value.args[0]
with pytest.raises(CliCommandInnerException):
_command_wrappers.deploy_command_wrapper(
_deploy_command.deploy_command_wrapper(
"debug_pipeline.py",
deployment_method,
_deploy_command.COMMAND_DEPLOY_REPO_LOCATION,
@@ -94,7 +94,7 @@ def test_deploy_command(deployment_method: str, deployment_args: StrAny) -> None
**deployment_args,
)
with pytest.raises(CliCommandException) as ex:
_command_wrappers.deploy_command_wrapper(
_deploy_command.deploy_command_wrapper(
"debug_pipeline.py",
deployment_method,
_deploy_command.COMMAND_DEPLOY_REPO_LOCATION,
@@ -120,7 +120,7 @@ def test_deploy_command(deployment_method: str, deployment_args: StrAny) -> None
assert "The last pipeline run ended with error" in py_ex2.value.args[0]
with pytest.raises(CliCommandException) as ex:
_command_wrappers.deploy_command_wrapper(
_deploy_command.deploy_command_wrapper(
"debug_pipeline.py",
deployment_method,
_deploy_command.COMMAND_DEPLOY_REPO_LOCATION,
@@ -186,7 +186,7 @@ def test_deploy_command(deployment_method: str, deployment_args: StrAny) -> None
)
with echo.always_choose(False, always_choose_value=True):
with pytest.raises(CliCommandException) as ex:
_command_wrappers.deploy_command_wrapper(
_deploy_command.deploy_command_wrapper(
"no_pipeline.py",
deployment_method,
_deploy_command.COMMAND_DEPLOY_REPO_LOCATION,

View File

@@ -1,3 +1,4 @@
import sys
import io
from copy import deepcopy
import hashlib
@@ -85,6 +86,18 @@ TEMPLATES = [
# a few verified sources we know to exist
SOME_KNOWN_VERIFIED_SOURCES = ["chess", "google_sheets", "pipedrive"]
from tests.utils import (
auto_unload_modules,
)
@pytest.fixture(autouse=True)
def auto_unload_core_sources(auto_unload_modules) -> None:
"""Unload core sources so all init tests will pass"""
sys.modules.pop("dlt.sources.rest_api", None)
sys.modules.pop("dlt.sources.sql_database", None)
sys.modules.pop("dlt.sources.filesystem", None)
def get_source_candidates(repo_dir: str, source_type: TSourceType = "verified") -> List[str]:
# vibe sources are in the root folder, so no module name

View File

@@ -12,6 +12,7 @@ from dlt.common import pendulum
from dlt._workspace.helpers.dashboard.config import DashboardConfiguration
from dlt._workspace.helpers.dashboard.utils import (
PICKLE_TRACE_FILE,
get_dashboard_config_sections,
get_query_result_cached,
resolve_dashboard_config,
get_local_pipelines,
@@ -56,6 +57,7 @@ from tests.workspace.helpers.dashboard.example_pipelines import (
PIPELINES_WITH_EXCEPTIONS,
PIPELINES_WITH_LOAD,
)
from tests.workspace.utils import isolated_workspace
@pytest.fixture
@@ -110,10 +112,23 @@ def test_get_local_data_path(pipeline: dlt.Pipeline):
assert get_local_data_path(pipeline)
def test_resolve_dashboard_config(success_pipeline_duckdb):
def test_get_dashboard_config_sections(success_pipeline_duckdb) -> None:
# NOTE: "dashboard" obligatory section comes from configuration __section__
assert get_dashboard_config_sections(success_pipeline_duckdb) == (
"pipelines",
"success_pipeline_duckdb",
)
assert get_dashboard_config_sections(None) == ()
# create workspace context
with isolated_workspace("configured_workspace"):
assert get_dashboard_config_sections(None) == ("workspace",)
def test_resolve_dashboard_config(success_pipeline_duckdb) -> None:
"""Test resolving dashboard config with a real pipeline"""
os.environ["DASHBOARD__SUCCESS_PIPELINE_DUCKDB__DATETIME_FORMAT"] = "some format"
os.environ["PIPELINES__SUCCESS_PIPELINE_DUCKDB__DASHBOARD__DATETIME_FORMAT"] = "some format"
os.environ["DASHBOARD__DATETIME_FORMAT"] = "other format"
config = resolve_dashboard_config(success_pipeline_duckdb)
@@ -126,6 +141,12 @@ def test_resolve_dashboard_config(success_pipeline_duckdb):
config = resolve_dashboard_config(other_pipeline)
assert config.datetime_format == "other format"
# create workspace context
with isolated_workspace("configured_workspace"):
os.environ["WORKSPACE__DASHBOARD__DATETIME_FORMAT"] = "workspace format"
config = resolve_dashboard_config(None)
assert config.datetime_format == "workspace format"
@pytest.mark.parametrize("pipeline", ALL_PIPELINES, indirect=True)
def test_get_pipelines(pipeline: dlt.Pipeline):

View File

@@ -4,7 +4,7 @@ import pickle
import dlt
from dlt._workspace._workspace_context import WorkspaceRunContext, switch_context
from dlt._workspace.cli.utils import delete_local_data
from dlt._workspace.cli.utils import check_delete_local_data, delete_local_data
from dlt._workspace.exceptions import WorkspaceRunContextNotAvailable
from dlt._workspace.profile import DEFAULT_PROFILE, read_profile_pin, save_profile_pin
from dlt._workspace.run_context import (
@@ -12,7 +12,7 @@ from dlt._workspace.run_context import (
DEFAULT_WORKSPACE_WORKING_FOLDER,
switch_profile,
)
from dlt._workspace.cli.echo import maybe_no_stdin
from dlt._workspace.cli.echo import always_choose
from dlt.common.runtime.exceptions import RunContextNotAvailable
from dlt.common.runtime.run_context import DOT_DLT, RunContext, global_dir
@@ -73,6 +73,22 @@ def test_profile_switch_no_workspace():
switch_profile("dev")
def test_workspace_configuration():
with isolated_workspace("configured_workspace", profile="tests") as ctx:
# should be used as component for logging
assert ctx.runtime_config.pipeline_name == "component"
assert ctx.name == "name_override"
# check dirs for tests profile
assert ctx.data_dir == os.path.join(ctx.run_dir, "_data")
assert ctx.local_dir.endswith(os.path.join("_local", "tests"))
ctx = ctx.switch_profile("dev")
assert ctx.name == "name_override"
assert ctx.data_dir == os.path.join(ctx.run_dir, "_data")
# this OSS compat mode where local dir is same as run dir
assert ctx.local_dir == os.path.join(ctx.run_dir, ".")
def test_pinned_profile() -> None:
with isolated_workspace("default") as ctx:
save_profile_pin(ctx, "prod")
@@ -110,8 +126,8 @@ def test_workspace_pipeline() -> None:
assert os.path.isdir(os.path.join(ctx.get_data_entity("pipelines"), "ducklake_pipeline"))
# test wipe function
with maybe_no_stdin():
delete_local_data(ctx, skip_data_dir=False)
with always_choose(always_choose_default=False, always_choose_value=True):
delete_local_data(ctx, check_delete_local_data(ctx, skip_data_dir=False))
# must recreate pipeline
pipeline = pipeline.drop()
load_info = pipeline.run([{"foo": 1}, {"foo": 2}], table_name="table_foo")
@@ -149,7 +165,7 @@ def assert_workspace_context(context: WorkspaceRunContext, name_prefix: str, pro
expected_settings = os.path.join(context.run_dir, DOT_DLT)
assert context.settings_dir == expected_settings
# path / _data / profile
# path / .var / profile
expected_data_dir = os.path.join(
context.settings_dir, DEFAULT_WORKSPACE_WORKING_FOLDER, profile
)
@@ -171,4 +187,7 @@ def assert_workspace_context(context: WorkspaceRunContext, name_prefix: str, pro
assert context.get_setting("config.toml") == os.path.join(expected_settings, "config.toml")
# check if can be pickled
pickle.dumps(context)
pickled_ = pickle.dumps(context)
run_context_unpickled = pickle.loads(pickled_)
assert dict(context.runtime_config) == dict(run_context_unpickled.runtime_config)
assert dict(context.config) == dict(run_context_unpickled.config)