mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 18:01:26 +00:00
Compare commits
10 Commits
enable-pos
...
v1.9.0rc2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
333d600a95 | ||
|
|
61b6cb3864 | ||
|
|
6a36444dbb | ||
|
|
65f05e0bd2 | ||
|
|
aceae51ffb | ||
|
|
a84a787eeb | ||
|
|
3d70e1b06f | ||
|
|
55872497bd | ||
|
|
d305137226 | ||
|
|
bda92e7312 |
@@ -1,5 +1,5 @@
|
||||
[bumpversion]
|
||||
current_version = 1.9.0b4
|
||||
current_version = 1.9.0rc2
|
||||
parse = (?P<major>[\d]+) # major version number
|
||||
\.(?P<minor>[\d]+) # minor version number
|
||||
\.(?P<patch>[\d]+) # patch version number
|
||||
|
||||
35
.changes/1.9.0-rc1.md
Normal file
35
.changes/1.9.0-rc1.md
Normal file
@@ -0,0 +1,35 @@
|
||||
## dbt-core 1.9.0-rc1 - November 25, 2024
|
||||
|
||||
### Features
|
||||
|
||||
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
|
||||
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
|
||||
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
|
||||
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
|
||||
|
||||
### Fixes
|
||||
|
||||
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
|
||||
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
|
||||
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
|
||||
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
|
||||
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
|
||||
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
|
||||
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
|
||||
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
|
||||
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
|
||||
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
|
||||
|
||||
### Under the Hood
|
||||
|
||||
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
|
||||
|
||||
### Dependencies
|
||||
|
||||
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
|
||||
|
||||
### Contributors
|
||||
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||
5
.changes/1.9.0-rc2.md
Normal file
5
.changes/1.9.0-rc2.md
Normal file
@@ -0,0 +1,5 @@
|
||||
## dbt-core 1.9.0-rc2 - December 02, 2024
|
||||
|
||||
### Features
|
||||
|
||||
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))
|
||||
6
.changes/1.9.0/Features-20241104-120053.yaml
Normal file
6
.changes/1.9.0/Features-20241104-120053.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Add new hard_deletes="new_record" mode for snapshots.
|
||||
time: 2024-11-04T12:00:53.95191-05:00
|
||||
custom:
|
||||
Author: peterallenwebb
|
||||
Issue: "10235"
|
||||
6
.changes/1.9.0/Features-20241121-125630.yaml
Normal file
6
.changes/1.9.0/Features-20241121-125630.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Add `batch` context object to model jinja context
|
||||
time: 2024-11-21T12:56:30.715473-06:00
|
||||
custom:
|
||||
Author: QMalcolm
|
||||
Issue: "11025"
|
||||
6
.changes/1.9.0/Fixes-20241121-181739.yaml
Normal file
6
.changes/1.9.0/Fixes-20241121-181739.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Fixes
|
||||
body: Catch DbtRuntimeError for hooks
|
||||
time: 2024-11-21T18:17:39.753235Z
|
||||
custom:
|
||||
Author: aranke
|
||||
Issue: "11012"
|
||||
45
CHANGELOG.md
45
CHANGELOG.md
@@ -5,6 +5,50 @@
|
||||
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
|
||||
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)
|
||||
|
||||
## dbt-core 1.9.0-rc2 - December 02, 2024
|
||||
|
||||
### Features
|
||||
|
||||
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))
|
||||
|
||||
|
||||
|
||||
## dbt-core 1.9.0-rc1 - November 25, 2024
|
||||
|
||||
### Features
|
||||
|
||||
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
|
||||
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
|
||||
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
|
||||
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
|
||||
|
||||
### Fixes
|
||||
|
||||
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
|
||||
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
|
||||
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
|
||||
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
|
||||
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
|
||||
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
|
||||
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
|
||||
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
|
||||
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
|
||||
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
|
||||
|
||||
### Under the Hood
|
||||
|
||||
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
|
||||
|
||||
### Dependencies
|
||||
|
||||
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
|
||||
|
||||
### Contributors
|
||||
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||
|
||||
## dbt-core 1.9.0-b4 - November 06, 2024
|
||||
|
||||
### Features
|
||||
@@ -25,7 +69,6 @@
|
||||
### Contributors
|
||||
- [@DevonFulcher](https://github.com/DevonFulcher) ([#10959](https://github.com/dbt-labs/dbt-core/issues/10959), [#10960](https://github.com/dbt-labs/dbt-core/issues/10960))
|
||||
|
||||
|
||||
## dbt-core 1.9.0-b3 - October 30, 2024
|
||||
|
||||
### Features
|
||||
|
||||
@@ -13,6 +13,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
|
||||
dbt_valid_from: Optional[str] = None
|
||||
dbt_scd_id: Optional[str] = None
|
||||
dbt_updated_at: Optional[str] = None
|
||||
dbt_is_deleted: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -37,6 +38,7 @@ class SnapshotConfig(NodeConfig):
|
||||
"dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to",
|
||||
"dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id",
|
||||
"dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at",
|
||||
"dbt_is_deleted": self.snapshot_meta_column_names.dbt_is_deleted or "dbt_is_deleted",
|
||||
}
|
||||
|
||||
def final_validate(self):
|
||||
|
||||
@@ -244,9 +244,10 @@ class BaseResolver(metaclass=abc.ABCMeta):
|
||||
and self.model.config.materialized == "incremental"
|
||||
and self.model.config.incremental_strategy == "microbatch"
|
||||
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
|
||||
and self.model.batch is not None
|
||||
):
|
||||
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
|
||||
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
|
||||
start = self.model.batch.event_time_start
|
||||
end = self.model.batch.event_time_end
|
||||
|
||||
if start is not None or end is not None:
|
||||
event_time_filter = EventTimeFilter(
|
||||
|
||||
@@ -93,6 +93,7 @@ from dbt_common.contracts.constraints import (
|
||||
ConstraintType,
|
||||
ModelLevelConstraint,
|
||||
)
|
||||
from dbt_common.dataclass_schema import dbtClassMixin
|
||||
from dbt_common.events.contextvars import set_log_contextvars
|
||||
from dbt_common.events.functions import warn_or_error
|
||||
|
||||
@@ -442,15 +443,38 @@ class HookNode(HookNodeResource, CompiledNode):
|
||||
return HookNodeResource
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchContext(dbtClassMixin):
|
||||
id: str
|
||||
event_time_start: datetime
|
||||
event_time_end: datetime
|
||||
|
||||
def __post_serialize__(self, data, context):
|
||||
# This is insane, but necessary, I apologize. Mashumaro handles the
|
||||
# dictification of this class via a compile time generated `to_dict`
|
||||
# method based off of the _typing_ of th class. By default `datetime`
|
||||
# types are converted to strings. We don't want that, we want them to
|
||||
# stay datetimes.
|
||||
# Note: This is safe because the `BatchContext` isn't part of the artifact
|
||||
# and thus doesn't get written out.
|
||||
new_data = super().__post_serialize__(data, context)
|
||||
new_data["event_time_start"] = self.event_time_start
|
||||
new_data["event_time_end"] = self.event_time_end
|
||||
return new_data
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelNode(ModelResource, CompiledNode):
|
||||
batch_info: Optional[BatchResults] = None
|
||||
previous_batch_results: Optional[BatchResults] = None
|
||||
batch: Optional[BatchContext] = None
|
||||
_has_this: Optional[bool] = None
|
||||
|
||||
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
|
||||
dct = super().__post_serialize__(dct, context)
|
||||
if "_has_this" in dct:
|
||||
del dct["_has_this"]
|
||||
if "previous_batch_results" in dct:
|
||||
del dct["previous_batch_results"]
|
||||
return dct
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -100,25 +100,25 @@ class MicrobatchBuilder:
|
||||
|
||||
return batches
|
||||
|
||||
def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
|
||||
def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
|
||||
"""
|
||||
Create context with entries that reflect microbatch model + incremental execution state
|
||||
|
||||
Assumes self.model has been (re)-compiled with necessary batch filters applied.
|
||||
"""
|
||||
batch_context: Dict[str, Any] = {}
|
||||
jinja_context: Dict[str, Any] = {}
|
||||
|
||||
# Microbatch model properties
|
||||
batch_context["model"] = self.model.to_dict()
|
||||
batch_context["sql"] = self.model.compiled_code
|
||||
batch_context["compiled_code"] = self.model.compiled_code
|
||||
jinja_context["model"] = self.model.to_dict()
|
||||
jinja_context["sql"] = self.model.compiled_code
|
||||
jinja_context["compiled_code"] = self.model.compiled_code
|
||||
|
||||
# Add incremental context variables for batches running incrementally
|
||||
if incremental_batch:
|
||||
batch_context["is_incremental"] = lambda: True
|
||||
batch_context["should_full_refresh"] = lambda: False
|
||||
jinja_context["is_incremental"] = lambda: True
|
||||
jinja_context["should_full_refresh"] = lambda: False
|
||||
|
||||
return batch_context
|
||||
return jinja_context
|
||||
|
||||
@staticmethod
|
||||
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
|
||||
@@ -193,12 +193,11 @@ class MicrobatchBuilder:
|
||||
return truncated
|
||||
|
||||
@staticmethod
|
||||
def format_batch_start(
|
||||
batch_start: Optional[datetime], batch_size: BatchSize
|
||||
) -> Optional[str]:
|
||||
if batch_start is None:
|
||||
return batch_start
|
||||
def batch_id(start_time: datetime, batch_size: BatchSize) -> str:
|
||||
return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "")
|
||||
|
||||
@staticmethod
|
||||
def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str:
|
||||
return str(
|
||||
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
|
||||
)
|
||||
|
||||
@@ -136,7 +136,7 @@ class RetryTask(ConfiguredTask):
|
||||
# batch info if there were _no_ successful batches previously. This is
|
||||
# because passing the batch info _forces_ the microbatch process into
|
||||
# _incremental_ model, and it may be that we need to be in full refresh
|
||||
# mode which is only handled if batch_info _isn't_ passed for a node
|
||||
# mode which is only handled if previous_batch_results _isn't_ passed for a node
|
||||
batch_map = {
|
||||
result.unique_id: result.batch_results
|
||||
for result in self.previous_results.results
|
||||
|
||||
@@ -27,7 +27,7 @@ from dbt.clients.jinja import MacroGenerator
|
||||
from dbt.config import RuntimeConfig
|
||||
from dbt.context.providers import generate_runtime_model_context
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
|
||||
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
|
||||
from dbt.events.types import (
|
||||
GenericExceptionOnRun,
|
||||
LogHookEndLine,
|
||||
@@ -283,7 +283,6 @@ class ModelRunner(CompileRunner):
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -328,9 +327,7 @@ class ModelRunner(CompileRunner):
|
||||
|
||||
hook_ctx = self.adapter.pre_model_hook(context_config)
|
||||
|
||||
return self._execute_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
|
||||
|
||||
|
||||
class MicrobatchModelRunner(ModelRunner):
|
||||
@@ -341,6 +338,33 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
self.batches: Dict[int, BatchType] = {}
|
||||
self.relation_exists: bool = False
|
||||
|
||||
def compile(self, manifest: Manifest):
|
||||
if self.batch_idx is not None:
|
||||
batch = self.batches[self.batch_idx]
|
||||
|
||||
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
|
||||
# TODO: REMOVE before 1.10 GA
|
||||
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
# Create batch context on model node prior to re-compiling
|
||||
self.node.batch = BatchContext(
|
||||
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
|
||||
event_time_start=batch[0],
|
||||
event_time_end=batch[1],
|
||||
)
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
self.node,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], self.node.config.batch_size
|
||||
),
|
||||
)
|
||||
|
||||
# Skips compilation for non-batch runs
|
||||
return self.node
|
||||
|
||||
def set_batch_idx(self, batch_idx: int) -> None:
|
||||
self.batch_idx = batch_idx
|
||||
|
||||
@@ -353,7 +377,7 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def describe_node(self) -> str:
|
||||
return f"{self.node.language} microbatch model {self.get_node_representation()}"
|
||||
|
||||
def describe_batch(self, batch_start: Optional[datetime]) -> str:
|
||||
def describe_batch(self, batch_start: datetime) -> str:
|
||||
# Only visualize date if batch_start year/month/day
|
||||
formatted_batch_start = MicrobatchBuilder.format_batch_start(
|
||||
batch_start, self.node.config.batch_size
|
||||
@@ -445,8 +469,8 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
result.batch_results.failed = sorted(result.batch_results.failed)
|
||||
|
||||
# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
|
||||
if self.node.batch_info is not None:
|
||||
result.batch_results.successful += self.node.batch_info.successful
|
||||
if self.node.previous_batch_results is not None:
|
||||
result.batch_results.successful += self.node.previous_batch_results.successful
|
||||
|
||||
def _build_succesful_run_batch_result(
|
||||
self,
|
||||
@@ -495,7 +519,6 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def _execute_microbatch_materialization(
|
||||
self,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
@@ -508,15 +531,15 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
)
|
||||
|
||||
if self.batch_idx is None:
|
||||
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
|
||||
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
|
||||
# IFF `dbt retry` is being run and the microbatch model had batches which
|
||||
# failed on the run of the model (which is being retried)
|
||||
if model.batch_info is None:
|
||||
if model.previous_batch_results is None:
|
||||
end = microbatch_builder.build_end_time()
|
||||
start = microbatch_builder.build_start_time(end)
|
||||
batches = microbatch_builder.build_batches(start, end)
|
||||
else:
|
||||
batches = model.batch_info.failed
|
||||
batches = model.previous_batch_results.failed
|
||||
# If there is batch info, then don't run as full_refresh and do force is_incremental
|
||||
# not doing this risks blowing away the work that has already been done
|
||||
if self._has_relation(model=model):
|
||||
@@ -530,24 +553,11 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
# call materialization_macro to get a batch-level run result
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
# Set start/end in context prior to re-compiling
|
||||
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||
|
||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||
self.compiler.compile_node(
|
||||
model,
|
||||
manifest,
|
||||
{},
|
||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||
batch[0], model.config.batch_size
|
||||
),
|
||||
)
|
||||
# Update jinja context with batch context members
|
||||
batch_context = microbatch_builder.build_batch_context(
|
||||
jinja_context = microbatch_builder.build_jinja_context_for_batch(
|
||||
incremental_batch=self.relation_exists
|
||||
)
|
||||
context.update(batch_context)
|
||||
context.update(jinja_context)
|
||||
|
||||
# Materialize batch and cache any materialized relations
|
||||
result = MacroGenerator(
|
||||
@@ -630,36 +640,22 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
else:
|
||||
return False
|
||||
|
||||
def _execute_microbatch_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, manifest, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
def _execute_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
model: ModelNode,
|
||||
manifest: Manifest,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
) -> RunResult:
|
||||
return self._execute_microbatch_model(
|
||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
||||
)
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
model, context, materialization_macro
|
||||
)
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
return batch_result
|
||||
|
||||
|
||||
class RunTask(CompileTask):
|
||||
@@ -885,7 +881,7 @@ class RunTask(CompileTask):
|
||||
if uid in self.batch_map:
|
||||
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
|
||||
if isinstance(node, ModelNode):
|
||||
node.batch_info = self.batch_map[uid]
|
||||
node.previous_batch_results = self.batch_map[uid]
|
||||
|
||||
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
|
||||
with adapter.connection_named("master"):
|
||||
@@ -922,7 +918,7 @@ class RunTask(CompileTask):
|
||||
try:
|
||||
with adapter.connection_named("master"):
|
||||
self.safe_run_hooks(adapter, RunHookType.End, extras)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
except (KeyboardInterrupt, SystemExit, DbtRuntimeError):
|
||||
run_result = self.get_result(
|
||||
results=self.node_results,
|
||||
elapsed_time=time.time() - self.started_at,
|
||||
|
||||
@@ -231,5 +231,5 @@ def _get_adapter_plugin_names() -> Iterator[str]:
|
||||
yield plugin_name
|
||||
|
||||
|
||||
__version__ = "1.9.0b4"
|
||||
__version__ = "1.9.0rc2"
|
||||
installed = get_installed_version()
|
||||
|
||||
@@ -25,7 +25,7 @@ with open(os.path.join(this_directory, "README.md")) as f:
|
||||
|
||||
|
||||
package_name = "dbt-core"
|
||||
package_version = "1.9.0b4"
|
||||
package_version = "1.9.0rc2"
|
||||
description = """With dbt, data analysts and engineers can build analytics \
|
||||
the way engineers build applications."""
|
||||
|
||||
@@ -51,7 +51,7 @@ setup(
|
||||
# Pin to the patch or minor version, and bump in each new minor version of dbt-core.
|
||||
"agate>=1.7.0,<1.10",
|
||||
"Jinja2>=3.1.3,<4",
|
||||
"mashumaro[msgpack]>=3.9,<4.0",
|
||||
"mashumaro[msgpack]>=3.9,<3.15",
|
||||
# ----
|
||||
# dbt-core uses these packages in standard ways. Pin to the major version, and check compatibility
|
||||
# with major versions in each new minor version of dbt-core.
|
||||
|
||||
@@ -6754,6 +6754,17 @@
|
||||
}
|
||||
],
|
||||
"default": null
|
||||
},
|
||||
"dbt_is_deleted": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
@@ -16672,6 +16683,17 @@
|
||||
}
|
||||
],
|
||||
"default": null
|
||||
},
|
||||
"dbt_is_deleted": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
||||
@@ -2,7 +2,8 @@ import pytest
|
||||
|
||||
from dbt.artifacts.schemas.results import RunStatus
|
||||
from dbt.contracts.graph.nodes import HookNode
|
||||
from dbt.tests.util import get_artifact, run_dbt_and_capture
|
||||
from dbt.tests.util import get_artifact, run_dbt, run_dbt_and_capture
|
||||
from dbt_common.exceptions import CompilationError
|
||||
|
||||
|
||||
class Test__StartHookFail__FlagIsNone__ModelFail:
|
||||
@@ -242,3 +243,36 @@ class Test__HookContext__HookFail:
|
||||
assert "Thread ID: main" in log_output
|
||||
assert results[0].thread_id == "main"
|
||||
assert "Num Results in context: 2" in log_output # failed hook and model
|
||||
|
||||
|
||||
class Test__HookCompilationError:
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {"my_model.sql": "select 1 as id"}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def macros(self):
|
||||
return {
|
||||
"rce.sql": """
|
||||
{% macro rce(relation) %}
|
||||
{% if execute %}
|
||||
{{ exceptions.raise_compiler_error("Always raise a compiler error in execute") }}
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
"""
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self):
|
||||
return {
|
||||
"on-run-end": ["{{ rce() }}"],
|
||||
}
|
||||
|
||||
def test_results(self, project):
|
||||
with pytest.raises(CompilationError, match="Always raise a compiler error in execute"):
|
||||
run_dbt(["run"], expect_pass=False)
|
||||
|
||||
run_results = get_artifact(project.project_root, "target", "run_results.json")
|
||||
assert [(result["unique_id"], result["status"]) for result in run_results["results"]] == [
|
||||
("model.test.my_model", RunStatus.Success)
|
||||
]
|
||||
|
||||
@@ -112,6 +112,7 @@ def get_rendered_snapshot_config(**updates):
|
||||
"dbt_valid_from": None,
|
||||
"dbt_updated_at": None,
|
||||
"dbt_scd_id": None,
|
||||
"dbt_is_deleted": None,
|
||||
},
|
||||
"dbt_valid_to_current": None,
|
||||
"tags": [],
|
||||
|
||||
@@ -69,6 +69,7 @@ class TestList:
|
||||
"dbt_updated_at": None,
|
||||
"dbt_valid_from": None,
|
||||
"dbt_valid_to": None,
|
||||
"dbt_is_deleted": None,
|
||||
},
|
||||
"unique_key": "id",
|
||||
"strategy": "timestamp",
|
||||
|
||||
@@ -64,8 +64,8 @@ microbatch_yearly_model_downstream_sql = """
|
||||
select * from {{ ref('microbatch_model') }}
|
||||
"""
|
||||
|
||||
invalid_batch_context_macro_sql = """
|
||||
{% macro check_invalid_batch_context() %}
|
||||
invalid_batch_jinja_context_macro_sql = """
|
||||
{% macro check_invalid_batch_jinja_context() %}
|
||||
|
||||
{% if model is not mapping %}
|
||||
{{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }}
|
||||
@@ -83,9 +83,9 @@ invalid_batch_context_macro_sql = """
|
||||
"""
|
||||
|
||||
microbatch_model_with_context_checks_sql = """
|
||||
{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
{{ config(pre_hook="{{ check_invalid_batch_jinja_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
|
||||
{{ check_invalid_batch_context() }}
|
||||
{{ check_invalid_batch_jinja_context() }}
|
||||
select * from {{ ref('input_model') }}
|
||||
"""
|
||||
|
||||
@@ -404,7 +404,7 @@ class TestMicrobatchJinjaContext(BaseMicrobatchTest):
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def macros(self):
|
||||
return {"check_batch_context.sql": invalid_batch_context_macro_sql}
|
||||
return {"check_batch_jinja_context.sql": invalid_batch_jinja_context_macro_sql}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
@@ -498,6 +498,13 @@ microbatch_model_context_vars = """
|
||||
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
|
||||
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
|
||||
{% if model.batch %}
|
||||
{{ log("batch.event_time_start: "~ model.batch.event_time_start, info=True)}}
|
||||
{{ log("batch.event_time_end: "~ model.batch.event_time_end, info=True)}}
|
||||
{{ log("batch.id: "~ model.batch.id, info=True)}}
|
||||
{{ log("start timezone: "~ model.batch.event_time_start.tzinfo, info=True)}}
|
||||
{{ log("end timezone: "~ model.batch.event_time_end.tzinfo, info=True)}}
|
||||
{% endif %}
|
||||
select * from {{ ref('input_model') }}
|
||||
"""
|
||||
|
||||
@@ -516,12 +523,23 @@ class TestMicrobatchJinjaContextVarsAvailable(BaseMicrobatchTest):
|
||||
|
||||
assert "start: 2020-01-01 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-01 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.id: 20200101" in logs
|
||||
assert "start timezone: UTC" in logs
|
||||
assert "end timezone: UTC" in logs
|
||||
|
||||
assert "start: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-02 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.id: 20200102" in logs
|
||||
|
||||
assert "start: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "end: 2020-01-03 13:57:00+00:00" in logs
|
||||
assert "batch.event_time_start: 2020-01-03 00:00:00+00:00" in logs
|
||||
assert "batch.event_time_end: 2020-01-03 13:57:00+00:00" in logs
|
||||
assert "batch.id: 20200103" in logs
|
||||
|
||||
|
||||
microbatch_model_failing_incremental_partition_sql = """
|
||||
@@ -675,16 +693,6 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
|
||||
with patch_microbatch_end_time("2020-01-03 13:57:00"):
|
||||
run_dbt(["run"])
|
||||
|
||||
# Compiled paths - compiled model without filter only
|
||||
assert read_file(
|
||||
project.project_root,
|
||||
"target",
|
||||
"compiled",
|
||||
"test",
|
||||
"models",
|
||||
"microbatch_model.sql",
|
||||
)
|
||||
|
||||
# Compiled paths - batch compilations
|
||||
assert read_file(
|
||||
project.project_root,
|
||||
|
||||
@@ -96,7 +96,7 @@ REQUIRED_PARSED_NODE_KEYS = frozenset(
|
||||
"deprecation_date",
|
||||
"defer_relation",
|
||||
"time_spine",
|
||||
"batch_info",
|
||||
"batch",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -489,11 +489,11 @@ class TestMicrobatchBuilder:
|
||||
assert len(actual_batches) == len(expected_batches)
|
||||
assert actual_batches == expected_batches
|
||||
|
||||
def test_build_batch_context_incremental_batch(self, microbatch_model):
|
||||
def test_build_jinja_context_for_incremental_batch(self, microbatch_model):
|
||||
microbatch_builder = MicrobatchBuilder(
|
||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||
)
|
||||
context = microbatch_builder.build_batch_context(incremental_batch=True)
|
||||
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=True)
|
||||
|
||||
assert context["model"] == microbatch_model.to_dict()
|
||||
assert context["sql"] == microbatch_model.compiled_code
|
||||
@@ -502,11 +502,11 @@ class TestMicrobatchBuilder:
|
||||
assert context["is_incremental"]() is True
|
||||
assert context["should_full_refresh"]() is False
|
||||
|
||||
def test_build_batch_context_incremental_batch_false(self, microbatch_model):
|
||||
def test_build_jinja_context_for_incremental_batch_false(self, microbatch_model):
|
||||
microbatch_builder = MicrobatchBuilder(
|
||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||
)
|
||||
context = microbatch_builder.build_batch_context(incremental_batch=False)
|
||||
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=False)
|
||||
|
||||
assert context["model"] == microbatch_model.to_dict()
|
||||
assert context["sql"] == microbatch_model.compiled_code
|
||||
@@ -605,7 +605,6 @@ class TestMicrobatchBuilder:
|
||||
@pytest.mark.parametrize(
|
||||
"batch_size,batch_start,expected_formatted_batch_start",
|
||||
[
|
||||
(None, None, None),
|
||||
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||
|
||||
Reference in New Issue
Block a user