Compare commits

...

17 Commits

Author SHA1 Message Date
Quigley Malcolm
f5003724e8 Update .changes/unreleased/Under the Hood-20240826-141843.yaml
Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
2024-08-28 10:10:39 -05:00
Quigley Malcolm
3b2481b2f3 Add changie doc for BaseRunner type hinting improvements 2024-08-26 14:34:29 -05:00
Quigley Malcolm
008e90a5eb Add type hinting to on_skip and fix unhandled potential runtime error
In adding type hinting to `on_skip`, and `do_skip`, a potential runtime
error was uncovered wherein we were accessing `self.skip_cause.status` in
`on_skip` when the `skip_cause` can be `None`. On that edge case, a
runtime error would be raised given how we were attempting to access
the `status` property. We now handle this edge case, and we only discovered
it because of the additional typing.
2024-08-26 14:34:29 -05:00
Quigley Malcolm
c3512ed240 Add type hinting to BaseRunner._skip_caused_by_ephemeral_failure 2024-08-26 14:34:29 -05:00
Quigley Malcolm
d274046c07 Add type hinting to before_execute and after_execute of BaseRunner
These two methods are essentially just additional logging based for the
runner. They should never return anything. Typing them as such helps
guarantee that so people don't end up doing some weird things.
2024-08-26 14:34:29 -05:00
Quigley Malcolm
3bd2972b5c Add type hinting to safe_run and _safe_release_connection of BaseRunner 2024-08-26 14:34:29 -05:00
Quigley Malcolm
486447e57e Add type hinting to exception handling methods of BaseRunner
Specifically methods:
- `_handle_catchable_exception`
- `_handle_internal_exception`
- `_handle_generic_exception`
- `handle_exception`

Additionally we are defining "catchable errors" twice now. Once as a union of
types in `_handle_catchable_exception` and as a tuple of classes in `handle_exception`.
We should look at refactoring this into a single definition sometime, as a change to
one should 1-1 correspond to a change in the other.
2024-08-26 14:34:26 -05:00
Quigley Malcolm
436e1e86b6 Add type hinting to execute and run of BaseRunner
Because `execute` didn't have it's return type specified, it appeared to
`run` that there was no expected return value. In turn `compile_and_run`
was saying that the `result` it would return would _always_ be `None`. However
that is most assuredly _not_ the case, and typing `execute` solves this.
2024-08-26 14:33:53 -05:00
Quigley Malcolm
8cc71243d4 Add typing to node property of ExecutionContext
We did this because in `BaseRunner.compile_and_execute` we weren't
getting type completion for accessing `ctx.node.node_info`. Said another
way, we didn't have the type context in `compile_and_execute` to know
if a `node_info` property existed on the `node` of the `ctx`. By improving
type hinting for `ExecutionContext` we were able to resolve this.
2024-08-26 14:33:53 -05:00
Quigley Malcolm
b909090932 Add type hinting to BaseRunner.compile_and_execute 2024-08-26 14:33:53 -05:00
Quigley Malcolm
15ef78b4eb Add ResultStatus type union definition and use to fix status typing in BaseRunner 2024-08-26 14:33:53 -05:00
Quigley Malcolm
f4dd99b347 Fix typing of node in BaseRunner init and other methods 2024-08-26 14:33:53 -05:00
Quigley Malcolm
7a80aa3a9f Add type hinting to error_result, ephemeral_result, and from_run_result of BaseRunner
There are two type errors in this commit. The type hinting of `RunResult.node` and the `node`
argument of `_build_run_result` are _different_! I tried to fix this in a previous commit
by going from `GraphMemberNode` too `ManifestNode` as `GraphMemberNode` was too broad. It seems
`ManifestNode` is too narrow unfortunately. Checking the typing of `RunResult.node`, which I
should have done sooner, it uses `ResultNode`, which makes sense.

The second type error is that the type hinting of `RunResult.status` and the `status` arguement
of `_build_run_result` are _different_. The correct typing we should be that of `RunResult.status`.
Both of these typing issues will be resolved in the next commit.
2024-08-26 14:33:53 -05:00
Quigley Malcolm
2092a70e83 Add type hinting for BaseRunner._build_run_result 2024-08-26 14:33:50 -05:00
Quigley Malcolm
01219eb037 Add typing for BaseRunner.run_with_hooks
As part of this I had to switch the typing of node from `GraphMemberNode`
to `ManifestNode`. We needed to do this be because `GraphMemberNode` was
_too_ wide. Specifically, `GraphMemberNode` includes the `Metric` node
which doesn't inherit from `NodeInfoMixin` and thus does not have a
`update_event_status` method. Unfortunately moving to `ManifestNode`
_excludes_ some nodes from the allowed typing that we'll probably later
on find we need to include. Maybe that means some nodes should be moved
to the `ManifestNode` definition. We might find that it would be more
appropriate however to add an `ExecutableNode` or `RunnableNode` protocol
definition, and then use that for typing.
2024-08-23 16:25:20 -05:00
Quigley Malcolm
d678d73b76 Add typing to BuildResult.get_result_status 2024-08-23 11:06:09 -05:00
Quigley Malcolm
a0c02919c8 Add typing to BaseRunner.__init__
The `BaseRunner` class is a widely inherited class, and it's untyped.
Sometimes the wrong thing gets passed into the classes that inherit
`BaseRunner`. The "subclasses" of `BaseRunner` generally further
restrict some of the available typing, (f.x. the type of node that
can be passed in). In order to properly type those subclasses, we
first should type `BaseRunner`. This is the start of that.
2024-08-23 10:58:21 -05:00
3 changed files with 93 additions and 45 deletions

View File

@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add type hinting to BaseRunner class to increase stability guarantees
time: 2024-08-26T14:18:43.834018-05:00
custom:
Author: QMalcolm
Issue: "10606"

View File

@@ -81,9 +81,12 @@ class FreshnessStatus(StrEnum):
RuntimeErr = NodeStatus.RuntimeErr
ResultStatus = Union[RunStatus, TestStatus, FreshnessStatus]
@dataclass
class BaseResult(dbtClassMixin):
status: Union[RunStatus, TestStatus, FreshnessStatus]
status: ResultStatus
timing: List[TimingInfo]
thread_id: str
execution_time: float

View File

@@ -6,14 +6,18 @@ from abc import ABCMeta, abstractmethod
from contextlib import nullcontext
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Union
from agate import Table
import dbt.exceptions
import dbt_common.exceptions.base
from dbt import tracking
from dbt.adapters.base.impl import BaseAdapter
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.results import (
NodeStatus,
ResultStatus,
RunningStatus,
RunStatus,
TimingInfo,
@@ -26,6 +30,8 @@ from dbt.config import RuntimeConfig
from dbt.config.profile import read_profile
from dbt.constants import DBT_PROJECT_FILE_NAME
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.results import BaseResult
from dbt.events.types import (
CatchableExceptionOnRun,
GenericExceptionOnRun,
@@ -159,24 +165,31 @@ class ExecutionContext:
timing information and the newest (compiled vs executed) form of the node.
"""
def __init__(self, node) -> None:
def __init__(self, node: ResultNode) -> None:
self.timing: List[TimingInfo] = []
self.node = node
self.node: ResultNode = node
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
self.config = config
self.compiler = Compiler(config)
self.adapter = adapter
self.node = node
self.node_index = node_index
self.num_nodes = num_nodes
def __init__(
self,
config: RuntimeConfig,
adapter: BaseAdapter,
node: ResultNode,
node_index: int,
num_nodes: int,
) -> None:
self.config: RuntimeConfig = config
self.compiler: Compiler = Compiler(config)
self.adapter: BaseAdapter = adapter
self.node: ResultNode = node
self.node_index: int = node_index
self.num_nodes: int = num_nodes
self.skip = False
self.skip: bool = False
self.skip_cause: Optional[RunResult] = None
self.run_ephemeral_models = False
self.run_ephemeral_models: bool = False
@abstractmethod
def compile(self, manifest: Manifest) -> Any:
@@ -185,7 +198,7 @@ class BaseRunner(metaclass=ABCMeta):
def _node_build_path(self) -> Optional[str]:
return self.node.build_path if hasattr(self.node, "build_path") else None
def get_result_status(self, result) -> Dict[str, str]:
def get_result_status(self, result: BaseResult) -> Dict[str, str]:
if result.status == NodeStatus.Error:
return {"node_status": "error", "node_error": str(result.message)}
elif result.status == NodeStatus.Skipped:
@@ -197,7 +210,7 @@ class BaseRunner(metaclass=ABCMeta):
else:
return {"node_status": "passed"}
def run_with_hooks(self, manifest):
def run_with_hooks(self, manifest: Manifest):
if self.skip:
return self.on_skip()
@@ -217,15 +230,15 @@ class BaseRunner(metaclass=ABCMeta):
def _build_run_result(
self,
node,
start_time,
status,
timing_info,
message,
agate_table=None,
adapter_response=None,
failures=None,
):
node: ResultNode,
start_time: float,
status: ResultStatus,
timing_info: List[TimingInfo],
message: Optional[str] = None,
agate_table: Optional[Table] = None,
adapter_response: Optional[Dict[str, Any]] = None,
failures: Optional[int] = None,
) -> RunResult:
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
if adapter_response is None:
@@ -242,7 +255,13 @@ class BaseRunner(metaclass=ABCMeta):
failures=failures,
)
def error_result(self, node, message, start_time, timing_info):
def error_result(
self,
node: ResultNode,
message: str,
start_time: float,
timing_info: List[TimingInfo],
) -> RunResult:
return self._build_run_result(
node=node,
start_time=start_time,
@@ -251,7 +270,12 @@ class BaseRunner(metaclass=ABCMeta):
message=message,
)
def ephemeral_result(self, node, start_time, timing_info):
def ephemeral_result(
self,
node: ResultNode,
start_time: float,
timing_info: List[TimingInfo],
) -> RunResult:
return self._build_run_result(
node=node,
start_time=start_time,
@@ -260,7 +284,12 @@ class BaseRunner(metaclass=ABCMeta):
message=None,
)
def from_run_result(self, result, start_time, timing_info):
def from_run_result(
self,
result: RunResult,
start_time: float,
timing_info: List[TimingInfo],
) -> RunResult:
return self._build_run_result(
node=result.node,
start_time=start_time,
@@ -272,7 +301,11 @@ class BaseRunner(metaclass=ABCMeta):
failures=result.failures,
)
def compile_and_execute(self, manifest, ctx):
def compile_and_execute(
self,
manifest: Manifest,
ctx: ExecutionContext,
) -> Optional[RunResult]:
result = None
with (
self.adapter.connection_named(self.node.unique_id, self.node)
@@ -305,7 +338,11 @@ class BaseRunner(metaclass=ABCMeta):
return result
def _handle_catchable_exception(self, e, ctx):
def _handle_catchable_exception(
self,
e: Union[CompilationError, DbtRuntimeError],
ctx: ExecutionContext,
) -> str:
if e.node is None:
e.add_node(ctx.node)
@@ -316,7 +353,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_internal_exception(self, e, ctx):
def _handle_internal_exception(self, e: DbtInternalError, ctx: ExecutionContext) -> str:
fire_event(
InternalErrorOnRun(
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
@@ -324,7 +361,7 @@ class BaseRunner(metaclass=ABCMeta):
)
return str(e)
def _handle_generic_exception(self, e, ctx):
def _handle_generic_exception(self, e: Exception, ctx: ExecutionContext) -> str:
fire_event(
GenericExceptionOnRun(
build_path=self._node_build_path(),
@@ -337,7 +374,7 @@ class BaseRunner(metaclass=ABCMeta):
return str(e)
def handle_exception(self, e, ctx):
def handle_exception(self, e, ctx) -> str:
catchable_errors = (CompilationError, DbtRuntimeError)
if isinstance(e, catchable_errors):
error = self._handle_catchable_exception(e, ctx)
@@ -347,7 +384,7 @@ class BaseRunner(metaclass=ABCMeta):
error = self._handle_generic_exception(e, ctx)
return error
def safe_run(self, manifest):
def safe_run(self, manifest: Manifest) -> RunResult:
started = time.time()
ctx = ExecutionContext(self.node)
error = None
@@ -378,7 +415,7 @@ class BaseRunner(metaclass=ABCMeta):
result = self.ephemeral_result(ctx.node, started, ctx.timing)
return result
def _safe_release_connection(self):
def _safe_release_connection(self) -> Optional[str]:
"""Try to release a connection. If an exception is hit, log and return
the error string.
"""
@@ -394,24 +431,24 @@ class BaseRunner(metaclass=ABCMeta):
return None
def before_execute(self):
raise NotImplementedError()
def before_execute(self) -> None:
raise NotImplementedError("The `before_execute` function hasn't been implemented")
def execute(self, compiled_node, manifest):
raise NotImplementedError()
def execute(self, compiled_node: ResultNode, manifest: Manifest) -> RunResult:
raise NotImplementedError(msg="The `execute` function hasn't been implemented")
def run(self, compiled_node, manifest):
def run(self, compiled_node: ResultNode, manifest: Manifest) -> RunResult:
return self.execute(compiled_node, manifest)
def after_execute(self, result):
raise NotImplementedError()
def after_execute(self, result: RunResult) -> None:
raise NotImplementedError("The `after_execute` function hasn't been implemented")
def _skip_caused_by_ephemeral_failure(self):
def _skip_caused_by_ephemeral_failure(self) -> bool:
if self.skip_cause is None or self.skip_cause.node is None:
return False
return self.skip_cause.node.is_ephemeral_model
def on_skip(self):
def on_skip(self) -> RunResult:
schema_name = getattr(self.node, "schema", "")
node_name = self.node.name
@@ -427,7 +464,9 @@ class BaseRunner(metaclass=ABCMeta):
relation=node_name,
index=self.node_index,
total=self.num_nodes,
status=self.skip_cause.status,
status=(
self.skip_cause.status if self.skip_cause is not None else "unknown"
),
)
)
# skip_cause here should be the run_result from the ephemeral model
@@ -461,7 +500,7 @@ class BaseRunner(metaclass=ABCMeta):
node_result = RunResult.from_node(self.node, RunStatus.Skipped, error_message)
return node_result
def do_skip(self, cause=None):
def do_skip(self, cause: Optional[RunResult] = None) -> None:
self.skip = True
self.skip_cause = cause