Compare commits

...

8 Commits

Author SHA1 Message Date
Dave Connors
42518e71d6 try to update linker 2022-10-10 13:56:00 -05:00
Dave Connors
243947dd64 get metrics to rely on exposure 2022-10-10 13:44:19 -05:00
Dave Connors
b18139497f dirty dimension inheritance from joined models 2022-10-06 14:52:02 -05:00
Dave Connors
ad5570cd6c allow all dimensions to be passed to metric object 2022-10-06 14:12:43 -05:00
Dave Connors
ef9f4d6cf9 futile attempt to check for node dimensions 2022-10-06 10:05:54 -05:00
Dave Connors
70f7dc24ce add datatype column attribute 2022-10-06 09:21:40 -05:00
Dave Connors
206b220e3d simple relationships for models 2022-10-06 09:11:18 -05:00
Dave Connors
5ed168c47b add is entity as top level model attribute, is_entity_dimension and is_primary_key as column attributes 2022-10-06 08:47:36 -05:00
7 changed files with 207 additions and 1 deletions

View File

@@ -413,12 +413,15 @@ class Compiler:
linker.add_node(node.unique_id)
for dependency in node.depends_on_nodes:
import pdb; pdb.set_trace()
if dependency in manifest.nodes:
linker.dependency(node.unique_id, (manifest.nodes[dependency].unique_id))
elif dependency in manifest.sources:
linker.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
elif dependency in manifest.metrics:
linker.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
elif dependency in manifest.exposures:
linker.dependency(node.unique_id, (manifest.exposures[dependency].unique_id))
else:
dependency_not_found(node, dependency)

View File

@@ -146,6 +146,38 @@ class SourceLookup(dbtClassMixin):
return manifest.sources[unique_id]
class ExposureLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
def get_unique_id(self, search_name, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)
def find(self, search_name, package: Optional[PackageName], manifest: "Manifest"):
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_exposure(self, exposure: ParsedExposure):
if exposure.search_name not in self.storage:
self.storage[exposure.search_name] = {}
self.storage[exposure.search_name][exposure.package_name] = exposure.unique_id
def populate(self, manifest):
for exposure in manifest.exposures.values():
self.add_exposure(exposure)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedExposure:
if unique_id not in manifest.exposures:
raise dbt.exceptions.InternalException(
f"Exposure {unique_id} found in cache but not found in manifest"
)
return manifest.exposures[unique_id]
class RefableLookup(dbtClassMixin):
# model, seed, snapshot
_lookup_types: ClassVar[set] = set(NodeType.refable())
@@ -466,6 +498,7 @@ class Disabled(Generic[D]):
MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]]
MaybeExposure = Optional[Union[ParsedExposure, Disabled[ParsedExposure]]]
MaybeDocumentation = Optional[ParsedDocumentation]
@@ -629,6 +662,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_ref_lookup: Optional[RefableLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_exposure_lookup: Optional[ExposureLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_metric_lookup: Optional[MetricLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
@@ -885,6 +921,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._metric_lookup = MetricLookup(self)
return self._metric_lookup
@property
def exposure_lookup(self) -> ExposureLookup:
if self._exposure_lookup is None:
self._exposure_lookup = ExposureLookup(self)
return self._exposure_lookup
def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)
@@ -931,6 +973,36 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return Disabled(disabled[0])
return None
def resolve_exposure(
self,
target_exposure_name: str,
target_exposure_package: Optional[str],
current_project: str,
node_package: str,
) -> MaybeExposure:
exposure: Optional[GraphMemberNode] = None
disabled: Optional[List[GraphMemberNode]] = None
candidates = _search_packages(current_project, node_package, target_exposure_package)
for pkg in candidates:
exposure = self.exposure_lookup.find(target_exposure_name, pkg, self)
if exposure is not None and exposure.config.enabled:
return exposure
import pdb
pdb.set_trace()
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(target_exposure_name, pkg)
if disabled:
return Disabled(disabled[0])
return None
# Called by dbt.parser.manifest._resolve_sources_for_exposure
# and dbt.parser.manifest._process_source_for_node
def resolve_source(
@@ -1163,6 +1235,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._doc_lookup,
self._source_lookup,
self._ref_lookup,
self._exposure_lookup,
self._metric_lookup,
self._disabled_lookup,
self._analysis_lookup,

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType,
MetricFilter,
MetricTime,
EntityRelationship,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error
@@ -64,6 +65,9 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable
meta: Dict[str, Any] = field(default_factory=dict)
data_type: Optional[str] = None
quote: Optional[bool] = None
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
datatype: Optional[str] = None
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -94,6 +98,7 @@ class MacroDependsOn(dbtClassMixin, Replaceable):
@dataclass
class DependsOn(MacroDependsOn):
nodes: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
def add_node(self, value: str):
if value not in self.nodes:
@@ -160,6 +165,8 @@ class ParsedNodeMixins(dbtClassMixin):
self.created_at = time.time()
self.description = patch.description
self.columns = patch.columns
self.is_public = patch.is_public
# self.relationships = patch.relationships
def get_materialization(self):
return self.config.materialized
@@ -210,6 +217,8 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
compiled_path: Optional[str] = None
build_path: Optional[str] = None
deferred: bool = False
is_public: Optional[bool] = False
# relationships: List[EntityRelationship] = field(default_factory=list)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
@@ -497,6 +506,8 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
@dataclass
class ParsedNodePatch(ParsedPatch):
columns: Dict[str, ColumnInfo]
is_public: Optional[bool]
# relationships: List[EntityRelationship]
@dataclass
@@ -756,6 +767,7 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
refs: List[List[str]] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
relationships: List[EntityRelationship] = field(default_factory=list)
@property
def depends_on_nodes(self):
@@ -786,6 +798,9 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
def same_url(self, old: "ParsedExposure") -> bool:
return self.url == old.url
def same_relationships(self, old: "ParsedExposure") -> bool:
return self.relationships == old.relationships
def same_config(self, old: "ParsedExposure") -> bool:
return self.config.same_contents(
self.unrendered_config,
@@ -807,6 +822,7 @@ class ParsedExposure(UnparsedBaseNode, HasUniqueID, HasFqn):
and self.same_description(old)
and self.same_label(old)
and self.same_depends_on(old)
and self.same_relationships(old)
and self.same_config(old)
and True
)
@@ -831,6 +847,7 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
dimensions: List[str]
window: Optional[MetricTime] = None
model: Optional[str] = None
exposure: Optional[str] = None
model_unique_id: Optional[str] = None
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)

View File

@@ -88,11 +88,27 @@ class Docs(dbtClassMixin, Replaceable):
node_color: Optional[str] = None
@dataclass
class EntityRelationshipType(StrEnum):
many_to_one = "many_to_one"
one_to_many = "one_to_many"
one_to_one = "one_to_one"
@dataclass
class EntityRelationship(dbtClassMixin, Replaceable):
from_model: str
to_model: str
join_key: str
relationship_type: EntityRelationshipType
@dataclass
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
name: str
description: str = ""
meta: Dict[str, Any] = field(default_factory=dict)
is_public: Optional[bool] = False
data_type: Optional[str] = None
docs: Docs = field(default_factory=Docs)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -114,6 +130,8 @@ class HasTests(HasDocs):
class UnparsedColumn(HasTests):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
@dataclass
@@ -417,6 +435,7 @@ class ExposureType(StrEnum):
Analysis = "analysis"
ML = "ml"
Application = "application"
Entity = "entity"
class MaturityType(StrEnum):
@@ -444,6 +463,7 @@ class UnparsedExposure(dbtClassMixin, Replaceable):
url: Optional[str] = None
depends_on: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
relationships: List[EntityRelationship] = field(default_factory=list)
@classmethod
def validate(cls, data):
@@ -453,6 +473,12 @@ class UnparsedExposure(dbtClassMixin, Replaceable):
if not (re.match(r"[\w-]+$", data["name"])):
deprecations.warn("exposure-name", exposure=data["name"])
if "relationships" in data:
if data["type"] != "entity":
raise ParsingException(
f"The exposure '{data['name']}' is invalid. It cannot specify relationships if 'type' is not an entity exposure"
)
@dataclass
class MetricFilter(dbtClassMixin, Replaceable):
@@ -493,6 +519,7 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
dimensions: List[str] = field(default_factory=list)
window: Optional[MetricTime] = None
model: Optional[str] = None
exposure: Optional[str] = None
filters: List[MetricFilter] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -39,6 +39,7 @@ class NodeType(StrEnum):
cls.Model,
cls.Seed,
cls.Snapshot,
cls.Exposure,
]
@classmethod

View File

@@ -53,7 +53,7 @@ from dbt.context.providers import ParseProvider
from dbt.contracts.files import FileHash, ParseFileType, SchemaSourceFile
from dbt.parser.read_files import read_files, load_source_file
from dbt.parser.partial import PartialParsing, special_override_macros
from dbt.contracts.graph.compiled import ManifestNode
from dbt.contracts.graph.compiled import ManifestNode, GraphMemberNode
from dbt.contracts.graph.manifest import (
Manifest,
Disabled,
@@ -857,6 +857,12 @@ class ManifestLoader:
if metric.created_at < self.started_at:
continue
_process_metrics_for_node(self.manifest, current_project, metric)
for metric in self.manifest.metrics.values():
# TODO: Can we do this if the metric is derived & depends on
# some other metric for its definition? Maybe....
if metric.created_at < self.started_at:
continue
_process_exposures_for_metric(self.manifest, current_project, metric)
# nodes: node and column descriptions
# sources: source and table descriptions, column descriptions
@@ -1208,9 +1214,70 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
target_model_id = target_model.unique_id
metric.depends_on.nodes.append(target_model_id)
# this should not go here
# this checks the node columns, and adds any dimension
# declared in that model yml to the metric dimension list
# primary_dimensions = [
# col.name for col in target_model.columns.values() if col.is_dimension
# ]
# for dim in primary_dimensions:
# if dim not in metric.dimensions:
# metric.dimensions.append(dim)
# secondary_dimensions = []
# for relationship in target_model.relationships:
# to_model_name = relationship.to
# to_model = manifest.resolve_ref(
# to_model_name,
# target_model_package,
# current_project,
# metric.package_name,
# )
# new_dims = [col.name for col in to_model.columns.values() if col.is_dimension]
# secondary_dimensions.extend(new_dims)
# for dim in secondary_dimensions:
# if dim not in metric.dimensions:
# metric.dimensions.append(dim)
# joined_models = target_model.relationships
manifest.update_metric(metric)
def _process_exposures_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
"""Given a manifest and a metric in that manifest, process its exposure relationships"""
target_exposure: Optional[Union[Disabled, GraphMemberNode]] = None
target_exposure_name: str
target_exposure_package: Optional[str] = None
target_exposure_name = metric.exposure
target_exposure = manifest.resolve_exposure(
target_exposure_name,
target_exposure_package,
current_project,
metric.package_name,
)
import pdb
pdb.set_trace()
if target_exposure is None or isinstance(target_exposure, Disabled):
# This may raise. Even if it doesn't, we don't want to add
# this metric to the graph b/c there is no destination metric
metric.config.enabled = False
invalid_exposure_fail_unless_test(metric, target_exposure_name, target_exposure_package)
target_exposure_id = target_exposure.unique_id
metric.depends_on.exposures.append(target_exposure_id)
manifest.update_metric(metric)
def _process_metrics_for_node(
manifest: Manifest, current_project: str, node: Union[ManifestNode, ParsedMetric]
):

View File

@@ -140,8 +140,12 @@ class ParserRef:
quote: Optional[bool]
if isinstance(column, UnparsedColumn):
quote = column.quote
is_dimension = column.is_dimension
is_primary_key = column.is_primary_key
else:
quote = None
is_dimension = False
is_primary_key = False
self.column_info[column.name] = ColumnInfo(
name=column.name,
description=description,
@@ -150,6 +154,8 @@ class ParserRef:
tags=tags,
quote=quote,
_extra=column.extra,
is_dimension=is_dimension,
is_primary_key=is_primary_key,
)
@classmethod
@@ -866,6 +872,8 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
meta=block.target.meta,
docs=block.target.docs,
config=block.target.config,
is_public=block.target.is_public,
# relationships=block.target.relationships,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -1039,6 +1047,7 @@ class ExposureParser(YamlReader):
maturity=unparsed.maturity,
config=config,
unrendered_config=unrendered_config,
relationships=unparsed.relationships,
)
ctx = generate_parse_exposure(
parsed,
@@ -1145,6 +1154,7 @@ class MetricParser(YamlReader):
tags=unparsed.tags,
config=config,
unrendered_config=unrendered_config,
exposure=unparsed.exposure,
)
ctx = generate_parse_metrics(
@@ -1164,6 +1174,14 @@ class MetricParser(YamlReader):
node=parsed,
)
for node in parsed.depends_on.nodes:
if self.manifest.nodes[node]:
declared_dimensions = [
col.name for col in self.manifest.nodes[node]["columns"] if col.is_dimension
]
for dim in declared_dimensions:
parsed.dimensions.append(dim)
# if the metric is disabled we do not want it included in the manifest, only in the disabled dict
if parsed.config.enabled:
self.manifest.add_metric(self.yaml.file, parsed)