Compare commits

...

2 Commits

Author SHA1 Message Date
Dave Connors
a7b50a0762 additional entity work 2022-10-03 14:21:41 -05:00
Dave Connors
cdb823b0b9 start to build node types 2022-10-03 11:01:08 -05:00
8 changed files with 181 additions and 0 deletions

View File

@@ -56,6 +56,7 @@ def print_compile_stats(stats):
NodeType.Source: "source", NodeType.Source: "source",
NodeType.Exposure: "exposure", NodeType.Exposure: "exposure",
NodeType.Metric: "metric", NodeType.Metric: "metric",
NodeType.Entity: "entity",
} }
results = {k: 0 for k in names.keys()} results = {k: 0 for k in names.keys()}
@@ -431,6 +432,8 @@ class Compiler:
self.link_node(linker, exposure, manifest) self.link_node(linker, exposure, manifest)
for metric in manifest.metrics.values(): for metric in manifest.metrics.values():
self.link_node(linker, metric, manifest) self.link_node(linker, metric, manifest)
for entity in manifest.entities.values():
self.link_node(linker, entity, manifest)
cycle = linker.find_cycles() cycle = linker.find_cycles()

View File

@@ -229,6 +229,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list) sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list) exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list) metrics: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses # node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list) ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id. # any macro patches in this file by macro unique_id.

View File

@@ -7,6 +7,7 @@ from dbt.contracts.graph.parsed import (
ParsedModelNode, ParsedModelNode,
ParsedExposure, ParsedExposure,
ParsedMetric, ParsedMetric,
ParsedEntity,
ParsedResource, ParsedResource,
ParsedRPCNode, ParsedRPCNode,
ParsedSqlNode, ParsedSqlNode,
@@ -232,4 +233,5 @@ GraphMemberNode = Union[
CompileResultNode, CompileResultNode,
ParsedExposure, ParsedExposure,
ParsedMetric, ParsedMetric,
ParsedEntity,
] ]

View File

@@ -36,6 +36,7 @@ from dbt.contracts.graph.parsed import (
ParsedGenericTestNode, ParsedGenericTestNode,
ParsedExposure, ParsedExposure,
ParsedMetric, ParsedMetric,
ParsedEntity,
HasUniqueID, HasUniqueID,
UnpatchedSourceDefinition, UnpatchedSourceDefinition,
ManifestNodes, ManifestNodes,
@@ -217,6 +218,39 @@ class MetricLookup(dbtClassMixin):
return manifest.metrics[unique_id] return manifest.metrics[unique_id]
class EntityLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def get_unique_id(self, search_name, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)
def find(self, search_name, package: Optional[PackageName], manifest: "Manifest"):
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_entity(self, entity: ParsedEntity):
if entity.search_name not in self.storage:
self.storage[entity.search_name] = {}
self.storage[entity.search_name][entity.package_name] = entity.unique_id
def populate(self, manifest):
for entity in manifest.entities.values():
if hasattr(entity, "name"):
self.add_entity(entity)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedEntity:
if unique_id not in manifest.entities:
raise dbt.exceptions.InternalException(
f"Metric {unique_id} found in cache but not found in manifest"
)
return manifest.entities[unique_id]
# This handles both models/seeds/snapshots and sources/metrics/exposures # This handles both models/seeds/snapshots and sources/metrics/exposures
class DisabledLookup(dbtClassMixin): class DisabledLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"): def __init__(self, manifest: "Manifest"):
@@ -466,6 +500,7 @@ class Disabled(Generic[D]):
MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]] MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]]
MaybeEntityNode = Optional[Union[ParsedEntity, Disabled[ParsedEntity]]]
MaybeDocumentation = Optional[ParsedDocumentation] MaybeDocumentation = Optional[ParsedDocumentation]
@@ -611,6 +646,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict) docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict)
exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict) exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict)
metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict) metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict)
entities: MutableMapping[str, ParsedEntity] = field(default_factory=dict)
selectors: MutableMapping[str, Any] = field(default_factory=dict) selectors: MutableMapping[str, Any] = field(default_factory=dict)
files: MutableMapping[str, AnySourceFile] = field(default_factory=dict) files: MutableMapping[str, AnySourceFile] = field(default_factory=dict)
metadata: ManifestMetadata = field(default_factory=ManifestMetadata) metadata: ManifestMetadata = field(default_factory=ManifestMetadata)
@@ -632,6 +668,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_metric_lookup: Optional[MetricLookup] = field( _metric_lookup: Optional[MetricLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
) )
_entity_lookup: Optional[EntityLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field( _disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
) )
@@ -682,6 +721,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
def update_metric(self, new_metric: ParsedMetric): def update_metric(self, new_metric: ParsedMetric):
_update_into(self.metrics, new_metric) _update_into(self.metrics, new_metric)
def update_entity(self, new_entity: ParsedEntity):
_update_into(self.entities, new_entity)
def update_node(self, new_node: ManifestNode): def update_node(self, new_node: ManifestNode):
_update_into(self.nodes, new_node) _update_into(self.nodes, new_node)
@@ -699,6 +741,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
"metrics": {k: v.to_dict(omit_none=False) for k, v in self.metrics.items()}, "metrics": {k: v.to_dict(omit_none=False) for k, v in self.metrics.items()},
"nodes": {k: v.to_dict(omit_none=False) for k, v in self.nodes.items()}, "nodes": {k: v.to_dict(omit_none=False) for k, v in self.nodes.items()},
"sources": {k: v.to_dict(omit_none=False) for k, v in self.sources.items()}, "sources": {k: v.to_dict(omit_none=False) for k, v in self.sources.items()},
"entities": {k: v.to_dict(omit_none=False) for k, v in self.entities.items()},
} }
def build_disabled_by_file_id(self): def build_disabled_by_file_id(self):
@@ -759,6 +802,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.nodes.values(), self.nodes.values(),
self.sources.values(), self.sources.values(),
self.metrics.values(), self.metrics.values(),
self.entities.values(),
) )
for resource in all_resources: for resource in all_resources:
resource_type_plural = resource.resource_type.pluralize() resource_type_plural = resource.resource_type.pluralize()
@@ -787,6 +831,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs={k: _deepcopy(v) for k, v in self.docs.items()}, docs={k: _deepcopy(v) for k, v in self.docs.items()},
exposures={k: _deepcopy(v) for k, v in self.exposures.items()}, exposures={k: _deepcopy(v) for k, v in self.exposures.items()},
metrics={k: _deepcopy(v) for k, v in self.metrics.items()}, metrics={k: _deepcopy(v) for k, v in self.metrics.items()},
entities={k: _deepcopy(v) for k, v in self.entities.items()},
selectors={k: _deepcopy(v) for k, v in self.selectors.items()}, selectors={k: _deepcopy(v) for k, v in self.selectors.items()},
metadata=self.metadata, metadata=self.metadata,
disabled={k: _deepcopy(v) for k, v in self.disabled.items()}, disabled={k: _deepcopy(v) for k, v in self.disabled.items()},
@@ -801,6 +846,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.sources.values(), self.sources.values(),
self.exposures.values(), self.exposures.values(),
self.metrics.values(), self.metrics.values(),
self.entities.values(),
) )
) )
forward_edges, backward_edges = build_node_edges(edge_members) forward_edges, backward_edges = build_node_edges(edge_members)
@@ -826,6 +872,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs=self.docs, docs=self.docs,
exposures=self.exposures, exposures=self.exposures,
metrics=self.metrics, metrics=self.metrics,
entities=self.entities,
selectors=self.selectors, selectors=self.selectors,
metadata=self.metadata, metadata=self.metadata,
disabled=self.disabled, disabled=self.disabled,
@@ -847,6 +894,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return self.exposures[unique_id] return self.exposures[unique_id]
elif unique_id in self.metrics: elif unique_id in self.metrics:
return self.metrics[unique_id] return self.metrics[unique_id]
elif unique_id in self.entities:
return self.entities[unique_id]
else: else:
# something terrible has happened # something terrible has happened
raise dbt.exceptions.InternalException( raise dbt.exceptions.InternalException(
@@ -883,6 +932,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._metric_lookup = MetricLookup(self) self._metric_lookup = MetricLookup(self)
return self._metric_lookup return self._metric_lookup
@property
def entity_lookup(self) -> EntityLookup:
if self._entity_lookup is None:
self._entity_lookup = EntityLookup(self)
return self._entity_lookup
def rebuild_ref_lookup(self): def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self) self._ref_lookup = RefableLookup(self)
@@ -983,6 +1038,31 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return Disabled(disabled[0]) return Disabled(disabled[0])
return None return None
def resolve_entity(
self,
target_entity_name: str,
target_entity_package: Optional[str],
current_project: str,
node_package: str,
) -> MaybeEntityNode:
entity: Optional[ParsedEntity] = None
disabled: Optional[List[ParsedEntity]] = None
candidates = _search_packages(current_project, node_package, target_entity_package)
for pkg in candidates:
entity = self.entity_lookup.find(target_entity_name, pkg, self)
if entity is not None and entity.config.enabled:
return entity
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(f"{target_entity_name}", pkg)
if disabled:
return Disabled(disabled[0])
return None
# Called by DocsRuntimeContext.doc # Called by DocsRuntimeContext.doc
def resolve_doc( def resolve_doc(
self, self,
@@ -1108,6 +1188,11 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.metrics[metric.unique_id] = metric self.metrics[metric.unique_id] = metric
source_file.metrics.append(metric.unique_id) source_file.metrics.append(metric.unique_id)
def add_entity(self, source_file: SchemaSourceFile, entity: ParsedEntity):
_check_duplicates(entity, self.entities)
self.entities[entity.unique_id] = entity
source_file.entities.append(entity.unique_id)
def add_disabled_nofile(self, node: GraphMemberNode): def add_disabled_nofile(self, node: GraphMemberNode):
# There can be multiple disabled nodes for the same unique_id # There can be multiple disabled nodes for the same unique_id
if node.unique_id in self.disabled: if node.unique_id in self.disabled:
@@ -1123,6 +1208,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
source_file.add_test(node.unique_id, test_from) source_file.add_test(node.unique_id, test_from)
if isinstance(node, ParsedMetric): if isinstance(node, ParsedMetric):
source_file.metrics.append(node.unique_id) source_file.metrics.append(node.unique_id)
if isinstance(node, ParsedEntity):
source_file.entities.append(node.unique_id)
if isinstance(node, ParsedExposure): if isinstance(node, ParsedExposure):
source_file.exposures.append(node.unique_id) source_file.exposures.append(node.unique_id)
else: else:
@@ -1150,6 +1237,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.docs, self.docs,
self.exposures, self.exposures,
self.metrics, self.metrics,
self.entities,
self.selectors, self.selectors,
self.files, self.files,
self.metadata, self.metadata,
@@ -1203,6 +1291,9 @@ class WritableManifest(ArtifactMixin):
metrics: Mapping[UniqueID, ParsedMetric] = field( metrics: Mapping[UniqueID, ParsedMetric] = field(
metadata=dict(description=("The metrics defined in the dbt project and its dependencies")) metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
) )
entities: Mapping[UniqueID, ParsedEntity] = field(
metadata=dict(description=("The entities defined in the dbt project and its dependencies"))
)
selectors: Mapping[UniqueID, Any] = field( selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=("The selectors defined in selectors.yml")) metadata=dict(description=("The selectors defined in selectors.yml"))
) )

View File

@@ -368,6 +368,11 @@ class MetricConfig(BaseConfig):
enabled: bool = True enabled: bool = True
@dataclass
class EntityConfig(BaseConfig):
enabled: bool = True
@dataclass @dataclass
class ExposureConfig(BaseConfig): class ExposureConfig(BaseConfig):
enabled: bool = True enabled: bool = True
@@ -624,6 +629,7 @@ class SnapshotConfig(EmptySnapshotConfig):
RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = { RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
NodeType.Metric: MetricConfig, NodeType.Metric: MetricConfig,
NodeType.Entity: EntityConfig,
NodeType.Exposure: ExposureConfig, NodeType.Exposure: ExposureConfig,
NodeType.Source: SourceConfig, NodeType.Source: SourceConfig,
NodeType.Seed: SeedConfig, NodeType.Seed: SeedConfig,

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType, MaturityType,
MetricFilter, MetricFilter,
MetricTime, MetricTime,
EntityRelationship,
) )
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error from dbt.exceptions import warn_or_error
@@ -54,6 +55,7 @@ from .model_config import (
ExposureConfig, ExposureConfig,
EmptySnapshotConfig, EmptySnapshotConfig,
SnapshotConfig, SnapshotConfig,
EntityConfig,
) )
@@ -909,6 +911,51 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
) )
@dataclass
class ParsedEntity(UnparsedBaseNode, HasUniqueID, HasFqn):
name: str
root_model: str
relationships: List[EntityRelationship] = field(default_factory=list)
resource_type: NodeType = NodeType.Entity
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: EntityConfig = field(default_factory=EntityConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
refs: List[List[str]] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
@property
def depends_on_nodes(self):
return self.depends_on.nodes
@property
def search_name(self):
return self.name
def same_root_model(self, old: "ParsedEntity") -> bool:
return self.root_model == old.root_model
def same_relationships(self, old: "ParsedEntity") -> bool:
return self.relationships == old.relationships
def same_config(self, old: "ParsedEntity") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["ParsedEntity"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
return True
return self.same_root_model(old) and self.same_relationships(old) and True
ManifestNodes = Union[ ManifestNodes = Union[
ParsedAnalysisNode, ParsedAnalysisNode,
ParsedSingularTestNode, ParsedSingularTestNode,
@@ -928,5 +975,6 @@ ParsedResource = Union[
ParsedNode, ParsedNode,
ParsedExposure, ParsedExposure,
ParsedMetric, ParsedMetric,
ParsedEntity,
ParsedSourceDefinition, ParsedSourceDefinition,
] ]

View File

@@ -525,3 +525,29 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
if data.get("model") is not None and data.get("calculation_method") == "derived": if data.get("model") is not None and data.get("calculation_method") == "derived":
raise ValidationError("Derived metrics cannot have a 'model' property") raise ValidationError("Derived metrics cannot have a 'model' property")
@dataclass
class EntityRelationship(dbtClassMixin, Mergeable):
to_model: str
from_model: str
join_key: str
join_type: str
relationship_type: str
fields: Optional[List[str]] = None
@dataclass
class UnparsedEntity(dbtClassMixin, Replaceable):
name: str
root_model: str
description: str = ""
relationships: List[EntityRelationship] = field(default_factory=list)
@classmethod
def validate(cls, data):
super(UnparsedEntity, cls).validate(data)
if "name" in data:
# name can only contain alphanumeric chars and underscores
if not (re.match(r"[\w-]+$", data["name"])):
deprecations.warn("entity-name", entity=data["name"])

View File

@@ -18,6 +18,7 @@ class NodeType(StrEnum):
Macro = "macro" Macro = "macro"
Exposure = "exposure" Exposure = "exposure"
Metric = "metric" Metric = "metric"
Entity = "entity"
@classmethod @classmethod
def executable(cls) -> List["NodeType"]: def executable(cls) -> List["NodeType"]:
@@ -52,11 +53,14 @@ class NodeType(StrEnum):
cls.Analysis, cls.Analysis,
cls.Exposure, cls.Exposure,
cls.Metric, cls.Metric,
cls.Entity,
] ]
def pluralize(self) -> str: def pluralize(self) -> str:
if self is self.Analysis: if self is self.Analysis:
return "analyses" return "analyses"
elif self is self.Entity:
return "entities"
return f"{self}s" return f"{self}s"