Compare commits

...

3 Commits

Author SHA1 Message Date
Callum McCann
fb8b161351 adding entity inheritence 2022-12-06 16:20:12 -06:00
Callum McCann
7ecb431278 making dimensions possible! 2022-12-06 13:54:28 -06:00
Callum McCann
792150ff6a let there be entity 2022-12-05 16:02:14 -06:00
40 changed files with 808 additions and 19 deletions

View File

@@ -56,6 +56,7 @@ def print_compile_stats(stats):
NodeType.Source: "source",
NodeType.Exposure: "exposure",
NodeType.Metric: "metric",
NodeType.Entity: "entity",
}
results = {k: 0 for k in names.keys()}
@@ -91,6 +92,8 @@ def _generate_stats(manifest: Manifest):
stats[exposure.resource_type] += 1
for metric in manifest.metrics.values():
stats[metric.resource_type] += 1
for entity in manifest.entities.values():
stats[entity.resource_type] += 1
for macro in manifest.macros.values():
stats[macro.resource_type] += 1
return stats

View File

@@ -381,6 +381,7 @@ class PartialProject(RenderComponents):
sources: Dict[str, Any]
tests: Dict[str, Any]
metrics: Dict[str, Any]
entities: Dict[str, Any]
exposures: Dict[str, Any]
vars_value: VarProvider
@@ -391,6 +392,7 @@ class PartialProject(RenderComponents):
sources = cfg.sources
tests = cfg.tests
metrics = cfg.metrics
entities = cfg.entities
exposures = cfg.exposures
if cfg.vars is None:
vars_dict: Dict[str, Any] = {}
@@ -446,6 +448,7 @@ class PartialProject(RenderComponents):
sources=sources,
tests=tests,
metrics=metrics,
entities=entities,
exposures=exposures,
vars=vars_value,
config_version=cfg.config_version,
@@ -550,6 +553,7 @@ class Project:
sources: Dict[str, Any]
tests: Dict[str, Any]
metrics: Dict[str, Any]
entities: Dict[str, Any]
exposures: Dict[str, Any]
vars: VarProvider
dbt_version: List[VersionSpecifier]
@@ -624,6 +628,7 @@ class Project:
"sources": self.sources,
"tests": self.tests,
"metrics": self.metrics,
"entities": self.entities,
"exposures": self.exposures,
"vars": self.vars.to_dict(),
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],

View File

@@ -116,6 +116,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
sources=project.sources,
tests=project.tests,
metrics=project.metrics,
entities=project.entities,
exposures=project.exposures,
vars=project.vars,
config_version=project.config_version,
@@ -311,6 +312,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
"sources": self._get_config_paths(self.sources),
"tests": self._get_config_paths(self.tests),
"metrics": self._get_config_paths(self.metrics),
"entities": self._get_config_paths(self.entities),
"exposures": self._get_config_paths(self.exposures),
}
@@ -506,6 +508,7 @@ class UnsetProfileConfig(RuntimeConfig):
"sources": self.sources,
"tests": self.tests,
"metrics": self.metrics,
"entities": self.entities,
"exposures": self.exposures,
"vars": self.vars.to_dict(),
"require-dbt-version": [v.to_version_string() for v in self.dbt_version],
@@ -568,6 +571,7 @@ class UnsetProfileConfig(RuntimeConfig):
sources=project.sources,
tests=project.tests,
metrics=project.metrics,
entities=project.entities,
exposures=project.exposures,
vars=project.vars,
config_version=project.config_version,

View File

@@ -45,6 +45,8 @@ class UnrenderedConfig(ConfigSource):
model_configs = unrendered.get("tests")
elif resource_type == NodeType.Metric:
model_configs = unrendered.get("metrics")
elif resource_type == NodeType.Entity:
model_configs = unrendered.get("entities")
elif resource_type == NodeType.Exposure:
model_configs = unrendered.get("exposures")
else:
@@ -70,6 +72,8 @@ class RenderedConfig(ConfigSource):
model_configs = self.project.tests
elif resource_type == NodeType.Metric:
model_configs = self.project.metrics
elif resource_type == NodeType.Entity:
model_configs = self.project.entities
elif resource_type == NodeType.Exposure:
model_configs = self.project.exposures
else:

View File

@@ -37,6 +37,7 @@ from dbt.contracts.graph.parsed import (
ParsedMacro,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedSeedNode,
ParsedSourceDefinition,
)
@@ -301,12 +302,10 @@ class BaseMetricResolver(BaseResolver):
self.validate_args(name, package)
return self.resolve(name, package)
class Config(Protocol):
def __init__(self, model, context_config: Optional[ContextConfig]):
...
# Implementation of "config(..)" calls in models
class ParseConfigObject(Config):
def __init__(self, model, context_config: Optional[ContextConfig]):
@@ -1492,7 +1491,6 @@ class MetricRefResolver(BaseResolver):
"the name argument to ref() must be a string"
)
def generate_parse_metrics(
metric: ParsedMetric,
config: RuntimeConfig,
@@ -1515,6 +1513,41 @@ def generate_parse_metrics(
),
}
class EntityRefResolver(BaseResolver):
def __call__(self, *args) -> str:
package = None
if len(args) == 1:
name = args[0]
elif len(args) == 2:
package, name = args
else:
ref_invalid_args(self.model, args)
self.validate_args(name, package)
self.model.refs.append(list(args))
return ""
def validate_args(self, name, package):
if not isinstance(name, str):
raise ParsingException(
f"In the entity associated with {self.model.original_file_path} "
"the name argument to ref() must be a string"
)
def generate_parse_entities(
entity: ParsedEntity,
config: RuntimeConfig,
manifest: Manifest,
package_name: str,
) -> Dict[str, Any]:
project = config.load_dependencies()[package_name]
return {
"ref": EntityRefResolver(
None,
entity,
project,
manifest,
),
}
# This class is currently used by the schema parser in order
# to limit the number of macros in the context by using

View File

@@ -227,6 +227,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.

View File

@@ -7,6 +7,7 @@ from dbt.contracts.graph.parsed import (
ParsedModelNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedResource,
ParsedRPCNode,
ParsedSqlNode,
@@ -233,4 +234,5 @@ GraphMemberNode = Union[
CompileResultNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
]

View File

@@ -36,6 +36,7 @@ from dbt.contracts.graph.parsed import (
ParsedGenericTestNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
HasUniqueID,
UnpatchedSourceDefinition,
ManifestNodes,
@@ -216,8 +217,39 @@ class MetricLookup(dbtClassMixin):
)
return manifest.metrics[unique_id]
class EntityLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
self.populate(manifest)
# This handles both models/seeds/snapshots and sources/metrics/exposures
def get_unique_id(self, search_name, package: Optional[PackageName]):
return find_unique_id_for_package(self.storage, search_name, package)
def find(self, search_name, package: Optional[PackageName], manifest: "Manifest"):
unique_id = self.get_unique_id(search_name, package)
if unique_id is not None:
return self.perform_lookup(unique_id, manifest)
return None
def add_entity(self, entity: ParsedEntity):
if entity.search_name not in self.storage:
self.storage[entity.search_name] = {}
self.storage[entity.search_name][entity.package_name] = entity.unique_id
def populate(self, manifest):
for entity in manifest.entities.values():
if hasattr(entity, "name"):
self.add_entity(entity)
def perform_lookup(self, unique_id: UniqueID, manifest: "Manifest") -> ParsedEntity:
if unique_id not in manifest.entities:
raise dbt.exceptions.InternalException(
f"Entity {unique_id} found in cache but not found in manifest"
)
return manifest.entities[unique_id]
# This handles both models/seeds/snapshots and sources/metrics/entities/exposures
class DisabledLookup(dbtClassMixin):
def __init__(self, manifest: "Manifest"):
self.storage: Dict[str, Dict[PackageName, List[Any]]] = {}
@@ -467,6 +499,7 @@ class Disabled(Generic[D]):
MaybeMetricNode = Optional[Union[ParsedMetric, Disabled[ParsedMetric]]]
MaybeEntityNode = Optional[Union[ParsedEntity, Disabled[ParsedEntity]]]
MaybeDocumentation = Optional[ParsedDocumentation]
@@ -611,6 +644,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict)
exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict)
metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict)
entities: MutableMapping[str, ParsedEntity] = field(default_factory=dict)
selectors: MutableMapping[str, Any] = field(default_factory=dict)
files: MutableMapping[str, AnySourceFile] = field(default_factory=dict)
metadata: ManifestMetadata = field(default_factory=ManifestMetadata)
@@ -632,6 +666,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
_metric_lookup: Optional[MetricLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_entity_lookup: Optional[EntityLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
_disabled_lookup: Optional[DisabledLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
)
@@ -682,6 +719,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
def update_metric(self, new_metric: ParsedMetric):
_update_into(self.metrics, new_metric)
def update_entity(self, new_entity: ParsedEntity):
_update_into(self.entities, new_entity)
def update_node(self, new_node: ManifestNode):
_update_into(self.nodes, new_node)
@@ -697,6 +737,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.flat_graph = {
"exposures": {k: v.to_dict(omit_none=False) for k, v in self.exposures.items()},
"metrics": {k: v.to_dict(omit_none=False) for k, v in self.metrics.items()},
"entities": {k: v.to_dict(omit_none=False) for k, v in self.entities.items()},
"nodes": {k: v.to_dict(omit_none=False) for k, v in self.nodes.items()},
"sources": {k: v.to_dict(omit_none=False) for k, v in self.sources.items()},
}
@@ -759,6 +800,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.nodes.values(),
self.sources.values(),
self.metrics.values(),
self.entities.values(),
)
for resource in all_resources:
resource_type_plural = resource.resource_type.pluralize()
@@ -787,6 +829,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs={k: _deepcopy(v) for k, v in self.docs.items()},
exposures={k: _deepcopy(v) for k, v in self.exposures.items()},
metrics={k: _deepcopy(v) for k, v in self.metrics.items()},
entities={k: _deepcopy(v) for k, v in self.entities.items()},
selectors={k: _deepcopy(v) for k, v in self.selectors.items()},
metadata=self.metadata,
disabled={k: _deepcopy(v) for k, v in self.disabled.items()},
@@ -803,6 +846,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.sources.values(),
self.exposures.values(),
self.metrics.values(),
self.entities.values(),
)
)
forward_edges, backward_edges = build_node_edges(edge_members)
@@ -828,6 +872,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs=self.docs,
exposures=self.exposures,
metrics=self.metrics,
entities=self.entities,
selectors=self.selectors,
metadata=self.metadata,
disabled=self.disabled,
@@ -849,6 +894,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return self.exposures[unique_id]
elif unique_id in self.metrics:
return self.metrics[unique_id]
elif unique_id in self.entities:
return self.entities[unique_id]
else:
# something terrible has happened
raise dbt.exceptions.InternalException(
@@ -885,6 +932,12 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._metric_lookup = MetricLookup(self)
return self._metric_lookup
@property
def entity_lookup(self) -> EntityLookup:
if self._entity_lookup is None:
self._entity_lookup = EntityLookup(self)
return self._entity_lookup
def rebuild_ref_lookup(self):
self._ref_lookup = RefableLookup(self)
@@ -985,6 +1038,32 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
return Disabled(disabled[0])
return None
def resolve_entity(
self,
target_entity_name: str,
target_entity_package: Optional[str],
current_project: str,
node_package: str,
) -> MaybeEntityNode:
entity: Optional[ParsedEntity] = None
disabled: Optional[List[ParsedEntity]] = None
candidates = _search_packages(current_project, node_package, target_entity_package)
for pkg in candidates:
entity = self.entity_lookup.find(target_entity_name, pkg, self)
if entity is not None and entity.config.enabled:
return entity
# it's possible that the node is disabled
if disabled is None:
disabled = self.disabled_lookup.find(f"{target_entity_name}", pkg)
if disabled:
return Disabled(disabled[0])
return None
# Called by DocsRuntimeContext.doc
def resolve_doc(
self,
@@ -1099,6 +1178,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
source_file.add_test(node.unique_id, test_from)
if isinstance(node, ParsedMetric):
source_file.metrics.append(node.unique_id)
if isinstance(node, ParsedEntity):
source_file.entities.append(node.unique_id)
if isinstance(node, ParsedExposure):
source_file.exposures.append(node.unique_id)
else:
@@ -1114,6 +1195,11 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.metrics[metric.unique_id] = metric
source_file.metrics.append(metric.unique_id)
def add_entity(self, source_file: SchemaSourceFile, entity: ParsedEntity):
_check_duplicates(entity, self.entities)
self.entities[entity.unique_id] = entity
source_file.entities.append(entity.unique_id)
def add_disabled_nofile(self, node: GraphMemberNode):
# There can be multiple disabled nodes for the same unique_id
if node.unique_id in self.disabled:
@@ -1129,6 +1215,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
source_file.add_test(node.unique_id, test_from)
if isinstance(node, ParsedMetric):
source_file.metrics.append(node.unique_id)
if isinstance(node, ParsedEntity):
source_file.entities.append(node.unique_id)
if isinstance(node, ParsedExposure):
source_file.exposures.append(node.unique_id)
else:
@@ -1156,6 +1244,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self.docs,
self.exposures,
self.metrics,
self.entities,
self.selectors,
self.files,
self.metadata,
@@ -1168,6 +1257,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
self._source_lookup,
self._ref_lookup,
self._metric_lookup,
self._entity_lookup,
self._disabled_lookup,
self._analysis_lookup,
)
@@ -1209,6 +1299,9 @@ class WritableManifest(ArtifactMixin):
metrics: Mapping[UniqueID, ParsedMetric] = field(
metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
)
entities: Mapping[UniqueID, ParsedEntity] = field(
metadata=dict(description=("The entities defined in the dbt project and its dependencies"))
)
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=("The selectors defined in selectors.yml"))
)

View File

@@ -367,6 +367,9 @@ class BaseConfig(AdditionalPropertiesAllowed, Replaceable):
class MetricConfig(BaseConfig):
enabled: bool = True
@dataclass
class EntityConfig(BaseConfig):
enabled: bool = True
@dataclass
class ExposureConfig(BaseConfig):
@@ -604,6 +607,7 @@ class SnapshotConfig(EmptySnapshotConfig):
RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
NodeType.Metric: MetricConfig,
NodeType.Entity: EntityConfig,
NodeType.Exposure: ExposureConfig,
NodeType.Source: SourceConfig,
NodeType.Seed: SeedConfig,

View File

@@ -38,6 +38,7 @@ from dbt.contracts.graph.unparsed import (
MaturityType,
MetricFilter,
MetricTime,
EntityDimension
)
from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin
from dbt.events.proto_types import NodeInfo
@@ -58,6 +59,7 @@ from .model_config import (
TestConfig,
SourceConfig,
MetricConfig,
EntityConfig,
ExposureConfig,
EmptySnapshotConfig,
SnapshotConfig,
@@ -209,6 +211,7 @@ class ParsedNodeDefaults(NodeInfoMixin, ParsedNodeMandatory):
refs: List[List[str]] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
metrics: List[List[str]] = field(default_factory=list)
entities: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
description: str = field(default="")
columns: Dict[str, ColumnInfo] = field(default_factory=dict)
@@ -252,7 +255,7 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
@classmethod
def _deserialize(cls, dct: Dict[str, int]):
# The serialized ParsedNodes do not differ from each other
# in fields that would allow 'from_dict' to distinguis
# in fields that would allow 'from_dict' to distinguish
# between them.
resource_type = dct["resource_type"]
if resource_type == "model":
@@ -845,7 +848,7 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: MetricConfig = field(default_factory=MetricConfig)
config: EntityConfig = field(default_factory=EntityConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
@@ -918,6 +921,60 @@ class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn):
and True
)
@dataclass
class ParsedEntity(UnparsedBaseNode, HasUniqueID, HasFqn):
name: str
model: str
description: str
dimensions: Dict[str, EntityDimension] = field(default_factory=dict)
model_unique_id: Optional[str] = None
resource_type: NodeType = NodeType.Metric
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: MetricConfig = field(default_factory=MetricConfig)
unrendered_config: Dict[str, Any] = field(default_factory=dict)
sources: List[List[str]] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)
refs: List[List[str]] = field(default_factory=list)
entities: List[List[str]] = field(default_factory=list)
created_at: float = field(default_factory=lambda: time.time())
@property
def depends_on_nodes(self):
return self.depends_on.nodes
@property
def search_name(self):
return self.name
def same_model(self, old: "ParsedEntity") -> bool:
return self.model == old.model
def same_description(self, old: "ParsedEntity") -> bool:
return self.description == old.description
def same_dimensions(self, old: "ParsedEntity") -> bool:
return self.dimensions == old.dimensions
def same_config(self, old: "ParsedEntity") -> bool:
return self.config.same_contents(
self.unrendered_config,
old.unrendered_config,
)
def same_contents(self, old: Optional["ParsedEntity"]) -> bool:
# existing when it didn't before is a change!
# metadata/tags changes are not "changes"
if old is None:
return True
return (
self.same_model(old)
and self.same_description(old)
and self.same_dimensions(old)
and self.same_config(old)
and True
)
ManifestNodes = Union[
ParsedAnalysisNode,
@@ -938,5 +995,6 @@ ParsedResource = Union[
ParsedNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedSourceDefinition,
]

View File

@@ -523,3 +523,47 @@ class UnparsedMetric(dbtClassMixin, Replaceable):
if data.get("model") is not None and data.get("calculation_method") == "derived":
raise ValidationError("Derived metrics cannot have a 'model' property")
@dataclass
class EntityDimension(dbtClassMixin, Mergeable):
"""This class is used for the dimension information at the entity level. It
closely matches the implementation of columns for models."""
name: str
description: str = ""
column_name: Optional[str] = None
date_type: Optional[str] = None
default_timestamp: Optional[bool] = None
primary_key: Optional[bool] = None
time_grains: Optional[List[str]] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
meta: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EntityInheritence(EntityDimension):
"""This class is used for entity dimension inheritence. This class is optional
but if it is present then include needs to be present. Exclude cannot be present
without some idea of what is being included, whereas exclude is fully optional.
The acceptable inputs for include are either a list of columns/dimensions or *
to represent all fields. The acceptable inputs for exclude are a list of columns/
dimensions
"""
include: Union[List[str],str] = field(default_factory=list)
exclude: Optional[List[str]] = field(default_factory=list)
@dataclass
class UnparsedEntity(dbtClassMixin, Replaceable):
"""This class is used for entity information"""
name: str
model: str
description: str = ""
dimensions: Optional[Union[Optional[Sequence[EntityDimension]],Optional[EntityInheritence]]] = None
# dimensions: Optional[Sequence[EntityDimension]] = None
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
@classmethod
def validate(cls, data):
super(UnparsedEntity, cls).validate(data)
errors = []
## TODO: Add validation here around include/exclude and others

View File

@@ -208,6 +208,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
sources: Dict[str, Any] = field(default_factory=dict)
tests: Dict[str, Any] = field(default_factory=dict)
metrics: Dict[str, Any] = field(default_factory=dict)
entities: Dict[str, Any] = field(default_factory=dict)
exposures: Dict[str, Any] = field(default_factory=dict)
vars: Optional[Dict[str, Any]] = field(
default=None,

View File

@@ -908,7 +908,6 @@ class PartialParsingDeletedMetric(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
unique_id: str = betterproto.string_field(2)
@dataclass
class ManifestWrongMetadataVersion(betterproto.Message):
"""I022"""
@@ -1249,6 +1248,12 @@ class JinjaLogWarning(betterproto.Message):
info: "EventInfo" = betterproto.message_field(1)
msg: str = betterproto.string_field(2)
@dataclass
class PartialParsingDeletedEntity(betterproto.Message):
"""I062"""
info: "EventInfo" = betterproto.message_field(1)
unique_id: str = betterproto.string_field(2)
@dataclass
class GitSparseCheckoutSubdirectory(betterproto.Message):

View File

@@ -1526,6 +1526,14 @@ class JinjaLogWarning(WarnLevel, pt.JinjaLogWarning):
def message(self) -> str:
return self.msg
@dataclass
class PartialParsingDeletedEntity(DebugLevel, pt.PartialParsingDeletedEntity):
def code(self):
return "I062"
def message(self) -> str:
return f"Partial parsing: deleted entity {self.unique_id}"
# =======================================================
# M - Deps generation

View File

@@ -20,7 +20,7 @@ from .selector_spec import (
INTERSECTION_DELIMITER = ","
DEFAULT_INCLUDES: List[str] = ["fqn:*", "source:*", "exposure:*", "metric:*"]
DEFAULT_INCLUDES: List[str] = ["fqn:*", "source:*", "exposure:*", "metric:*", "entity:*"]
DEFAULT_EXCLUDES: List[str] = []

View File

@@ -5,7 +5,7 @@ from queue import PriorityQueue
from typing import Dict, Set, List, Generator, Optional
from .graph import UniqueId
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric, ParsedEntity
from dbt.contracts.graph.compiled import GraphMemberNode
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType
@@ -48,7 +48,7 @@ class GraphQueue:
if node.resource_type != NodeType.Model:
return False
# must be a Model - tell mypy this won't be a Source or Exposure or Metric
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric))
assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric, ParsedEntity))
if node.is_ephemeral:
return False
return True

View File

@@ -161,6 +161,9 @@ class NodeSelector(MethodManager):
elif unique_id in self.manifest.metrics:
metric = self.manifest.metrics[unique_id]
return metric.config.enabled
elif unique_id in self.manifest.entities:
metric = self.manifest.entities[unique_id]
return metric.config.enabled
node = self.manifest.nodes[unique_id]
return not node.empty and node.config.enabled
@@ -180,6 +183,8 @@ class NodeSelector(MethodManager):
node = self.manifest.exposures[unique_id]
elif unique_id in self.manifest.metrics:
node = self.manifest.metrics[unique_id]
elif unique_id in self.manifest.entities:
node = self.manifest.entities[unique_id]
else:
raise InternalException(f"Node {unique_id} not found in the manifest!")
return self.node_is_match(node)

View File

@@ -19,6 +19,7 @@ from dbt.contracts.graph.parsed import (
ParsedSingularTestNode,
ParsedExposure,
ParsedMetric,
ParsedEntity,
ParsedGenericTestNode,
ParsedSourceDefinition,
)
@@ -48,6 +49,7 @@ class MethodName(StrEnum):
State = "state"
Exposure = "exposure"
Metric = "metric"
Entity = "entity"
Result = "result"
SourceStatus = "source_status"
@@ -76,7 +78,7 @@ def is_selected_node(fqn: List[str], node_selector: str):
return True
SelectorTarget = Union[ParsedSourceDefinition, ManifestNode, ParsedExposure, ParsedMetric]
SelectorTarget = Union[ParsedSourceDefinition, ManifestNode, ParsedExposure, ParsedMetric, ParsedEntity]
class SelectorMethod(metaclass=abc.ABCMeta):
@@ -127,6 +129,16 @@ class SelectorMethod(metaclass=abc.ABCMeta):
continue
yield unique_id, metric
def entity_nodes(
self, included_nodes: Set[UniqueId]
) -> Iterator[Tuple[UniqueId, ParsedEntity]]:
for key, metric in self.manifest.entities.items():
unique_id = UniqueId(key)
if unique_id not in included_nodes:
continue
yield unique_id, metric
def all_nodes(
self, included_nodes: Set[UniqueId]
) -> Iterator[Tuple[UniqueId, SelectorTarget]]:
@@ -135,6 +147,7 @@ class SelectorMethod(metaclass=abc.ABCMeta):
self.source_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes),
self.entity_nodes(included_nodes),
)
def configurable_nodes(
@@ -145,11 +158,12 @@ class SelectorMethod(metaclass=abc.ABCMeta):
def non_source_nodes(
self,
included_nodes: Set[UniqueId],
) -> Iterator[Tuple[UniqueId, Union[ParsedExposure, ManifestNode, ParsedMetric]]]:
) -> Iterator[Tuple[UniqueId, Union[ParsedExposure, ManifestNode, ParsedMetric, ParsedEntity]]]:
yield from chain(
self.parsed_nodes(included_nodes),
self.exposure_nodes(included_nodes),
self.metric_nodes(included_nodes),
self.entity_nodes(included_nodes),
)
@abc.abstractmethod
@@ -278,6 +292,30 @@ class MetricSelectorMethod(SelectorMethod):
yield node
class EntitySelectorMethod(SelectorMethod):
"""TODO: Add a description of what this selector method is doing"""
def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[UniqueId]:
parts = selector.split(".")
target_package = SELECTOR_GLOB
if len(parts) == 1:
target_name = parts[0]
elif len(parts) == 2:
target_package, target_name = parts
else:
msg = (
'Invalid entity selector value "{}". Entities must be of '
"the form ${{entity_name}} or "
"${{entity_package.entity_name}}"
).format(selector)
raise RuntimeException(msg)
for node, real_node in self.entity_nodes(included_nodes):
if target_package not in (real_node.package_name, SELECTOR_GLOB):
continue
if target_name not in (real_node.name, SELECTOR_GLOB):
continue
yield node
class PathSelectorMethod(SelectorMethod):
def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[UniqueId]:
@@ -536,6 +574,8 @@ class StateSelectorMethod(SelectorMethod):
previous_node = manifest.exposures[node]
elif node in manifest.metrics:
previous_node = manifest.metrics[node]
elif node in manifest.entities:
previous_node = manifest.entities[node]
if checker(previous_node, real_node):
yield node
@@ -624,6 +664,7 @@ class MethodManager:
MethodName.State: StateSelectorMethod,
MethodName.Exposure: ExposureSelectorMethod,
MethodName.Metric: MetricSelectorMethod,
MethodName.Entity: EntitySelectorMethod,
MethodName.Result: ResultSelectorMethod,
MethodName.SourceStatus: SourceStatusSelectorMethod,
}

View File

@@ -18,6 +18,7 @@ class NodeType(StrEnum):
Macro = "macro"
Exposure = "exposure"
Metric = "metric"
Entity = "entity"
@classmethod
def executable(cls) -> List["NodeType"]:
@@ -52,11 +53,14 @@ class NodeType(StrEnum):
cls.Analysis,
cls.Exposure,
cls.Metric,
cls.Entity,
]
def pluralize(self) -> str:
if self is self.Analysis:
return "analyses"
if self is self.Entity:
return "entities"
return f"{self}s"

View File

@@ -68,6 +68,7 @@ from dbt.contracts.graph.parsed import (
ColumnInfo,
ParsedExposure,
ParsedMetric,
ParsedEntity,
)
from dbt.contracts.util import Writable
from dbt.exceptions import (
@@ -344,7 +345,7 @@ class ManifestLoader:
project, project_parser_files[project.project_name], parser_types
)
# Now that we've loaded most of the nodes (except for schema tests, sources, metrics)
# Now that we've loaded most of the nodes (except for schema tests, sources, metrics, entities)
# load up the Lookup objects to resolve them by name, so the SourceFiles store
# the unique_id instead of the name. Sources are loaded from yaml files, so
# aren't in place yet
@@ -388,6 +389,7 @@ class ManifestLoader:
self.process_refs(self.root_project.project_name)
self.process_docs(self.root_project)
self.process_metrics(self.root_project)
self.process_entities(self.root_project)
# update tracking data
self._perf_info.process_manifest_elapsed = time.perf_counter() - start_process
@@ -546,6 +548,7 @@ class ManifestLoader:
ManifestWrongMetadataVersion(version=self.manifest.metadata.dbt_version)
)
self.manifest.metadata.dbt_version = __version__
breakpoint()
manifest_msgpack = self.manifest.to_msgpack()
make_directory(os.path.dirname(path))
with open(path, "wb") as fp:
@@ -836,6 +839,10 @@ class ManifestLoader:
if metric.created_at < self.started_at:
continue
_process_refs_for_metric(self.manifest, current_project, metric)
for entity in self.manifest.entities.values():
if entity.created_at < self.started_at:
continue
_process_refs_for_entity(self.manifest, current_project, entity)
# Takes references in 'metrics' array of nodes and exposures, finds the target
# node, and updates 'depends_on.nodes' with the unique id
@@ -856,6 +863,23 @@ class ManifestLoader:
continue
_process_metrics_for_node(self.manifest, current_project, exposure)
# Takes references in 'entities' array of nodes and exposures, finds the target
# node, and updates 'depends_on.nodes' with the unique id
def process_entities(self, config: RuntimeConfig):
current_project = config.project_name
for node in self.manifest.nodes.values():
if node.created_at < self.started_at:
continue
_process_entities_for_node(self.manifest, current_project, node)
for entity in self.manifest.entities.values():
if entity.created_at < self.started_at:
continue
_process_entities_for_node(self.manifest, current_project, entity)
for exposure in self.manifest.exposures.values():
if exposure.created_at < self.started_at:
continue
_process_entities_for_node(self.manifest, current_project, exposure)
# nodes: node and column descriptions
# sources: source and table descriptions, column descriptions
# macros: macro argument descriptions
@@ -911,6 +935,16 @@ class ManifestLoader:
config.project_name,
)
_process_docs_for_metrics(ctx, metric)
for entity in self.manifest.entities.values():
if entity.created_at < self.started_at:
continue
ctx = generate_runtime_docs_context(
config,
entity,
self.manifest,
config.project_name,
)
_process_docs_for_entities(ctx, entity)
# Loops through all nodes and exposures, for each element in
# 'sources' array finds the source node and updates the
@@ -1097,6 +1131,8 @@ def _process_docs_for_exposure(context: Dict[str, Any], exposure: ParsedExposure
def _process_docs_for_metrics(context: Dict[str, Any], metric: ParsedMetric) -> None:
metric.description = get_rendered(metric.description, context)
def _process_docs_for_entities(context: Dict[str, Any], entity: ParsedEntity) -> None:
entity.description = get_rendered(entity.description, context)
def _process_refs_for_exposure(manifest: Manifest, current_project: str, exposure: ParsedExposure):
"""Given a manifest and exposure in that manifest, process its refs"""
@@ -1182,6 +1218,46 @@ def _process_refs_for_metric(manifest: Manifest, current_project: str, metric: P
metric.depends_on.nodes.append(target_model_id)
manifest.update_metric(metric)
def _process_refs_for_entity(manifest: Manifest, current_project: str, entity: ParsedEntity):
"""Given a manifest and an entity in that manifest, process its refs"""
for ref in entity.refs:
target_model: Optional[Union[Disabled, ManifestNode]] = None
target_model_name: str
target_model_package: Optional[str] = None
if len(ref) == 1:
target_model_name = ref[0]
elif len(ref) == 2:
target_model_package, target_model_name = ref
else:
raise dbt.exceptions.InternalException(
f"Refs should always be 1 or 2 arguments - got {len(ref)}"
)
target_model = manifest.resolve_ref(
target_model_name,
target_model_package,
current_project,
entity.package_name,
)
if target_model is None or isinstance(target_model, Disabled):
# This may raise. Even if it doesn't, we don't want to add
# this entity to the graph b/c there is no destination entity
entity.config.enabled = False
invalid_target_fail_unless_test(
node=entity,
target_name=target_model_name,
target_kind="node",
target_package=target_model_package,
disabled=(isinstance(target_model, Disabled)),
)
continue
target_model_id = target_model.unique_id
entity.depends_on.nodes.append(target_model_id)
manifest.update_entity(entity)
def _process_metrics_for_node(
manifest: Manifest,
@@ -1227,6 +1303,49 @@ def _process_metrics_for_node(
node.depends_on.nodes.append(target_metric_id)
def _process_entities_for_node(
manifest: Manifest,
current_project: str,
node: Union[ManifestNode, ParsedEntity, ParsedExposure],
):
"""Given a manifest and a node in that manifest, process its entities"""
for entity in node.entities:
target_entity: Optional[Union[Disabled, ParsedEntity]] = None
target_entity_name: str
target_entity_package: Optional[str] = None
if len(entity) == 1:
target_entity_name = entity[0]
elif len(entity) == 2:
target_entity_package, target_entity_name = entity
else:
raise dbt.exceptions.InternalException(
f"Entity references should always be 1 or 2 arguments - got {len(entity)}"
)
target_entity = manifest.resolve_entity(
target_entity_name,
target_entity_package,
current_project,
node.package_name,
)
if target_entity is None or isinstance(target_entity, Disabled):
# This may raise. Even if it doesn't, we don't want to add
# this node to the graph b/c there is no destination node
node.config.enabled = False
invalid_target_fail_unless_test(
node=node,
target_name=target_entity_name,
target_kind="source",
target_package=target_entity_package,
disabled=(isinstance(target_entity, Disabled)),
)
continue
target_entity_id = target_entity.unique_id
node.depends_on.nodes.append(target_entity_id)
def _process_refs_for_node(manifest: Manifest, current_project: str, node: ManifestNode):
"""Given a manifest and a node in that manifest, process its refs"""
@@ -1298,7 +1417,7 @@ def _process_sources_for_exposure(
exposure.depends_on.nodes.append(target_source_id)
manifest.update_exposure(exposure)
## TODO: Remove this code because metrics can't be based on sources
def _process_sources_for_metric(manifest: Manifest, current_project: str, metric: ParsedMetric):
target_source: Optional[Union[Disabled, ParsedSourceDefinition]] = None
for source_name, table_name in metric.sources:

View File

@@ -20,6 +20,7 @@ from dbt.events.types import (
PartialParsingDeletedSource,
PartialParsingDeletedExposure,
PartialParsingDeletedMetric,
PartialParsingDeletedEntity,
)
from dbt.constants import DEFAULT_ENV_PLACEHOLDER
from dbt.node_types import NodeType
@@ -246,7 +247,7 @@ class PartialParsing:
self.remove_source_override_target(source)
def delete_disabled(self, unique_id, file_id):
# This node/metric/exposure is disabled. Find it and remove it from disabled dictionary.
# This node/metric/entity/exposure is disabled. Find it and remove it from disabled dictionary.
for dis_index, dis_node in enumerate(self.saved_manifest.disabled[unique_id]):
if dis_node.file_id == file_id:
node = dis_node
@@ -452,6 +453,18 @@ class PartialParsing:
if metric_element:
self.delete_schema_metric(schema_file, metric_element)
self.merge_patch(schema_file, "metrics", metric_element)
elif unique_id in self.saved_manifest.entities:
entity = self.saved_manifest.entities[unique_id]
file_id = entity.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
entities = []
if "entities" in schema_file.dict_from_yaml:
entities = schema_file.dict_from_yaml["entities"]
entity_element = self.get_schema_element(entities, entity.name)
if entity_element:
self.delete_schema_entity(schema_file, entity_element)
self.merge_patch(schema_file, "entities", entity_element)
elif unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros[unique_id]
file_id = macro.file_id
@@ -757,6 +770,29 @@ class PartialParsing:
self.delete_schema_metric(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
# entities
dict_key = "entities"
entity_diff = self.get_diff_for("entities", saved_yaml_dict, new_yaml_dict)
if entity_diff["changed"]:
for entity in entity_diff["changed"]:
self.delete_schema_entity(schema_file, entity)
self.merge_patch(schema_file, dict_key, entity)
if entity_diff["deleted"]:
for entity in entity_diff["deleted"]:
self.delete_schema_entity(schema_file, entity)
if entity_diff["added"]:
for entity in entity_diff["added"]:
self.merge_patch(schema_file, dict_key, entity)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in entity_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_entity(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
# Take a "section" of the schema file yaml dictionary from saved and new schema files
# and determine which parts have changed
def get_diff_for(self, key, saved_yaml_dict, new_yaml_dict):
@@ -935,6 +971,25 @@ class PartialParsing:
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
# entities are created only from schema files, but also can be referred to by other nodes
def delete_schema_entity(self, schema_file, entity_dict):
entity_name = entity_dict["name"]
entities = schema_file.entities.copy()
for unique_id in entities:
if unique_id in self.saved_manifest.entities:
entity = self.saved_manifest.entities[unique_id]
if entity.name == entity_name:
# Need to find everything that referenced this entity and schedule for parsing
if unique_id in self.saved_manifest.child_map:
self.schedule_nodes_for_parsing(self.saved_manifest.child_map[unique_id])
self.deleted_manifest.entities[unique_id] = self.saved_manifest.entities.pop(
unique_id
)
schema_file.entities.remove(unique_id)
fire_event(PartialParsingDeletedEntity(unique_id=unique_id))
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
def get_schema_element(self, elem_list, elem_name):
for element in elem_list:
if "name" in element and element["name"] == elem_name:

View File

@@ -22,11 +22,12 @@ from dbt.context.configured import generate_schema_yml_context, SchemaYamlVars
from dbt.context.providers import (
generate_parse_exposure,
generate_parse_metrics,
generate_parse_entities,
generate_test_context,
)
from dbt.context.macro_resolver import MacroResolver
from dbt.contracts.files import FileHash, SchemaSourceFile
from dbt.contracts.graph.model_config import MetricConfig, ExposureConfig
from dbt.contracts.graph.model_config import MetricConfig, ExposureConfig, EntityConfig
from dbt.contracts.graph.parsed import (
ParsedNodePatch,
ColumnInfo,
@@ -35,6 +36,7 @@ from dbt.contracts.graph.parsed import (
UnpatchedSourceDefinition,
ParsedExposure,
ParsedMetric,
ParsedEntity,
)
from dbt.contracts.graph.unparsed import (
HasColumnDocs,
@@ -47,6 +49,8 @@ from dbt.contracts.graph.unparsed import (
UnparsedNodeUpdate,
UnparsedExposure,
UnparsedMetric,
UnparsedEntity,
EntityDimension,
UnparsedSourceDefinition,
)
from dbt.exceptions import (
@@ -88,6 +92,7 @@ schema_file_keys = (
"analyses",
"exposures",
"metrics",
"entities",
)
@@ -127,6 +132,7 @@ class ParserRef:
def __init__(self):
self.column_info: Dict[str, ColumnInfo] = {}
## TODO: Mimic this for dimension information at the entity level
def add(
self,
column: Union[HasDocs, UnparsedColumn],
@@ -162,6 +168,7 @@ class ParserRef:
return refs
def _trimmed(inp: str) -> str:
if len(inp) < 50:
return inp
@@ -500,8 +507,8 @@ class SchemaParser(SimpleParser[GenericTestBlock, ParsedGenericTestNode]):
parser: YamlDocsReader
# There are 7 kinds of parsers:
# Model, Seed, Snapshot, Source, Macro, Analysis, Exposures
# There are 9 kinds of parsers:
# Model, Seed, Snapshot, Source, Macro, Analysis, Exposures, Metrics, & Entities
# NonSourceParser.parse(), TestablePatchParser is a variety of
# NodePatchParser
@@ -551,6 +558,10 @@ class SchemaParser(SimpleParser[GenericTestBlock, ParsedGenericTestNode]):
metric_parser = MetricParser(self, yaml_block)
metric_parser.parse()
# parse entities
if "entities" in dct:
entity_parser = EntityParser(self, yaml_block)
entity_parser.parse()
def check_format_version(file_path, yaml_dct) -> None:
if "version" not in yaml_dct:
@@ -1209,3 +1220,120 @@ class MetricParser(YamlReader):
msg = error_context(self.yaml.path, self.key, data, exc)
raise ParsingException(msg) from exc
self.parse_metric(unparsed)
class EntityParser(YamlReader):
def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock):
super().__init__(schema_parser, yaml, NodeType.Entity.pluralize())
self.schema_parser = schema_parser
self.yaml = yaml
def clean_dimensions(self, unparsed: UnparsedEntity):
"""Mimicing the format of UnparsedColumn"""
print("filler")
def parse_entity(self, unparsed: UnparsedEntity):
package_name = self.project.project_name
unique_id = f"{NodeType.Entity}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path
fqn = self.schema_parser.get_fqn_prefix(path)
fqn.append(unparsed.name)
config = self._generate_entity_config(
target=unparsed,
fqn=fqn,
package_name=package_name,
rendered=True,
)
config = config.finalize_and_validate()
unrendered_config = self._generate_entity_config(
target=unparsed,
fqn=fqn,
package_name=package_name,
rendered=False,
)
if not isinstance(config, EntityConfig):
raise InternalException(
f"Calculated a {type(config)} for an entity, but expected a EntityConfig"
)
## TODO: Remove or migrate the dimension mapping to this area
# parsed_dimensions = {}
# for dimension in unparsed.dimensions:
# breakpoint()
# if dimension:
# # breakpoint()
# parsed_dimensions[dimension.name] = dimension
parsed = ParsedEntity(
package_name=package_name,
path=path,
original_file_path=self.yaml.path.original_file_path,
unique_id=unique_id,
fqn=fqn,
model=unparsed.model,
name=unparsed.name,
description=unparsed.description,
dimensions= {dimension.name: dimension for dimension in unparsed.dimensions} if unparsed.dimensions else {},
meta=unparsed.meta,
tags=unparsed.tags,
config=config,
unrendered_config=unrendered_config,
)
ctx = generate_parse_entities(
parsed,
self.root_project,
self.schema_parser.manifest,
package_name,
)
if parsed.model is not None:
model_ref = "{{ " + parsed.model + " }}"
get_rendered(model_ref, ctx, parsed)
# if the metric is disabled we do not want it included in the manifest, only in the disabled dict
if parsed.config.enabled:
# self.manifest.add_metric(self.yaml.file, parsed)
self.manifest.add_entity(self.yaml.file, parsed)
else:
self.manifest.add_disabled(self.yaml.file, parsed)
def _generate_entity_config(
self, target: UnparsedEntity, fqn: List[str], package_name: str, rendered: bool
):
generator: BaseContextConfigGenerator
if rendered:
generator = ContextConfigGenerator(self.root_project)
else:
generator = UnrenderedConfigGenerator(self.root_project)
# configs with precendence set
precedence_configs = dict()
# first apply metric configs
precedence_configs.update(target.config)
return generator.calculate_node_config(
config_call_dict={},
fqn=fqn,
resource_type=NodeType.Entity,
project_name=package_name,
base=False,
patch_config_dict=precedence_configs,
)
def parse(self):
for data in self.get_key_dicts():
breakpoint()
try:
UnparsedEntity.validate(data)
unparsed = UnparsedEntity.from_dict(data)
except (ValidationError, JSONValidationException) as exc:
msg = error_context(self.yaml.path, self.key, data, exc)
raise ParsingException(msg) from exc
self.parse_entity(unparsed)

View File

@@ -1,6 +1,6 @@
import json
from dbt.contracts.graph.parsed import ParsedExposure, ParsedSourceDefinition, ParsedMetric
from dbt.contracts.graph.parsed import ParsedExposure, ParsedSourceDefinition, ParsedMetric, ParsedEntity
from dbt.graph import ResourceTypeSelector
from dbt.task.runnable import GraphRunnableTask, ManifestTask
from dbt.task.test import TestSelector
@@ -23,6 +23,7 @@ class ListTask(GraphRunnableTask):
NodeType.Source,
NodeType.Exposure,
NodeType.Metric,
NodeType.Entity,
)
)
ALL_RESOURCE_VALUES = DEFAULT_RESOURCE_VALUES | frozenset((NodeType.Analysis,))
@@ -84,6 +85,8 @@ class ListTask(GraphRunnableTask):
yield self.manifest.exposures[node]
elif node in self.manifest.metrics:
yield self.manifest.metrics[node]
elif node in self.manifest.entities:
yield self.manifest.entities[node]
else:
raise RuntimeException(
f'Got an unexpected result from node selection: "{node}"'
@@ -107,6 +110,11 @@ class ListTask(GraphRunnableTask):
# metrics are searched for by pkg.metric_name
metric_selector = ".".join([node.package_name, node.name])
yield f"metric:{metric_selector}"
elif node.resource_type == NodeType.Entity:
assert isinstance(node, ParsedEntity)
# entities are searched for by pkg.entity_name
entity_selector = ".".join([node.package_name, node.name])
yield f"entity:{entity_selector}"
else:
# everything else is from `fqn`
yield ".".join(node.fqn)

4
testing-project/postgres/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
target/
dbt_packages/
logs/

View File

@@ -0,0 +1,15 @@
Welcome to your new dbt project!
### Using the starter project
Try running the following commands:
- dbt run
- dbt test
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices

View File

@@ -0,0 +1,35 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'postgres'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'postgres'
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
postgres:

View File

View File

@@ -0,0 +1,21 @@
with orders as (
select * from {{ ref('fact_orders') }}
)
,
customers as (
select * from {{ ref('dim_customers') }}
)
,
final as (
select *
from orders
left join customers using (customer_id)
)
select * from final

View File

@@ -0,0 +1 @@
select * from {{ref('dim_customers_source')}}

View File

@@ -0,0 +1,21 @@
version: 2
models:
- name: dim_customers
columns:
- name: customer_id
description: TBD
- name: first_name
description: TBD
- name: last_name
description: TBD
- name: email
description: TBD
- name: gender
description: TBD
- name: is_new_customer
description: TBD

View File

@@ -0,0 +1,19 @@
version: 2
entities:
- name: customers
model: ref('dim_customers')
description: "Our customers entity"
dimensions:
- include: *
exclude:
- first_name
- name: first_name_v3_testing
description: string
column_name: first_name
date_type: string
default_timestamp: true
primary_key: true
time_grains: [day, week, month]

View File

@@ -0,0 +1,6 @@
version: 2
entities:
- name: orders
model: ref('fact_orders')
description: "Our orders entity"

View File

@@ -0,0 +1,4 @@
select
*
,round(order_total - (order_total/2)) as discount_total
from {{ref('fact_orders_source')}}

View File

@@ -0,0 +1,21 @@
version: 2
models:
- name: fact_orders
columns:
- name: order_id
description: TBD
- name: order_country
description: TBD
- name: order_total
description: TBD
- name: had_discount
description: TBD
- name: customer_id
description: TBD
- name: order_date
description: TBD

View File

View File

@@ -0,0 +1,6 @@
customer_id,first_name,last_name,email,gender,is_new_customer,date_added
1,Geodude,Hills,bhills0@altervista.org,Male,FALSE,2022-01-01
2,Mew,Coxhead,mcoxhead1@symantec.com,Genderfluid,TRUE,2022-01-06
3,Mewtwo,Redish,aredish2@last.fm,Genderqueer,FALSE,2022-01-13
4,Charizard,Basant,lbasant3@dedecms.com,Female,TRUE,2022-02-01
5,Snorlax,Pokemon,the_email@dedecms.com,Male,TRUE,2022-02-03
1 customer_id first_name last_name email gender is_new_customer date_added
2 1 Geodude Hills bhills0@altervista.org Male FALSE 2022-01-01
3 2 Mew Coxhead mcoxhead1@symantec.com Genderfluid TRUE 2022-01-06
4 3 Mewtwo Redish aredish2@last.fm Genderqueer FALSE 2022-01-13
5 4 Charizard Basant lbasant3@dedecms.com Female TRUE 2022-02-01
6 5 Snorlax Pokemon the_email@dedecms.com Male TRUE 2022-02-03

View File

@@ -0,0 +1,11 @@
order_id,order_country,order_total,had_discount,customer_id,order_date
1,Russia,2,false,1,01/28/2022
2,Mauritius,1,false,2,01/20/2022
3,Peru,1,false,1,01/13/2022
4,Kazakhstan,1,true,3,01/06/2022
5,Portugal,1,false,4,01/08/2022
6,China,1,false,5,01/21/2022
7,Germany,1,true,2,01/22/2022
8,Greenland,0,true,1,02/15/2022
9,Bangladesh,1,false,2,02/03/2022
10,Sweden,1,false,3,02/13/2022
1 order_id order_country order_total had_discount customer_id order_date
2 1 Russia 2 false 1 01/28/2022
3 2 Mauritius 1 false 2 01/20/2022
4 3 Peru 1 false 1 01/13/2022
5 4 Kazakhstan 1 true 3 01/06/2022
6 5 Portugal 1 false 4 01/08/2022
7 6 China 1 false 5 01/21/2022
8 7 Germany 1 true 2 01/22/2022
9 8 Greenland 0 true 1 02/15/2022
10 9 Bangladesh 1 false 2 02/03/2022
11 10 Sweden 1 false 3 02/13/2022

View File