Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
193cd7f146 Spike: resolve semantic_model -> saved_query edges 2024-04-22 23:14:42 +02:00

View File

@@ -191,6 +191,45 @@ class Linker:
if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))
def add_semantic_model_saved_query_edges(self, manifest: Manifest) -> None:
"""This method adds edges from semantic_model -> saved_query if the
saved_query depends on one of the dimensions of the semantic_model for
a group-by or a filter.
This would be trickier to run at parse time, because DSI needs the full
SemanticManifest in order to do entity resolution / join traversal. But we could
think about that as an "end of parse time" resolution step, instead of while
constructing and linking the DAG.
Paul's WIP implementation: https://github.com/dbt-labs/dbt-semantic-interfaces/pull/278"""
# TODO: review these imports and move them up
# TODO: test this in a big project with 1k semantic models + saved queries
from dbt_semantic_interfaces.references import SavedQueryReference
from dbt_semantic_interfaces.experimental.saved_query_dependency_resolver import (
SavedQueryDependencyResolver,
)
from dbt.contracts.graph.semantic_manifest import SemanticManifest
semantic_manifest = SemanticManifest(manifest)._get_pydantic_semantic_manifest()
self.sq_resolver = SavedQueryDependencyResolver(semantic_manifest)
for node_id in self.graph:
if node_id in manifest.saved_queries:
# Look up this saved query
sq_name = manifest.saved_queries[node_id].name
sq_ref = SavedQueryReference(sq_name)
# Ask Paul to find its dependencies
sq_deps_set = self.sq_resolver.resolve_dependencies(sq_ref)
# For each semantic model that Paul told us this saved query depends on, add an edge in the DAG
for upstream_semantic_model in sq_deps_set.semantic_model_references:
semantic_model_id = next(
semantic_model.unique_id
for semantic_model in manifest.semantic_models.values()
if semantic_model.name == upstream_semantic_model.semantic_model_name
)
self.graph.add_edge(semantic_model_id, node_id)
def add_test_edges(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
@@ -447,6 +486,8 @@ class Compiler:
self.initialize()
linker = Linker()
linker.link_graph(manifest)
if manifest.saved_queries:
linker.add_semantic_model_saved_query_edges(manifest)
# Create a file containing basic information about graph structure,
# supporting diagnostics and performance analysis.