mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-18 22:51:27 +00:00
Compare commits
16 Commits
check_181
...
simple-ent
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac7c8209cc | ||
|
|
d545c47112 | ||
|
|
3cd5595482 | ||
|
|
d8f3586e56 | ||
|
|
ab9d0fec55 | ||
|
|
f7255f2598 | ||
|
|
2e310d6e01 | ||
|
|
1cd996a2c8 | ||
|
|
335b3062be | ||
|
|
ce0c706102 | ||
|
|
b18139497f | ||
|
|
ad5570cd6c | ||
|
|
ef9f4d6cf9 | ||
|
|
70f7dc24ce | ||
|
|
206b220e3d | ||
|
|
5ed168c47b |
@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
|
||||
MaturityType,
|
||||
MetricFilter,
|
||||
MetricTime,
|
||||
EntityRelationship,
|
||||
)
|
||||
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
|
||||
from dbt.exceptions import warn_or_error
|
||||
@@ -64,6 +65,8 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
data_type: Optional[str] = None
|
||||
quote: Optional[bool] = None
|
||||
is_dimension: Optional[bool] = False
|
||||
is_primary_key: Optional[bool] = False
|
||||
tags: List[str] = field(default_factory=list)
|
||||
_extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@@ -160,6 +163,8 @@ class ParsedNodeMixins(dbtClassMixin):
|
||||
self.created_at = time.time()
|
||||
self.description = patch.description
|
||||
self.columns = patch.columns
|
||||
self.is_public = patch.is_public
|
||||
self.relationships = patch.relationships
|
||||
|
||||
def get_materialization(self):
|
||||
return self.config.materialized
|
||||
@@ -210,6 +215,8 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
|
||||
compiled_path: Optional[str] = None
|
||||
build_path: Optional[str] = None
|
||||
deferred: bool = False
|
||||
is_public: Optional[bool] = False
|
||||
relationships: List[EntityRelationship] = field(default_factory=list)
|
||||
unrendered_config: Dict[str, Any] = field(default_factory=dict)
|
||||
created_at: float = field(default_factory=lambda: time.time())
|
||||
config_call_dict: Dict[str, Any] = field(default_factory=dict)
|
||||
@@ -497,6 +504,8 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
|
||||
@dataclass
|
||||
class ParsedNodePatch(ParsedPatch):
|
||||
columns: Dict[str, ColumnInfo]
|
||||
is_public: Optional[bool]
|
||||
relationships: List[EntityRelationship]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -828,10 +837,11 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
|
||||
expression: str
|
||||
filters: List[MetricFilter]
|
||||
time_grains: List[str]
|
||||
dimensions: List[str]
|
||||
dimensions: Dict[str, Any] = field(default_factory=dict)
|
||||
window: Optional[MetricTime] = None
|
||||
model: Optional[str] = None
|
||||
model_unique_id: Optional[str] = None
|
||||
allow_joins: Optional[bool] = True
|
||||
resource_type: NodeType = NodeType.Metric
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
@@ -88,12 +88,36 @@ class Docs(dbtClassMixin, Replaceable):
|
||||
node_color: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class EntityRelationshipType(StrEnum):
|
||||
many_to_one = "many_to_one"
|
||||
one_to_many = "one_to_many"
|
||||
one_to_one = "one_to_one"
|
||||
|
||||
def inverse(self) -> str:
|
||||
if self == "many_to_one":
|
||||
return "one_to_many"
|
||||
elif self == "one_to_many":
|
||||
return "many_to_one"
|
||||
else:
|
||||
return self
|
||||
|
||||
|
||||
@dataclass
|
||||
class EntityRelationship(dbtClassMixin, Replaceable):
|
||||
to: str
|
||||
join_key: str
|
||||
relationship_type: EntityRelationshipType
|
||||
|
||||
|
||||
@dataclass
|
||||
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
|
||||
name: str
|
||||
description: str = ""
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
is_public: Optional[bool] = False
|
||||
data_type: Optional[str] = None
|
||||
relationships: List[EntityRelationship] = field(default_factory=list)
|
||||
docs: Docs = field(default_factory=Docs)
|
||||
_extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@@ -114,6 +138,9 @@ class HasTests(HasDocs):
|
||||
class UnparsedColumn(HasTests):
|
||||
quote: Optional[bool] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
is_dimension: Optional[bool] = False
|
||||
is_primary_key: Optional[bool] = False
|
||||
data_type: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -490,9 +517,10 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
|
||||
expression: str
|
||||
description: str = ""
|
||||
time_grains: List[str] = field(default_factory=list)
|
||||
dimensions: List[str] = field(default_factory=list)
|
||||
dimensions: Union[Dict[str, Any], List[str]] = field(default_factory=dict)
|
||||
window: Optional[MetricTime] = None
|
||||
model: Optional[str] = None
|
||||
allow_joins: Optional[bool] = True
|
||||
filters: List[MetricFilter] = field(default_factory=list)
|
||||
meta: Dict[str, Any] = field(default_factory=dict)
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
@@ -69,6 +69,7 @@ from dbt.contracts.graph.parsed import (
|
||||
ParsedExposure,
|
||||
ParsedMetric,
|
||||
)
|
||||
from dbt.contracts.graph.unparsed import EntityRelationshipType, EntityRelationship
|
||||
from dbt.contracts.util import Writable
|
||||
from dbt.exceptions import (
|
||||
ref_target_not_found,
|
||||
@@ -392,6 +393,7 @@ class ManifestLoader:
|
||||
start_process = time.perf_counter()
|
||||
self.process_sources(self.root_project.project_name)
|
||||
self.process_refs(self.root_project.project_name)
|
||||
# self.process_inverse_relationships(self.root_project.project_name)
|
||||
self.process_docs(self.root_project)
|
||||
self.process_metrics(self.root_project)
|
||||
|
||||
@@ -832,6 +834,7 @@ class ManifestLoader:
|
||||
if node.created_at < self.started_at:
|
||||
continue
|
||||
_process_refs_for_node(self.manifest, current_project, node)
|
||||
_process_inverse_relationships_for_node(self.manifest, current_project, node)
|
||||
for exposure in self.manifest.exposures.values():
|
||||
if exposure.created_at < self.started_at:
|
||||
continue
|
||||
@@ -953,6 +956,12 @@ class ManifestLoader:
|
||||
|
||||
self.manifest.rebuild_ref_lookup()
|
||||
|
||||
# def process_inverse_relationships(self, current_project: str):
|
||||
# for node in self.manifest.nodes.values():
|
||||
# if node.created_at < self.started_at:
|
||||
# continue
|
||||
# _process_inverse_relationships_for_node(self.manifest, current_project, node)
|
||||
|
||||
|
||||
def invalid_ref_fail_unless_test(node, target_model_name, target_model_package, disabled):
|
||||
|
||||
@@ -1167,6 +1176,72 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur
|
||||
manifest.update_exposure(exposure)
|
||||
|
||||
|
||||
def _process_dimensions_and_relationships_for_metric(
|
||||
manifest: Manifest,
|
||||
target_model: ManifestNode,
|
||||
metric: ParsedMetric,
|
||||
target_model_package: Optional[str],
|
||||
current_project: Optional[str],
|
||||
):
|
||||
# ensure that relationships are not declared on non-public models
|
||||
if target_model.relationships and not target_model.is_public:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Model relationships must be declared between public models - {metric.unique_id} depends on {target_model.unique_id}, which is not a public model."
|
||||
)
|
||||
# only allow dimensions to be accessed if the model is declared to be a public model
|
||||
if target_model.is_public:
|
||||
# get declared dimensions from primary model
|
||||
primary_model_dimensions = [
|
||||
col for col in target_model.columns.values() if col.is_dimension
|
||||
]
|
||||
|
||||
# validate declared dims from primary model
|
||||
for dim in primary_model_dimensions:
|
||||
if not dim.data_type:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Dimension columns must declare a `data_type` attribute. {dim.name} is missing this configuration."
|
||||
)
|
||||
|
||||
# check if dimensions declared, if not, supply dimensions from model yml
|
||||
if not metric.dimensions:
|
||||
metric.dimensions[target_model.name] = [col.name for col in primary_model_dimensions]
|
||||
else:
|
||||
# TODO -- do we actually want to append missing dims here?
|
||||
for metric_dim in metric.dimensions[target_model.name]:
|
||||
if metric_dim not in [dim.name for dim in primary_model_dimensions]:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Metric dimensions on public models must be declared as dimensions in the model yml file. Dimension `{metric_dim}` declared on metric `{metric.name}` is missing this configuration."
|
||||
)
|
||||
for dim in primary_model_dimensions:
|
||||
if dim.name not in metric.dimensions[target_model.name]:
|
||||
metric.dimensions[target_model.name].append(dim.name)
|
||||
|
||||
if metric.allow_joins:
|
||||
for relationship in target_model.relationships:
|
||||
to_model_name = relationship.to
|
||||
to_model = manifest.resolve_ref(
|
||||
to_model_name,
|
||||
target_model_package,
|
||||
current_project,
|
||||
metric.package_name,
|
||||
)
|
||||
if not to_model.is_public:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Model relationships must be declared between public models - {metric.unique_id} depends on {to_model.unique_id}, which is not a public model."
|
||||
)
|
||||
metric.depends_on.nodes.append(to_model.unique_id)
|
||||
|
||||
new_dims = [col for col in to_model.columns.values() if col.is_dimension]
|
||||
if new_dims:
|
||||
metric.dimensions[to_model.name] = []
|
||||
for col in new_dims:
|
||||
if not col.data_type:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Dimension columns must declare a `data_type` attribute. {col.name} is missing this configuration."
|
||||
)
|
||||
metric.dimensions[to_model.name].append(col.name)
|
||||
|
||||
|
||||
def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
|
||||
"""Given a manifest and a metric in that manifest, process its refs"""
|
||||
for ref in metric.refs:
|
||||
@@ -1206,6 +1281,17 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
|
||||
target_model_id = target_model.unique_id
|
||||
|
||||
metric.depends_on.nodes.append(target_model_id)
|
||||
|
||||
_process_dimensions_and_relationships_for_metric(
|
||||
manifest, target_model, metric, target_model_package, current_project
|
||||
)
|
||||
|
||||
# this should not go here
|
||||
# this checks the node columns, and adds any dimensions
|
||||
# declared in that model yml to the metric dimension list
|
||||
|
||||
# joined_models = target_model.relationships
|
||||
|
||||
manifest.update_metric(metric)
|
||||
|
||||
|
||||
@@ -1298,6 +1384,25 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif
|
||||
manifest.update_node(node)
|
||||
|
||||
|
||||
def _process_inverse_relationships_for_node(
|
||||
manifest: Manifest, current_project: str, node: ManifestNode
|
||||
):
|
||||
"""Given a manifest and a node in that manifest, process the inverse relationships for the related nodes"""
|
||||
target_model_package: Optional[str] = None
|
||||
if node.resource_type == "model" and len(node.relationships) > 0:
|
||||
for relationship in node.relationships:
|
||||
to_model = manifest.resolve_ref(
|
||||
relationship.to, target_model_package, current_project, node.package_name
|
||||
)
|
||||
inverse_relationship = EntityRelationship(
|
||||
to=node.name,
|
||||
join_key=relationship.join_key,
|
||||
relationship_type=EntityRelationshipType(relationship.relationship_type.inverse()),
|
||||
)
|
||||
to_model.relationships.append(inverse_relationship)
|
||||
manifest.update_node(to_model)
|
||||
|
||||
|
||||
def _process_sources_for_exposure(
|
||||
manifest: Manifest, current_project: str, exposure: ParsedExposure
|
||||
):
|
||||
|
||||
@@ -602,7 +602,6 @@ class PartialParsing:
|
||||
else:
|
||||
saved_schema_file.pp_dict = {}
|
||||
self.handle_schema_file_changes(saved_schema_file, saved_yaml_dict, new_yaml_dict)
|
||||
|
||||
# copy from new schema_file to saved_schema_file to preserve references
|
||||
# that weren't removed
|
||||
saved_schema_file.contents = new_schema_file.contents
|
||||
@@ -855,6 +854,16 @@ class PartialParsing:
|
||||
# remove from patches
|
||||
schema_file.node_patches.remove(elem_unique_id)
|
||||
|
||||
# for models, reparse children metrics if applicable:
|
||||
if dict_key == "models":
|
||||
for unique_id in schema_file.node_patches:
|
||||
children = [
|
||||
child
|
||||
for child in self.saved_manifest.child_map[unique_id]
|
||||
if child.split(".")[0] == "metric"
|
||||
]
|
||||
self.schedule_nodes_for_parsing(children)
|
||||
|
||||
# for models, seeds, snapshots (not analyses)
|
||||
if dict_key in ["models", "seeds", "snapshots"]:
|
||||
# find related tests and remove them
|
||||
|
||||
@@ -140,8 +140,12 @@ class ParserRef:
|
||||
quote: Optional[bool]
|
||||
if isinstance(column, UnparsedColumn):
|
||||
quote = column.quote
|
||||
is_dimension = column.is_dimension
|
||||
is_primary_key = column.is_primary_key
|
||||
else:
|
||||
quote = None
|
||||
is_dimension = False
|
||||
is_primary_key = False
|
||||
self.column_info[column.name] = ColumnInfo(
|
||||
name=column.name,
|
||||
description=description,
|
||||
@@ -150,6 +154,8 @@ class ParserRef:
|
||||
tags=tags,
|
||||
quote=quote,
|
||||
_extra=column.extra,
|
||||
is_dimension=is_dimension,
|
||||
is_primary_key=is_primary_key,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@@ -865,6 +871,8 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
|
||||
meta=block.target.meta,
|
||||
docs=block.target.docs,
|
||||
config=block.target.config,
|
||||
is_public=block.target.is_public,
|
||||
relationships=block.target.relationships,
|
||||
)
|
||||
assert isinstance(self.yaml.file, SchemaSourceFile)
|
||||
source_file: SchemaSourceFile = self.yaml.file
|
||||
@@ -1097,6 +1105,22 @@ class MetricParser(YamlReader):
|
||||
self.schema_parser = schema_parser
|
||||
self.yaml = yaml
|
||||
|
||||
def process_dimension_lists(
|
||||
self, unparsed_dimension_block: List[str] or Dict[str, any], unparsed_model: str
|
||||
):
|
||||
if isinstance(unparsed_dimension_block, list):
|
||||
# i think this has a real method to resolve refs, but for now hacking this together as POC
|
||||
model_name = (
|
||||
unparsed_model.replace("ref", "")
|
||||
.replace("'", "")
|
||||
.replace('"', "")
|
||||
.replace("(", "")
|
||||
.replace(")", "")
|
||||
)
|
||||
return dict({model_name: unparsed_dimension_block})
|
||||
else:
|
||||
return unparsed_dimension_block
|
||||
|
||||
def parse_metric(self, unparsed: UnparsedMetric):
|
||||
package_name = self.project.project_name
|
||||
unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}"
|
||||
@@ -1140,7 +1164,8 @@ class MetricParser(YamlReader):
|
||||
calculation_method=unparsed.calculation_method,
|
||||
expression=str(unparsed.expression),
|
||||
timestamp=unparsed.timestamp,
|
||||
dimensions=unparsed.dimensions,
|
||||
dimensions=self.process_dimension_lists(unparsed.dimensions, unparsed.model),
|
||||
allow_joins=unparsed.allow_joins,
|
||||
window=unparsed.window,
|
||||
time_grains=unparsed.time_grains,
|
||||
filters=unparsed.filters,
|
||||
|
||||
Reference in New Issue
Block a user