forked from repo-mirrors/dbt-core
Compare commits
17 Commits
main
...
qmalcolm--
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f5003724e8 | ||
|
|
3b2481b2f3 | ||
|
|
008e90a5eb | ||
|
|
c3512ed240 | ||
|
|
d274046c07 | ||
|
|
3bd2972b5c | ||
|
|
486447e57e | ||
|
|
436e1e86b6 | ||
|
|
8cc71243d4 | ||
|
|
b909090932 | ||
|
|
15ef78b4eb | ||
|
|
f4dd99b347 | ||
|
|
7a80aa3a9f | ||
|
|
2092a70e83 | ||
|
|
01219eb037 | ||
|
|
d678d73b76 | ||
|
|
a0c02919c8 |
6
.changes/unreleased/Under the Hood-20240826-141843.yaml
Normal file
6
.changes/unreleased/Under the Hood-20240826-141843.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Under the Hood
|
||||
body: Add type hinting to BaseRunner class to increase stability guarantees
|
||||
time: 2024-08-26T14:18:43.834018-05:00
|
||||
custom:
|
||||
Author: QMalcolm
|
||||
Issue: "10606"
|
||||
@@ -81,9 +81,12 @@ class FreshnessStatus(StrEnum):
|
||||
RuntimeErr = NodeStatus.RuntimeErr
|
||||
|
||||
|
||||
ResultStatus = Union[RunStatus, TestStatus, FreshnessStatus]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseResult(dbtClassMixin):
|
||||
status: Union[RunStatus, TestStatus, FreshnessStatus]
|
||||
status: ResultStatus
|
||||
timing: List[TimingInfo]
|
||||
thread_id: str
|
||||
execution_time: float
|
||||
|
||||
@@ -6,14 +6,18 @@ from abc import ABCMeta, abstractmethod
|
||||
from contextlib import nullcontext
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
|
||||
from agate import Table
|
||||
|
||||
import dbt.exceptions
|
||||
import dbt_common.exceptions.base
|
||||
from dbt import tracking
|
||||
from dbt.adapters.base.impl import BaseAdapter
|
||||
from dbt.artifacts.resources.types import NodeType
|
||||
from dbt.artifacts.schemas.results import (
|
||||
NodeStatus,
|
||||
ResultStatus,
|
||||
RunningStatus,
|
||||
RunStatus,
|
||||
TimingInfo,
|
||||
@@ -26,6 +30,8 @@ from dbt.config import RuntimeConfig
|
||||
from dbt.config.profile import read_profile
|
||||
from dbt.constants import DBT_PROJECT_FILE_NAME
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.graph.nodes import ResultNode
|
||||
from dbt.contracts.results import BaseResult
|
||||
from dbt.events.types import (
|
||||
CatchableExceptionOnRun,
|
||||
GenericExceptionOnRun,
|
||||
@@ -159,24 +165,31 @@ class ExecutionContext:
|
||||
timing information and the newest (compiled vs executed) form of the node.
|
||||
"""
|
||||
|
||||
def __init__(self, node) -> None:
|
||||
def __init__(self, node: ResultNode) -> None:
|
||||
self.timing: List[TimingInfo] = []
|
||||
self.node = node
|
||||
self.node: ResultNode = node
|
||||
|
||||
|
||||
class BaseRunner(metaclass=ABCMeta):
|
||||
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
|
||||
self.config = config
|
||||
self.compiler = Compiler(config)
|
||||
self.adapter = adapter
|
||||
self.node = node
|
||||
self.node_index = node_index
|
||||
self.num_nodes = num_nodes
|
||||
def __init__(
|
||||
self,
|
||||
config: RuntimeConfig,
|
||||
adapter: BaseAdapter,
|
||||
node: ResultNode,
|
||||
node_index: int,
|
||||
num_nodes: int,
|
||||
) -> None:
|
||||
self.config: RuntimeConfig = config
|
||||
self.compiler: Compiler = Compiler(config)
|
||||
self.adapter: BaseAdapter = adapter
|
||||
self.node: ResultNode = node
|
||||
self.node_index: int = node_index
|
||||
self.num_nodes: int = num_nodes
|
||||
|
||||
self.skip = False
|
||||
self.skip: bool = False
|
||||
self.skip_cause: Optional[RunResult] = None
|
||||
|
||||
self.run_ephemeral_models = False
|
||||
self.run_ephemeral_models: bool = False
|
||||
|
||||
@abstractmethod
|
||||
def compile(self, manifest: Manifest) -> Any:
|
||||
@@ -185,7 +198,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
def _node_build_path(self) -> Optional[str]:
|
||||
return self.node.build_path if hasattr(self.node, "build_path") else None
|
||||
|
||||
def get_result_status(self, result) -> Dict[str, str]:
|
||||
def get_result_status(self, result: BaseResult) -> Dict[str, str]:
|
||||
if result.status == NodeStatus.Error:
|
||||
return {"node_status": "error", "node_error": str(result.message)}
|
||||
elif result.status == NodeStatus.Skipped:
|
||||
@@ -197,7 +210,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
else:
|
||||
return {"node_status": "passed"}
|
||||
|
||||
def run_with_hooks(self, manifest):
|
||||
def run_with_hooks(self, manifest: Manifest):
|
||||
if self.skip:
|
||||
return self.on_skip()
|
||||
|
||||
@@ -217,15 +230,15 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
|
||||
def _build_run_result(
|
||||
self,
|
||||
node,
|
||||
start_time,
|
||||
status,
|
||||
timing_info,
|
||||
message,
|
||||
agate_table=None,
|
||||
adapter_response=None,
|
||||
failures=None,
|
||||
):
|
||||
node: ResultNode,
|
||||
start_time: float,
|
||||
status: ResultStatus,
|
||||
timing_info: List[TimingInfo],
|
||||
message: Optional[str] = None,
|
||||
agate_table: Optional[Table] = None,
|
||||
adapter_response: Optional[Dict[str, Any]] = None,
|
||||
failures: Optional[int] = None,
|
||||
) -> RunResult:
|
||||
execution_time = time.time() - start_time
|
||||
thread_id = threading.current_thread().name
|
||||
if adapter_response is None:
|
||||
@@ -242,7 +255,13 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
failures=failures,
|
||||
)
|
||||
|
||||
def error_result(self, node, message, start_time, timing_info):
|
||||
def error_result(
|
||||
self,
|
||||
node: ResultNode,
|
||||
message: str,
|
||||
start_time: float,
|
||||
timing_info: List[TimingInfo],
|
||||
) -> RunResult:
|
||||
return self._build_run_result(
|
||||
node=node,
|
||||
start_time=start_time,
|
||||
@@ -251,7 +270,12 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
message=message,
|
||||
)
|
||||
|
||||
def ephemeral_result(self, node, start_time, timing_info):
|
||||
def ephemeral_result(
|
||||
self,
|
||||
node: ResultNode,
|
||||
start_time: float,
|
||||
timing_info: List[TimingInfo],
|
||||
) -> RunResult:
|
||||
return self._build_run_result(
|
||||
node=node,
|
||||
start_time=start_time,
|
||||
@@ -260,7 +284,12 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
message=None,
|
||||
)
|
||||
|
||||
def from_run_result(self, result, start_time, timing_info):
|
||||
def from_run_result(
|
||||
self,
|
||||
result: RunResult,
|
||||
start_time: float,
|
||||
timing_info: List[TimingInfo],
|
||||
) -> RunResult:
|
||||
return self._build_run_result(
|
||||
node=result.node,
|
||||
start_time=start_time,
|
||||
@@ -272,7 +301,11 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
failures=result.failures,
|
||||
)
|
||||
|
||||
def compile_and_execute(self, manifest, ctx):
|
||||
def compile_and_execute(
|
||||
self,
|
||||
manifest: Manifest,
|
||||
ctx: ExecutionContext,
|
||||
) -> Optional[RunResult]:
|
||||
result = None
|
||||
with (
|
||||
self.adapter.connection_named(self.node.unique_id, self.node)
|
||||
@@ -305,7 +338,11 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
|
||||
return result
|
||||
|
||||
def _handle_catchable_exception(self, e, ctx):
|
||||
def _handle_catchable_exception(
|
||||
self,
|
||||
e: Union[CompilationError, DbtRuntimeError],
|
||||
ctx: ExecutionContext,
|
||||
) -> str:
|
||||
if e.node is None:
|
||||
e.add_node(ctx.node)
|
||||
|
||||
@@ -316,7 +353,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
)
|
||||
return str(e)
|
||||
|
||||
def _handle_internal_exception(self, e, ctx):
|
||||
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
|
||||
fire_event(
|
||||
InternalErrorOnRun(
|
||||
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
|
||||
@@ -324,7 +361,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
)
|
||||
return str(e)
|
||||
|
||||
def _handle_generic_exception(self, e, ctx):
|
||||
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
|
||||
fire_event(
|
||||
GenericExceptionOnRun(
|
||||
build_path=self._node_build_path(),
|
||||
@@ -337,7 +374,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
|
||||
return str(e)
|
||||
|
||||
def handle_exception(self, e, ctx):
|
||||
def handle_exception(self, e, ctx) -> str:
|
||||
catchable_errors = (CompilationError, DbtRuntimeError)
|
||||
if isinstance(e, catchable_errors):
|
||||
error = self._handle_catchable_exception(e, ctx)
|
||||
@@ -347,7 +384,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
error = self._handle_generic_exception(e, ctx)
|
||||
return error
|
||||
|
||||
def safe_run(self, manifest):
|
||||
def safe_run(self, manifest: Manifest) -> RunResult:
|
||||
started = time.time()
|
||||
ctx = ExecutionContext(self.node)
|
||||
error = None
|
||||
@@ -378,7 +415,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
result = self.ephemeral_result(ctx.node, started, ctx.timing)
|
||||
return result
|
||||
|
||||
def _safe_release_connection(self):
|
||||
def _safe_release_connection(self) -> Optional[str]:
|
||||
"""Try to release a connection. If an exception is hit, log and return
|
||||
the error string.
|
||||
"""
|
||||
@@ -394,24 +431,24 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
|
||||
return None
|
||||
|
||||
def before_execute(self):
|
||||
raise NotImplementedError()
|
||||
def before_execute(self) -> None:
|
||||
raise NotImplementedError("The `before_execute` function hasn't been implemented")
|
||||
|
||||
def execute(self, compiled_node, manifest):
|
||||
raise NotImplementedError()
|
||||
def execute(self, compiled_node: ResultNode, manifest: Manifest) -> RunResult:
|
||||
raise NotImplementedError(msg="The `execute` function hasn't been implemented")
|
||||
|
||||
def run(self, compiled_node, manifest):
|
||||
def run(self, compiled_node: ResultNode, manifest: Manifest) -> RunResult:
|
||||
return self.execute(compiled_node, manifest)
|
||||
|
||||
def after_execute(self, result):
|
||||
raise NotImplementedError()
|
||||
def after_execute(self, result: RunResult) -> None:
|
||||
raise NotImplementedError("The `after_execute` function hasn't been implemented")
|
||||
|
||||
def _skip_caused_by_ephemeral_failure(self):
|
||||
def _skip_caused_by_ephemeral_failure(self) -> bool:
|
||||
if self.skip_cause is None or self.skip_cause.node is None:
|
||||
return False
|
||||
return self.skip_cause.node.is_ephemeral_model
|
||||
|
||||
def on_skip(self):
|
||||
def on_skip(self) -> RunResult:
|
||||
schema_name = getattr(self.node, "schema", "")
|
||||
node_name = self.node.name
|
||||
|
||||
@@ -427,7 +464,9 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
relation=node_name,
|
||||
index=self.node_index,
|
||||
total=self.num_nodes,
|
||||
status=self.skip_cause.status,
|
||||
status=(
|
||||
self.skip_cause.status if self.skip_cause is not None else "unknown"
|
||||
),
|
||||
)
|
||||
)
|
||||
# skip_cause here should be the run_result from the ephemeral model
|
||||
@@ -461,7 +500,7 @@ class BaseRunner(metaclass=ABCMeta):
|
||||
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
|
||||
return node_result
|
||||
|
||||
def do_skip(self, cause=None):
|
||||
def do_skip(self, cause: Optional[RunResult] = None) -> None:
|
||||
self.skip = True
|
||||
self.skip_cause = cause
|
||||
|
||||
|
||||
Reference in New Issue
Block a user