mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 13:01:27 +00:00
Compare commits
8 Commits
enable-pos
...
feature/ex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42518e71d6 | ||
|
|
243947dd64 | ||
|
|
b18139497f | ||
|
|
ad5570cd6c | ||
|
|
ef9f4d6cf9 | ||
|
|
70f7dc24ce | ||
|
|
206b220e3d | ||
|
|
5ed168c47b |
@@ -413,12 +413,15 @@ class Compiler:
|
||||
linker.add_node(node.unique_id)
|
||||
|
||||
for dependency in node.depends_on_nodes:
|
||||
import pdb; pdb.set_trace()
|
||||
if dependency in manifest.nodes:
|
||||
linker.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
|
||||
elif dependency in manifest.sources:
|
||||
linker.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
|
||||
elif dependency in manifest.metrics:
|
||||
linker.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
|
||||
elif dependency in manifest.exposures:
|
||||
linker.dependency(node.unique_id, (manifest.exposures[dependency].unique_id))
|
||||
else:
|
||||
dependency_not_found(node, dependency)
|
||||
|
||||
|
||||
@@ -146,6 +146,38 @@ class SourceLookup(dbtClassMixin):
|
||||
return manifest.sources[unique_id]
|
||||
|
||||
|
||||
class ExposureLookup(dbtClassMixin):
|
||||
def __init__(self, manifest: "Manifest"):
|
||||
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
|
||||
self.populate(manifest)
|
||||
|
||||
def get_unique_id(self, search_name, package: Optional[PackageName]):
|
||||
return find_unique_id_for_package(self.storage, search_name, package)
|
||||
|
||||
def find(self, search_name, package: Optional[PackageName], manifest: "Manifest"):
|
||||
unique_id = self.get_unique_id(search_name, package)
|
||||
if unique_id is not None:
|
||||
return self.perform_lookup(unique_id, manifest)
|
||||
return None
|
||||
|
||||
def add_exposure(self, exposure: ParsedExposure):
|
||||
if exposure.search_name not in self.storage:
|
||||
self.storage[exposure.search_name] = {}
|
||||
|
||||
self.storage[exposure.search_name][exposure.package_name] = exposure.unique_id
|
||||
|
||||
def populate(self, manifest):
|
||||
for exposure in manifest.exposures.values():
|
||||
self.add_exposure(exposure)
|
||||
|
||||
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedExposure:
|
||||
if unique_id not in manifest.exposures:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Exposure {unique_id} found in cache but not found in manifest"
|
||||
)
|
||||
return manifest.exposures[unique_id]
|
||||
|
||||
|
||||
class RefableLookup(dbtClassMixin):
|
||||
# model, seed, snapshot
|
||||
_lookup_types: ClassVar[set] = set(NodeType.refable())
|
||||
@@ -466,6 +498,7 @@ class Disabled(Generic[D]):
|
||||
|
||||
|
||||
MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]]
|
||||
MaybeExposure = Optional[Union[ParsedExposure, Disabled[ParsedExposure]]]
|
||||
|
||||
|
||||
MaybeDocumentation = Optional[ParsedDocumentation]
|
||||
@@ -629,6 +662,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
_ref_lookup: Optional[RefableLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
_exposure_lookup: Optional[ExposureLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
_metric_lookup: Optional[MetricLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
@@ -885,6 +921,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
self._metric_lookup = MetricLookup(self)
|
||||
return self._metric_lookup
|
||||
|
||||
@property
|
||||
def exposure_lookup(self) -> ExposureLookup:
|
||||
if self._exposure_lookup is None:
|
||||
self._exposure_lookup = ExposureLookup(self)
|
||||
return self._exposure_lookup
|
||||
|
||||
def rebuild_ref_lookup(self):
|
||||
self._ref_lookup = RefableLookup(self)
|
||||
|
||||
@@ -931,6 +973,36 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
return Disabled(disabled[0])
|
||||
return None
|
||||
|
||||
def resolve_exposure(
|
||||
self,
|
||||
target_exposure_name: str,
|
||||
target_exposure_package: Optional[str],
|
||||
current_project: str,
|
||||
node_package: str,
|
||||
) -> MaybeExposure:
|
||||
|
||||
exposure: Optional[GraphMemberNode] = None
|
||||
disabled: Optional[List[GraphMemberNode]] = None
|
||||
|
||||
candidates = _search_packages(current_project, node_package, target_exposure_package)
|
||||
for pkg in candidates:
|
||||
exposure = self.exposure_lookup.find(target_exposure_name, pkg, self)
|
||||
|
||||
if exposure is not None and exposure.config.enabled:
|
||||
return exposure
|
||||
|
||||
import pdb
|
||||
|
||||
pdb.set_trace()
|
||||
|
||||
# it's possible that the node is disabled
|
||||
if disabled is None:
|
||||
disabled = self.disabled_lookup.find(target_exposure_name, pkg)
|
||||
|
||||
if disabled:
|
||||
return Disabled(disabled[0])
|
||||
return None
|
||||
|
||||
# Called by dbt.parser.manifest._resolve_sources_for_exposure
|
||||
# and dbt.parser.manifest._process_source_for_node
|
||||
def resolve_source(
|
||||
@@ -1163,6 +1235,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
self._doc_lookup,
|
||||
self._source_lookup,
|
||||
self._ref_lookup,
|
||||
self._exposure_lookup,
|
||||
self._metric_lookup,
|
||||
self._disabled_lookup,
|
||||
self._analysis_lookup,
|
||||
|
||||
@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
|
||||
MaturityType,
|
||||
MetricFilter,
|
||||
MetricTime,
|
||||
EntityRelationship,
|
||||
)
|
||||
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
|
||||
from dbt.exceptions import warn_or_error
|
||||
@@ -64,6 +65,9 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
data_type: Optional[str] = None
|
||||
quote: Optional[bool] = None
|
||||
is_dimension: Optional[bool] = False
|
||||
is_primary_key: Optional[bool] = False
|
||||
datatype: Optional[str] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
_extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@@ -94,6 +98,7 @@ class MacroDependsOn(dbtClassMixin, Replaceable):
|
||||
@dataclass
|
||||
class DependsOn(MacroDependsOn):
|
||||
nodes: List[str] = field(default_factory=list)
|
||||
exposures: List[str] = field(default_factory=list)
|
||||
|
||||
def add_node(self, value: str):
|
||||
if value not in self.nodes:
|
||||
@@ -160,6 +165,8 @@ class ParsedNodeMixins(dbtClassMixin):
|
||||
self.created_at = time.time()
|
||||
self.description = patch.description
|
||||
self.columns = patch.columns
|
||||
self.is_public = patch.is_public
|
||||
# self.relationships = patch.relationships
|
||||
|
||||
def get_materialization(self):
|
||||
return self.config.materialized
|
||||
@@ -210,6 +217,8 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
|
||||
compiled_path: Optional[str] = None
|
||||
build_path: Optional[str] = None
|
||||
deferred: bool = False
|
||||
is_public: Optional[bool] = False
|
||||
# relationships: List[EntityRelationship] = field(default_factory=list)
|
||||
unrendered_config: Dict[str, Any] = field(default_factory=dict)
|
||||
created_at: float = field(default_factory=lambda: time.time())
|
||||
config_call_dict: Dict[str, Any] = field(default_factory=dict)
|
||||
@@ -497,6 +506,8 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
|
||||
@dataclass
|
||||
class ParsedNodePatch(ParsedPatch):
|
||||
columns: Dict[str, ColumnInfo]
|
||||
is_public: Optional[bool]
|
||||
# relationships: List[EntityRelationship]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -756,6 +767,7 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
|
||||
refs: List[List[str]] = field(default_factory=list)
|
||||
sources: List[List[str]] = field(default_factory=list)
|
||||
created_at: float = field(default_factory=lambda: time.time())
|
||||
relationships: List[EntityRelationship] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def depends_on_nodes(self):
|
||||
@@ -786,6 +798,9 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
|
||||
def same_url(self, old: "ParsedExposure") -> bool:
|
||||
return self.url == old.url
|
||||
|
||||
def same_relationships(self, old: "ParsedExposure") -> bool:
|
||||
return self.relationships == old.relationships
|
||||
|
||||
def same_config(self, old: "ParsedExposure") -> bool:
|
||||
return self.config.same_contents(
|
||||
self.unrendered_config,
|
||||
@@ -807,6 +822,7 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
|
||||
and self.same_description(old)
|
||||
and self.same_label(old)
|
||||
and self.same_depends_on(old)
|
||||
and self.same_relationships(old)
|
||||
and self.same_config(old)
|
||||
and True
|
||||
)
|
||||
@@ -831,6 +847,7 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
|
||||
dimensions: List[str]
|
||||
window: Optional[MetricTime] = None
|
||||
model: Optional[str] = None
|
||||
exposure: Optional[str] = None
|
||||
model_unique_id: Optional[str] = None
|
||||
resource_type: NodeType = NodeType.Metric
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@@ -88,11 +88,27 @@ class Docs(dbtClassMixin, Replaceable):
|
||||
node_color: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class EntityRelationshipType(StrEnum):
|
||||
many_to_one = "many_to_one"
|
||||
one_to_many = "one_to_many"
|
||||
one_to_one = "one_to_one"
|
||||
|
||||
|
||||
@dataclass
|
||||
class EntityRelationship(dbtClassMixin, Replaceable):
|
||||
from_model: str
|
||||
to_model: str
|
||||
join_key: str
|
||||
relationship_type: EntityRelationshipType
|
||||
|
||||
|
||||
@dataclass
|
||||
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
|
||||
name: str
|
||||
description: str = ""
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
is_public: Optional[bool] = False
|
||||
data_type: Optional[str] = None
|
||||
docs: Docs = field(default_factory=Docs)
|
||||
_extra: Dict[str, Any] = field(default_factory=dict)
|
||||
@@ -114,6 +130,8 @@ class HasTests(HasDocs):
|
||||
class UnparsedColumn(HasTests):
|
||||
quote: Optional[bool] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
is_dimension: Optional[bool] = False
|
||||
is_primary_key: Optional[bool] = False
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -417,6 +435,7 @@ class ExposureType(StrEnum):
|
||||
Analysis = "analysis"
|
||||
ML = "ml"
|
||||
Application = "application"
|
||||
Entity = "entity"
|
||||
|
||||
|
||||
class MaturityType(StrEnum):
|
||||
@@ -444,6 +463,7 @@ class UnparsedExposure(dbtClassMixin, Replaceable):
|
||||
url: Optional[str] = None
|
||||
depends_on: List[str] = field(default_factory=list)
|
||||
config: Dict[str, Any] = field(default_factory=dict)
|
||||
relationships: List[EntityRelationship] = field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data):
|
||||
@@ -453,6 +473,12 @@ class UnparsedExposure(dbtClassMixin, Replaceable):
|
||||
if not (re.match(r"[\w-]+$", data["name"])):
|
||||
deprecations.warn("exposure-name", exposure=data["name"])
|
||||
|
||||
if "relationships" in data:
|
||||
if data["type"] != "entity":
|
||||
raise ParsingException(
|
||||
f"The exposure '{data['name']}' is invalid. It cannot specify relationships if 'type' is not an entity exposure"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricFilter(dbtClassMixin, Replaceable):
|
||||
@@ -493,6 +519,7 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
|
||||
dimensions: List[str] = field(default_factory=list)
|
||||
window: Optional[MetricTime] = None
|
||||
model: Optional[str] = None
|
||||
exposure: Optional[str] = None
|
||||
filters: List[MetricFilter] = field(default_factory=list)
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
@@ -39,6 +39,7 @@ class NodeType(StrEnum):
|
||||
cls.Model,
|
||||
cls.Seed,
|
||||
cls.Snapshot,
|
||||
cls.Exposure,
|
||||
]
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -53,7 +53,7 @@ from dbt.context.providers import ParseProvider
|
||||
from dbt.contracts.files import FileHash, ParseFileType, SchemaSourceFile
|
||||
from dbt.parser.read_files import read_files, load_source_file
|
||||
from dbt.parser.partial import PartialParsing, special_override_macros
|
||||
from dbt.contracts.graph.compiled import ManifestNode
|
||||
from dbt.contracts.graph.compiled import ManifestNode, GraphMemberNode
|
||||
from dbt.contracts.graph.manifest import (
|
||||
Manifest,
|
||||
Disabled,
|
||||
@@ -857,6 +857,12 @@ class ManifestLoader:
|
||||
if metric.created_at < self.started_at:
|
||||
continue
|
||||
_process_metrics_for_node(self.manifest, current_project, metric)
|
||||
for metric in self.manifest.metrics.values():
|
||||
# TODO: Can we do this if the metric is derived & depends on
|
||||
# some other metric for its definition? Maybe....
|
||||
if metric.created_at < self.started_at:
|
||||
continue
|
||||
_process_exposures_for_metric(self.manifest, current_project, metric)
|
||||
|
||||
# nodes: node and column descriptions
|
||||
# sources: source and table descriptions, column descriptions
|
||||
@@ -1208,9 +1214,70 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
|
||||
target_model_id = target_model.unique_id
|
||||
|
||||
metric.depends_on.nodes.append(target_model_id)
|
||||
|
||||
# this should not go here
|
||||
# this checks the node columns, and adds any dimension
|
||||
# declared in that model yml to the metric dimension list
|
||||
# primary_dimensions = [
|
||||
# col.name for col in target_model.columns.values() if col.is_dimension
|
||||
# ]
|
||||
# for dim in primary_dimensions:
|
||||
# if dim not in metric.dimensions:
|
||||
# metric.dimensions.append(dim)
|
||||
# secondary_dimensions = []
|
||||
# for relationship in target_model.relationships:
|
||||
# to_model_name = relationship.to
|
||||
# to_model = manifest.resolve_ref(
|
||||
# to_model_name,
|
||||
# target_model_package,
|
||||
# current_project,
|
||||
# metric.package_name,
|
||||
# )
|
||||
# new_dims = [col.name for col in to_model.columns.values() if col.is_dimension]
|
||||
# secondary_dimensions.extend(new_dims)
|
||||
|
||||
# for dim in secondary_dimensions:
|
||||
# if dim not in metric.dimensions:
|
||||
# metric.dimensions.append(dim)
|
||||
|
||||
# joined_models = target_model.relationships
|
||||
|
||||
manifest.update_metric(metric)
|
||||
|
||||
|
||||
def _process_exposures_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
|
||||
"""Given a manifest and a metric in that manifest, process its exposure relationships"""
|
||||
|
||||
target_exposure: Optional[Union[Disabled, GraphMemberNode]] = None
|
||||
target_exposure_name: str
|
||||
target_exposure_package: Optional[str] = None
|
||||
|
||||
target_exposure_name = metric.exposure
|
||||
|
||||
target_exposure = manifest.resolve_exposure(
|
||||
target_exposure_name,
|
||||
target_exposure_package,
|
||||
current_project,
|
||||
metric.package_name,
|
||||
)
|
||||
|
||||
import pdb
|
||||
|
||||
pdb.set_trace()
|
||||
|
||||
if target_exposure is None or isinstance(target_exposure, Disabled):
|
||||
# This may raise. Even if it doesn't, we don't want to add
|
||||
# this metric to the graph b/c there is no destination metric
|
||||
metric.config.enabled = False
|
||||
invalid_exposure_fail_unless_test(metric, target_exposure_name, target_exposure_package)
|
||||
|
||||
target_exposure_id = target_exposure.unique_id
|
||||
|
||||
metric.depends_on.exposures.append(target_exposure_id)
|
||||
|
||||
manifest.update_metric(metric)
|
||||
|
||||
|
||||
def _process_metrics_for_node(
|
||||
manifest: Manifest, current_project: str, node: Union[ManifestNode, ParsedMetric]
|
||||
):
|
||||
|
||||
@@ -140,8 +140,12 @@ class ParserRef:
|
||||
quote: Optional[bool]
|
||||
if isinstance(column, UnparsedColumn):
|
||||
quote = column.quote
|
||||
is_dimension = column.is_dimension
|
||||
is_primary_key = column.is_primary_key
|
||||
else:
|
||||
quote = None
|
||||
is_dimension = False
|
||||
is_primary_key = False
|
||||
self.column_info[column.name] = ColumnInfo(
|
||||
name=column.name,
|
||||
description=description,
|
||||
@@ -150,6 +154,8 @@ class ParserRef:
|
||||
tags=tags,
|
||||
quote=quote,
|
||||
_extra=column.extra,
|
||||
is_dimension=is_dimension,
|
||||
is_primary_key=is_primary_key,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@@ -866,6 +872,8 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
|
||||
meta=block.target.meta,
|
||||
docs=block.target.docs,
|
||||
config=block.target.config,
|
||||
is_public=block.target.is_public,
|
||||
# relationships=block.target.relationships,
|
||||
)
|
||||
assert isinstance(self.yaml.file, SchemaSourceFile)
|
||||
source_file: SchemaSourceFile = self.yaml.file
|
||||
@@ -1039,6 +1047,7 @@ class ExposureParser(YamlReader):
|
||||
maturity=unparsed.maturity,
|
||||
config=config,
|
||||
unrendered_config=unrendered_config,
|
||||
relationships=unparsed.relationships,
|
||||
)
|
||||
ctx = generate_parse_exposure(
|
||||
parsed,
|
||||
@@ -1145,6 +1154,7 @@ class MetricParser(YamlReader):
|
||||
tags=unparsed.tags,
|
||||
config=config,
|
||||
unrendered_config=unrendered_config,
|
||||
exposure=unparsed.exposure,
|
||||
)
|
||||
|
||||
ctx = generate_parse_metrics(
|
||||
@@ -1164,6 +1174,14 @@ class MetricParser(YamlReader):
|
||||
node=parsed,
|
||||
)
|
||||
|
||||
for node in parsed.depends_on.nodes:
|
||||
if self.manifest.nodes[node]:
|
||||
declared_dimensions = [
|
||||
col.name for col in self.manifest.nodes[node]["columns"] if col.is_dimension
|
||||
]
|
||||
for dim in declared_dimensions:
|
||||
parsed.dimensions.append(dim)
|
||||
|
||||
# if the metric is disabled we do not want it included in the manifest, only in the disabled dict
|
||||
if parsed.config.enabled:
|
||||
self.manifest.add_metric(self.yaml.file, parsed)
|
||||
|
||||
Reference in New Issue
Block a user