move microbatch compilation to .compile method

This commit is contained in:
Michelle Ark
2024-11-27 15:26:24 -05:00
parent ebe46f1bd2
commit b3e270351b

View File

@@ -342,10 +342,30 @@ class MicrobatchModelRunner(ModelRunner):
self.relation_exists: bool = False
def compile(self, manifest: Manifest):
# The default compile function is _always_ called. However, we do our
# compilation _later_ in `_execute_microbatch_materialization`. This
# meant the node was being compiled _twice_ for each batch. To get around
# this, we've overriden the default compile method to do nothing
if self.batch_idx is not None:
batch = self.batches[self.batch_idx]
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
self.node.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
self.node,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], self.node.config.batch_size
),
)
# Skips compilation for non-batch runs
return self.node
def set_batch_idx(self, batch_idx: int) -> None:
@@ -537,25 +557,6 @@ class MicrobatchModelRunner(ModelRunner):
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
model.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
# Update jinja context with batch context members
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists