Compare commits

...

10 Commits

Author SHA1 Message Date
Github Build Bot
333d600a95 Bumping version to 1.9.0rc2 and generate changelog 2024-12-02 14:25:20 +00:00
github-actions[bot]
61b6cb3864 [Tidy first] move microbatch compilation to .compile method (#11063) (#11065) 2024-11-27 23:58:39 -05:00
github-actions[bot]
6a36444dbb Add batch context object to microbatch jinja context (#11031) (#11064)
* Add `batch_id` to jinja context of microbatch batches

* Add changie doc

* Update `format_batch_start` to assume `batch_start` is always provided

* Add "runtime only" property `batch_context` to `ModelNode`

By it being "runtime only" we mean that it doesn't exist on the artifact
and thus won't be written out to the manifest artifact.

* Begin populating `batch_context` during materialization execution for microbatch batches

* Fix circular import

* Fixup MicrobatchBuilder.batch_id property method

* Ensure MicrobatchModelRunner doesn't double compile batches

We were compiling the node for each batch _twice_. Besides making microbatch
models more expensive than they needed to be, double compiling wasn't
causing any issue. However the first compilation was happening _before_ we
had added the batch context information to the model node for the batch. This
was leading to models which try to access the `batch_context` information on the
model to blow up, which was undesirable. As such, we've now gone and skipped
the first compilation. We've done this similar to how SavedQuery nodes skip
compilation.

* Add `__post_serialize__` method to `BatchContext` to ensure correct dict shape

This is weird, but necessary, I apologize. Mashumaro handles the
dictification of this class via a compile time generated `to_dict`
method based off of the _typing_ of th class. By default `datetime`
types are converted to strings. We don't want that, we want them to
stay datetimes.

* Update tests to check for `batch_context`

* Update `resolve_event_time_filter` to use new `batch_context`

* Stop testing for batchless compiled code for microbatch models

In 45daec72f4 we stopped an extra compilation
that was happening per batch prior to the batch_context being loaded. Stopping
this extra compilation means that compiled sql for the microbatch model without
the event time filter / batch context is no longer produced. We have discussed
this and _believe_ it is okay given that this is a new node type that has not
hit GA yet.

* Rename `ModelNode.batch_context` to `ModelNode.batch`

* Rename `build_batch_context` to `build_jinja_context_for_batch`

The name `build_batch_context` was confusing as
1) We have a `BatchContext` object, which the method was not building
2) The method builds the jinja context for the batch
As such it felt appropriate to rename the method to more accurately
communicate what it does.

* Rename test macro `invalid_batch_context_macro_sql` to `invalid_batch_jinja_context_macro_sql`

This rename was to make it more clear that the jinja context for a
batch was being checked, as a batch_context has a slightly different
connotation.

* Update changie doc

(cherry picked from commit c3d87b89fb)

Co-authored-by: Quigley Malcolm <QMalcolm@users.noreply.github.com>
2024-11-27 17:03:10 -06:00
github-actions[bot]
65f05e0bd2 Rename internal batch_info variable to previous_batch_results (#11056) (#11062)
* Rename `batch_info` to `previous_batch_results`

* Exclude `previous_batch_results` from serialization of model node to avoid jinja context bloat

* Drop `previous_batch_results` key from `test_manifest.py` unit tests

In 4050e377ec we began excluding
`previous_batch_results` from the serialized representation of the
ModelNode. As such, we no longer need to check for it in `test_manifest.py`.

(cherry picked from commit 0f084e16ca)

Co-authored-by: Quigley Malcolm <QMalcolm@users.noreply.github.com>
2024-11-27 16:41:50 -06:00
FishtownBuildBot
aceae51ffb [Automated] Merged prep-release/1.9.0rc1_12015575655 into target 1.9.latest during release process 2024-11-25 13:06:05 -05:00
Github Build Bot
a84a787eeb Bumping version to 1.9.0rc1 and generate changelog 2024-11-25 17:37:13 +00:00
github-actions[bot]
3d70e1b06f Fix #11012: Catch DbtRuntimeError for hooks (#11023) (#11045)
(cherry picked from commit f582ac2488)

Co-authored-by: Kshitij Aranke <kshitij.aranke@dbtlabs.com>
Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
2024-11-25 12:15:46 -05:00
Peter Webb
55872497bd Add New Config Properties and Schema for Snapshot Hard Deletes (#10972) (#11037)
* Add changelog entry.

* Update schemas and test fixtures for new snapshot meta-column

* Add back comment.

Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
2024-11-25 11:53:45 -05:00
github-actions[bot]
d305137226 Pin mashumaro to <3.15 (#11046) (#11047)
(cherry picked from commit 407f6caa1c)

Co-authored-by: Gerda Shank <gerda@dbtlabs.com>
2024-11-25 11:28:36 -05:00
Emily Rockman
bda92e7312 add 1.8 changelogs (#11028) 2024-11-21 15:59:16 -06:00
38 changed files with 283 additions and 95 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.9.0b4
current_version = 1.9.0rc2
parse = (?P<major>[\d]+) # major version number
\.(?P<minor>[\d]+) # minor version number
\.(?P<patch>[\d]+) # patch version number

35
.changes/1.9.0-rc1.md Normal file
View File

@@ -0,0 +1,35 @@
## dbt-core 1.9.0-rc1 - November 25, 2024
### Features
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
### Fixes
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
### Under the Hood
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
### Dependencies
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
### Contributors
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))

5
.changes/1.9.0-rc2.md Normal file
View File

@@ -0,0 +1,5 @@
## dbt-core 1.9.0-rc2 - December 02, 2024
### Features
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add new hard_deletes="new_record" mode for snapshots.
time: 2024-11-04T12:00:53.95191-05:00
custom:
Author: peterallenwebb
Issue: "10235"

View File

@@ -0,0 +1,6 @@
kind: Features
body: Add `batch` context object to model jinja context
time: 2024-11-21T12:56:30.715473-06:00
custom:
Author: QMalcolm
Issue: "11025"

View File

@@ -0,0 +1,6 @@
kind: Fixes
body: Catch DbtRuntimeError for hooks
time: 2024-11-21T18:17:39.753235Z
custom:
Author: aranke
Issue: "11012"

View File

@@ -5,6 +5,50 @@
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)
## dbt-core 1.9.0-rc2 - December 02, 2024
### Features
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))
## dbt-core 1.9.0-rc1 - November 25, 2024
### Features
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
### Fixes
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
### Under the Hood
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
### Dependencies
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
### Contributors
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
## dbt-core 1.9.0-b4 - November 06, 2024
### Features
@@ -25,7 +69,6 @@
### Contributors
- [@DevonFulcher](https://github.com/DevonFulcher) ([#10959](https://github.com/dbt-labs/dbt-core/issues/10959), [#10960](https://github.com/dbt-labs/dbt-core/issues/10960))
## dbt-core 1.9.0-b3 - October 30, 2024
### Features

View File

@@ -13,6 +13,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
dbt_valid_from: Optional[str] = None
dbt_scd_id: Optional[str] = None
dbt_updated_at: Optional[str] = None
dbt_is_deleted: Optional[str] = None
@dataclass
@@ -37,6 +38,7 @@ class SnapshotConfig(NodeConfig):
"dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to",
"dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id",
"dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at",
"dbt_is_deleted": self.snapshot_meta_column_names.dbt_is_deleted or "dbt_is_deleted",
}
def final_validate(self):

View File

@@ -244,9 +244,10 @@ class BaseResolver(metaclass=abc.ABCMeta):
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
and self.model.batch is not None
):
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
start = self.model.batch.event_time_start
end = self.model.batch.event_time_end
if start is not None or end is not None:
event_time_filter = EventTimeFilter(

View File

@@ -93,6 +93,7 @@ from dbt_common.contracts.constraints import (
ConstraintType,
ModelLevelConstraint,
)
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.contextvars import set_log_contextvars
from dbt_common.events.functions import warn_or_error
@@ -442,15 +443,38 @@ class HookNode(HookNodeResource, CompiledNode):
return HookNodeResource
@dataclass
class BatchContext(dbtClassMixin):
id: str
event_time_start: datetime
event_time_end: datetime
def __post_serialize__(self, data, context):
# This is insane, but necessary, I apologize. Mashumaro handles the
# dictification of this class via a compile time generated `to_dict`
# method based off of the _typing_ of th class. By default `datetime`
# types are converted to strings. We don't want that, we want them to
# stay datetimes.
# Note: This is safe because the `BatchContext` isn't part of the artifact
# and thus doesn't get written out.
new_data = super().__post_serialize__(data, context)
new_data["event_time_start"] = self.event_time_start
new_data["event_time_end"] = self.event_time_end
return new_data
@dataclass
class ModelNode(ModelResource, CompiledNode):
batch_info: Optional[BatchResults] = None
previous_batch_results: Optional[BatchResults] = None
batch: Optional[BatchContext] = None
_has_this: Optional[bool] = None
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_has_this" in dct:
del dct["_has_this"]
if "previous_batch_results" in dct:
del dct["previous_batch_results"]
return dct
@classmethod

View File

@@ -100,25 +100,25 @@ class MicrobatchBuilder:
return batches
def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Assumes self.model has been (re)-compiled with necessary batch filters applied.
"""
batch_context: Dict[str, Any] = {}
jinja_context: Dict[str, Any] = {}
# Microbatch model properties
batch_context["model"] = self.model.to_dict()
batch_context["sql"] = self.model.compiled_code
batch_context["compiled_code"] = self.model.compiled_code
jinja_context["model"] = self.model.to_dict()
jinja_context["sql"] = self.model.compiled_code
jinja_context["compiled_code"] = self.model.compiled_code
# Add incremental context variables for batches running incrementally
if incremental_batch:
batch_context["is_incremental"] = lambda: True
batch_context["should_full_refresh"] = lambda: False
jinja_context["is_incremental"] = lambda: True
jinja_context["should_full_refresh"] = lambda: False
return batch_context
return jinja_context
@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
@@ -193,12 +193,11 @@ class MicrobatchBuilder:
return truncated
@staticmethod
def format_batch_start(
batch_start: Optional[datetime], batch_size: BatchSize
) -> Optional[str]:
if batch_start is None:
return batch_start
def batch_id(start_time: datetime, batch_size: BatchSize) -> str:
return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "")
@staticmethod
def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str:
return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)

View File

@@ -136,7 +136,7 @@ class RetryTask(ConfiguredTask):
# batch info if there were _no_ successful batches previously. This is
# because passing the batch info _forces_ the microbatch process into
# _incremental_ model, and it may be that we need to be in full refresh
# mode which is only handled if batch_info _isn't_ passed for a node
# mode which is only handled if previous_batch_results _isn't_ passed for a node
batch_map = {
result.unique_id: result.batch_results
for result in self.previous_results.results

View File

@@ -27,7 +27,7 @@ from dbt.clients.jinja import MacroGenerator
from dbt.config import RuntimeConfig
from dbt.context.providers import generate_runtime_model_context
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
from dbt.events.types import (
GenericExceptionOnRun,
LogHookEndLine,
@@ -283,7 +283,6 @@ class ModelRunner(CompileRunner):
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
@@ -328,9 +327,7 @@ class ModelRunner(CompileRunner):
hook_ctx = self.adapter.pre_model_hook(context_config)
return self._execute_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
class MicrobatchModelRunner(ModelRunner):
@@ -341,6 +338,33 @@ class MicrobatchModelRunner(ModelRunner):
self.batches: Dict[int, BatchType] = {}
self.relation_exists: bool = False
def compile(self, manifest: Manifest):
if self.batch_idx is not None:
batch = self.batches[self.batch_idx]
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
self.node.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
self.node,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], self.node.config.batch_size
),
)
# Skips compilation for non-batch runs
return self.node
def set_batch_idx(self, batch_idx: int) -> None:
self.batch_idx = batch_idx
@@ -353,7 +377,7 @@ class MicrobatchModelRunner(ModelRunner):
def describe_node(self) -> str:
return f"{self.node.language} microbatch model {self.get_node_representation()}"
def describe_batch(self, batch_start: Optional[datetime]) -> str:
def describe_batch(self, batch_start: datetime) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
@@ -445,8 +469,8 @@ class MicrobatchModelRunner(ModelRunner):
result.batch_results.failed = sorted(result.batch_results.failed)
# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
if self.node.batch_info is not None:
result.batch_results.successful += self.node.batch_info.successful
if self.node.previous_batch_results is not None:
result.batch_results.successful += self.node.previous_batch_results.successful
def _build_succesful_run_batch_result(
self,
@@ -495,7 +519,6 @@ class MicrobatchModelRunner(ModelRunner):
def _execute_microbatch_materialization(
self,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
@@ -508,15 +531,15 @@ class MicrobatchModelRunner(ModelRunner):
)
if self.batch_idx is None:
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
if model.previous_batch_results is None:
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batch_info.failed
batches = model.previous_batch_results.failed
# If there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
@@ -530,24 +553,11 @@ class MicrobatchModelRunner(ModelRunner):
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# Set start/end in context prior to re-compiling
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
# Update jinja context with batch context members
batch_context = microbatch_builder.build_batch_context(
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists
)
context.update(batch_context)
context.update(jinja_context)
# Materialize batch and cache any materialized relations
result = MacroGenerator(
@@ -630,36 +640,22 @@ class MicrobatchModelRunner(ModelRunner):
else:
return False
def _execute_microbatch_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
batch_result = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)
return batch_result
def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
try:
batch_result = self._execute_microbatch_materialization(
model, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)
return batch_result
class RunTask(CompileTask):
@@ -885,7 +881,7 @@ class RunTask(CompileTask):
if uid in self.batch_map:
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
if isinstance(node, ModelNode):
node.batch_info = self.batch_map[uid]
node.previous_batch_results = self.batch_map[uid]
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
@@ -922,7 +918,7 @@ class RunTask(CompileTask):
try:
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)
except (KeyboardInterrupt, SystemExit):
except (KeyboardInterrupt, SystemExit, DbtRuntimeError):
run_result = self.get_result(
results=self.node_results,
elapsed_time=time.time() - self.started_at,

View File

@@ -231,5 +231,5 @@ def _get_adapter_plugin_names() -> Iterator[str]:
yield plugin_name
__version__ = "1.9.0b4"
__version__ = "1.9.0rc2"
installed = get_installed_version()

View File

@@ -25,7 +25,7 @@ with open(os.path.join(this_directory, "README.md")) as f:
package_name = "dbt-core"
package_version = "1.9.0b4"
package_version = "1.9.0rc2"
description = """With dbt, data analysts and engineers can build analytics \
the way engineers build applications."""
@@ -51,7 +51,7 @@ setup(
# Pin to the patch or minor version, and bump in each new minor version of dbt-core.
"agate>=1.7.0,<1.10",
"Jinja2>=3.1.3,<4",
"mashumaro[msgpack]>=3.9,<4.0",
"mashumaro[msgpack]>=3.9,<3.15",
# ----
# dbt-core uses these packages in standard ways. Pin to the major version, and check compatibility
# with major versions in each new minor version of dbt-core.

View File

@@ -6754,6 +6754,17 @@
}
],
"default": null
},
"dbt_is_deleted": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false
@@ -16672,6 +16683,17 @@
}
],
"default": null
},
"dbt_is_deleted": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false

View File

@@ -2,7 +2,8 @@ import pytest
from dbt.artifacts.schemas.results import RunStatus
from dbt.contracts.graph.nodes import HookNode
from dbt.tests.util import get_artifact, run_dbt_and_capture
from dbt.tests.util import get_artifact, run_dbt, run_dbt_and_capture
from dbt_common.exceptions import CompilationError
class Test__StartHookFail__FlagIsNone__ModelFail:
@@ -242,3 +243,36 @@ class Test__HookContext__HookFail:
assert "Thread ID: main" in log_output
assert results[0].thread_id == "main"
assert "Num Results in context: 2" in log_output # failed hook and model
class Test__HookCompilationError:
@pytest.fixture(scope="class")
def models(self):
return {"my_model.sql": "select 1 as id"}
@pytest.fixture(scope="class")
def macros(self):
return {
"rce.sql": """
{% macro rce(relation) %}
{% if execute %}
{{ exceptions.raise_compiler_error("Always raise a compiler error in execute") }}
{% endif %}
{% endmacro %}
"""
}
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"on-run-end": ["{{ rce() }}"],
}
def test_results(self, project):
with pytest.raises(CompilationError, match="Always raise a compiler error in execute"):
run_dbt(["run"], expect_pass=False)
run_results = get_artifact(project.project_root, "target", "run_results.json")
assert [(result["unique_id"], result["status"]) for result in run_results["results"]] == [
("model.test.my_model", RunStatus.Success)
]

View File

@@ -112,6 +112,7 @@ def get_rendered_snapshot_config(**updates):
"dbt_valid_from": None,
"dbt_updated_at": None,
"dbt_scd_id": None,
"dbt_is_deleted": None,
},
"dbt_valid_to_current": None,
"tags": [],

View File

@@ -69,6 +69,7 @@ class TestList:
"dbt_updated_at": None,
"dbt_valid_from": None,
"dbt_valid_to": None,
"dbt_is_deleted": None,
},
"unique_key": "id",
"strategy": "timestamp",

View File

@@ -64,8 +64,8 @@ microbatch_yearly_model_downstream_sql = """
select * from {{ ref('microbatch_model') }}
"""
invalid_batch_context_macro_sql = """
{% macro check_invalid_batch_context() %}
invalid_batch_jinja_context_macro_sql = """
{% macro check_invalid_batch_jinja_context() %}
{% if model is not mapping %}
{{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }}
@@ -83,9 +83,9 @@ invalid_batch_context_macro_sql = """
"""
microbatch_model_with_context_checks_sql = """
{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ config(pre_hook="{{ check_invalid_batch_jinja_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ check_invalid_batch_context() }}
{{ check_invalid_batch_jinja_context() }}
select * from {{ ref('input_model') }}
"""
@@ -404,7 +404,7 @@ class TestMicrobatchJinjaContext(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def macros(self):
return {"check_batch_context.sql": invalid_batch_context_macro_sql}
return {"check_batch_jinja_context.sql": invalid_batch_jinja_context_macro_sql}
@pytest.fixture(scope="class")
def models(self):
@@ -498,6 +498,13 @@ microbatch_model_context_vars = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
{% if model.batch %}
{{ log("batch.event_time_start: "~ model.batch.event_time_start, info=True)}}
{{ log("batch.event_time_end: "~ model.batch.event_time_end, info=True)}}
{{ log("batch.id: "~ model.batch.id, info=True)}}
{{ log("start timezone: "~ model.batch.event_time_start.tzinfo, info=True)}}
{{ log("end timezone: "~ model.batch.event_time_end.tzinfo, info=True)}}
{% endif %}
select * from {{ ref('input_model') }}
"""
@@ -516,12 +523,23 @@ class TestMicrobatchJinjaContextVarsAvailable(BaseMicrobatchTest):
assert "start: 2020-01-01 00:00:00+00:00" in logs
assert "end: 2020-01-02 00:00:00+00:00" in logs
assert "batch.event_time_start: 2020-01-01 00:00:00+00:00" in logs
assert "batch.event_time_end: 2020-01-02 00:00:00+00:00" in logs
assert "batch.id: 20200101" in logs
assert "start timezone: UTC" in logs
assert "end timezone: UTC" in logs
assert "start: 2020-01-02 00:00:00+00:00" in logs
assert "end: 2020-01-03 00:00:00+00:00" in logs
assert "batch.event_time_start: 2020-01-02 00:00:00+00:00" in logs
assert "batch.event_time_end: 2020-01-03 00:00:00+00:00" in logs
assert "batch.id: 20200102" in logs
assert "start: 2020-01-03 00:00:00+00:00" in logs
assert "end: 2020-01-03 13:57:00+00:00" in logs
assert "batch.event_time_start: 2020-01-03 00:00:00+00:00" in logs
assert "batch.event_time_end: 2020-01-03 13:57:00+00:00" in logs
assert "batch.id: 20200103" in logs
microbatch_model_failing_incremental_partition_sql = """
@@ -675,16 +693,6 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
# Compiled paths - compiled model without filter only
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model.sql",
)
# Compiled paths - batch compilations
assert read_file(
project.project_root,

View File

@@ -96,7 +96,7 @@ REQUIRED_PARSED_NODE_KEYS = frozenset(
"deprecation_date",
"defer_relation",
"time_spine",
"batch_info",
"batch",
}
)

View File

@@ -489,11 +489,11 @@ class TestMicrobatchBuilder:
assert len(actual_batches) == len(expected_batches)
assert actual_batches == expected_batches
def test_build_batch_context_incremental_batch(self, microbatch_model):
def test_build_jinja_context_for_incremental_batch(self, microbatch_model):
microbatch_builder = MicrobatchBuilder(
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
)
context = microbatch_builder.build_batch_context(incremental_batch=True)
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=True)
assert context["model"] == microbatch_model.to_dict()
assert context["sql"] == microbatch_model.compiled_code
@@ -502,11 +502,11 @@ class TestMicrobatchBuilder:
assert context["is_incremental"]() is True
assert context["should_full_refresh"]() is False
def test_build_batch_context_incremental_batch_false(self, microbatch_model):
def test_build_jinja_context_for_incremental_batch_false(self, microbatch_model):
microbatch_builder = MicrobatchBuilder(
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
)
context = microbatch_builder.build_batch_context(incremental_batch=False)
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=False)
assert context["model"] == microbatch_model.to_dict()
assert context["sql"] == microbatch_model.compiled_code
@@ -605,7 +605,6 @@ class TestMicrobatchBuilder:
@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),