Compare commits

...

16 Commits

Author SHA1 Message Date
Dave Connors
ac7c8209cc remove trace 2022-10-26 09:12:53 -05:00
Dave Connors
d545c47112 parse list of dimensions to dictionary 2022-10-26 08:26:22 -05:00
Dave Connors
3cd5595482 dimensions 2022-10-25 08:18:16 -05:00
Dave Connors
d8f3586e56 process inverse relationships 2022-10-24 14:00:12 -05:00
Dave Connors
ab9d0fec55 process inverse relationships -- broken manifest 2022-10-14 15:52:17 -05:00
Dave Connors
f7255f2598 add optional metric config allow_joins to bypass relationship logic 2022-10-14 12:01:01 -05:00
Dave Connors
2e310d6e01 dimensions as dictionaries take 1 2022-10-13 15:40:59 -05:00
Dave Connors
1cd996a2c8 dataype, partial parsing support for model schema changes 2022-10-13 14:47:17 -05:00
Dave Connors
335b3062be Merge branch 'main' into simple-entity-attributes 2022-10-12 16:14:25 -05:00
Dave Connors
ce0c706102 add related model to depends on 2022-10-11 16:52:21 -05:00
Dave Connors
b18139497f dirty dimension inheritance from joined models 2022-10-06 14:52:02 -05:00
Dave Connors
ad5570cd6c allow all dimensions to be passed to metric object 2022-10-06 14:12:43 -05:00
Dave Connors
ef9f4d6cf9 futile attempt to check for node dimensions 2022-10-06 10:05:54 -05:00
Dave Connors
70f7dc24ce add datatype column attribute 2022-10-06 09:21:40 -05:00
Dave Connors
206b220e3d simple relationships for models 2022-10-06 09:11:18 -05:00
Dave Connors
5ed168c47b add is entity as top level model attribute, is_entity_dimension and is_primary_key as column attributes 2022-10-06 08:47:36 -05:00
5 changed files with 181 additions and 4 deletions

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType,
MetricFilter,
MetricTime,
EntityRelationship,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error
@@ -64,6 +65,8 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable
meta: Dict[str, Any] = field(default_factory=dict)
data_type: Optional[str] = None
quote: Optional[bool] = None
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -160,6 +163,8 @@ class ParsedNodeMixins(dbtClassMixin):
self.created_at = time.time()
self.description = patch.description
self.columns = patch.columns
self.is_public = patch.is_public
self.relationships = patch.relationships
def get_materialization(self):
return self.config.materialized
@@ -210,6 +215,8 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
compiled_path: Optional[str] = None
build_path: Optional[str] = None
deferred: bool = False
is_public: Optional[bool] = False
relationships: List[EntityRelationship] = field(default_factory=list)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
@@ -497,6 +504,8 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
@dataclass
class ParsedNodePatch(ParsedPatch):
columns: Dict[str, ColumnInfo]
is_public: Optional[bool]
relationships: List[EntityRelationship]
@dataclass
@@ -828,10 +837,11 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
expression: str
filters: List[MetricFilter]
time_grains: List[str]
dimensions: List[str]
dimensions: Dict[str, Any] = field(default_factory=dict)
window: Optional[MetricTime] = None
model: Optional[str] = None
model_unique_id: Optional[str] = None
allow_joins: Optional[bool] = True
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -88,12 +88,36 @@ class Docs(dbtClassMixin, Replaceable):
node_color: Optional[str] = None
@dataclass
class EntityRelationshipType(StrEnum):
many_to_one = "many_to_one"
one_to_many = "one_to_many"
one_to_one = "one_to_one"
def inverse(self) -> str:
if self == "many_to_one":
return "one_to_many"
elif self == "one_to_many":
return "many_to_one"
else:
return self
@dataclass
class EntityRelationship(dbtClassMixin, Replaceable):
to: str
join_key: str
relationship_type: EntityRelationshipType
@dataclass
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
name: str
description: str = ""
meta: Dict[str, Any] = field(default_factory=dict)
is_public: Optional[bool] = False
data_type: Optional[str] = None
relationships: List[EntityRelationship] = field(default_factory=list)
docs: Docs = field(default_factory=Docs)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -114,6 +138,9 @@ class HasTests(HasDocs):
class UnparsedColumn(HasTests):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
data_type: Optional[str] = None
@dataclass
@@ -490,9 +517,10 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
expression: str
description: str = ""
time_grains: List[str] = field(default_factory=list)
dimensions: List[str] = field(default_factory=list)
dimensions: Union[Dict[str, Any], List[str]] = field(default_factory=dict)
window: Optional[MetricTime] = None
model: Optional[str] = None
allow_joins: Optional[bool] = True
filters: List[MetricFilter] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -69,6 +69,7 @@ from dbt.contracts.graph.parsed import (
ParsedExposure,
ParsedMetric,
)
from dbt.contracts.graph.unparsed import EntityRelationshipType, EntityRelationship
from dbt.contracts.util import Writable
from dbt.exceptions import (
ref_target_not_found,
@@ -392,6 +393,7 @@ class ManifestLoader:
start_process = time.perf_counter()
self.process_sources(self.root_project.project_name)
self.process_refs(self.root_project.project_name)
# self.process_inverse_relationships(self.root_project.project_name)
self.process_docs(self.root_project)
self.process_metrics(self.root_project)
@@ -832,6 +834,7 @@ class ManifestLoader:
if node.created_at < self.started_at:
continue
_process_refs_for_node(self.manifest, current_project, node)
_process_inverse_relationships_for_node(self.manifest, current_project, node)
for exposure in self.manifest.exposures.values():
if exposure.created_at < self.started_at:
continue
@@ -953,6 +956,12 @@ class ManifestLoader:
self.manifest.rebuild_ref_lookup()
# def process_inverse_relationships(self, current_project: str):
# for node in self.manifest.nodes.values():
# if node.created_at < self.started_at:
# continue
# _process_inverse_relationships_for_node(self.manifest, current_project, node)
def invalid_ref_fail_unless_test(node, target_model_name, target_model_package, disabled):
@@ -1167,6 +1176,72 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur
manifest.update_exposure(exposure)
def _process_dimensions_and_relationships_for_metric(
manifest: Manifest,
target_model: ManifestNode,
metric: ParsedMetric,
target_model_package: Optional[str],
current_project: Optional[str],
):
# ensure that relationships are not declared on non-public models
if target_model.relationships and not target_model.is_public:
raise dbt.exceptions.InternalException(
f"Model relationships must be declared between public models - {metric.unique_id} depends on {target_model.unique_id}, which is not a public model."
)
# only allow dimensions to be accessed if the model is declared to be a public model
if target_model.is_public:
# get declared dimensions from primary model
primary_model_dimensions = [
col for col in target_model.columns.values() if col.is_dimension
]
# validate declared dims from primary model
for dim in primary_model_dimensions:
if not dim.data_type:
raise dbt.exceptions.InternalException(
f"Dimension columns must declare a `data_type` attribute. {dim.name} is missing this configuration."
)
# check if dimensions declared, if not, supply dimensions from model yml
if not metric.dimensions:
metric.dimensions[target_model.name] = [col.name for col in primary_model_dimensions]
else:
# TODO -- do we actually want to append missing dims here?
for metric_dim in metric.dimensions[target_model.name]:
if metric_dim not in [dim.name for dim in primary_model_dimensions]:
raise dbt.exceptions.InternalException(
f"Metric dimensions on public models must be declared as dimensions in the model yml file. Dimension `{metric_dim}` declared on metric `{metric.name}` is missing this configuration."
)
for dim in primary_model_dimensions:
if dim.name not in metric.dimensions[target_model.name]:
metric.dimensions[target_model.name].append(dim.name)
if metric.allow_joins:
for relationship in target_model.relationships:
to_model_name = relationship.to
to_model = manifest.resolve_ref(
to_model_name,
target_model_package,
current_project,
metric.package_name,
)
if not to_model.is_public:
raise dbt.exceptions.InternalException(
f"Model relationships must be declared between public models - {metric.unique_id} depends on {to_model.unique_id}, which is not a public model."
)
metric.depends_on.nodes.append(to_model.unique_id)
new_dims = [col for col in to_model.columns.values() if col.is_dimension]
if new_dims:
metric.dimensions[to_model.name] = []
for col in new_dims:
if not col.data_type:
raise dbt.exceptions.InternalException(
f"Dimension columns must declare a `data_type` attribute. {col.name} is missing this configuration."
)
metric.dimensions[to_model.name].append(col.name)
def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
"""Given a manifest and a metric in that manifest, process its refs"""
for ref in metric.refs:
@@ -1206,6 +1281,17 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
target_model_id = target_model.unique_id
metric.depends_on.nodes.append(target_model_id)
_process_dimensions_and_relationships_for_metric(
manifest, target_model, metric, target_model_package, current_project
)
# this should not go here
# this checks the node columns, and adds any dimensions
# declared in that model yml to the metric dimension list
# joined_models = target_model.relationships
manifest.update_metric(metric)
@@ -1298,6 +1384,25 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif
manifest.update_node(node)
def _process_inverse_relationships_for_node(
manifest: Manifest, current_project: str, node: ManifestNode
):
"""Given a manifest and a node in that manifest, process the inverse relationships for the related nodes"""
target_model_package: Optional[str] = None
if node.resource_type == "model" and len(node.relationships) > 0:
for relationship in node.relationships:
to_model = manifest.resolve_ref(
relationship.to, target_model_package, current_project, node.package_name
)
inverse_relationship = EntityRelationship(
to=node.name,
join_key=relationship.join_key,
relationship_type=EntityRelationshipType(relationship.relationship_type.inverse()),
)
to_model.relationships.append(inverse_relationship)
manifest.update_node(to_model)
def _process_sources_for_exposure(
manifest: Manifest, current_project: str, exposure: ParsedExposure
):

View File

@@ -602,7 +602,6 @@ class PartialParsing:
else:
saved_schema_file.pp_dict = {}
self.handle_schema_file_changes(saved_schema_file, saved_yaml_dict, new_yaml_dict)
# copy from new schema_file to saved_schema_file to preserve references
# that weren't removed
saved_schema_file.contents = new_schema_file.contents
@@ -855,6 +854,16 @@ class PartialParsing:
# remove from patches
schema_file.node_patches.remove(elem_unique_id)
# for models, reparse children metrics if applicable:
if dict_key == "models":
for unique_id in schema_file.node_patches:
children = [
child
for child in self.saved_manifest.child_map[unique_id]
if child.split(".")[0] == "metric"
]
self.schedule_nodes_for_parsing(children)
# for models, seeds, snapshots (not analyses)
if dict_key in ["models", "seeds", "snapshots"]:
# find related tests and remove them

View File

@@ -140,8 +140,12 @@ class ParserRef:
quote: Optional[bool]
if isinstance(column, UnparsedColumn):
quote = column.quote
is_dimension = column.is_dimension
is_primary_key = column.is_primary_key
else:
quote = None
is_dimension = False
is_primary_key = False
self.column_info[column.name] = ColumnInfo(
name=column.name,
description=description,
@@ -150,6 +154,8 @@ class ParserRef:
tags=tags,
quote=quote,
_extra=column.extra,
is_dimension=is_dimension,
is_primary_key=is_primary_key,
)
@classmethod
@@ -865,6 +871,8 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
meta=block.target.meta,
docs=block.target.docs,
config=block.target.config,
is_public=block.target.is_public,
relationships=block.target.relationships,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -1097,6 +1105,22 @@ class MetricParser(YamlReader):
self.schema_parser = schema_parser
self.yaml = yaml
def process_dimension_lists(
self, unparsed_dimension_block: List[str] or Dict[str, any], unparsed_model: str
):
if isinstance(unparsed_dimension_block, list):
# i think this has a real method to resolve refs, but for now hacking this together as POC
model_name = (
unparsed_model.replace("ref", "")
.replace("'", "")
.replace('"', "")
.replace("(", "")
.replace(")", "")
)
return dict({model_name: unparsed_dimension_block})
else:
return unparsed_dimension_block
def parse_metric(self, unparsed: UnparsedMetric):
package_name = self.project.project_name
unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}"
@@ -1140,7 +1164,8 @@ class MetricParser(YamlReader):
calculation_method=unparsed.calculation_method,
expression=str(unparsed.expression),
timestamp=unparsed.timestamp,
dimensions=unparsed.dimensions,
dimensions=self.process_dimension_lists(unparsed.dimensions, unparsed.model),
allow_joins=unparsed.allow_joins,
window=unparsed.window,
time_grains=unparsed.time_grains,
filters=unparsed.filters,