Compare commits

...

8 Commits

Author SHA1 Message Date
Gerda Shank
760211b47e Merge branch 'main' into generate_latest_versioned_model 2025-02-04 09:33:26 -05:00
Gerda Shank
ac5524b856 Fix test, add adapter.commit 2024-12-11 16:26:45 -05:00
Gerda Shank
7419a8adcf Working, but not detecting existence of view 2024-12-11 12:00:05 -05:00
Gerda Shank
4fa970d0d3 Add macro to test case 2024-12-11 11:01:53 -05:00
Gerda Shank
1fb00d6748 Stub in generate_latest method on ModelRunner 2024-12-10 17:27:41 -05:00
Gerda Shank
a6703120b3 Changie 2024-12-10 13:07:46 -05:00
Gerda Shank
805f9b8c67 Update v12.json schema 2024-12-10 13:05:59 -05:00
Gerda Shank
857e343059 add "generate_latest" config 2024-12-10 13:04:42 -05:00
9 changed files with 89 additions and 8 deletions

View File

@@ -0,0 +1,6 @@
kind: Features
body: Generate latest model version view
time: 2024-12-10T13:07:34.723167-05:00
custom:
Author: gshank
Issue: "7442"

View File

@@ -21,6 +21,7 @@ class ModelConfig(NodeConfig):
default=AccessType.Protected,
metadata=MergeBehavior.Clobber.meta(),
)
generate_latest: bool = False
@dataclass

View File

@@ -142,7 +142,6 @@ class ConfiguredTask(BaseTask):
def from_args(cls, args: Flags, *pargs, **kwargs):
move_to_nearest_project_dir(args.project_dir)
try:
# This is usually RuntimeConfig
config = RuntimeConfig.from_args(args)
except dbt.exceptions.DbtProjectError as exc:
fire_event(LogDbtProjectError(exc=str(exc)))
@@ -168,8 +167,10 @@ class ExecutionContext:
class BaseRunner(metaclass=ABCMeta):
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
self.config = config
def __init__(
self, config: RuntimeConfig, adapter, node, node_index: int, num_nodes: int
) -> None:
self.config: RuntimeConfig = config
self.compiler = Compiler(config)
self.adapter = adapter
self.node = node

View File

@@ -284,10 +284,11 @@ class ModelRunner(CompileRunner):
def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
context_config: Dict[str, Any],
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
manifest: Manifest,
) -> RunResult:
try:
result = MacroGenerator(
@@ -296,11 +297,29 @@ class ModelRunner(CompileRunner):
finally:
self.adapter.post_model_hook(context_config, hook_ctx)
if model.config.generate_latest and model.latest_version == model.version:
self.generate_latest(model, manifest)
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
return self._build_run_model_result(model, context)
def generate_latest(self, model: ModelNode, manifest):
model_context = generate_runtime_model_context(model, self.config, manifest)
macro = manifest.find_macro_by_name(
"create_latest_version_view", self.config.project_name, model.package_name
)
if macro:
self.adapter.execute_macro(
macro_name="create_latest_version_view",
macro_resolver=manifest,
context_override=model_context,
kwargs={},
)
def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)
@@ -330,7 +349,9 @@ class ModelRunner(CompileRunner):
hook_ctx = self.adapter.pre_model_hook(context_config)
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
return self._execute_model(
hook_ctx, context_config, model, context, materialization_macro, manifest
)
class MicrobatchModelRunner(ModelRunner):
@@ -687,10 +708,11 @@ class MicrobatchModelRunner(ModelRunner):
def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
context_config: Dict[str, Any],
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
manifest: Manifest,
) -> RunResult:
try:
batch_result = self._execute_microbatch_materialization(

View File

@@ -80,7 +80,7 @@ class GraphRunnableTask(ConfiguredTask):
def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None:
super().__init__(args, config, manifest)
self.config = config
self.config: RuntimeConfig = config
self._flattened_nodes: Optional[List[ResultNode]] = None
self._raise_next_tick: Optional[DbtRuntimeError] = None
self._skipped_children: Dict[str, Optional[RunResult]] = {}

View File

@@ -461,6 +461,7 @@ class TestProjInfo:
return get_adapter_by_type(self.adapter_type)
# Run sql from a path
# fetch = "one" or "all"
def run_sql_file(self, sql_path, fetch=None):
with open(sql_path, "r") as f:
statements = f.read().split(";")
@@ -468,6 +469,7 @@ class TestProjInfo:
self.run_sql(statement, fetch)
# Run sql from a string, using adapter saved at test startup
# fetch = "one" or "all"
def run_sql(self, sql, fetch=None):
return run_sql_with_adapter(self.adapter, sql, fetch=fetch)

View File

@@ -301,6 +301,7 @@ class TestProcessingException(Exception):
# adapter.config.credentials
# adapter.quote
# adapter.run_sql_for_tests
# fetch = "one", or "all"
def run_sql_with_adapter(adapter, sql, fetch=None):
if sql.strip() == "":
return

View File

@@ -3817,6 +3817,10 @@
"public"
],
"default": "protected"
},
"generate_latest": {
"type": "boolean",
"default": false
}
},
"additionalProperties": true
@@ -13786,6 +13790,10 @@
"public"
],
"default": "protected"
},
"generate_latest": {
"type": "boolean",
"default": false
}
},
"additionalProperties": true

View File

@@ -1,7 +1,14 @@
import pytest
from dbt.exceptions import ParsingError
from dbt.tests.util import get_manifest, rm_file, run_dbt, write_file
from dbt.tests.util import (
check_table_does_exist,
get_manifest,
get_relation_columns,
rm_file,
run_dbt,
write_file,
)
schema_yml = """
models:
@@ -38,6 +45,7 @@ models:
materialized: table
contract:
enforced: true
generate_latest: true
constraints:
- type: primary_key
columns: [id, user_name]
@@ -115,6 +123,27 @@ models:
"""
create_latest_version_view_sql = """
{% macro create_latest_version_view() %}
-- this hook will run only if the model is versioned, and only if it's the latest version
-- otherwise, it's a no-op
{% if model.get('version') and model.get('version') == model.get('latest_version') %}
{% set target_relation = this.incorporate(path={"identifier": model['name'] ~ '_latest'}) %}
{% set view_sql = get_replace_view_sql(target_relation, "select * from " ~ this) %}
{% call statement(name="main") %}
{{ view_sql }}
{% endcall %}
{{ adapter.commit() }}
{% endif %}
{% endmacro %}
"""
class TestVersionedModelConstraints:
@pytest.fixture(scope="class")
@@ -124,6 +153,12 @@ class TestVersionedModelConstraints:
"schema.yml": schema_yml,
}
@pytest.fixture(scope="class")
def macros(self):
return {
"create_latest_version_view.sql": create_latest_version_view_sql,
}
def test_versioned_model_constraints(self, project):
results = run_dbt(["run"])
assert len(results) == 1
@@ -142,6 +177,11 @@ class TestVersionedModelConstraints:
model_node = manifest.nodes["model.test.foo.v1"]
assert model_node.contract.enforced is True
assert len(model_node.constraints) == 1
assert model_node.config.generate_latest is True
check_table_does_exist(project.adapter, "foo_v1")
columns = get_relation_columns(project.adapter, "foo_latest")
assert columns == [("id", "integer", None), ("user_name", "text", None)]
check_table_does_exist(project.adapter, "foo_latest")
# test primary key defined across model and column level constraints, expect error