mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 23:01:27 +00:00
Compare commits
8 Commits
enable-pos
...
generate_l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
760211b47e | ||
|
|
ac5524b856 | ||
|
|
7419a8adcf | ||
|
|
4fa970d0d3 | ||
|
|
1fb00d6748 | ||
|
|
a6703120b3 | ||
|
|
805f9b8c67 | ||
|
|
857e343059 |
6
.changes/unreleased/Features-20241210-130734.yaml
Normal file
6
.changes/unreleased/Features-20241210-130734.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Features
|
||||
body: Generate latest model version view
|
||||
time: 2024-12-10T13:07:34.723167-05:00
|
||||
custom:
|
||||
Author: gshank
|
||||
Issue: "7442"
|
||||
@@ -21,6 +21,7 @@ class ModelConfig(NodeConfig):
|
||||
default=AccessType.Protected,
|
||||
metadata=MergeBehavior.Clobber.meta(),
|
||||
)
|
||||
generate_latest: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -142,7 +142,6 @@ class ConfiguredTask(BaseTask):
|
||||
def from_args(cls, args: Flags, *pargs, **kwargs):
|
||||
move_to_nearest_project_dir(args.project_dir)
|
||||
try:
|
||||
# This is usually RuntimeConfig
|
||||
config = RuntimeConfig.from_args(args)
|
||||
except dbt.exceptions.DbtProjectError as exc:
|
||||
fire_event(LogDbtProjectError(exc=str(exc)))
|
||||
@@ -168,8 +167,10 @@ class ExecutionContext:
|
||||
|
||||
|
||||
class BaseRunner(metaclass=ABCMeta):
|
||||
def __init__(self, config, adapter, node, node_index: int, num_nodes: int) -> None:
|
||||
self.config = config
|
||||
def __init__(
|
||||
self, config: RuntimeConfig, adapter, node, node_index: int, num_nodes: int
|
||||
) -> None:
|
||||
self.config: RuntimeConfig = config
|
||||
self.compiler = Compiler(config)
|
||||
self.adapter = adapter
|
||||
self.node = node
|
||||
|
||||
@@ -284,10 +284,11 @@ class ModelRunner(CompileRunner):
|
||||
def _execute_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
context_config: Dict[str, Any],
|
||||
model: ModelNode,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
manifest: Manifest,
|
||||
) -> RunResult:
|
||||
try:
|
||||
result = MacroGenerator(
|
||||
@@ -296,11 +297,29 @@ class ModelRunner(CompileRunner):
|
||||
finally:
|
||||
self.adapter.post_model_hook(context_config, hook_ctx)
|
||||
|
||||
if model.config.generate_latest and model.latest_version == model.version:
|
||||
self.generate_latest(model, manifest)
|
||||
|
||||
for relation in self._materialization_relations(result, model):
|
||||
self.adapter.cache_added(relation.incorporate(dbt_created=True))
|
||||
|
||||
return self._build_run_model_result(model, context)
|
||||
|
||||
def generate_latest(self, model: ModelNode, manifest):
|
||||
model_context = generate_runtime_model_context(model, self.config, manifest)
|
||||
|
||||
macro = manifest.find_macro_by_name(
|
||||
"create_latest_version_view", self.config.project_name, model.package_name
|
||||
)
|
||||
|
||||
if macro:
|
||||
self.adapter.execute_macro(
|
||||
macro_name="create_latest_version_view",
|
||||
macro_resolver=manifest,
|
||||
context_override=model_context,
|
||||
kwargs={},
|
||||
)
|
||||
|
||||
def execute(self, model, manifest):
|
||||
context = generate_runtime_model_context(model, self.config, manifest)
|
||||
|
||||
@@ -330,7 +349,9 @@ class ModelRunner(CompileRunner):
|
||||
|
||||
hook_ctx = self.adapter.pre_model_hook(context_config)
|
||||
|
||||
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
|
||||
return self._execute_model(
|
||||
hook_ctx, context_config, model, context, materialization_macro, manifest
|
||||
)
|
||||
|
||||
|
||||
class MicrobatchModelRunner(ModelRunner):
|
||||
@@ -687,10 +708,11 @@ class MicrobatchModelRunner(ModelRunner):
|
||||
def _execute_model(
|
||||
self,
|
||||
hook_ctx: Any,
|
||||
context_config: Any,
|
||||
context_config: Dict[str, Any],
|
||||
model: ModelNode,
|
||||
context: Dict[str, Any],
|
||||
materialization_macro: MacroProtocol,
|
||||
manifest: Manifest,
|
||||
) -> RunResult:
|
||||
try:
|
||||
batch_result = self._execute_microbatch_materialization(
|
||||
|
||||
@@ -80,7 +80,7 @@ class GraphRunnableTask(ConfiguredTask):
|
||||
|
||||
def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None:
|
||||
super().__init__(args, config, manifest)
|
||||
self.config = config
|
||||
self.config: RuntimeConfig = config
|
||||
self._flattened_nodes: Optional[List[ResultNode]] = None
|
||||
self._raise_next_tick: Optional[DbtRuntimeError] = None
|
||||
self._skipped_children: Dict[str, Optional[RunResult]] = {}
|
||||
|
||||
2
core/dbt/tests/fixtures/project.py
vendored
2
core/dbt/tests/fixtures/project.py
vendored
@@ -461,6 +461,7 @@ class TestProjInfo:
|
||||
return get_adapter_by_type(self.adapter_type)
|
||||
|
||||
# Run sql from a path
|
||||
# fetch = "one" or "all"
|
||||
def run_sql_file(self, sql_path, fetch=None):
|
||||
with open(sql_path, "r") as f:
|
||||
statements = f.read().split(";")
|
||||
@@ -468,6 +469,7 @@ class TestProjInfo:
|
||||
self.run_sql(statement, fetch)
|
||||
|
||||
# Run sql from a string, using adapter saved at test startup
|
||||
# fetch = "one" or "all"
|
||||
def run_sql(self, sql, fetch=None):
|
||||
return run_sql_with_adapter(self.adapter, sql, fetch=fetch)
|
||||
|
||||
|
||||
@@ -301,6 +301,7 @@ class TestProcessingException(Exception):
|
||||
# adapter.config.credentials
|
||||
# adapter.quote
|
||||
# adapter.run_sql_for_tests
|
||||
# fetch = "one", or "all"
|
||||
def run_sql_with_adapter(adapter, sql, fetch=None):
|
||||
if sql.strip() == "":
|
||||
return
|
||||
|
||||
@@ -3817,6 +3817,10 @@
|
||||
"public"
|
||||
],
|
||||
"default": "protected"
|
||||
},
|
||||
"generate_latest": {
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
}
|
||||
},
|
||||
"additionalProperties": true
|
||||
@@ -13786,6 +13790,10 @@
|
||||
"public"
|
||||
],
|
||||
"default": "protected"
|
||||
},
|
||||
"generate_latest": {
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
}
|
||||
},
|
||||
"additionalProperties": true
|
||||
|
||||
@@ -1,7 +1,14 @@
|
||||
import pytest
|
||||
|
||||
from dbt.exceptions import ParsingError
|
||||
from dbt.tests.util import get_manifest, rm_file, run_dbt, write_file
|
||||
from dbt.tests.util import (
|
||||
check_table_does_exist,
|
||||
get_manifest,
|
||||
get_relation_columns,
|
||||
rm_file,
|
||||
run_dbt,
|
||||
write_file,
|
||||
)
|
||||
|
||||
schema_yml = """
|
||||
models:
|
||||
@@ -38,6 +45,7 @@ models:
|
||||
materialized: table
|
||||
contract:
|
||||
enforced: true
|
||||
generate_latest: true
|
||||
constraints:
|
||||
- type: primary_key
|
||||
columns: [id, user_name]
|
||||
@@ -115,6 +123,27 @@ models:
|
||||
|
||||
"""
|
||||
|
||||
create_latest_version_view_sql = """
|
||||
{% macro create_latest_version_view() %}
|
||||
|
||||
-- this hook will run only if the model is versioned, and only if it's the latest version
|
||||
-- otherwise, it's a no-op
|
||||
{% if model.get('version') and model.get('version') == model.get('latest_version') %}
|
||||
|
||||
{% set target_relation = this.incorporate(path={"identifier": model['name'] ~ '_latest'}) %}
|
||||
|
||||
{% set view_sql = get_replace_view_sql(target_relation, "select * from " ~ this) %}
|
||||
{% call statement(name="main") %}
|
||||
{{ view_sql }}
|
||||
{% endcall %}
|
||||
|
||||
{{ adapter.commit() }}
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% endmacro %}
|
||||
"""
|
||||
|
||||
|
||||
class TestVersionedModelConstraints:
|
||||
@pytest.fixture(scope="class")
|
||||
@@ -124,6 +153,12 @@ class TestVersionedModelConstraints:
|
||||
"schema.yml": schema_yml,
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def macros(self):
|
||||
return {
|
||||
"create_latest_version_view.sql": create_latest_version_view_sql,
|
||||
}
|
||||
|
||||
def test_versioned_model_constraints(self, project):
|
||||
results = run_dbt(["run"])
|
||||
assert len(results) == 1
|
||||
@@ -142,6 +177,11 @@ class TestVersionedModelConstraints:
|
||||
model_node = manifest.nodes["model.test.foo.v1"]
|
||||
assert model_node.contract.enforced is True
|
||||
assert len(model_node.constraints) == 1
|
||||
assert model_node.config.generate_latest is True
|
||||
check_table_does_exist(project.adapter, "foo_v1")
|
||||
columns = get_relation_columns(project.adapter, "foo_latest")
|
||||
assert columns == [("id", "integer", None), ("user_name", "text", None)]
|
||||
check_table_does_exist(project.adapter, "foo_latest")
|
||||
|
||||
|
||||
# test primary key defined across model and column level constraints, expect error
|
||||
|
||||
Reference in New Issue
Block a user