forked from repo-mirrors/dbt-core
Compare commits
4 Commits
artifact-m
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b7d9b5704 | ||
|
|
c3d87b89fb | ||
|
|
0f084e16ca | ||
|
|
3464be7f70 |
6
.changes/unreleased/Features-20241121-125630.yaml
Normal file
6
.changes/unreleased/Features-20241121-125630.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Add `batch` context object to model jinja context
|
||||
time: 2024-11-21T12:56:30.715473-06:00
|
||||
custom:
|
||||
Author: QMalcolm
|
||||
Issue: "11025"
|
||||
6
.changes/unreleased/Fixes-20240822-122132.yaml
Normal file
6
.changes/unreleased/Fixes-20240822-122132.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Fixes
|
||||
body: dbt retry does not respect --threads
|
||||
time: 2024-08-22T12:21:32.358066+05:30
|
||||
custom:
|
||||
Author: donjin-master
|
||||
Issue: "10584"
|
||||
@@ -244,9 +244,10 @@ class BaseResolver(metaclass=abc.ABCMeta):
|
||||
and self.model.config.materialized == "incremental"
|
||||
and self.model.config.incremental_strategy == "microbatch"
|
||||
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
|
||||
and self.model.batch is not None
|
||||
):
|
||||
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
|
||||
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
|
||||
start = self.model.batch.event_time_start
|
||||
end = self.model.batch.event_time_end
|
||||
|
||||
if start is not None or end is not None:
|
||||
event_time_filter = EventTimeFilter(
|
||||
|
||||
@@ -93,6 +93,7 @@ from dbt_common.contracts.constraints import (
|
||||
ConstraintType,
|
||||
ModelLevelConstraint,
|
||||
)
|
||||
from dbt_common.dataclass_schema import dbtClassMixin
|
||||
from dbt_common.events.contextvars import set_log_contextvars
|
||||
from dbt_common.events.functions import warn_or_error
|
||||
|
||||
@@ -442,15 +443,38 @@ class HookNode(HookNodeResource, CompiledNode):
|
||||
return HookNodeResource
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchContext(dbtClassMixin):
|
||||
id: str
|
||||
event_time_start: datetime
|
||||
event_time_end: datetime
|
||||
|
||||
def __post_serialize__(self, data, context):
|
||||
# This is insane, but necessary, I apologize. Mashumaro handles the
|
||||
# dictification of this class via a compile time generated `to_dict`
|
||||
# method based off of the _typing_ of th class. By default `datetime`
|
||||
# types are converted to strings. We don't want that, we want them to
|
||||
# stay datetimes.
|
||||
# Note: This is safe because the `BatchContext` isn't part of the artifact
|
||||
# and thus doesn't get written out.
|
||||
new_data = super().__post_serialize__(data, context)
|
||||
new_data["event_time_start"] = self.event_time_start
|
||||
new_data["event_time_end"] = self.event_time_end
|
||||
return new_data
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelNode(ModelResource, CompiledNode):
|
||||
batch_info: Optional[BatchResults] = None
|
||||
previous_batch_results: Optional[BatchResults] = None
|
||||
batch: Optional[BatchContext] = None
|
||||
_has_this: Optional[bool] = None
|
||||
|
||||
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
|
||||
dct = super().__post_serialize__(dct, context)
|
||||
if "_has_this" in dct:
|
||||
del dct["_has_this"]
|
||||
if "previous_batch_results" in dct:
|
||||
del dct["previous_batch_results"]
|
||||
return dct
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -100,25 +100,25 @@ class MicrobatchBuilder:
|
||||
|
||||
return batches
|
||||
|
||||
def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
|
||||
def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
|
||||
"""
|
||||
Create context with entries that reflect microbatch model + incremental execution state
|
||||
|
||||
Assumes self.model has been (re)-compiled with necessary batch filters applied.
|
||||
"""
|
||||
batch_context: Dict[str, Any] = {}
|
||||
jinja_context: Dict[str, Any] = {}
|
||||
|
||||
# Microbatch model properties
|
||||
batch_context["model"] = self.model.to_dict()
|
||||
batch_context["sql"] = self.model.compiled_code
|
||||
batch_context["compiled_code"] = self.model.compiled_code
|
||||
jinja_context["model"] = self.model.to_dict()
|
||||
jinja_context["sql"] = self.model.compiled_code
|
||||
jinja_context["compiled_code"] = self.model.compiled_code
|
||||
|
||||
# Add incremental context variables for batches running incrementally
|
||||
if incremental_batch:
|
||||
batch_context["is_incremental"] = lambda: True
|
||||
batch_context["should_full_refresh"] = lambda: False
|
||||
jinja_context["is_incremental"] = lambda: True
|
||||
jinja_context["should_full_refresh"] = lambda: False
|
||||
|
||||
return batch_context
|
||||
return jinja_context
|
||||
|
||||
@staticmethod
|
||||
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
|
||||
@@ -193,12 +193,11 @@ class MicrobatchBuilder:
|
||||
return truncated
|
||||
|
||||
@staticmethod
|
||||
def format_batch_start(
|
||||
batch_start: Optional[datetime], batch_size: BatchSize
|
||||
) -> Optional[str]:
|
||||
if batch_start is None:
|
||||
return batch_start
|
||||
def batch_id(start_time: datetime, batch_size: BatchSize) -> str:
|
||||
return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "")
|
||||
|
||||
@staticmethod
|
||||
def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str:
|
||||
return str(
|
||||
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
|
||||
)
|
||||
|
||||
@@ -42,7 +42,7 @@ IGNORE_PARENT_FLAGS = {
|
||||
"warn_error",
|
||||
}
|
||||
|
||||
ALLOW_CLI_OVERRIDE_FLAGS = {"vars"}
|
||||
ALLOW_CLI_OVERRIDE_FLAGS = {"vars", "threads"}
|
||||
|
||||
TASK_DICT = {
|
||||
"build": BuildTask,
|
||||
@@ -136,7 +136,7 @@ class RetryTask(ConfiguredTask):
|
||||
# batch info if there were _no_ successful batches previously. This is
|
||||
# because passing the batch info _forces_ the microbatch process into
|
||||
# _incremental_ model, and it may be that we need to be in full refresh
|
||||
# mode which is only handled if batch_info _isn't_ passed for a node
|
||||
# mode which is only handled if previous_batch_results _isn't_ passed for a node
|
||||
batch_map = {
|
||||
result.unique_id: result.batch_results
|
||||
for result in self.previous_results.results
|
||||
|
||||
@@ -27,7 +27,7 @@ from dbt.clients.jinja import MacroGenerator
|
||||
from dbt.config import RuntimeConfig
|
||||
from dbt.context.providers import generate_runtime_model_context
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
|
||||
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
|
||||
from dbt.events.types import (
|
||||
GenericExceptionOnRun,
|
||||
LogHookEndLine,
|
||||
@@ -283,7 +283,6 @@ class ModelRunner(CompileRunner):
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -328,9 +327,7 @@ class ModelRunner(CompileRunner):
|
||||
|
||||
hook_ctx = self.adapter.pre_model_hook(context_config)
|
||||
|
||||
return self._execute_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
|
||||
|
||||
|
||||
class MicrobatchModelRunner(ModelRunner):
|
||||
@@ -341,6 +338,33 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
self.batches: Dict[int, BatchType] = {}
|
||||
self.relation_exists: bool = False
|
||||
|
||||
def compile(self, manifest: Manifest):
|
||||
if self.batch_idx is not None:
|
||||
batch = self.batches[self.batch_idx]
|
||||
|
||||
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
|
||||
# TODO: REMOVE before 1.10 GA
|
||||
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
# Create batch context on model node prior to re-compiling
|
||||
self.node.batch = BatchContext(
|
||||
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
|
||||
event_time_start=batch[0],
|
||||
event_time_end=batch[1],
|
||||
)
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
self.node,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], self.node.config.batch_size
|
||||
),
|
||||
)
|
||||
|
||||
# Skips compilation for non-batch runs
|
||||
return self.node
|
||||
|
||||
def set_batch_idx(self, batch_idx: int) -> None:
|
||||
self.batch_idx = batch_idx
|
||||
|
||||
@@ -353,7 +377,7 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def describe_node(self) -> str:
|
||||
return f"{self.node.language} microbatch model {self.get_node_representation()}"
|
||||
|
||||
def describe_batch(self, batch_start: Optional[datetime]) -> str:
|
||||
def describe_batch(self, batch_start: datetime) -> str:
|
||||
# Only visualize date if batch_start year/month/day
|
||||
formatted_batch_start = MicrobatchBuilder.format_batch_start(
|
||||
batch_start, self.node.config.batch_size
|
||||
@@ -445,8 +469,8 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
result.batch_results.failed = sorted(result.batch_results.failed)
|
||||
|
||||
# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
|
||||
if self.node.batch_info is not None:
|
||||
result.batch_results.successful += self.node.batch_info.successful
|
||||
if self.node.previous_batch_results is not None:
|
||||
result.batch_results.successful += self.node.previous_batch_results.successful
|
||||
|
||||
def _build_succesful_run_batch_result(
|
||||
self,
|
||||
@@ -495,7 +519,6 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def _execute_microbatch_materialization(
|
||||
self,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -508,15 +531,15 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
)
|
||||
|
||||
if self.batch_idx is None:
|
||||
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
|
||||
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
|
||||
# IFF `dbt retry` is being run and the microbatch model had batches which
|
||||
# failed on the run of the model (which is being retried)
|
||||
if model.batch_info is None:
|
||||
if model.previous_batch_results is None:
|
||||
end = microbatch_builder.build_end_time()
|
||||
start = microbatch_builder.build_start_time(end)
|
||||
batches = microbatch_builder.build_batches(start, end)
|
||||
else:
|
||||
batches = model.batch_info.failed
|
||||
batches = model.previous_batch_results.failed
|
||||
# If there is batch info, then don't run as full_refresh and do force is_incremental
|
||||
# not doing this risks blowing away the work that has already been done
|
||||
if self._has_relation(model=model):
|
||||
@@ -530,24 +553,11 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
# call materialization_macro to get a batch-level run result
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
# Set start/end in context prior to re-compiling
|
||||
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
model,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], model.config.batch_size
|
||||
),
|
||||
)
|
||||
# Update jinja context with batch context members
|
||||
batch_context = microbatch_builder.build_batch_context(
|
||||
jinja_context = microbatch_builder.build_jinja_context_for_batch(
|
||||
incremental_batch=self.relation_exists
|
||||
)
|
||||
context.update(batch_context)
|
||||
context.update(jinja_context)
|
||||
|
||||
# Materialize batch and cache any materialized relations
|
||||
result = MacroGenerator(
|
||||
@@ -630,36 +640,22 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
else:
|
||||
return False
|
||||
|
||||
def _execute_microbatch_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, manifest, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
def _execute_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
return self._execute_microbatch_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
|
||||
class RunTask(CompileTask):
|
||||
@@ -885,7 +881,7 @@ class RunTask(CompileTask):
|
||||
if uid in self.batch_map:
|
||||
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
|
||||
if isinstance(node, ModelNode):
|
||||
node.batch_info = self.batch_map[uid]
|
||||
node.previous_batch_results = self.batch_map[uid]
|
||||
|
||||
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
|
||||
with adapter.connection_named("master"):
|
||||
|
||||
@@ -64,8 +64,8 @@ microbatch_yearly_model_downstream_sql = """
|
||||
select * from {{ ref('microbatch_model') }}
|
||||
"""
|
||||
|
||||
invalid_batch_context_macro_sql = """
|
||||
{% macro check_invalid_batch_context() %}
|
||||
invalid_batch_jinja_context_macro_sql = """
|
||||
{% macro check_invalid_batch_jinja_context() %}
|
||||
|
||||
{% if model is not mapping %}
|
||||
{{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }}
|
||||
@@ -83,9 +83,9 @@ invalid_batch_context_macro_sql = """
|
||||
"""
|
||||
|
||||
microbatch_model_with_context_checks_sql = """
|
||||
{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
{{ config(pre_hook="{{ check_invalid_batch_jinja_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
|
||||
{{ check_invalid_batch_context() }}
|
||||
{{ check_invalid_batch_jinja_context() }}
|
||||
select * from {{ ref('input_model') }}
|
||||
"""
|
||||
|
||||
@@ -404,7 +404,7 @@ class TestMicrobatchJinjaContext(BaseMicrobatchTest):
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def macros(self):
|
||||
return {"check_batch_context.sql": invalid_batch_context_macro_sql}
|
||||
return {"check_batch_jinja_context.sql": invalid_batch_jinja_context_macro_sql}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
@@ -498,6 +498,13 @@ microbatch_model_context_vars = """
|
||||
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
|
||||
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
|
||||
{% if model.batch %}
|
||||
{{ log("batch.event_time_start: "~ model.batch.event_time_start, info=True)}}
|
||||
{{ log("batch.event_time_end: "~ model.batch.event_time_end, info=True)}}
|
||||
{{ log("batch.id: "~ model.batch.id, info=True)}}
|
||||
{{ log("start timezone: "~ model.batch.event_time_start.tzinfo, info=True)}}
|
||||
{{ log("end timezone: "~ model.batch.event_time_end.tzinfo, info=True)}}
|
||||
{% endif %}
|
||||
select * from {{ ref('input_model') }}
|
||||
"""
|
||||
|
||||
@@ -516,12 +523,23 @@ class TestMicrobatchJinjaContextVarsAvailable(BaseMicrobatchTest):
|
||||
|
||||
assert "start: 2020-01-01 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-01 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.id: 20200101" in logs
|
||||
assert "start timezone: UTC" in logs
|
||||
assert "end timezone: UTC" in logs
|
||||
|
||||
assert "start: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.id: 20200102" in logs
|
||||
|
||||
assert "start: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-03 13:57:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-03 13:57:00+00:00" in logs
|
||||
assert "batch.id: 20200103" in logs
|
||||
|
||||
|
||||
microbatch_model_failing_incremental_partition_sql = """
|
||||
@@ -675,16 +693,6 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
|
||||
with patch_microbatch_end_time("2020-01-03 13:57:00"):
|
||||
run_dbt(["run"])
|
||||
|
||||
# Compiled paths - compiled model without filter only
|
||||
assert read_file(
|
||||
project.project_root,
|
||||
"target",
|
||||
"compiled",
|
||||
"test",
|
||||
"models",
|
||||
"microbatch_model.sql",
|
||||
)
|
||||
|
||||
# Compiled paths - batch compilations
|
||||
assert read_file(
|
||||
project.project_root,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
models__sample_model = """select 1 as id, baz as foo"""
|
||||
models__second_model = """select 1 as id, 2 as bar"""
|
||||
models__thread_model = """select idx as id"""
|
||||
|
||||
models__union_model = """
|
||||
select foo + bar as sum3 from {{ ref('sample_model') }}
|
||||
@@ -58,3 +59,13 @@ models:
|
||||
data_tests:
|
||||
- not_null
|
||||
"""
|
||||
|
||||
schema_test_thread_yml = """
|
||||
models:
|
||||
- name: thread_model
|
||||
columns:
|
||||
- name: id
|
||||
data_tests:
|
||||
- not_null
|
||||
|
||||
"""
|
||||
|
||||
56
tests/functional/retry/test_retry_threads.py
Normal file
56
tests/functional/retry/test_retry_threads.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import pytest
|
||||
|
||||
from dbt.contracts.results import RunStatus, TestStatus
|
||||
from dbt.tests.util import run_dbt, write_file
|
||||
from tests.functional.retry.fixtures import models__thread_model, schema_test_thread_yml
|
||||
|
||||
|
||||
class TestCustomThreadRetry:
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {
|
||||
"thread_model.sql": models__thread_model,
|
||||
"schema.yml": schema_test_thread_yml,
|
||||
}
|
||||
|
||||
def test_thread_target(self, project):
|
||||
# Passing Threads to check
|
||||
results = run_dbt(
|
||||
["build", "--select", "thread_model", "--threads", "3"], expect_pass=False
|
||||
)
|
||||
expected_statuses = {
|
||||
"thread_model": RunStatus.Error,
|
||||
"not_null_thread_model_id": TestStatus.Skipped,
|
||||
}
|
||||
assert {n.node.name: n.status for n in results.results} == expected_statuses
|
||||
|
||||
# Retry Running the Dbt with simple Retry
|
||||
results = run_dbt(["retry", "--threads", "2"], expect_pass=False)
|
||||
expected_statuses = {
|
||||
"thread_model": RunStatus.Error,
|
||||
"not_null_thread_model_id": TestStatus.Skipped,
|
||||
}
|
||||
assert {n.node.name: n.status for n in results.results} == expected_statuses
|
||||
assert results.args["threads"] == 2
|
||||
|
||||
# running with retry withour threads
|
||||
results = run_dbt(["retry"], expect_pass=False)
|
||||
expected_statuses = {
|
||||
"thread_model": RunStatus.Error,
|
||||
"not_null_thread_model_id": TestStatus.Skipped,
|
||||
}
|
||||
assert {n.node.name: n.status for n in results.results} == expected_statuses
|
||||
assert results.args["threads"] == 2
|
||||
|
||||
# Retry with fixing the model and running with --threads 1
|
||||
fixed_sql = "select 1 as id"
|
||||
write_file(fixed_sql, "models", "thread_model.sql")
|
||||
|
||||
results = run_dbt(["retry", "--threads", "1"])
|
||||
expected_statuses = {
|
||||
"thread_model": RunStatus.Success,
|
||||
"not_null_thread_model_id": TestStatus.Pass,
|
||||
}
|
||||
|
||||
assert {n.node.name: n.status for n in results.results} == expected_statuses
|
||||
assert results.args["threads"] == 1
|
||||
@@ -96,7 +96,7 @@ REQUIRED_PARSED_NODE_KEYS = frozenset(
|
||||
"deprecation_date",
|
||||
"defer_relation",
|
||||
"time_spine",
|
||||
"batch_info",
|
||||
"batch",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -489,11 +489,11 @@ class TestMicrobatchBuilder:
|
||||
assert len(actual_batches) == len(expected_batches)
|
||||
assert actual_batches == expected_batches
|
||||
|
||||
def test_build_batch_context_incremental_batch(self, microbatch_model):
|
||||
def test_build_jinja_context_for_incremental_batch(self, microbatch_model):
|
||||
microbatch_builder = MicrobatchBuilder(
|
||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||
)
|
||||
context = microbatch_builder.build_batch_context(incremental_batch=True)
|
||||
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=True)
|
||||
|
||||
assert context["model"] == microbatch_model.to_dict()
|
||||
assert context["sql"] == microbatch_model.compiled_code
|
||||
@@ -502,11 +502,11 @@ class TestMicrobatchBuilder:
|
||||
assert context["is_incremental"]() is True
|
||||
assert context["should_full_refresh"]() is False
|
||||
|
||||
def test_build_batch_context_incremental_batch_false(self, microbatch_model):
|
||||
def test_build_jinja_context_for_incremental_batch_false(self, microbatch_model):
|
||||
microbatch_builder = MicrobatchBuilder(
|
||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||
)
|
||||
context = microbatch_builder.build_batch_context(incremental_batch=False)
|
||||
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=False)
|
||||
|
||||
assert context["model"] == microbatch_model.to_dict()
|
||||
assert context["sql"] == microbatch_model.compiled_code
|
||||
@@ -605,7 +605,6 @@ class TestMicrobatchBuilder:
|
||||
@pytest.mark.parametrize(
|
||||
"batch_size,batch_start,expected_formatted_batch_start",
|
||||
[
|
||||
(None, None, None),
|
||||
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
|
||||
Reference in New Issue
Block a user