mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-20 15:01:28 +00:00
Compare commits
6 Commits
enable-pos
...
qmalcolm--
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6358aff386 | ||
|
|
9731eaaeb0 | ||
|
|
43d1720986 | ||
|
|
de43f599c0 | ||
|
|
43945d68a1 | ||
|
|
c63dd9d294 |
@@ -644,96 +644,109 @@ class RunTask(CompileTask):
|
||||
hooks.sort(key=self._hook_keyfunc)
|
||||
return hooks
|
||||
|
||||
def _safe_run_hook(
|
||||
self,
|
||||
adapter: BaseAdapter,
|
||||
hook: HookNode,
|
||||
hook_name: str,
|
||||
timing: List[TimingInfo],
|
||||
num_hooks: int,
|
||||
extra_context: Dict[str, Any],
|
||||
) -> Tuple[RunStatus, str, float]:
|
||||
with collect_timing_info("compile", timing.append):
|
||||
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)
|
||||
|
||||
started_at = timing[0].started_at or datetime.utcnow()
|
||||
hook.update_event_status(
|
||||
started_at=started_at.isoformat(), node_status=RunningStatus.Started
|
||||
)
|
||||
|
||||
fire_event(
|
||||
LogHookStartLine(
|
||||
statement=hook_name,
|
||||
index=hook.index,
|
||||
total=num_hooks,
|
||||
node_info=hook.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
with collect_timing_info("execute", timing.append):
|
||||
status, message = get_execution_status(sql, adapter)
|
||||
|
||||
finished_at = timing[1].completed_at or datetime.utcnow()
|
||||
hook.update_event_status(finished_at=finished_at.isoformat())
|
||||
execution_time = (finished_at - started_at).total_seconds()
|
||||
|
||||
if status == RunStatus.Success:
|
||||
message = f"{hook_name} passed"
|
||||
else:
|
||||
message = f"{hook_name} failed, error:\n {message}"
|
||||
|
||||
return (status, message, execution_time)
|
||||
|
||||
def safe_run_hooks(
|
||||
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
|
||||
self,
|
||||
adapter: BaseAdapter,
|
||||
hook_type: RunHookType,
|
||||
extra_context: Dict[str, Any],
|
||||
) -> RunStatus:
|
||||
ordered_hooks = self.get_hooks_by_type(hook_type)
|
||||
|
||||
if hook_type == RunHookType.End and ordered_hooks:
|
||||
fire_event(Formatting(""))
|
||||
|
||||
# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
|
||||
adapter.clear_transaction()
|
||||
if not ordered_hooks:
|
||||
return RunStatus.Success
|
||||
|
||||
status = RunStatus.Success
|
||||
failed = False
|
||||
num_hooks = len(ordered_hooks)
|
||||
|
||||
for idx, hook in enumerate(ordered_hooks, 1):
|
||||
with log_contextvars(node_info=hook.node_info):
|
||||
hook.index = idx
|
||||
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
|
||||
execution_time = 0.0
|
||||
timing: List[TimingInfo] = []
|
||||
failures = 1
|
||||
if ordered_hooks:
|
||||
if hook_type == RunHookType.End:
|
||||
fire_event(Formatting(""))
|
||||
|
||||
if not failed:
|
||||
with collect_timing_info("compile", timing.append):
|
||||
sql = self.get_hook_sql(
|
||||
adapter, hook, hook.index, num_hooks, extra_context
|
||||
# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
|
||||
adapter.clear_transaction()
|
||||
|
||||
num_hooks = len(ordered_hooks)
|
||||
|
||||
for idx, hook in enumerate(ordered_hooks, 1):
|
||||
with log_contextvars(node_info=hook.node_info):
|
||||
hook.index = idx
|
||||
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
|
||||
execution_time = 0.0
|
||||
timing: List[TimingInfo] = []
|
||||
|
||||
# Only run this hook if the previous hook succeeded
|
||||
# otherwise, skip it
|
||||
if status == RunStatus.Success:
|
||||
(status, message, execution_time) = self._safe_run_hook(
|
||||
adapter, hook, hook_name, timing, num_hooks, extra_context
|
||||
)
|
||||
else:
|
||||
status = RunStatus.Skipped
|
||||
message = f"{hook_name} skipped"
|
||||
|
||||
started_at = timing[0].started_at or datetime.utcnow()
|
||||
hook.update_event_status(
|
||||
started_at=started_at.isoformat(), node_status=RunningStatus.Started
|
||||
hook.update_event_status(node_status=status)
|
||||
|
||||
self.node_results.append(
|
||||
RunResult(
|
||||
status=status,
|
||||
thread_id="main",
|
||||
timing=timing,
|
||||
message=message,
|
||||
adapter_response={},
|
||||
execution_time=execution_time,
|
||||
failures=0 if status == RunStatus.Success else 1,
|
||||
node=hook,
|
||||
)
|
||||
)
|
||||
|
||||
fire_event(
|
||||
LogHookStartLine(
|
||||
LogHookEndLine(
|
||||
statement=hook_name,
|
||||
status=status,
|
||||
index=hook.index,
|
||||
total=num_hooks,
|
||||
execution_time=execution_time,
|
||||
node_info=hook.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
with collect_timing_info("execute", timing.append):
|
||||
status, message = get_execution_status(sql, adapter)
|
||||
|
||||
finished_at = timing[1].completed_at or datetime.utcnow()
|
||||
hook.update_event_status(finished_at=finished_at.isoformat())
|
||||
execution_time = (finished_at - started_at).total_seconds()
|
||||
failures = 0 if status == RunStatus.Success else 1
|
||||
|
||||
if status == RunStatus.Success:
|
||||
message = f"{hook_name} passed"
|
||||
else:
|
||||
message = f"{hook_name} failed, error:\n {message}"
|
||||
failed = True
|
||||
else:
|
||||
status = RunStatus.Skipped
|
||||
message = f"{hook_name} skipped"
|
||||
|
||||
hook.update_event_status(node_status=status)
|
||||
|
||||
self.node_results.append(
|
||||
RunResult(
|
||||
status=status,
|
||||
thread_id="main",
|
||||
timing=timing,
|
||||
message=message,
|
||||
adapter_response={},
|
||||
execution_time=execution_time,
|
||||
failures=failures,
|
||||
node=hook,
|
||||
)
|
||||
)
|
||||
|
||||
fire_event(
|
||||
LogHookEndLine(
|
||||
statement=hook_name,
|
||||
status=status,
|
||||
index=hook.index,
|
||||
total=num_hooks,
|
||||
execution_time=execution_time,
|
||||
node_info=hook.node_info,
|
||||
)
|
||||
)
|
||||
|
||||
if hook_type == RunHookType.Start and ordered_hooks:
|
||||
fire_event(Formatting(""))
|
||||
if hook_type == RunHookType.Start:
|
||||
fire_event(Formatting(""))
|
||||
|
||||
return status
|
||||
|
||||
|
||||
Reference in New Issue
Block a user