Compare commits

...

23 Commits

Author SHA1 Message Date
Dave Connors
f34644a613 add timegrains as column value and process timegrains if not specified in metric 2022-10-31 16:00:27 -05:00
Callum McCann
b17f8f51ba dave feedback 2022-10-31 11:42:43 -05:00
Callum McCann
d812a3bcf8 updating based on comments 2022-10-31 11:40:53 -05:00
Callum McCann
2eeabad16e adding the primary_keys property in manifest 2022-10-30 15:19:54 -05:00
Callum McCann
c5aa06ef4b renaming to process semantic information 2022-10-30 13:00:48 -05:00
Callum McCann
e628a8e9c3 adding join_keys 2022-10-29 17:33:29 -05:00
Callum McCann
e48c51e238 first draft 2022-10-28 17:39:31 -05:00
Dave Connors
ac7c8209cc remove trace 2022-10-26 09:12:53 -05:00
Dave Connors
d545c47112 parse list of dimensions to dictionary 2022-10-26 08:26:22 -05:00
Dave Connors
3cd5595482 dimensions 2022-10-25 08:18:16 -05:00
Dave Connors
d8f3586e56 process inverse relationships 2022-10-24 14:00:12 -05:00
Dave Connors
ab9d0fec55 process inverse relationships -- broken manifest 2022-10-14 15:52:17 -05:00
Dave Connors
f7255f2598 add optional metric config allow_joins to bypass relationship logic 2022-10-14 12:01:01 -05:00
Dave Connors
2e310d6e01 dimensions as dictionaries take 1 2022-10-13 15:40:59 -05:00
Dave Connors
1cd996a2c8 dataype, partial parsing support for model schema changes 2022-10-13 14:47:17 -05:00
Dave Connors
335b3062be Merge branch 'main' into simple-entity-attributes 2022-10-12 16:14:25 -05:00
Dave Connors
ce0c706102 add related model to depends on 2022-10-11 16:52:21 -05:00
Dave Connors
b18139497f dirty dimension inheritance from joined models 2022-10-06 14:52:02 -05:00
Dave Connors
ad5570cd6c allow all dimensions to be passed to metric object 2022-10-06 14:12:43 -05:00
Dave Connors
ef9f4d6cf9 futile attempt to check for node dimensions 2022-10-06 10:05:54 -05:00
Dave Connors
70f7dc24ce add datatype column attribute 2022-10-06 09:21:40 -05:00
Dave Connors
206b220e3d simple relationships for models 2022-10-06 09:11:18 -05:00
Dave Connors
5ed168c47b add is entity as top level model attribute, is_entity_dimension and is_primary_key as column attributes 2022-10-06 08:47:36 -05:00
5 changed files with 276 additions and 6 deletions

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType,
MetricFilter,
MetricTime,
EntityRelationship,
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.exceptions import warn_or_error
@@ -64,6 +65,9 @@ class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable
meta: Dict[str, Any] = field(default_factory=dict)
data_type: Optional[str] = None
quote: Optional[bool] = None
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
time_grains: Optional[List[str]] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -160,6 +164,8 @@ class ParsedNodeMixins(dbtClassMixin):
self.created_at = time.time()
self.description = patch.description
self.columns = patch.columns
self.is_public = patch.is_public
self.relationships = patch.relationships
def get_materialization(self):
return self.config.materialized
@@ -210,6 +216,9 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
compiled_path: Optional[str] = None
build_path: Optional[str] = None
deferred: bool = False
is_public: Optional[bool] = False
relationships: List[EntityRelationship] = field(default_factory=list)
primary_keys: List[str] = field(default_factory=list)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
config_call_dict: Dict[str, Any] = field(default_factory=dict)
@@ -244,7 +253,7 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
@classmethod
def _deserialize(cls, dct: Dict[str, int]):
# The serialized ParsedNodes do not differ from each other
# in fields that would allow 'from_dict' to distinguis
# in fields that would allow 'from_dict' to distinguish
# between them.
resource_type = dct["resource_type"]
if resource_type == "model":
@@ -497,6 +506,8 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
@dataclass
class ParsedNodePatch(ParsedPatch):
columns: Dict[str, ColumnInfo]
is_public: Optional[bool]
relationships: List[EntityRelationship]
@dataclass
@@ -828,10 +839,11 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
expression: str
filters: List[MetricFilter]
time_grains: List[str]
dimensions: List[str]
dimensions: Dict[str, Any] = field(default_factory=dict)
window: Optional[MetricTime] = None
model: Optional[str] = None
model_unique_id: Optional[str] = None
allow_joins: Optional[bool] = True
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -88,12 +88,36 @@ class Docs(dbtClassMixin, Replaceable):
node_color: Optional[str] = None
@dataclass
class EntityRelationshipType(StrEnum):
many_to_one = "many_to_one"
one_to_many = "one_to_many"
one_to_one = "one_to_one"
def inverse(self) -> str:
if self == "many_to_one":
return "one_to_many"
elif self == "one_to_many":
return "many_to_one"
else:
return self
@dataclass
class EntityRelationship(dbtClassMixin, Replaceable):
to: str
join_keys: Union[str, List[str]]
relationship_type: EntityRelationshipType
@dataclass
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
name: str
description: str = ""
meta: Dict[str, Any] = field(default_factory=dict)
is_public: Optional[bool] = False
data_type: Optional[str] = None
relationships: List[EntityRelationship] = field(default_factory=list)
docs: Docs = field(default_factory=Docs)
_extra: Dict[str, Any] = field(default_factory=dict)
@@ -114,6 +138,10 @@ class HasTests(HasDocs):
class UnparsedColumn(HasTests):
quote: Optional[bool] = None
tags: List[str] = field(default_factory=list)
is_dimension: Optional[bool] = False
is_primary_key: Optional[bool] = False
time_grains: Optional[List[str]] = field(default_factory=list)
data_type: Optional[str] = None
@dataclass
@@ -489,10 +517,11 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
timestamp: str
expression: str
description: str = ""
time_grains: List[str] = field(default_factory=list)
dimensions: List[str] = field(default_factory=list)
time_grains: Optional[List[str]] = field(default_factory=list)
dimensions: Union[Dict[str, Any], List[str]] = field(default_factory=dict)
window: Optional[MetricTime] = None
model: Optional[str] = None
allow_joins: Optional[bool] = True
filters: List[MetricFilter] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

View File

@@ -69,6 +69,7 @@ from dbt.contracts.graph.parsed import (
ParsedExposure,
ParsedMetric,
)
from dbt.contracts.graph.unparsed import EntityRelationshipType, EntityRelationship
from dbt.contracts.util import Writable
from dbt.exceptions import (
ref_target_not_found,
@@ -392,6 +393,7 @@ class ManifestLoader:
start_process = time.perf_counter()
self.process_sources(self.root_project.project_name)
self.process_refs(self.root_project.project_name)
# self.process_inverse_relationships(self.root_project.project_name)
self.process_docs(self.root_project)
self.process_metrics(self.root_project)
@@ -832,6 +834,7 @@ class ManifestLoader:
if node.created_at < self.started_at:
continue
_process_refs_for_node(self.manifest, current_project, node)
_process_semantic_information_for_node(self.manifest, current_project, node)
for exposure in self.manifest.exposures.values():
if exposure.created_at < self.started_at:
continue
@@ -1167,6 +1170,83 @@ def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposur
manifest.update_exposure(exposure)
def _process_semantic_information_for_metric(
manifest: Manifest,
target_model: ManifestNode,
metric: ParsedMetric,
target_model_package: Optional[str],
current_project: Optional[str],
):
# ensure that relationships are not declared on non-public models
if target_model.relationships and not target_model.is_public:
raise dbt.exceptions.InternalException(
f"Model relationships must be declared between public models - {metric.unique_id} depends on {target_model.unique_id}, which is not a public model."
)
# only allow dimensions to be accessed if the model is declared to be a public model
if target_model.is_public:
# get declared dimensions from primary model
primary_model_dimensions = [
col for col in target_model.columns.values() if col.is_dimension
]
import pdb
pdb.set_trace()
timestamp_col = target_model.columns[metric.timestamp]
# validate declared dims from primary model
for dim in primary_model_dimensions:
if not dim.data_type:
raise dbt.exceptions.InternalException(
f"Dimension columns must declare a `data_type` attribute. {dim.name} is missing this configuration."
)
# check if dimensions declared, if not, supply dimensions from model yml
if not metric.dimensions:
metric.dimensions[target_model.name] = [col.name for col in primary_model_dimensions]
else:
# TODO -- do we actually want to append missing dims here?
for metric_dim in metric.dimensions[target_model.name]:
if metric_dim not in [dim.name for dim in primary_model_dimensions]:
raise dbt.exceptions.InternalException(
f"Metric dimensions on public models must be declared as dimensions in the model yml file. Dimension `{metric_dim}` declared on metric `{metric.name}` is missing this configuration."
)
for dim in primary_model_dimensions:
if dim.name not in metric.dimensions[target_model.name]:
metric.dimensions[target_model.name].append(dim.name)
if metric.allow_joins:
for relationship in target_model.relationships:
to_model_name = relationship.to
to_model = manifest.resolve_ref(
to_model_name,
target_model_package,
current_project,
metric.package_name,
)
metric.depends_on.nodes.append(to_model.unique_id)
new_dims = [col for col in to_model.columns.values() if col.is_dimension]
if new_dims:
metric.dimensions[to_model.name] = []
for col in new_dims:
if not col.data_type:
raise dbt.exceptions.InternalException(
f"Dimension columns must declare a `data_type` attribute. {col.name} is missing this configuration."
)
metric.dimensions[to_model.name].append(col.name)
import pdb
pdb.set_trace()
# if metric.time_grains:
# pass
# elif timestamp_col:
# inferred_time_grains = timestamp_col.time_grains
# metric.time_grains = inferred_time_grains
def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
"""Given a manifest and a metric in that manifest, process its refs"""
for ref in metric.refs:
@@ -1206,6 +1286,17 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
target_model_id = target_model.unique_id
metric.depends_on.nodes.append(target_model_id)
_process_semantic_information_for_metric(
manifest, target_model, metric, target_model_package, current_project
)
# this should not go here
# this checks the node columns, and adds any dimensions
# declared in that model yml to the metric dimension list
# joined_models = target_model.relationships
manifest.update_metric(metric)
@@ -1298,6 +1389,107 @@ def _process_refs_for_node(manifest: Manifest, current_project: str, node: Manif
manifest.update_node(node)
def _process_semantic_information_for_node(
manifest: Manifest, current_project: str, node: ManifestNode
):
"""Given a manifest and a node in that manifest, process all of the semantic
information for the related nodes. This occurs in multiple steps:
1. If the node is a seed or model and is_public is true, continue with compilation. Otherwise ignore.
2. Loop through all of the models listed in relationships and check if they also
have is_public set to true. If not, append to a list and then raise that list in a CompilationException.
3. Loop through the columns where is_primary_key is set to true and then create a
composite list at the model level of those columns
4. Create the inverse_relationship of each relationship and validate it against the other model.
If it doesn't exist, add it. If a relationship exists with the same name, confirm that the properties match.
If the properties don't match, raise a CompilationException.
"""
target_model_package: Optional[str] = None
# Limit this parsing to public models/seeds that have relationships
if node.resource_type == "model" and node.is_public == True:
# Creating a list that will be populated with models missing the in_public
# flag that are being referenced with relationships
non_public_models = []
# Setting the primary_keys field in the node to be a combination of all
# columns that have is_primary_key set to true
primary_key_columns = [column for column in node.columns.values() if column.is_primary_key]
# Appending the values of all primary key columns into a primary_key list
# at the model level
for column in primary_key_columns:
node.primary_keys.append(column.name)
# Use a generator expression to create the set of public models/seeds
# that have relationships, which we'll use to iterate on
nodes_with_relationships = (
relationship for relationship in node.relationships if len(node.relationships) > 0
)
for relationship in nodes_with_relationships:
# Here we overwrite the base value of the join_keys if it is a string and replace
# it with a list value for the manifest"""
if type(relationship.join_keys) == str:
relationship.join_keys = [relationship.join_keys]
to_model = manifest.resolve_ref(
relationship.to, target_model_package, current_project, node.package_name
)
# Create a list of models that have relationships pointed at them that
# have not been set to is_public. Use this list to raise CompilationException
if to_model.is_public == False:
non_public_models.append(to_model.name)
# Using the to_model, create an inverse relationship in the model
inverse_relationship = EntityRelationship(
to=node.name,
join_keys=relationship.join_keys,
relationship_type=EntityRelationshipType(relationship.relationship_type.inverse()),
)
# TODO: Clean up this very messy code. See if way we can check
# relationship.to against to_model.relationships without it breaking
# because the latter is a EntityRelationship
# Checks if the relationship already exists and only creates if not
if len(to_model.relationships) > 0:
for to_model_relationship in to_model.relationships:
# TODO: Clean up how the entity relationship class matches. Strings feel bad
# Relationship exists, carry on as normal
if inverse_relationship == to_model_relationship and str(
inverse_relationship.relationship_type
) == str(to_model_relationship.relationship_type):
continue
# If the relationship name exists but other properties don't raise an error
elif inverse_relationship.to == to_model_relationship.to and (
inverse_relationship.join_keys != to_model_relationship.join_keys
or str(inverse_relationship.relationship_type)
!= str(to_model_relationship.relationship_type)
):
raise dbt.exceptions.CompilationException(
f"""The relationship between {relationship.to} and {inverse_relationship.to} does not match. Please ensure that the join_key(s) and relationship_types match"""
)
# If none of the above conditions are triggered, write the new relationship!
else:
to_model.relationships.append(inverse_relationship)
manifest.update_node(to_model)
else:
to_model.relationships.append(inverse_relationship)
manifest.update_node(to_model)
# Looping through public models that have relationships established to raise compilation error
if len(non_public_models) > 0:
raise dbt.exceptions.CompilationException(
f"""The model(s) {', '.join(non_public_models)} have relationships established but are not public models. Please set the `is_public` property to true for these model(s)."""
)
def _process_sources_for_exposure(
manifest: Manifest, current_project: str, exposure: ParsedExposure
):

View File

@@ -602,7 +602,6 @@ class PartialParsing:
else:
saved_schema_file.pp_dict = {}
self.handle_schema_file_changes(saved_schema_file, saved_yaml_dict, new_yaml_dict)
# copy from new schema_file to saved_schema_file to preserve references
# that weren't removed
saved_schema_file.contents = new_schema_file.contents
@@ -855,6 +854,16 @@ class PartialParsing:
# remove from patches
schema_file.node_patches.remove(elem_unique_id)
# for models, reparse children metrics if applicable:
if dict_key == "models":
for unique_id in schema_file.node_patches:
children = [
child
for child in self.saved_manifest.child_map[unique_id]
if child.split(".")[0] == "metric"
]
self.schedule_nodes_for_parsing(children)
# for models, seeds, snapshots (not analyses)
if dict_key in ["models", "seeds", "snapshots"]:
# find related tests and remove them

View File

@@ -140,8 +140,14 @@ class ParserRef:
quote: Optional[bool]
if isinstance(column, UnparsedColumn):
quote = column.quote
is_dimension = column.is_dimension
is_primary_key = column.is_primary_key
time_grains = column.time_grains
else:
quote = None
is_dimension = False
is_primary_key = False
time_grains = None
self.column_info[column.name] = ColumnInfo(
name=column.name,
description=description,
@@ -150,6 +156,9 @@ class ParserRef:
tags=tags,
quote=quote,
_extra=column.extra,
is_dimension=is_dimension,
is_primary_key=is_primary_key,
time_grains=time_grains,
)
@classmethod
@@ -865,6 +874,8 @@ class NodePatchParser(NonSourceParser[NodeTarget, ParsedNodePatch], Generic[Node
meta=block.target.meta,
docs=block.target.docs,
config=block.target.config,
is_public=block.target.is_public,
relationships=block.target.relationships,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
@@ -1097,6 +1108,22 @@ class MetricParser(YamlReader):
self.schema_parser = schema_parser
self.yaml = yaml
def process_dimension_lists(
self, unparsed_dimension_block: List[str] or Dict[str, any], unparsed_model: str
):
if isinstance(unparsed_dimension_block, list):
# i think this has a real method to resolve refs, but for now hacking this together as POC
model_name = (
unparsed_model.replace("ref", "")
.replace("'", "")
.replace('"', "")
.replace("(", "")
.replace(")", "")
)
return dict({model_name: unparsed_dimension_block})
else:
return unparsed_dimension_block
def parse_metric(self, unparsed: UnparsedMetric):
package_name = self.project.project_name
unique_id = f"{NodeType.Metric}.{package_name}.{unparsed.name}"
@@ -1140,7 +1167,8 @@ class MetricParser(YamlReader):
calculation_method=unparsed.calculation_method,
expression=str(unparsed.expression),
timestamp=unparsed.timestamp,
dimensions=unparsed.dimensions,
dimensions=self.process_dimension_lists(unparsed.dimensions, unparsed.model),
allow_joins=unparsed.allow_joins,
window=unparsed.window,
time_grains=unparsed.time_grains,
filters=unparsed.filters,