mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Propagate exceptions for NodeFinished callbacks in dbtRunner (#12286)
This commit is contained in:
6
.changes/unreleased/Fixes-20251216-120727.yaml
Normal file
6
.changes/unreleased/Fixes-20251216-120727.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Fixes
|
||||
body: ':bug: :snowman:Propagate exceptions for NodeFinished callbacks in dbtRunner'
|
||||
time: 2025-12-16T12:07:27.576087-05:00
|
||||
custom:
|
||||
Author: michelleark
|
||||
Issue: "11612"
|
||||
@@ -249,34 +249,17 @@ class GraphRunnableTask(ConfiguredTask):
|
||||
thread_exception = e
|
||||
finally:
|
||||
if result is not None:
|
||||
fire_event(
|
||||
NodeFinished(
|
||||
node_info=runner.node.node_info,
|
||||
run_result=result.to_msg_dict(),
|
||||
try:
|
||||
fire_event(
|
||||
NodeFinished(
|
||||
node_info=runner.node.node_info,
|
||||
run_result=result.to_msg_dict(),
|
||||
)
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
result = self._handle_thread_exception(runner, e)
|
||||
else:
|
||||
msg = f"Exception on worker thread. {thread_exception}"
|
||||
|
||||
fire_event(
|
||||
GenericExceptionOnRun(
|
||||
unique_id=runner.node.unique_id,
|
||||
exc=str(thread_exception),
|
||||
node_info=runner.node.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
result = RunResult(
|
||||
status=RunStatus.Error, # type: ignore
|
||||
timing=[],
|
||||
thread_id="",
|
||||
execution_time=0.0,
|
||||
adapter_response={},
|
||||
message=msg,
|
||||
failures=None,
|
||||
batch_results=None,
|
||||
node=runner.node,
|
||||
)
|
||||
result = self._handle_thread_exception(runner, thread_exception)
|
||||
|
||||
# `_event_status` dict is only used for logging. Make sure
|
||||
# it gets deleted when we're done with it
|
||||
@@ -365,6 +348,32 @@ class GraphRunnableTask(ConfiguredTask):
|
||||
args = [runner]
|
||||
self._submit(pool, args, callback)
|
||||
|
||||
def _handle_thread_exception(
|
||||
self,
|
||||
runner: BaseRunner,
|
||||
thread_exception: Optional[Union[KeyboardInterrupt, SystemExit, Exception]],
|
||||
) -> RunResult:
|
||||
msg = f"Exception on worker thread. {thread_exception}"
|
||||
fire_event(
|
||||
GenericExceptionOnRun(
|
||||
unique_id=runner.node.unique_id,
|
||||
exc=str(thread_exception),
|
||||
node_info=runner.node.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
return RunResult(
|
||||
status=RunStatus.Error, # type: ignore
|
||||
timing=[],
|
||||
thread_id="",
|
||||
execution_time=0.0,
|
||||
adapter_response={},
|
||||
message=msg,
|
||||
failures=None,
|
||||
batch_results=None,
|
||||
node=runner.node,
|
||||
)
|
||||
|
||||
def _handle_result(self, result: RunResult) -> None:
|
||||
"""Mark the result as completed, insert the `CompileResultNode` into
|
||||
the manifest, and mark any descendants (potentially with a 'cause' if
|
||||
|
||||
@@ -55,6 +55,22 @@ class TestDbtRunner:
|
||||
dbt.invoke(["debug"])
|
||||
mock_callback.assert_called()
|
||||
|
||||
def test_callback_node_finished_exceptions_are_raised(self, project):
|
||||
from dbt_common.events.base_types import EventMsg
|
||||
|
||||
def callback_with_exception(event: EventMsg):
|
||||
if event.info.name == "NodeFinished":
|
||||
raise Exception("This should let continue the execution registering the failure")
|
||||
|
||||
dbt = dbtRunner(callbacks=[callback_with_exception])
|
||||
result = dbt.invoke(["run", "--select", "models"])
|
||||
|
||||
assert result is not None
|
||||
assert (
|
||||
result.result.results[0].message
|
||||
== "Exception on worker thread. This should let continue the execution registering the failure"
|
||||
)
|
||||
|
||||
def test_invoke_kwargs(self, project, dbt):
|
||||
res = dbt.invoke(
|
||||
["run"],
|
||||
|
||||
Reference in New Issue
Block a user