mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-21 15:01:28 +00:00
Compare commits
6 Commits
enable-pos
...
qmalcolm--
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6358aff386 | ||
|
|
9731eaaeb0 | ||
|
|
43d1720986 | ||
|
|
de43f599c0 | ||
|
|
43945d68a1 | ||
|
|
c63dd9d294 |
@@ -644,96 +644,109 @@ class RunTask(CompileTask):
|
|||||||
hooks.sort(key=self._hook_keyfunc)
|
hooks.sort(key=self._hook_keyfunc)
|
||||||
return hooks
|
return hooks
|
||||||
|
|
||||||
|
def _safe_run_hook(
|
||||||
|
self,
|
||||||
|
adapter: BaseAdapter,
|
||||||
|
hook: HookNode,
|
||||||
|
hook_name: str,
|
||||||
|
timing: List[TimingInfo],
|
||||||
|
num_hooks: int,
|
||||||
|
extra_context: Dict[str, Any],
|
||||||
|
) -> Tuple[RunStatus, str, float]:
|
||||||
|
with collect_timing_info("compile", timing.append):
|
||||||
|
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)
|
||||||
|
|
||||||
|
started_at = timing[0].started_at or datetime.utcnow()
|
||||||
|
hook.update_event_status(
|
||||||
|
started_at=started_at.isoformat(), node_status=RunningStatus.Started
|
||||||
|
)
|
||||||
|
|
||||||
|
fire_event(
|
||||||
|
LogHookStartLine(
|
||||||
|
statement=hook_name,
|
||||||
|
index=hook.index,
|
||||||
|
total=num_hooks,
|
||||||
|
node_info=hook.node_info,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
with collect_timing_info("execute", timing.append):
|
||||||
|
status, message = get_execution_status(sql, adapter)
|
||||||
|
|
||||||
|
finished_at = timing[1].completed_at or datetime.utcnow()
|
||||||
|
hook.update_event_status(finished_at=finished_at.isoformat())
|
||||||
|
execution_time = (finished_at - started_at).total_seconds()
|
||||||
|
|
||||||
|
if status == RunStatus.Success:
|
||||||
|
message = f"{hook_name} passed"
|
||||||
|
else:
|
||||||
|
message = f"{hook_name} failed, error:\n {message}"
|
||||||
|
|
||||||
|
return (status, message, execution_time)
|
||||||
|
|
||||||
def safe_run_hooks(
|
def safe_run_hooks(
|
||||||
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
|
self,
|
||||||
|
adapter: BaseAdapter,
|
||||||
|
hook_type: RunHookType,
|
||||||
|
extra_context: Dict[str, Any],
|
||||||
) -> RunStatus:
|
) -> RunStatus:
|
||||||
ordered_hooks = self.get_hooks_by_type(hook_type)
|
ordered_hooks = self.get_hooks_by_type(hook_type)
|
||||||
|
|
||||||
if hook_type == RunHookType.End and ordered_hooks:
|
|
||||||
fire_event(Formatting(""))
|
|
||||||
|
|
||||||
# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
|
|
||||||
adapter.clear_transaction()
|
|
||||||
if not ordered_hooks:
|
|
||||||
return RunStatus.Success
|
|
||||||
|
|
||||||
status = RunStatus.Success
|
status = RunStatus.Success
|
||||||
failed = False
|
|
||||||
num_hooks = len(ordered_hooks)
|
|
||||||
|
|
||||||
for idx, hook in enumerate(ordered_hooks, 1):
|
if ordered_hooks:
|
||||||
with log_contextvars(node_info=hook.node_info):
|
if hook_type == RunHookType.End:
|
||||||
hook.index = idx
|
fire_event(Formatting(""))
|
||||||
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
|
|
||||||
execution_time = 0.0
|
|
||||||
timing: List[TimingInfo] = []
|
|
||||||
failures = 1
|
|
||||||
|
|
||||||
if not failed:
|
# on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created.
|
||||||
with collect_timing_info("compile", timing.append):
|
adapter.clear_transaction()
|
||||||
sql = self.get_hook_sql(
|
|
||||||
adapter, hook, hook.index, num_hooks, extra_context
|
num_hooks = len(ordered_hooks)
|
||||||
|
|
||||||
|
for idx, hook in enumerate(ordered_hooks, 1):
|
||||||
|
with log_contextvars(node_info=hook.node_info):
|
||||||
|
hook.index = idx
|
||||||
|
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
|
||||||
|
execution_time = 0.0
|
||||||
|
timing: List[TimingInfo] = []
|
||||||
|
|
||||||
|
# Only run this hook if the previous hook succeeded
|
||||||
|
# otherwise, skip it
|
||||||
|
if status == RunStatus.Success:
|
||||||
|
(status, message, execution_time) = self._safe_run_hook(
|
||||||
|
adapter, hook, hook_name, timing, num_hooks, extra_context
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
status = RunStatus.Skipped
|
||||||
|
message = f"{hook_name} skipped"
|
||||||
|
|
||||||
started_at = timing[0].started_at or datetime.utcnow()
|
hook.update_event_status(node_status=status)
|
||||||
hook.update_event_status(
|
|
||||||
started_at=started_at.isoformat(), node_status=RunningStatus.Started
|
self.node_results.append(
|
||||||
|
RunResult(
|
||||||
|
status=status,
|
||||||
|
thread_id="main",
|
||||||
|
timing=timing,
|
||||||
|
message=message,
|
||||||
|
adapter_response={},
|
||||||
|
execution_time=execution_time,
|
||||||
|
failures=0 if status == RunStatus.Success else 1,
|
||||||
|
node=hook,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
fire_event(
|
fire_event(
|
||||||
LogHookStartLine(
|
LogHookEndLine(
|
||||||
statement=hook_name,
|
statement=hook_name,
|
||||||
|
status=status,
|
||||||
index=hook.index,
|
index=hook.index,
|
||||||
total=num_hooks,
|
total=num_hooks,
|
||||||
|
execution_time=execution_time,
|
||||||
node_info=hook.node_info,
|
node_info=hook.node_info,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
with collect_timing_info("execute", timing.append):
|
if hook_type == RunHookType.Start:
|
||||||
status, message = get_execution_status(sql, adapter)
|
fire_event(Formatting(""))
|
||||||
|
|
||||||
finished_at = timing[1].completed_at or datetime.utcnow()
|
|
||||||
hook.update_event_status(finished_at=finished_at.isoformat())
|
|
||||||
execution_time = (finished_at - started_at).total_seconds()
|
|
||||||
failures = 0 if status == RunStatus.Success else 1
|
|
||||||
|
|
||||||
if status == RunStatus.Success:
|
|
||||||
message = f"{hook_name} passed"
|
|
||||||
else:
|
|
||||||
message = f"{hook_name} failed, error:\n {message}"
|
|
||||||
failed = True
|
|
||||||
else:
|
|
||||||
status = RunStatus.Skipped
|
|
||||||
message = f"{hook_name} skipped"
|
|
||||||
|
|
||||||
hook.update_event_status(node_status=status)
|
|
||||||
|
|
||||||
self.node_results.append(
|
|
||||||
RunResult(
|
|
||||||
status=status,
|
|
||||||
thread_id="main",
|
|
||||||
timing=timing,
|
|
||||||
message=message,
|
|
||||||
adapter_response={},
|
|
||||||
execution_time=execution_time,
|
|
||||||
failures=failures,
|
|
||||||
node=hook,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
fire_event(
|
|
||||||
LogHookEndLine(
|
|
||||||
statement=hook_name,
|
|
||||||
status=status,
|
|
||||||
index=hook.index,
|
|
||||||
total=num_hooks,
|
|
||||||
execution_time=execution_time,
|
|
||||||
node_info=hook.node_info,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if hook_type == RunHookType.Start and ordered_hooks:
|
|
||||||
fire_event(Formatting(""))
|
|
||||||
|
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user