mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-23 02:11:28 +00:00
Compare commits
10 Commits
enable-pos
...
v1.9.0rc2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
333d600a95 | ||
|
|
61b6cb3864 | ||
|
|
6a36444dbb | ||
|
|
65f05e0bd2 | ||
|
|
aceae51ffb | ||
|
|
a84a787eeb | ||
|
|
3d70e1b06f | ||
|
|
55872497bd | ||
|
|
d305137226 | ||
|
|
bda92e7312 |
@@ -1,5 +1,5 @@
|
|||||||
[bumpversion]
|
[bumpversion]
|
||||||
current_version = 1.9.0b4
|
current_version = 1.9.0rc2
|
||||||
parse = (?P<major>[\d]+) # major version number
|
parse = (?P<major>[\d]+) # major version number
|
||||||
\.(?P<minor>[\d]+) # minor version number
|
\.(?P<minor>[\d]+) # minor version number
|
||||||
\.(?P<patch>[\d]+) # patch version number
|
\.(?P<patch>[\d]+) # patch version number
|
||||||
|
|||||||
35
.changes/1.9.0-rc1.md
Normal file
35
.changes/1.9.0-rc1.md
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
## dbt-core 1.9.0-rc1 - November 25, 2024
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
|
||||||
|
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
|
||||||
|
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
|
||||||
|
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
|
||||||
|
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||||
|
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||||
|
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
|
||||||
|
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
|
||||||
|
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
|
||||||
|
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
|
||||||
|
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
|
||||||
|
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
|
||||||
|
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
|
||||||
|
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
|
||||||
|
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
|
||||||
|
|
||||||
|
### Under the Hood
|
||||||
|
|
||||||
|
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
|
||||||
|
|
||||||
|
### Dependencies
|
||||||
|
|
||||||
|
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
|
||||||
|
|
||||||
|
### Contributors
|
||||||
|
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||||
|
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||||
5
.changes/1.9.0-rc2.md
Normal file
5
.changes/1.9.0-rc2.md
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
## dbt-core 1.9.0-rc2 - December 02, 2024
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))
|
||||||
6
.changes/1.9.0/Features-20241104-120053.yaml
Normal file
6
.changes/1.9.0/Features-20241104-120053.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
kind: Features
|
||||||
|
body: Add new hard_deletes="new_record" mode for snapshots.
|
||||||
|
time: 2024-11-04T12:00:53.95191-05:00
|
||||||
|
custom:
|
||||||
|
Author: peterallenwebb
|
||||||
|
Issue: "10235"
|
||||||
6
.changes/1.9.0/Features-20241121-125630.yaml
Normal file
6
.changes/1.9.0/Features-20241121-125630.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
kind: Features
|
||||||
|
body: Add `batch` context object to model jinja context
|
||||||
|
time: 2024-11-21T12:56:30.715473-06:00
|
||||||
|
custom:
|
||||||
|
Author: QMalcolm
|
||||||
|
Issue: "11025"
|
||||||
6
.changes/1.9.0/Fixes-20241121-181739.yaml
Normal file
6
.changes/1.9.0/Fixes-20241121-181739.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
kind: Fixes
|
||||||
|
body: Catch DbtRuntimeError for hooks
|
||||||
|
time: 2024-11-21T18:17:39.753235Z
|
||||||
|
custom:
|
||||||
|
Author: aranke
|
||||||
|
Issue: "11012"
|
||||||
45
CHANGELOG.md
45
CHANGELOG.md
@@ -5,6 +5,50 @@
|
|||||||
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
|
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
|
||||||
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)
|
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)
|
||||||
|
|
||||||
|
## dbt-core 1.9.0-rc2 - December 02, 2024
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Add `batch` context object to model jinja context ([#11025](https://github.com/dbt-labs/dbt-core/issues/11025))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## dbt-core 1.9.0-rc1 - November 25, 2024
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Parseable JSON and text output in quiet mode for `dbt show` and `dbt compile` ([#9840](https://github.com/dbt-labs/dbt-core/issues/9840))
|
||||||
|
- Change gating of microbatch feature to be behind project flag / behavior flag ([#10798](https://github.com/dbt-labs/dbt-core/issues/10798))
|
||||||
|
- Add new hard_deletes="new_record" mode for snapshots. ([#10235](https://github.com/dbt-labs/dbt-core/issues/10235))
|
||||||
|
- Allow microbatch batches to run in parallel ([#10853](https://github.com/dbt-labs/dbt-core/issues/10853), [#10855](https://github.com/dbt-labs/dbt-core/issues/10855))
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
- override materialization python models ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520))
|
||||||
|
- Support disabling unit tests via config. ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||||
|
- unit tests with versioned refs ([#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||||
|
- Fix 'no attribute .config' error when ref-ing a microbatch model from non-Model context ([#10928](https://github.com/dbt-labs/dbt-core/issues/10928))
|
||||||
|
- Ensure inferred primary_key is a List[str] with no null values ([#10983](https://github.com/dbt-labs/dbt-core/issues/10983))
|
||||||
|
- Correct when custom microbatch macro deprecation warning is fired ([#10994](https://github.com/dbt-labs/dbt-core/issues/10994))
|
||||||
|
- Validate manifest has group_map during group_lookup init ([#10988](https://github.com/dbt-labs/dbt-core/issues/10988))
|
||||||
|
- Fix plural of 'partial success' in log message ([#10999](https://github.com/dbt-labs/dbt-core/issues/10999))
|
||||||
|
- Emit batch-level exception with node_info on microbatch batch run failure ([#10840](https://github.com/dbt-labs/dbt-core/issues/10840))
|
||||||
|
- Fix restrict-access to not apply within a package ([#10134](https://github.com/dbt-labs/dbt-core/issues/10134))
|
||||||
|
- Make microbatch models skippable ([#11021](https://github.com/dbt-labs/dbt-core/issues/11021))
|
||||||
|
- Catch DbtRuntimeError for hooks ([#11012](https://github.com/dbt-labs/dbt-core/issues/11012))
|
||||||
|
|
||||||
|
### Under the Hood
|
||||||
|
|
||||||
|
- Upgrade protobuf ([#10658](https://github.com/dbt-labs/dbt-core/issues/10658))
|
||||||
|
|
||||||
|
### Dependencies
|
||||||
|
|
||||||
|
- Bump minimum dbt-adapters version to 1.9.0 ([#10996](https://github.com/dbt-labs/dbt-core/issues/10996))
|
||||||
|
|
||||||
|
### Contributors
|
||||||
|
- [@devmessias](https://github.com/devmessias) ([#8520](https://github.com/dbt-labs/dbt-core/issues/8520), [#10880](https://github.com/dbt-labs/dbt-core/issues/10880), [#10528](https://github.com/dbt-labs/dbt-core/issues/10528), [#10623](https://github.com/dbt-labs/dbt-core/issues/10623))
|
||||||
|
- [@tsturge](https://github.com/tsturge) ([#9109](https://github.com/dbt-labs/dbt-core/issues/9109), [#10540](https://github.com/dbt-labs/dbt-core/issues/10540))
|
||||||
|
|
||||||
## dbt-core 1.9.0-b4 - November 06, 2024
|
## dbt-core 1.9.0-b4 - November 06, 2024
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
@@ -25,7 +69,6 @@
|
|||||||
### Contributors
|
### Contributors
|
||||||
- [@DevonFulcher](https://github.com/DevonFulcher) ([#10959](https://github.com/dbt-labs/dbt-core/issues/10959), [#10960](https://github.com/dbt-labs/dbt-core/issues/10960))
|
- [@DevonFulcher](https://github.com/DevonFulcher) ([#10959](https://github.com/dbt-labs/dbt-core/issues/10959), [#10960](https://github.com/dbt-labs/dbt-core/issues/10960))
|
||||||
|
|
||||||
|
|
||||||
## dbt-core 1.9.0-b3 - October 30, 2024
|
## dbt-core 1.9.0-b3 - October 30, 2024
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
|
|||||||
dbt_valid_from: Optional[str] = None
|
dbt_valid_from: Optional[str] = None
|
||||||
dbt_scd_id: Optional[str] = None
|
dbt_scd_id: Optional[str] = None
|
||||||
dbt_updated_at: Optional[str] = None
|
dbt_updated_at: Optional[str] = None
|
||||||
|
dbt_is_deleted: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -37,6 +38,7 @@ class SnapshotConfig(NodeConfig):
|
|||||||
"dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to",
|
"dbt_valid_to": self.snapshot_meta_column_names.dbt_valid_to or "dbt_valid_to",
|
||||||
"dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id",
|
"dbt_scd_id": self.snapshot_meta_column_names.dbt_scd_id or "dbt_scd_id",
|
||||||
"dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at",
|
"dbt_updated_at": self.snapshot_meta_column_names.dbt_updated_at or "dbt_updated_at",
|
||||||
|
"dbt_is_deleted": self.snapshot_meta_column_names.dbt_is_deleted or "dbt_is_deleted",
|
||||||
}
|
}
|
||||||
|
|
||||||
def final_validate(self):
|
def final_validate(self):
|
||||||
|
|||||||
@@ -244,9 +244,10 @@ class BaseResolver(metaclass=abc.ABCMeta):
|
|||||||
and self.model.config.materialized == "incremental"
|
and self.model.config.materialized == "incremental"
|
||||||
and self.model.config.incremental_strategy == "microbatch"
|
and self.model.config.incremental_strategy == "microbatch"
|
||||||
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
|
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
|
||||||
|
and self.model.batch is not None
|
||||||
):
|
):
|
||||||
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
|
start = self.model.batch.event_time_start
|
||||||
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
|
end = self.model.batch.event_time_end
|
||||||
|
|
||||||
if start is not None or end is not None:
|
if start is not None or end is not None:
|
||||||
event_time_filter = EventTimeFilter(
|
event_time_filter = EventTimeFilter(
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ from dbt_common.contracts.constraints import (
|
|||||||
ConstraintType,
|
ConstraintType,
|
||||||
ModelLevelConstraint,
|
ModelLevelConstraint,
|
||||||
)
|
)
|
||||||
|
from dbt_common.dataclass_schema import dbtClassMixin
|
||||||
from dbt_common.events.contextvars import set_log_contextvars
|
from dbt_common.events.contextvars import set_log_contextvars
|
||||||
from dbt_common.events.functions import warn_or_error
|
from dbt_common.events.functions import warn_or_error
|
||||||
|
|
||||||
@@ -442,15 +443,38 @@ class HookNode(HookNodeResource, CompiledNode):
|
|||||||
return HookNodeResource
|
return HookNodeResource
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BatchContext(dbtClassMixin):
|
||||||
|
id: str
|
||||||
|
event_time_start: datetime
|
||||||
|
event_time_end: datetime
|
||||||
|
|
||||||
|
def __post_serialize__(self, data, context):
|
||||||
|
# This is insane, but necessary, I apologize. Mashumaro handles the
|
||||||
|
# dictification of this class via a compile time generated `to_dict`
|
||||||
|
# method based off of the _typing_ of th class. By default `datetime`
|
||||||
|
# types are converted to strings. We don't want that, we want them to
|
||||||
|
# stay datetimes.
|
||||||
|
# Note: This is safe because the `BatchContext` isn't part of the artifact
|
||||||
|
# and thus doesn't get written out.
|
||||||
|
new_data = super().__post_serialize__(data, context)
|
||||||
|
new_data["event_time_start"] = self.event_time_start
|
||||||
|
new_data["event_time_end"] = self.event_time_end
|
||||||
|
return new_data
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ModelNode(ModelResource, CompiledNode):
|
class ModelNode(ModelResource, CompiledNode):
|
||||||
batch_info: Optional[BatchResults] = None
|
previous_batch_results: Optional[BatchResults] = None
|
||||||
|
batch: Optional[BatchContext] = None
|
||||||
_has_this: Optional[bool] = None
|
_has_this: Optional[bool] = None
|
||||||
|
|
||||||
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
|
def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
|
||||||
dct = super().__post_serialize__(dct, context)
|
dct = super().__post_serialize__(dct, context)
|
||||||
if "_has_this" in dct:
|
if "_has_this" in dct:
|
||||||
del dct["_has_this"]
|
del dct["_has_this"]
|
||||||
|
if "previous_batch_results" in dct:
|
||||||
|
del dct["previous_batch_results"]
|
||||||
return dct
|
return dct
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -100,25 +100,25 @@ class MicrobatchBuilder:
|
|||||||
|
|
||||||
return batches
|
return batches
|
||||||
|
|
||||||
def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
|
def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Create context with entries that reflect microbatch model + incremental execution state
|
Create context with entries that reflect microbatch model + incremental execution state
|
||||||
|
|
||||||
Assumes self.model has been (re)-compiled with necessary batch filters applied.
|
Assumes self.model has been (re)-compiled with necessary batch filters applied.
|
||||||
"""
|
"""
|
||||||
batch_context: Dict[str, Any] = {}
|
jinja_context: Dict[str, Any] = {}
|
||||||
|
|
||||||
# Microbatch model properties
|
# Microbatch model properties
|
||||||
batch_context["model"] = self.model.to_dict()
|
jinja_context["model"] = self.model.to_dict()
|
||||||
batch_context["sql"] = self.model.compiled_code
|
jinja_context["sql"] = self.model.compiled_code
|
||||||
batch_context["compiled_code"] = self.model.compiled_code
|
jinja_context["compiled_code"] = self.model.compiled_code
|
||||||
|
|
||||||
# Add incremental context variables for batches running incrementally
|
# Add incremental context variables for batches running incrementally
|
||||||
if incremental_batch:
|
if incremental_batch:
|
||||||
batch_context["is_incremental"] = lambda: True
|
jinja_context["is_incremental"] = lambda: True
|
||||||
batch_context["should_full_refresh"] = lambda: False
|
jinja_context["should_full_refresh"] = lambda: False
|
||||||
|
|
||||||
return batch_context
|
return jinja_context
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
|
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
|
||||||
@@ -193,12 +193,11 @@ class MicrobatchBuilder:
|
|||||||
return truncated
|
return truncated
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format_batch_start(
|
def batch_id(start_time: datetime, batch_size: BatchSize) -> str:
|
||||||
batch_start: Optional[datetime], batch_size: BatchSize
|
return MicrobatchBuilder.format_batch_start(start_time, batch_size).replace("-", "")
|
||||||
) -> Optional[str]:
|
|
||||||
if batch_start is None:
|
|
||||||
return batch_start
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def format_batch_start(batch_start: datetime, batch_size: BatchSize) -> str:
|
||||||
return str(
|
return str(
|
||||||
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
|
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -136,7 +136,7 @@ class RetryTask(ConfiguredTask):
|
|||||||
# batch info if there were _no_ successful batches previously. This is
|
# batch info if there were _no_ successful batches previously. This is
|
||||||
# because passing the batch info _forces_ the microbatch process into
|
# because passing the batch info _forces_ the microbatch process into
|
||||||
# _incremental_ model, and it may be that we need to be in full refresh
|
# _incremental_ model, and it may be that we need to be in full refresh
|
||||||
# mode which is only handled if batch_info _isn't_ passed for a node
|
# mode which is only handled if previous_batch_results _isn't_ passed for a node
|
||||||
batch_map = {
|
batch_map = {
|
||||||
result.unique_id: result.batch_results
|
result.unique_id: result.batch_results
|
||||||
for result in self.previous_results.results
|
for result in self.previous_results.results
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ from dbt.clients.jinja import MacroGenerator
|
|||||||
from dbt.config import RuntimeConfig
|
from dbt.config import RuntimeConfig
|
||||||
from dbt.context.providers import generate_runtime_model_context
|
from dbt.context.providers import generate_runtime_model_context
|
||||||
from dbt.contracts.graph.manifest import Manifest
|
from dbt.contracts.graph.manifest import Manifest
|
||||||
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
|
from dbt.contracts.graph.nodes import BatchContext, HookNode, ModelNode, ResultNode
|
||||||
from dbt.events.types import (
|
from dbt.events.types import (
|
||||||
GenericExceptionOnRun,
|
GenericExceptionOnRun,
|
||||||
LogHookEndLine,
|
LogHookEndLine,
|
||||||
@@ -283,7 +283,6 @@ class ModelRunner(CompileRunner):
|
|||||||
hook_ctx: Any,
|
hook_ctx: Any,
|
||||||
context_config: Any,
|
context_config: Any,
|
||||||
model: ModelNode,
|
model: ModelNode,
|
||||||
manifest: Manifest,
|
|
||||||
context: Dict[str, Any],
|
context: Dict[str, Any],
|
||||||
materialization_macro: MacroProtocol,
|
materialization_macro: MacroProtocol,
|
||||||
) -> RunResult:
|
) -> RunResult:
|
||||||
@@ -328,9 +327,7 @@ class ModelRunner(CompileRunner):
|
|||||||
|
|
||||||
hook_ctx = self.adapter.pre_model_hook(context_config)
|
hook_ctx = self.adapter.pre_model_hook(context_config)
|
||||||
|
|
||||||
return self._execute_model(
|
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
|
||||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class MicrobatchModelRunner(ModelRunner):
|
class MicrobatchModelRunner(ModelRunner):
|
||||||
@@ -341,6 +338,33 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
self.batches: Dict[int, BatchType] = {}
|
self.batches: Dict[int, BatchType] = {}
|
||||||
self.relation_exists: bool = False
|
self.relation_exists: bool = False
|
||||||
|
|
||||||
|
def compile(self, manifest: Manifest):
|
||||||
|
if self.batch_idx is not None:
|
||||||
|
batch = self.batches[self.batch_idx]
|
||||||
|
|
||||||
|
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
|
||||||
|
# TODO: REMOVE before 1.10 GA
|
||||||
|
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
||||||
|
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
||||||
|
# Create batch context on model node prior to re-compiling
|
||||||
|
self.node.batch = BatchContext(
|
||||||
|
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
|
||||||
|
event_time_start=batch[0],
|
||||||
|
event_time_end=batch[1],
|
||||||
|
)
|
||||||
|
# Recompile node to re-resolve refs with event time filters rendered, update context
|
||||||
|
self.compiler.compile_node(
|
||||||
|
self.node,
|
||||||
|
manifest,
|
||||||
|
{},
|
||||||
|
split_suffix=MicrobatchBuilder.format_batch_start(
|
||||||
|
batch[0], self.node.config.batch_size
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Skips compilation for non-batch runs
|
||||||
|
return self.node
|
||||||
|
|
||||||
def set_batch_idx(self, batch_idx: int) -> None:
|
def set_batch_idx(self, batch_idx: int) -> None:
|
||||||
self.batch_idx = batch_idx
|
self.batch_idx = batch_idx
|
||||||
|
|
||||||
@@ -353,7 +377,7 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
def describe_node(self) -> str:
|
def describe_node(self) -> str:
|
||||||
return f"{self.node.language} microbatch model {self.get_node_representation()}"
|
return f"{self.node.language} microbatch model {self.get_node_representation()}"
|
||||||
|
|
||||||
def describe_batch(self, batch_start: Optional[datetime]) -> str:
|
def describe_batch(self, batch_start: datetime) -> str:
|
||||||
# Only visualize date if batch_start year/month/day
|
# Only visualize date if batch_start year/month/day
|
||||||
formatted_batch_start = MicrobatchBuilder.format_batch_start(
|
formatted_batch_start = MicrobatchBuilder.format_batch_start(
|
||||||
batch_start, self.node.config.batch_size
|
batch_start, self.node.config.batch_size
|
||||||
@@ -445,8 +469,8 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
result.batch_results.failed = sorted(result.batch_results.failed)
|
result.batch_results.failed = sorted(result.batch_results.failed)
|
||||||
|
|
||||||
# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
|
# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
|
||||||
if self.node.batch_info is not None:
|
if self.node.previous_batch_results is not None:
|
||||||
result.batch_results.successful += self.node.batch_info.successful
|
result.batch_results.successful += self.node.previous_batch_results.successful
|
||||||
|
|
||||||
def _build_succesful_run_batch_result(
|
def _build_succesful_run_batch_result(
|
||||||
self,
|
self,
|
||||||
@@ -495,7 +519,6 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
def _execute_microbatch_materialization(
|
def _execute_microbatch_materialization(
|
||||||
self,
|
self,
|
||||||
model: ModelNode,
|
model: ModelNode,
|
||||||
manifest: Manifest,
|
|
||||||
context: Dict[str, Any],
|
context: Dict[str, Any],
|
||||||
materialization_macro: MacroProtocol,
|
materialization_macro: MacroProtocol,
|
||||||
) -> RunResult:
|
) -> RunResult:
|
||||||
@@ -508,15 +531,15 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if self.batch_idx is None:
|
if self.batch_idx is None:
|
||||||
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
|
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
|
||||||
# IFF `dbt retry` is being run and the microbatch model had batches which
|
# IFF `dbt retry` is being run and the microbatch model had batches which
|
||||||
# failed on the run of the model (which is being retried)
|
# failed on the run of the model (which is being retried)
|
||||||
if model.batch_info is None:
|
if model.previous_batch_results is None:
|
||||||
end = microbatch_builder.build_end_time()
|
end = microbatch_builder.build_end_time()
|
||||||
start = microbatch_builder.build_start_time(end)
|
start = microbatch_builder.build_start_time(end)
|
||||||
batches = microbatch_builder.build_batches(start, end)
|
batches = microbatch_builder.build_batches(start, end)
|
||||||
else:
|
else:
|
||||||
batches = model.batch_info.failed
|
batches = model.previous_batch_results.failed
|
||||||
# If there is batch info, then don't run as full_refresh and do force is_incremental
|
# If there is batch info, then don't run as full_refresh and do force is_incremental
|
||||||
# not doing this risks blowing away the work that has already been done
|
# not doing this risks blowing away the work that has already been done
|
||||||
if self._has_relation(model=model):
|
if self._has_relation(model=model):
|
||||||
@@ -530,24 +553,11 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
# call materialization_macro to get a batch-level run result
|
# call materialization_macro to get a batch-level run result
|
||||||
start_time = time.perf_counter()
|
start_time = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
# Set start/end in context prior to re-compiling
|
|
||||||
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
|
|
||||||
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
|
|
||||||
|
|
||||||
# Recompile node to re-resolve refs with event time filters rendered, update context
|
|
||||||
self.compiler.compile_node(
|
|
||||||
model,
|
|
||||||
manifest,
|
|
||||||
{},
|
|
||||||
split_suffix=MicrobatchBuilder.format_batch_start(
|
|
||||||
batch[0], model.config.batch_size
|
|
||||||
),
|
|
||||||
)
|
|
||||||
# Update jinja context with batch context members
|
# Update jinja context with batch context members
|
||||||
batch_context = microbatch_builder.build_batch_context(
|
jinja_context = microbatch_builder.build_jinja_context_for_batch(
|
||||||
incremental_batch=self.relation_exists
|
incremental_batch=self.relation_exists
|
||||||
)
|
)
|
||||||
context.update(batch_context)
|
context.update(jinja_context)
|
||||||
|
|
||||||
# Materialize batch and cache any materialized relations
|
# Materialize batch and cache any materialized relations
|
||||||
result = MacroGenerator(
|
result = MacroGenerator(
|
||||||
@@ -630,36 +640,22 @@ class MicrobatchModelRunner(ModelRunner):
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _execute_microbatch_model(
|
|
||||||
self,
|
|
||||||
hook_ctx: Any,
|
|
||||||
context_config: Any,
|
|
||||||
model: ModelNode,
|
|
||||||
manifest: Manifest,
|
|
||||||
context: Dict[str, Any],
|
|
||||||
materialization_macro: MacroProtocol,
|
|
||||||
) -> RunResult:
|
|
||||||
try:
|
|
||||||
batch_result = self._execute_microbatch_materialization(
|
|
||||||
model, manifest, context, materialization_macro
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
|
||||||
|
|
||||||
return batch_result
|
|
||||||
|
|
||||||
def _execute_model(
|
def _execute_model(
|
||||||
self,
|
self,
|
||||||
hook_ctx: Any,
|
hook_ctx: Any,
|
||||||
context_config: Any,
|
context_config: Any,
|
||||||
model: ModelNode,
|
model: ModelNode,
|
||||||
manifest: Manifest,
|
|
||||||
context: Dict[str, Any],
|
context: Dict[str, Any],
|
||||||
materialization_macro: MacroProtocol,
|
materialization_macro: MacroProtocol,
|
||||||
) -> RunResult:
|
) -> RunResult:
|
||||||
return self._execute_microbatch_model(
|
try:
|
||||||
hook_ctx, context_config, model, manifest, context, materialization_macro
|
batch_result = self._execute_microbatch_materialization(
|
||||||
|
model, context, materialization_macro
|
||||||
)
|
)
|
||||||
|
finally:
|
||||||
|
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||||
|
|
||||||
|
return batch_result
|
||||||
|
|
||||||
|
|
||||||
class RunTask(CompileTask):
|
class RunTask(CompileTask):
|
||||||
@@ -885,7 +881,7 @@ class RunTask(CompileTask):
|
|||||||
if uid in self.batch_map:
|
if uid in self.batch_map:
|
||||||
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
|
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
|
||||||
if isinstance(node, ModelNode):
|
if isinstance(node, ModelNode):
|
||||||
node.batch_info = self.batch_map[uid]
|
node.previous_batch_results = self.batch_map[uid]
|
||||||
|
|
||||||
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
|
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
|
||||||
with adapter.connection_named("master"):
|
with adapter.connection_named("master"):
|
||||||
@@ -922,7 +918,7 @@ class RunTask(CompileTask):
|
|||||||
try:
|
try:
|
||||||
with adapter.connection_named("master"):
|
with adapter.connection_named("master"):
|
||||||
self.safe_run_hooks(adapter, RunHookType.End, extras)
|
self.safe_run_hooks(adapter, RunHookType.End, extras)
|
||||||
except (KeyboardInterrupt, SystemExit):
|
except (KeyboardInterrupt, SystemExit, DbtRuntimeError):
|
||||||
run_result = self.get_result(
|
run_result = self.get_result(
|
||||||
results=self.node_results,
|
results=self.node_results,
|
||||||
elapsed_time=time.time() - self.started_at,
|
elapsed_time=time.time() - self.started_at,
|
||||||
|
|||||||
@@ -231,5 +231,5 @@ def _get_adapter_plugin_names() -> Iterator[str]:
|
|||||||
yield plugin_name
|
yield plugin_name
|
||||||
|
|
||||||
|
|
||||||
__version__ = "1.9.0b4"
|
__version__ = "1.9.0rc2"
|
||||||
installed = get_installed_version()
|
installed = get_installed_version()
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ with open(os.path.join(this_directory, "README.md")) as f:
|
|||||||
|
|
||||||
|
|
||||||
package_name = "dbt-core"
|
package_name = "dbt-core"
|
||||||
package_version = "1.9.0b4"
|
package_version = "1.9.0rc2"
|
||||||
description = """With dbt, data analysts and engineers can build analytics \
|
description = """With dbt, data analysts and engineers can build analytics \
|
||||||
the way engineers build applications."""
|
the way engineers build applications."""
|
||||||
|
|
||||||
@@ -51,7 +51,7 @@ setup(
|
|||||||
# Pin to the patch or minor version, and bump in each new minor version of dbt-core.
|
# Pin to the patch or minor version, and bump in each new minor version of dbt-core.
|
||||||
"agate>=1.7.0,<1.10",
|
"agate>=1.7.0,<1.10",
|
||||||
"Jinja2>=3.1.3,<4",
|
"Jinja2>=3.1.3,<4",
|
||||||
"mashumaro[msgpack]>=3.9,<4.0",
|
"mashumaro[msgpack]>=3.9,<3.15",
|
||||||
# ----
|
# ----
|
||||||
# dbt-core uses these packages in standard ways. Pin to the major version, and check compatibility
|
# dbt-core uses these packages in standard ways. Pin to the major version, and check compatibility
|
||||||
# with major versions in each new minor version of dbt-core.
|
# with major versions in each new minor version of dbt-core.
|
||||||
|
|||||||
@@ -6754,6 +6754,17 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"default": null
|
"default": null
|
||||||
|
},
|
||||||
|
"dbt_is_deleted": {
|
||||||
|
"anyOf": [
|
||||||
|
{
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "null"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"default": null
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": false
|
"additionalProperties": false
|
||||||
@@ -16672,6 +16683,17 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"default": null
|
"default": null
|
||||||
|
},
|
||||||
|
"dbt_is_deleted": {
|
||||||
|
"anyOf": [
|
||||||
|
{
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "null"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"default": null
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": false
|
"additionalProperties": false
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ import pytest
|
|||||||
|
|
||||||
from dbt.artifacts.schemas.results import RunStatus
|
from dbt.artifacts.schemas.results import RunStatus
|
||||||
from dbt.contracts.graph.nodes import HookNode
|
from dbt.contracts.graph.nodes import HookNode
|
||||||
from dbt.tests.util import get_artifact, run_dbt_and_capture
|
from dbt.tests.util import get_artifact, run_dbt, run_dbt_and_capture
|
||||||
|
from dbt_common.exceptions import CompilationError
|
||||||
|
|
||||||
|
|
||||||
class Test__StartHookFail__FlagIsNone__ModelFail:
|
class Test__StartHookFail__FlagIsNone__ModelFail:
|
||||||
@@ -242,3 +243,36 @@ class Test__HookContext__HookFail:
|
|||||||
assert "Thread ID: main" in log_output
|
assert "Thread ID: main" in log_output
|
||||||
assert results[0].thread_id == "main"
|
assert results[0].thread_id == "main"
|
||||||
assert "Num Results in context: 2" in log_output # failed hook and model
|
assert "Num Results in context: 2" in log_output # failed hook and model
|
||||||
|
|
||||||
|
|
||||||
|
class Test__HookCompilationError:
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def models(self):
|
||||||
|
return {"my_model.sql": "select 1 as id"}
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def macros(self):
|
||||||
|
return {
|
||||||
|
"rce.sql": """
|
||||||
|
{% macro rce(relation) %}
|
||||||
|
{% if execute %}
|
||||||
|
{{ exceptions.raise_compiler_error("Always raise a compiler error in execute") }}
|
||||||
|
{% endif %}
|
||||||
|
{% endmacro %}
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self):
|
||||||
|
return {
|
||||||
|
"on-run-end": ["{{ rce() }}"],
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_results(self, project):
|
||||||
|
with pytest.raises(CompilationError, match="Always raise a compiler error in execute"):
|
||||||
|
run_dbt(["run"], expect_pass=False)
|
||||||
|
|
||||||
|
run_results = get_artifact(project.project_root, "target", "run_results.json")
|
||||||
|
assert [(result["unique_id"], result["status"]) for result in run_results["results"]] == [
|
||||||
|
("model.test.my_model", RunStatus.Success)
|
||||||
|
]
|
||||||
|
|||||||
@@ -112,6 +112,7 @@ def get_rendered_snapshot_config(**updates):
|
|||||||
"dbt_valid_from": None,
|
"dbt_valid_from": None,
|
||||||
"dbt_updated_at": None,
|
"dbt_updated_at": None,
|
||||||
"dbt_scd_id": None,
|
"dbt_scd_id": None,
|
||||||
|
"dbt_is_deleted": None,
|
||||||
},
|
},
|
||||||
"dbt_valid_to_current": None,
|
"dbt_valid_to_current": None,
|
||||||
"tags": [],
|
"tags": [],
|
||||||
|
|||||||
@@ -69,6 +69,7 @@ class TestList:
|
|||||||
"dbt_updated_at": None,
|
"dbt_updated_at": None,
|
||||||
"dbt_valid_from": None,
|
"dbt_valid_from": None,
|
||||||
"dbt_valid_to": None,
|
"dbt_valid_to": None,
|
||||||
|
"dbt_is_deleted": None,
|
||||||
},
|
},
|
||||||
"unique_key": "id",
|
"unique_key": "id",
|
||||||
"strategy": "timestamp",
|
"strategy": "timestamp",
|
||||||
|
|||||||
@@ -64,8 +64,8 @@ microbatch_yearly_model_downstream_sql = """
|
|||||||
select * from {{ ref('microbatch_model') }}
|
select * from {{ ref('microbatch_model') }}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
invalid_batch_context_macro_sql = """
|
invalid_batch_jinja_context_macro_sql = """
|
||||||
{% macro check_invalid_batch_context() %}
|
{% macro check_invalid_batch_jinja_context() %}
|
||||||
|
|
||||||
{% if model is not mapping %}
|
{% if model is not mapping %}
|
||||||
{{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }}
|
{{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }}
|
||||||
@@ -83,9 +83,9 @@ invalid_batch_context_macro_sql = """
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
microbatch_model_with_context_checks_sql = """
|
microbatch_model_with_context_checks_sql = """
|
||||||
{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
{{ config(pre_hook="{{ check_invalid_batch_jinja_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||||
|
|
||||||
{{ check_invalid_batch_context() }}
|
{{ check_invalid_batch_jinja_context() }}
|
||||||
select * from {{ ref('input_model') }}
|
select * from {{ ref('input_model') }}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -404,7 +404,7 @@ class TestMicrobatchJinjaContext(BaseMicrobatchTest):
|
|||||||
|
|
||||||
@pytest.fixture(scope="class")
|
@pytest.fixture(scope="class")
|
||||||
def macros(self):
|
def macros(self):
|
||||||
return {"check_batch_context.sql": invalid_batch_context_macro_sql}
|
return {"check_batch_jinja_context.sql": invalid_batch_jinja_context_macro_sql}
|
||||||
|
|
||||||
@pytest.fixture(scope="class")
|
@pytest.fixture(scope="class")
|
||||||
def models(self):
|
def models(self):
|
||||||
@@ -498,6 +498,13 @@ microbatch_model_context_vars = """
|
|||||||
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
|
||||||
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
|
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
|
||||||
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
|
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
|
||||||
|
{% if model.batch %}
|
||||||
|
{{ log("batch.event_time_start: "~ model.batch.event_time_start, info=True)}}
|
||||||
|
{{ log("batch.event_time_end: "~ model.batch.event_time_end, info=True)}}
|
||||||
|
{{ log("batch.id: "~ model.batch.id, info=True)}}
|
||||||
|
{{ log("start timezone: "~ model.batch.event_time_start.tzinfo, info=True)}}
|
||||||
|
{{ log("end timezone: "~ model.batch.event_time_end.tzinfo, info=True)}}
|
||||||
|
{% endif %}
|
||||||
select * from {{ ref('input_model') }}
|
select * from {{ ref('input_model') }}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -516,12 +523,23 @@ class TestMicrobatchJinjaContextVarsAvailable(BaseMicrobatchTest):
|
|||||||
|
|
||||||
assert "start: 2020-01-01 00:00:00+00:00" in logs
|
assert "start: 2020-01-01 00:00:00+00:00" in logs
|
||||||
assert "end: 2020-01-02 00:00:00+00:00" in logs
|
assert "end: 2020-01-02 00:00:00+00:00" in logs
|
||||||
|
assert "batch.event_time_start: 2020-01-01 00:00:00+00:00" in logs
|
||||||
|
assert "batch.event_time_end: 2020-01-02 00:00:00+00:00" in logs
|
||||||
|
assert "batch.id: 20200101" in logs
|
||||||
|
assert "start timezone: UTC" in logs
|
||||||
|
assert "end timezone: UTC" in logs
|
||||||
|
|
||||||
assert "start: 2020-01-02 00:00:00+00:00" in logs
|
assert "start: 2020-01-02 00:00:00+00:00" in logs
|
||||||
assert "end: 2020-01-03 00:00:00+00:00" in logs
|
assert "end: 2020-01-03 00:00:00+00:00" in logs
|
||||||
|
assert "batch.event_time_start: 2020-01-02 00:00:00+00:00" in logs
|
||||||
|
assert "batch.event_time_end: 2020-01-03 00:00:00+00:00" in logs
|
||||||
|
assert "batch.id: 20200102" in logs
|
||||||
|
|
||||||
assert "start: 2020-01-03 00:00:00+00:00" in logs
|
assert "start: 2020-01-03 00:00:00+00:00" in logs
|
||||||
assert "end: 2020-01-03 13:57:00+00:00" in logs
|
assert "end: 2020-01-03 13:57:00+00:00" in logs
|
||||||
|
assert "batch.event_time_start: 2020-01-03 00:00:00+00:00" in logs
|
||||||
|
assert "batch.event_time_end: 2020-01-03 13:57:00+00:00" in logs
|
||||||
|
assert "batch.id: 20200103" in logs
|
||||||
|
|
||||||
|
|
||||||
microbatch_model_failing_incremental_partition_sql = """
|
microbatch_model_failing_incremental_partition_sql = """
|
||||||
@@ -675,16 +693,6 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
|
|||||||
with patch_microbatch_end_time("2020-01-03 13:57:00"):
|
with patch_microbatch_end_time("2020-01-03 13:57:00"):
|
||||||
run_dbt(["run"])
|
run_dbt(["run"])
|
||||||
|
|
||||||
# Compiled paths - compiled model without filter only
|
|
||||||
assert read_file(
|
|
||||||
project.project_root,
|
|
||||||
"target",
|
|
||||||
"compiled",
|
|
||||||
"test",
|
|
||||||
"models",
|
|
||||||
"microbatch_model.sql",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Compiled paths - batch compilations
|
# Compiled paths - batch compilations
|
||||||
assert read_file(
|
assert read_file(
|
||||||
project.project_root,
|
project.project_root,
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ REQUIRED_PARSED_NODE_KEYS = frozenset(
|
|||||||
"deprecation_date",
|
"deprecation_date",
|
||||||
"defer_relation",
|
"defer_relation",
|
||||||
"time_spine",
|
"time_spine",
|
||||||
"batch_info",
|
"batch",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -489,11 +489,11 @@ class TestMicrobatchBuilder:
|
|||||||
assert len(actual_batches) == len(expected_batches)
|
assert len(actual_batches) == len(expected_batches)
|
||||||
assert actual_batches == expected_batches
|
assert actual_batches == expected_batches
|
||||||
|
|
||||||
def test_build_batch_context_incremental_batch(self, microbatch_model):
|
def test_build_jinja_context_for_incremental_batch(self, microbatch_model):
|
||||||
microbatch_builder = MicrobatchBuilder(
|
microbatch_builder = MicrobatchBuilder(
|
||||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||||
)
|
)
|
||||||
context = microbatch_builder.build_batch_context(incremental_batch=True)
|
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=True)
|
||||||
|
|
||||||
assert context["model"] == microbatch_model.to_dict()
|
assert context["model"] == microbatch_model.to_dict()
|
||||||
assert context["sql"] == microbatch_model.compiled_code
|
assert context["sql"] == microbatch_model.compiled_code
|
||||||
@@ -502,11 +502,11 @@ class TestMicrobatchBuilder:
|
|||||||
assert context["is_incremental"]() is True
|
assert context["is_incremental"]() is True
|
||||||
assert context["should_full_refresh"]() is False
|
assert context["should_full_refresh"]() is False
|
||||||
|
|
||||||
def test_build_batch_context_incremental_batch_false(self, microbatch_model):
|
def test_build_jinja_context_for_incremental_batch_false(self, microbatch_model):
|
||||||
microbatch_builder = MicrobatchBuilder(
|
microbatch_builder = MicrobatchBuilder(
|
||||||
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None
|
||||||
)
|
)
|
||||||
context = microbatch_builder.build_batch_context(incremental_batch=False)
|
context = microbatch_builder.build_jinja_context_for_batch(incremental_batch=False)
|
||||||
|
|
||||||
assert context["model"] == microbatch_model.to_dict()
|
assert context["model"] == microbatch_model.to_dict()
|
||||||
assert context["sql"] == microbatch_model.compiled_code
|
assert context["sql"] == microbatch_model.compiled_code
|
||||||
@@ -605,7 +605,6 @@ class TestMicrobatchBuilder:
|
|||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"batch_size,batch_start,expected_formatted_batch_start",
|
"batch_size,batch_start,expected_formatted_batch_start",
|
||||||
[
|
[
|
||||||
(None, None, None),
|
|
||||||
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
|
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||||
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
|
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||||
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
|
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
|
||||||
|
|||||||
Reference in New Issue
Block a user