forked from repo-mirrors/dbt-core
[Tidy first] move microbatch compilation to .compile method (#11063)
This commit is contained in:
@@ -283,7 +283,6 @@ class ModelRunner(CompileRunner):
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -328,9 +327,7 @@ class ModelRunner(CompileRunner):
|
||||
|
||||
hook_ctx = self.adapter.pre_model_hook(context_config)
|
||||
|
||||
return self._execute_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
|
||||
|
||||
|
||||
class MicrobatchModelRunner(ModelRunner):
|
||||
@@ -342,10 +339,30 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
self.relation_exists: bool = False
|
||||
|
||||
def compile(self, manifest: Manifest):
|
||||
# The default compile function is _always_ called. However, we do our
|
||||
# compilation _later_ in `_execute_microbatch_materialization`. This
|
||||
# meant the node was being compiled _twice_ for each batch. To get around
|
||||
# this, we've overriden the default compile method to do nothing
|
||||
if self.batch_idx is not None:
|
||||
batch = self.batches[self.batch_idx]
|
||||
|
||||
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
|
||||
# TODO: REMOVE before 1.10 GA
|
||||
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
# Create batch context on model node prior to re-compiling
|
||||
self.node.batch = BatchContext(
|
||||
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
|
||||
event_time_start=batch[0],
|
||||
event_time_end=batch[1],
|
||||
)
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
self.node,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], self.node.config.batch_size
|
||||
),
|
||||
)
|
||||
|
||||
# Skips compilation for non-batch runs
|
||||
return self.node
|
||||
|
||||
def set_batch_idx(self, batch_idx: int) -> None:
|
||||
@@ -502,7 +519,6 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def _execute_microbatch_materialization(
|
||||
self,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -537,25 +553,6 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
# call materialization_macro to get a batch-level run result
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
|
||||
# TODO: REMOVE before 1.10 GA
|
||||
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
# Create batch context on model node prior to re-compiling
|
||||
model.batch = BatchContext(
|
||||
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
|
||||
event_time_start=batch[0],
|
||||
event_time_end=batch[1],
|
||||
)
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
model,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], model.config.batch_size
|
||||
),
|
||||
)
|
||||
# Update jinja context with batch context members
|
||||
jinja_context = microbatch_builder.build_jinja_context_for_batch(
|
||||
incremental_batch=self.relation_exists
|
||||
@@ -643,36 +640,22 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
else:
|
||||
return False
|
||||
|
||||
def _execute_microbatch_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, manifest, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
def _execute_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
return self._execute_microbatch_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
|
||||
class RunTask(CompileTask):
|
||||
|
||||
Reference in New Issue
Block a user