Microbatch strategy (#1179)

* first pass: add incremental_predicates
* safely add incremental_predicates + testing
* remove requirement for unique_id

---------

Co-authored-by: Quigley Malcolm <quigley.malcolm@dbtlabs.com>
This commit is contained in:
Michelle Ark
2024-09-18 13:29:42 -04:00
committed by GitHub
parent 49623d7309
commit 3cbe12f454
4 changed files with 63 additions and 1 deletions

View File

@@ -0,0 +1,6 @@
kind: Features
body: Microbatch incremental strategy
time: 2024-09-13T21:54:16.492885-04:00
custom:
Author: michelleark
Issue: "1182"

View File

@@ -416,7 +416,7 @@ CALL {proc_name}();
return response
def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]
def debug_query(self):
"""Override for DebugTask method"""

View File

@@ -48,3 +48,35 @@
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}
delete from {{ target }} DBT_INTERNAL_TARGET
using {{ source }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}

View File

@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)
# No requirement for a unique_id for snowflake microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""
class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql
@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"