mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
41 Commits
v1.7.13
...
project-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fa05afd0c9 | ||
|
|
505023fbaa | ||
|
|
4473436b67 | ||
|
|
ea1f725edc | ||
|
|
d541edf006 | ||
|
|
88f9e61916 | ||
|
|
77e91fd2ad | ||
|
|
350b012af6 | ||
|
|
dfe0e63876 | ||
|
|
642922f3e7 | ||
|
|
5ca1f194e9 | ||
|
|
120e4d3edc | ||
|
|
290eab7766 | ||
|
|
8dcabb1e6f | ||
|
|
43d94a6e56 | ||
|
|
bf6a44f4ac | ||
|
|
412673a252 | ||
|
|
77e57afdea | ||
|
|
62e5c8e1f3 | ||
|
|
3d343ee991 | ||
|
|
b08b72e20e | ||
|
|
87972f4c29 | ||
|
|
860fbf6caf | ||
|
|
ff6e554689 | ||
|
|
a92d580895 | ||
|
|
2439e891a0 | ||
|
|
5b1f329245 | ||
|
|
6df97d9348 | ||
|
|
23cc3dc532 | ||
|
|
61478b09e0 | ||
|
|
2ea007ff5b | ||
|
|
14c5cd2a38 | ||
|
|
d187f5ff2c | ||
|
|
2681c3885d | ||
|
|
b6c2b12978 | ||
|
|
3f51109e88 | ||
|
|
c075e35d7f | ||
|
|
2814cb93e5 | ||
|
|
3e5db87fc6 | ||
|
|
e13ab0aaa9 | ||
|
|
f59da0cbea |
@@ -419,12 +419,16 @@ class Compiler:
|
||||
linker.dependency(node.unique_id, (manifest.sources[dependency].unique_id))
|
||||
elif dependency in manifest.metrics:
|
||||
linker.dependency(node.unique_id, (manifest.metrics[dependency].unique_id))
|
||||
elif dependency in manifest.consumers:
|
||||
linker.dependency(node.unique_id, (manifest.consumers[dependency].unique_id))
|
||||
else:
|
||||
dependency_not_found(node, dependency)
|
||||
|
||||
def link_graph(self, linker: Linker, manifest: Manifest, add_test_edges: bool = False):
|
||||
for source in manifest.sources.values():
|
||||
linker.add_node(source.unique_id)
|
||||
for consumer in manifest.consumers.values():
|
||||
linker.add_node(consumer.unique_id)
|
||||
for node in manifest.nodes.values():
|
||||
self.link_node(linker, node, manifest)
|
||||
for exposure in manifest.exposures.values():
|
||||
|
||||
@@ -156,6 +156,28 @@ def value_or(value: Optional[T], default: T) -> T:
|
||||
return value
|
||||
|
||||
|
||||
# TODO: replicate function for dbt_contracts.yml
|
||||
# def _raw_contracts_from(project_root: str) -> Dict[str, Any]:
|
||||
|
||||
# project_root = os.path.normpath(project_root)
|
||||
# project_yaml_filepath = os.path.join(project_root, "dbt_contracts.yml")
|
||||
|
||||
# # get the project.yml contents
|
||||
# if not path_exists(project_yaml_filepath):
|
||||
# raise DbtProjectError(
|
||||
# "no dbt_contracts.yml found at expected path {}".format(
|
||||
# project_yaml_filepath
|
||||
# )
|
||||
# )
|
||||
|
||||
# project_dict = _load_yaml(project_yaml_filepath)
|
||||
|
||||
# if not isinstance(project_dict, dict):
|
||||
# raise DbtProjectError("dbt_contracts.yml does not parse to a dictionary")
|
||||
|
||||
# return project_dict
|
||||
|
||||
|
||||
def _raw_project_from(project_root: str) -> Dict[str, Any]:
|
||||
|
||||
project_root = os.path.normpath(project_root)
|
||||
@@ -195,7 +217,8 @@ def validate_version(dbt_version: List[VersionSpecifier], project_name: str):
|
||||
installed = get_installed_version()
|
||||
if not versions_compatible(*dbt_version):
|
||||
msg = IMPOSSIBLE_VERSION_ERROR.format(
|
||||
package=project_name, version_spec=[x.to_version_string() for x in dbt_version]
|
||||
package=project_name,
|
||||
version_spec=[x.to_version_string() for x in dbt_version],
|
||||
)
|
||||
raise DbtProjectError(msg)
|
||||
|
||||
@@ -351,7 +374,8 @@ class PartialProject(RenderComponents):
|
||||
# `data_paths` is deprecated but still allowed. Copy it into
|
||||
# `seed_paths` to simlify logic throughout the rest of the system.
|
||||
seed_paths: List[str] = value_or(
|
||||
cfg.seed_paths if "seed-paths" in rendered.project_dict else cfg.data_paths, ["seeds"]
|
||||
cfg.seed_paths if "seed-paths" in rendered.project_dict else cfg.data_paths,
|
||||
["seeds"],
|
||||
)
|
||||
test_paths: List[str] = value_or(cfg.test_paths, ["tests"])
|
||||
analysis_paths: List[str] = value_or(cfg.analysis_paths, ["analyses"])
|
||||
|
||||
@@ -46,6 +46,7 @@ class BaseRenderer:
|
||||
|
||||
def render_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
try:
|
||||
# print(deep_map_render(self.render_entry, data)) # dbt_contracts.yml should output a typed dictionary
|
||||
return deep_map_render(self.render_entry, data)
|
||||
except RecursionException:
|
||||
raise DbtProjectError(
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import enum
|
||||
from dataclasses import dataclass, field
|
||||
from itertools import chain, islice
|
||||
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
|
||||
from multiprocessing.synchronize import Lock
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Dict,
|
||||
List,
|
||||
@@ -23,6 +25,7 @@ from typing import (
|
||||
from typing_extensions import Protocol
|
||||
from uuid import UUID
|
||||
|
||||
from dbt.clients.yaml_helper import load_yaml_text
|
||||
from dbt.contracts.graph.compiled import (
|
||||
CompileResultNode,
|
||||
ManifestNode,
|
||||
@@ -182,6 +185,23 @@ class RefableLookup(dbtClassMixin):
|
||||
f"Node {unique_id} found in cache but not found in manifest"
|
||||
)
|
||||
return manifest.nodes[unique_id]
|
||||
|
||||
|
||||
class ConsumerLookup(RefableLookup):
|
||||
def __init__(self, manifest: "Manifest"):
|
||||
self.storage: Dict[str, Dict[PackageName, UniqueID]] = {}
|
||||
self.populate(manifest)
|
||||
|
||||
def populate(self, manifest):
|
||||
for node in manifest.consumers.values():
|
||||
self.add_node(node)
|
||||
|
||||
def perform_lookup(self, unique_id: UniqueID, manifest) -> ManifestNode:
|
||||
if unique_id not in manifest.consumers:
|
||||
raise dbt.exceptions.InternalException(
|
||||
f"Node {unique_id} found in cache but not found in manifest"
|
||||
)
|
||||
return manifest.consumers[unique_id]
|
||||
|
||||
|
||||
class MetricLookup(dbtClassMixin):
|
||||
@@ -619,7 +639,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
source_patches: MutableMapping[SourceKey, SourcePatch] = field(default_factory=dict)
|
||||
disabled: MutableMapping[str, List[GraphMemberNode]] = field(default_factory=dict)
|
||||
env_vars: MutableMapping[str, str] = field(default_factory=dict)
|
||||
|
||||
consumers: MutableMapping[str, ManifestNode] = field(default_factory=dict)
|
||||
|
||||
_doc_lookup: Optional[DocLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
@@ -629,6 +650,9 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
_ref_lookup: Optional[RefableLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
_consumer_lookup: Optional[ConsumerLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
_metric_lookup: Optional[MetricLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
)
|
||||
@@ -735,6 +759,48 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
MaterializationCandidate.from_macro(m, specificity)
|
||||
for m in self._find_macros_by_name(full_name, project_name)
|
||||
)
|
||||
|
||||
def get_consumers(self, project_root: str):
|
||||
from dbt.contracts.graph.compiled import ParsedModelNode
|
||||
|
||||
contract_config_path = Path(project_root) / "dbt_contracts.yml"
|
||||
|
||||
# Exit if our root project doesn't contain
|
||||
if not contract_config_path.exists():
|
||||
return
|
||||
|
||||
with contract_config_path.open() as f:
|
||||
contracts_dct = load_yaml_text(f)
|
||||
|
||||
# Exit if our root project is only a producer
|
||||
if not "consumer" in contracts_dct:
|
||||
return
|
||||
|
||||
for consumer in contracts_dct["consumer"]:
|
||||
|
||||
# We'll use `get` method to retrieve data from dictionary, this should be a dataclass though
|
||||
# to validate the schema, ensure required properties are there
|
||||
consumer_name = consumer.get("name")
|
||||
consumer_version = consumer.get("version")
|
||||
consumer_file = f"{consumer_name}_{consumer_version}_contract.json"
|
||||
|
||||
# I'm skipping the simulated step here where we're grabbing the file from external storage and bringing
|
||||
# into our project
|
||||
consumer_path = consumer.get("path")
|
||||
consumer_file_path = Path(project_root) / consumer_path / consumer_file
|
||||
with consumer_file_path.open() as f:
|
||||
try:
|
||||
consumer_json = json.load(f)
|
||||
except ValueError as e:
|
||||
print(f'Contract {consumer_file} does not exist!')
|
||||
raise(e)
|
||||
|
||||
consumer_nodes = consumer_json["nodes"]
|
||||
self.consumers.update(
|
||||
**{k: ParsedModelNode.from_dict(v) for k, v in consumer_nodes.items()
|
||||
if v["resource_type"] == "model"}
|
||||
)
|
||||
|
||||
|
||||
def find_materialization_macro_by_name(
|
||||
self, project_name: str, materialization_name: str, adapter_type: str
|
||||
@@ -825,6 +891,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
macros=self.macros,
|
||||
docs=self.docs,
|
||||
exposures=self.exposures,
|
||||
consumers=self.consumers,
|
||||
metrics=self.metrics,
|
||||
selectors=self.selectors,
|
||||
metadata=self.metadata,
|
||||
@@ -847,6 +914,8 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
return self.exposures[unique_id]
|
||||
elif unique_id in self.metrics:
|
||||
return self.metrics[unique_id]
|
||||
elif unique_id in self.consumers:
|
||||
return self.consumers[unique_id]
|
||||
else:
|
||||
# something terrible has happened
|
||||
raise dbt.exceptions.InternalException(
|
||||
@@ -876,6 +945,15 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
if self._ref_lookup is None:
|
||||
self._ref_lookup = RefableLookup(self)
|
||||
return self._ref_lookup
|
||||
|
||||
@property
|
||||
def consumer_lookup(self) -> ConsumerLookup:
|
||||
if self._consumer_lookup is None:
|
||||
self._consumer_lookup = ConsumerLookup(self)
|
||||
return self._consumer_lookup
|
||||
|
||||
def rebuild_consumer_lookup(self):
|
||||
self._consumer_lookup = ConsumerLookup(self)
|
||||
|
||||
@property
|
||||
def metric_lookup(self) -> MetricLookup:
|
||||
@@ -916,7 +994,10 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
|
||||
candidates = _search_packages(current_project, node_package, target_model_package)
|
||||
for pkg in candidates:
|
||||
node = self.ref_lookup.find(target_model_name, pkg, self)
|
||||
node = (
|
||||
self.ref_lookup.find(target_model_name, pkg, self)
|
||||
or self.consumer_lookup.find(target_model_name, pkg, self)
|
||||
)
|
||||
|
||||
if node is not None and node.config.enabled:
|
||||
return node
|
||||
@@ -1153,9 +1234,11 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
|
||||
self.source_patches,
|
||||
self.disabled,
|
||||
self.env_vars,
|
||||
self.consumers,
|
||||
self._doc_lookup,
|
||||
self._source_lookup,
|
||||
self._ref_lookup,
|
||||
self._consumer_lookup,
|
||||
self._metric_lookup,
|
||||
self._disabled_lookup,
|
||||
self._analysis_lookup,
|
||||
@@ -1195,6 +1278,9 @@ class WritableManifest(ArtifactMixin):
|
||||
description=("The exposures defined in the dbt project and its dependencies")
|
||||
)
|
||||
)
|
||||
consumers: Mapping[UniqueID, ManifestNode] = field(
|
||||
metadata=dict(description=("The nodes defined in the dbt project and its dependencies"))
|
||||
)
|
||||
metrics: Mapping[UniqueID, ParsedMetric] = field(
|
||||
metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
|
||||
)
|
||||
|
||||
@@ -622,6 +622,24 @@ class SnapshotConfig(EmptySnapshotConfig):
|
||||
return self.from_dict(data)
|
||||
|
||||
|
||||
# TODO: add a contract config to store the yaml configs in python memory
|
||||
@dataclass
|
||||
class ContractConfig(NodeAndTestConfig):
|
||||
# this is repeated because of a different default
|
||||
schema: Optional[str] = field(
|
||||
default="dbt_test__audit",
|
||||
metadata=CompareBehavior.Exclude.meta(),
|
||||
)
|
||||
materialized: str = "test"
|
||||
severity: Severity = Severity("ERROR")
|
||||
store_failures: Optional[bool] = None
|
||||
where: Optional[str] = None
|
||||
limit: Optional[int] = None
|
||||
fail_calc: str = "count(*)"
|
||||
warn_if: str = "!= 0"
|
||||
error_if: str = "!= 0"
|
||||
|
||||
|
||||
RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
|
||||
NodeType.Metric: MetricConfig,
|
||||
NodeType.Exposure: ExposureConfig,
|
||||
@@ -644,4 +662,5 @@ def get_config_for(resource_type: NodeType, base=False) -> Type[BaseConfig]:
|
||||
lookup = BASE_RESOURCE_TYPES
|
||||
else:
|
||||
lookup = RESOURCE_TYPES
|
||||
# print(f"lookup config {lookup.get(resource_type, NodeConfig)}")
|
||||
return lookup.get(resource_type, NodeConfig)
|
||||
|
||||
@@ -88,6 +88,14 @@ class Docs(dbtClassMixin, Replaceable):
|
||||
node_color: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Contracts(dbtClassMixin, Replaceable):
|
||||
# TODO: need strict typing here for various configs
|
||||
# TODO: make these optional?
|
||||
producer: Dict[str, Any] = field(default_factory=dict) # similar to meta
|
||||
consumer: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
|
||||
name: str
|
||||
|
||||
@@ -352,6 +352,11 @@ class DbtProjectError(DbtConfigError):
|
||||
pass
|
||||
|
||||
|
||||
# TODO: need a config error for contracts
|
||||
# class DbtContractsError(DbtConfigError):
|
||||
# pass
|
||||
|
||||
|
||||
class DbtSelectorsError(DbtConfigError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -167,6 +167,8 @@ class NodeSelector(MethodManager):
|
||||
elif unique_id in self.manifest.metrics:
|
||||
metric = self.manifest.metrics[unique_id]
|
||||
return metric.config.enabled
|
||||
elif unique_id in self.manifest.consumers:
|
||||
return True
|
||||
node = self.manifest.nodes[unique_id]
|
||||
return not node.empty and node.config.enabled
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import dbt.task.clean as clean_task
|
||||
import dbt.task.compile as compile_task
|
||||
import dbt.task.debug as debug_task
|
||||
import dbt.task.deps as deps_task
|
||||
import dbt.task.contracts as contracts_task
|
||||
import dbt.task.freshness as freshness_task
|
||||
import dbt.task.generate as generate_task
|
||||
import dbt.task.init as init_task
|
||||
@@ -66,7 +67,11 @@ class DBTVersion(argparse.Action):
|
||||
help="show program's version number and exit",
|
||||
):
|
||||
super().__init__(
|
||||
option_strings=option_strings, dest=dest, default=default, nargs=0, help=help
|
||||
option_strings=option_strings,
|
||||
dest=dest,
|
||||
default=default,
|
||||
nargs=0,
|
||||
help=help,
|
||||
)
|
||||
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
@@ -418,6 +423,8 @@ def _build_build_subparser(subparsers, base_subparser):
|
||||
return sub
|
||||
|
||||
|
||||
# TODO: Will this main.py file be completely refactored based on the latest roadmap update?
|
||||
# use this function as a template for the new contract command
|
||||
def _build_clean_subparser(subparsers, base_subparser):
|
||||
sub = subparsers.add_parser(
|
||||
"clean",
|
||||
@@ -465,6 +472,38 @@ def _build_deps_subparser(subparsers, base_subparser):
|
||||
return sub
|
||||
|
||||
|
||||
def _build_contracts_subparser(subparsers, base_subparser):
|
||||
sub = subparsers.add_parser(
|
||||
"contracts",
|
||||
parents=[base_subparser],
|
||||
help="""
|
||||
Pull the most recent version of the projects to consume listed in dbt_contracts.yml
|
||||
""",
|
||||
)
|
||||
|
||||
# TODO: add arguments for consumer/producer
|
||||
# sub.add_argument(
|
||||
# "--preview",
|
||||
# action="store_true",
|
||||
# help="""
|
||||
# If specified, DBT will show path information for this project
|
||||
# """,
|
||||
# )
|
||||
# _add_version_check(sub)
|
||||
|
||||
# sub.add_argument(
|
||||
# "--publish",
|
||||
# action="store_true",
|
||||
# help="""
|
||||
# If specified, DBT will show path information for this project
|
||||
# """,
|
||||
# )
|
||||
# _add_version_check(sub)
|
||||
|
||||
sub.set_defaults(cls=contracts_task.DepsTask, which="contracts", rpc_method="contracts")
|
||||
return sub
|
||||
|
||||
|
||||
def _build_snapshot_subparser(subparsers, base_subparser):
|
||||
sub = subparsers.add_parser(
|
||||
"snapshot",
|
||||
@@ -893,7 +932,9 @@ def _build_run_operation_subparser(subparsers, base_subparser):
|
||||
""",
|
||||
)
|
||||
sub.set_defaults(
|
||||
cls=run_operation_task.RunOperationTask, which="run-operation", rpc_method="run-operation"
|
||||
cls=run_operation_task.RunOperationTask,
|
||||
which="run-operation",
|
||||
rpc_method="run-operation",
|
||||
)
|
||||
return sub
|
||||
|
||||
@@ -1153,6 +1194,7 @@ def parse_args(args, cls=DBTArgumentParser):
|
||||
_build_clean_subparser(subs, base_subparser)
|
||||
_build_debug_subparser(subs, base_subparser)
|
||||
_build_deps_subparser(subs, base_subparser)
|
||||
_build_contracts_subparser(subs, base_subparser)
|
||||
_build_list_subparser(subs, base_subparser)
|
||||
|
||||
build_sub = _build_build_subparser(subs, base_subparser)
|
||||
|
||||
@@ -307,6 +307,11 @@ class ConfiguredParser(
|
||||
else:
|
||||
parsed_node.docs = Docs(show=docs_show)
|
||||
|
||||
# If we have contracts in the config, merge with the node level, for backwards
|
||||
# compatibility with earlier node-only config.
|
||||
if "contracts" in config_dict and config_dict["contracts"]:
|
||||
parsed_node.meta = config_dict["contracts"]
|
||||
|
||||
# unrendered_config is used to compare the original database/schema/alias
|
||||
# values and to handle 'same_config' and 'same_contents' calls
|
||||
parsed_node.unrendered_config = config.build_config_dict(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
import traceback
|
||||
from typing import Dict, Optional, Mapping, Callable, Any, List, Type, Union, Tuple
|
||||
@@ -52,7 +53,7 @@ from dbt.context.providers import ParseProvider
|
||||
from dbt.contracts.files import FileHash, ParseFileType, SchemaSourceFile
|
||||
from dbt.parser.read_files import read_files, load_source_file
|
||||
from dbt.parser.partial import PartialParsing, special_override_macros
|
||||
from dbt.contracts.graph.compiled import ManifestNode
|
||||
from dbt.contracts.graph.compiled import ParsedModelNode, ManifestNode
|
||||
from dbt.contracts.graph.manifest import (
|
||||
Manifest,
|
||||
Disabled,
|
||||
@@ -247,6 +248,9 @@ class ManifestLoader:
|
||||
orig_project_parser_files = project_parser_files
|
||||
self._perf_info.path_count = len(self.manifest.files)
|
||||
self._perf_info.read_files_elapsed = time.perf_counter() - start_read_files
|
||||
|
||||
# Get consumer contracts if they exist
|
||||
self.manifest.get_consumers(self.root_project.project_root)
|
||||
|
||||
skip_parsing = False
|
||||
if self.saved_manifest is not None:
|
||||
@@ -358,6 +362,7 @@ class ManifestLoader:
|
||||
self.manifest.rebuild_ref_lookup()
|
||||
self.manifest.rebuild_doc_lookup()
|
||||
self.manifest.rebuild_disabled_lookup()
|
||||
self.manifest.rebuild_consumer_lookup()
|
||||
|
||||
# Load yaml files
|
||||
parser_types = [SchemaParser]
|
||||
|
||||
@@ -834,6 +834,9 @@ class NonSourceParser(YamlDocsReader, Generic[NonSourceTarget, Parsed]):
|
||||
def normalize_docs_attribute(self, data, path):
|
||||
return self.normalize_attribute(data, path, "docs")
|
||||
|
||||
def normalize_contracts_attribute(self, data, path):
|
||||
return self.normalize_attribute(data, path, "contracts")
|
||||
|
||||
def patch_node_config(self, node, patch):
|
||||
# Get the ContextConfig that's used in calculating the config
|
||||
# This must match the model resource_type that's being patched
|
||||
|
||||
179
core/dbt/task/contracts.py
Normal file
179
core/dbt/task/contracts.py
Normal file
@@ -0,0 +1,179 @@
|
||||
# coding=utf-8
|
||||
# # TODO: use dbt deps and debug code as a template to print out helpful information for the dbt conrtracts command
|
||||
import os
|
||||
import shutil
|
||||
import json
|
||||
import dbt.utils
|
||||
import dbt.deprecations
|
||||
import dbt.exceptions
|
||||
from dbt.ui import green, red
|
||||
|
||||
from dbt.config import UnsetProfileConfig
|
||||
|
||||
# from dbt.config.renderer import DbtProjectYamlRenderer
|
||||
# from dbt.deps.base import downloads_directory
|
||||
# from dbt.deps.resolver import resolve_packages
|
||||
|
||||
# from dbt.events.functions import fire_event
|
||||
# from dbt.events.types import (
|
||||
# DepsNoPackagesFound,
|
||||
# DepsStartPackageInstall,
|
||||
# DepsUpdateAvailable,
|
||||
# DepsUTD,
|
||||
# DepsInstallInfo,
|
||||
# DepsListSubdirectory,
|
||||
# DepsNotifyUpdatesAvailable,
|
||||
# EmptyLine,
|
||||
# )
|
||||
# from dbt.clients import system
|
||||
|
||||
from dbt.task.base import BaseTask, move_to_nearest_project_dir
|
||||
from dbt.clients.yaml_helper import load_yaml_text
|
||||
|
||||
# from dbt.clients.git import clone_and_checkout
|
||||
|
||||
# TODO: point to github repo to consume using existing mechanic for packages
|
||||
# TODO: run a dbt compile to output the consumed manifest.json
|
||||
# TODO: integrate Doug's consumer ref code
|
||||
# TODO: what if I included this directly in the deps command? no, keep this separate
|
||||
# Remember, we aren't doing a real implementation of contracts, just a proof of concept. Therefore, I can create net new scripts knowing they will be thrown away. The goal is understanding the general structure of the code and how it will be used.
|
||||
|
||||
|
||||
class DepsTask(BaseTask):
|
||||
ConfigType = UnsetProfileConfig
|
||||
|
||||
def __init__(self, args, config: UnsetProfileConfig):
|
||||
super().__init__(args=args, config=config)
|
||||
|
||||
def track_package_install(self, package_name: str, source_type: str, version: str) -> None:
|
||||
# Hub packages do not need to be hashed, as they are public
|
||||
# Use the string 'local' for local package versions
|
||||
if source_type == "local":
|
||||
package_name = dbt.utils.md5(package_name)
|
||||
version = "local"
|
||||
elif source_type != "hub":
|
||||
package_name = dbt.utils.md5(package_name)
|
||||
version = dbt.utils.md5(version)
|
||||
dbt.tracking.track_package_install(
|
||||
self.config,
|
||||
self.config.args,
|
||||
{"name": package_name, "source": source_type, "version": version},
|
||||
)
|
||||
|
||||
def run(self):
|
||||
print("xxxxxxxxxxxxxxxxxxxx")
|
||||
# system.make_directory(self.config.packages_install_path)
|
||||
# packages = self.config.packages.packages
|
||||
# TODO: Locate the dbt_contracts.yml file
|
||||
project_dir = os.getcwd() # running a dbt project locally
|
||||
default_directory_location = os.path.join(project_dir, "dbt_contracts.yml")
|
||||
print(f"default_directory_location: {default_directory_location}")
|
||||
|
||||
# TODO: read in the dbt_contracts.yml as a dictionary
|
||||
with open(default_directory_location, "r") as stream:
|
||||
contracts_consumed_rendered = load_yaml_text(stream)
|
||||
print(f"contracts_consumed_rendered: {contracts_consumed_rendered}")
|
||||
consumer = contracts_consumed_rendered.get("consumer")
|
||||
print("xxxxxxxxxxxxxxxxxxxx\n")
|
||||
# TODO: Verify the api private key works(print statement for now: fire_event)
|
||||
# Will have to create a menu of options such as gcs, s3, API key, etc. to authenticate
|
||||
contract_validation = {}
|
||||
for x in consumer:
|
||||
contract_validation.update({x.get("contract_location"): x.get("credentials")})
|
||||
print(f'{x.get("name")}: contract credentials verified {green("[OK connection ok]")}')
|
||||
|
||||
# TODO: output the consumed code to a `contracts/projects/consumed` directory
|
||||
contracts_dir = project_dir + "/dbt_contracts"
|
||||
if not os.path.exists(contracts_dir):
|
||||
os.mkdir(contracts_dir)
|
||||
|
||||
# download the contracts from the contract_location and store them in the contracts_dir
|
||||
# in the short-term, we will copy the contracts from the local test directory to the contracts_dir
|
||||
# this contracts.json will consolidate a subset of the manifest.json, catalog.json, run_results.json, sources.json files and then merge that with the consumer's manifest.json, catalog.json(run_results.json, sources.json files are for validating contract requirements only)
|
||||
dummy_contracts_file_location = "../tests/functional/dbt_contracts/contracts.json"
|
||||
for x in consumer:
|
||||
contract_name = x.get("name")
|
||||
contract_version_expected = x.get("contract_version")
|
||||
contract_destination = f"{contracts_dir}/{contract_name}-contracts.json"
|
||||
with open(dummy_contracts_file_location) as json_file:
|
||||
contract_data = json.load(json_file)
|
||||
contract_version_actual = contract_data.get("metadata").get("contract_version")
|
||||
if contract_version_expected == contract_version_actual:
|
||||
shutil.copyfile(dummy_contracts_file_location, contract_destination)
|
||||
print(f"Successful contract consumed[{contract_name}]: {contract_destination}")
|
||||
# TODO: output the consumed contracts.json to a `contracts/consumed` directory within the respective consumed project directory
|
||||
# TODO: Read in the consumed `contracts.json` to produce a report card in a terminal output
|
||||
# What's published vs. private nodes?
|
||||
print(f" Published Nodes: {contract_data.get('contracts').get('published')}")
|
||||
print(f" Private Nodes: {contract_data.get('contracts').get('private')}")
|
||||
# What are the contract expectations vs. actuals?
|
||||
print(
|
||||
f" Test Coverage: {contract_data.get('contracts').get('requirements').get('test_coverage')} {green('[OK and valid]')}"
|
||||
)
|
||||
print(
|
||||
f" Freshness Coverage: {contract_data.get('contracts').get('requirements').get('freshness_coverage')} {green('[OK and valid]')}"
|
||||
)
|
||||
print(
|
||||
f" Max Upgrade Time Between Versions: {contract_data.get('contracts').get('requirements').get('max_upgrade_time')}"
|
||||
)
|
||||
# What permissions do I need to select published nodes?
|
||||
print(
|
||||
f" Published Node Permissions: {contract_data.get('contracts').get('permissions')}"
|
||||
)
|
||||
# How do I select them?
|
||||
contract_name = contract_data.get("contracts").get("name")
|
||||
print(" Published Node Selection:")
|
||||
print(f" select * from {{{{ ref('{contract_name}','my_first_model') }}}}")
|
||||
print(f" select * from {{{{ ref('{contract_name}','my_second_model') }}}}")
|
||||
else:
|
||||
print(
|
||||
f"Contract version mismatch, will not consume[{contract_name}]. Expected: {contract_version_expected}, Actual: {contract_version_actual} {red('[Not Compatible]')} \n"
|
||||
)
|
||||
|
||||
# git clone may not be necessary because the contracts.json will contain all the info from the manifest.json and catalog.json
|
||||
# for x in consumer:
|
||||
# project_location = x.get("path")
|
||||
# print(f"project_location: {project_location}")
|
||||
# clone_and_checkout(repo=project_location, cwd=contracts_dir)
|
||||
|
||||
# if not packages:
|
||||
# fire_event(DepsNoPackagesFound())
|
||||
# return
|
||||
|
||||
# with downloads_directory():
|
||||
# final_deps = resolve_packages(packages, self.config)
|
||||
|
||||
# renderer = DbtProjectYamlRenderer(self.config, self.config.cli_vars)
|
||||
|
||||
# packages_to_upgrade = []
|
||||
# for package in final_deps:
|
||||
# package_name = package.name
|
||||
# source_type = package.source_type()
|
||||
# version = package.get_version()
|
||||
|
||||
# fire_event(DepsStartPackageInstall(package_name=package_name))
|
||||
# package.install(self.config, renderer)
|
||||
# fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
|
||||
# if source_type == "hub":
|
||||
# version_latest = package.get_version_latest()
|
||||
# if version_latest != version:
|
||||
# packages_to_upgrade.append(package_name)
|
||||
# fire_event(DepsUpdateAvailable(version_latest=version_latest))
|
||||
# else:
|
||||
# fire_event(DepsUTD())
|
||||
# if package.get_subdirectory():
|
||||
# fire_event(DepsListSubdirectory(subdirectory=package.get_subdirectory()))
|
||||
|
||||
# self.track_package_install(
|
||||
# package_name=package_name, source_type=source_type, version=version
|
||||
# )
|
||||
# if packages_to_upgrade:
|
||||
# fire_event(EmptyLine())
|
||||
# fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
|
||||
|
||||
@classmethod
|
||||
def from_args(cls, args):
|
||||
# deps needs to move to the project directory, as it does put files
|
||||
# into the modules directory
|
||||
move_to_nearest_project_dir(args)
|
||||
return super().from_args(args)
|
||||
@@ -204,6 +204,7 @@ def _deep_map_render(
|
||||
return ret
|
||||
|
||||
|
||||
# TODO: We'll have to re-use this function for dbt_contracts.yml rendering
|
||||
def deep_map_render(func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any) -> Any:
|
||||
"""This function renders a nested dictionary derived from a yaml
|
||||
file. It is used to render dbt_project.yml, profiles.yml, and
|
||||
|
||||
279
tests/functional/dbt_contracts/contracts.json
Normal file
279
tests/functional/dbt_contracts/contracts.json
Normal file
@@ -0,0 +1,279 @@
|
||||
{
|
||||
"metadata": {
|
||||
"dbt_schema_version": "https://schemas.getdbt.com/dbt/contracts/v1.json",
|
||||
"dbt_version": "1.3.0b1",
|
||||
"contract_version": "0.1.0",
|
||||
"generated_at": "2022-08-17T13:50:25.756889Z",
|
||||
"invocation_id": "0d8bd1ee-7cf9-4569-87e6-f03be091edf0",
|
||||
"env": {},
|
||||
"project_id": "55d83c99b23a2f84fa11d1299c6f9272",
|
||||
"user_id": "3e8c7b85-8948-45a1-9da8-3c4956a6d938",
|
||||
"send_anonymous_usage_stats": true,
|
||||
"adapter_type": "snowflake"
|
||||
},
|
||||
"contracts": {
|
||||
"name": "core-only",
|
||||
"published": [
|
||||
"upstream_producer.model.tpch.my_first_model",
|
||||
"seed.upstream_project.my_first_dbt_seed"
|
||||
],
|
||||
"private": [
|
||||
"model.upstream_project.my_private_model",
|
||||
"seed.upstream_project.my_private_seed"
|
||||
],
|
||||
"requirements": {
|
||||
"test_coverage": { "expected": 0.8, "actual": 0.85 },
|
||||
"freshness_coverage": { "expected": 0.8, "actual": 0.85 },
|
||||
"max_upgrade_time": { "days": 7 }
|
||||
},
|
||||
"permissions": ["select: ['user_a', 'user_b']"]
|
||||
},
|
||||
"nodes_manifest": {
|
||||
"upstream_producer.model.tpch.my_first_model": {
|
||||
"raw_sql": "/*\n Welcome to your first dbt model!\n Did you know that you can also configure models directly within SQL files?\n This will override configurations stated in dbt_project.yml\n Try changing \"table\" to \"view\" below\n*/\n\n\n{{ config(materialized='view') }}\n\n\nwith source_data as (\n\n select 1 as id\n union all\n select 2 as id\n\n)\n\nselect *\nfrom source_data\n\n/*\n Uncomment the line below to remove records with null `id` values\n*/\n\n-- where id is not null",
|
||||
"resource_type": "model",
|
||||
"depends_on": { "macros": [], "nodes": [] },
|
||||
"config": {
|
||||
"enabled": true,
|
||||
"alias": null,
|
||||
"schema": null,
|
||||
"database": null,
|
||||
"tags": [],
|
||||
"meta": {},
|
||||
"materialized": "view",
|
||||
"persist_docs": {},
|
||||
"quoting": {},
|
||||
"column_types": {},
|
||||
"full_refresh": null,
|
||||
"unique_key": null,
|
||||
"on_schema_change": "ignore",
|
||||
"post-hook": [],
|
||||
"pre-hook": []
|
||||
},
|
||||
"database": "ANALYTICS",
|
||||
"schema": "dbt_sung",
|
||||
"fqn": ["tpch", "demo_examples", "my_first_model"],
|
||||
"unique_id": "model.tpch.my_first_model",
|
||||
"package_name": "tpch",
|
||||
"root_path": "/Users/sung/Desktop/dbt/dbt_env",
|
||||
"path": "demo_examples/my_first_model.sql",
|
||||
"original_file_path": "models/demo_examples/my_first_model.sql",
|
||||
"name": "my_first_model",
|
||||
"alias": "my_first_model",
|
||||
"checksum": {
|
||||
"name": "sha256",
|
||||
"checksum": "727d4af05e45dc7556410e97db98eb49597270b9f9e0976773a3efe4a40deb47"
|
||||
},
|
||||
"tags": [],
|
||||
"refs": [],
|
||||
"sources": [],
|
||||
"description": "",
|
||||
"columns": {},
|
||||
"meta": {},
|
||||
"docs": { "show": true },
|
||||
"patch_path": null,
|
||||
"compiled_path": null,
|
||||
"build_path": null,
|
||||
"deferred": false,
|
||||
"unrendered_config": { "materialized": "view" },
|
||||
"created_at": 1648488021.2328372
|
||||
},
|
||||
"upstream_producer.model.tpch.my_second_model": {
|
||||
"raw_sql": "-- Use the `ref` function to select from other models\n\nselect *\nfrom {{ ref('my_first_model') }}\nwhere id = 2",
|
||||
"resource_type": "model",
|
||||
"depends_on": { "macros": [], "nodes": ["model.tpch.my_first_model"] },
|
||||
"config": {
|
||||
"enabled": true,
|
||||
"alias": null,
|
||||
"schema": null,
|
||||
"database": null,
|
||||
"tags": [],
|
||||
"meta": {},
|
||||
"materialized": "view",
|
||||
"persist_docs": {},
|
||||
"quoting": {},
|
||||
"column_types": {},
|
||||
"full_refresh": null,
|
||||
"unique_key": null,
|
||||
"on_schema_change": "ignore",
|
||||
"post-hook": [],
|
||||
"pre-hook": []
|
||||
},
|
||||
"database": "ANALYTICS",
|
||||
"schema": "dbt_sung",
|
||||
"fqn": ["tpch", "demo_examples", "my_second_model"],
|
||||
"unique_id": "model.tpch.my_second_model",
|
||||
"package_name": "tpch",
|
||||
"root_path": "/Users/sung/Desktop/dbt/dbt_env",
|
||||
"path": "demo_examples/my_second_model.sql",
|
||||
"original_file_path": "models/demo_examples/my_second_model.sql",
|
||||
"name": "my_second_model",
|
||||
"alias": "my_second_model",
|
||||
"checksum": {
|
||||
"name": "sha256",
|
||||
"checksum": "f60b137bc59426bbf3b690a71e534735c72cc3f3567c2a4def1f81c578bd0c67"
|
||||
},
|
||||
"tags": [],
|
||||
"refs": [["my_first_model"]],
|
||||
"sources": [],
|
||||
"description": "",
|
||||
"columns": {},
|
||||
"meta": {},
|
||||
"docs": { "show": true },
|
||||
"patch_path": null,
|
||||
"compiled_path": null,
|
||||
"build_path": null,
|
||||
"deferred": false,
|
||||
"unrendered_config": {},
|
||||
"created_at": 1648488021.2358248
|
||||
}
|
||||
},
|
||||
"nodes_catalog": {
|
||||
"model.tpch.my_first_model": {
|
||||
"metadata": {
|
||||
"type": "VIEW",
|
||||
"schema": "DBT_SUNG",
|
||||
"name": "MY_FIRST_MODEL",
|
||||
"database": "ANALYTICS",
|
||||
"comment": null,
|
||||
"owner": "TRANSFORMER"
|
||||
},
|
||||
"columns": {
|
||||
"ID": { "type": "NUMBER", "index": 1, "name": "ID", "comment": null }
|
||||
},
|
||||
"stats": {
|
||||
"has_stats": {
|
||||
"id": "has_stats",
|
||||
"label": "Has Stats?",
|
||||
"value": false,
|
||||
"include": false,
|
||||
"description": "Indicates whether there are statistics for this table"
|
||||
}
|
||||
},
|
||||
"unique_id": "model.tpch.my_first_model"
|
||||
},
|
||||
"model.tpch.my_second_model": {
|
||||
"metadata": {
|
||||
"type": "VIEW",
|
||||
"schema": "DBT_SUNG",
|
||||
"name": "MY_SECOND_MODEL",
|
||||
"database": "ANALYTICS",
|
||||
"comment": null,
|
||||
"owner": "TRANSFORMER"
|
||||
},
|
||||
"columns": {
|
||||
"ID": { "type": "NUMBER", "index": 1, "name": "ID", "comment": null }
|
||||
},
|
||||
"stats": {
|
||||
"has_stats": {
|
||||
"id": "has_stats",
|
||||
"label": "Has Stats?",
|
||||
"value": false,
|
||||
"include": false,
|
||||
"description": "Indicates whether there are statistics for this table"
|
||||
}
|
||||
},
|
||||
"unique_id": "model.tpch.my_second_model"
|
||||
}
|
||||
},
|
||||
"nodes_run_results": [
|
||||
{
|
||||
"status": "success",
|
||||
"timing": [
|
||||
{
|
||||
"name": "compile",
|
||||
"started_at": "2022-09-22T21:16:18.812819Z",
|
||||
"completed_at": "2022-09-22T21:16:18.951459Z"
|
||||
},
|
||||
{
|
||||
"name": "execute",
|
||||
"started_at": "2022-09-22T21:16:18.966860Z",
|
||||
"completed_at": "2022-09-22T21:16:18.966873Z"
|
||||
}
|
||||
],
|
||||
"thread_id": "Thread-7",
|
||||
"execution_time": 0.19610309600830078,
|
||||
"adapter_response": {},
|
||||
"message": null,
|
||||
"failures": null,
|
||||
"unique_id": "model.tpch.my_first_model"
|
||||
},
|
||||
{
|
||||
"status": "success",
|
||||
"timing": [
|
||||
{
|
||||
"name": "compile",
|
||||
"started_at": "2022-09-22T21:16:19.684269Z",
|
||||
"completed_at": "2022-09-22T21:16:19.737776Z"
|
||||
},
|
||||
{
|
||||
"name": "execute",
|
||||
"started_at": "2022-09-22T21:16:19.813574Z",
|
||||
"completed_at": "2022-09-22T21:16:19.813583Z"
|
||||
}
|
||||
],
|
||||
"thread_id": "Thread-17",
|
||||
"execution_time": 0.29882001876831055,
|
||||
"adapter_response": {},
|
||||
"message": null,
|
||||
"failures": null,
|
||||
"unique_id": "model.tpch.my_second_model"
|
||||
}
|
||||
],
|
||||
"nodes_sources_results": [
|
||||
{
|
||||
"unique_id": "source.tpch.tpch_test.lineitem",
|
||||
"max_loaded_at": "1998-11-29T00:00:00+00:00",
|
||||
"snapshotted_at": "2022-04-12T14:08:42.647000+00:00",
|
||||
"max_loaded_at_time_ago_in_s": 737474922.647,
|
||||
"status": "warn",
|
||||
"criteria": {
|
||||
"warn_after": { "count": 6, "period": "hour" },
|
||||
"error_after": { "count": null, "period": null },
|
||||
"filter": null
|
||||
},
|
||||
"adapter_response": {},
|
||||
"timing": [
|
||||
{
|
||||
"name": "compile",
|
||||
"started_at": "2022-04-12T14:08:41.488132Z",
|
||||
"completed_at": "2022-04-12T14:08:41.488138Z"
|
||||
},
|
||||
{
|
||||
"name": "execute",
|
||||
"started_at": "2022-04-12T14:08:41.489064Z",
|
||||
"completed_at": "2022-04-12T14:08:43.139148Z"
|
||||
}
|
||||
],
|
||||
"thread_id": "Thread-3",
|
||||
"execution_time": 1.6662919521331787
|
||||
},
|
||||
{
|
||||
"unique_id": "source.tpch.tpch_test.orders",
|
||||
"max_loaded_at": "1998-08-02T00:00:00+00:00",
|
||||
"snapshotted_at": "2022-04-12T14:08:42.656000+00:00",
|
||||
"max_loaded_at_time_ago_in_s": 747756522.656,
|
||||
"status": "warn",
|
||||
"criteria": {
|
||||
"warn_after": { "count": 6, "period": "hour" },
|
||||
"error_after": { "count": null, "period": null },
|
||||
"filter": null
|
||||
},
|
||||
"adapter_response": {},
|
||||
"timing": [
|
||||
{
|
||||
"name": "compile",
|
||||
"started_at": "2022-04-12T14:08:41.475146Z",
|
||||
"completed_at": "2022-04-12T14:08:41.475159Z"
|
||||
},
|
||||
{
|
||||
"name": "execute",
|
||||
"started_at": "2022-04-12T14:08:41.475848Z",
|
||||
"completed_at": "2022-04-12T14:08:43.570703Z"
|
||||
}
|
||||
],
|
||||
"thread_id": "Thread-4",
|
||||
"execution_time": 2.0986380577087402
|
||||
}
|
||||
]
|
||||
}
|
||||
90
tests/functional/dbt_contracts/dbt_contracts.yml
Normal file
90
tests/functional/dbt_contracts/dbt_contracts.yml
Normal file
@@ -0,0 +1,90 @@
|
||||
# dbt_contracts.yml
|
||||
# upstream-only config for illustration
|
||||
consumer:
|
||||
core-only: # give the project a plain name to ref
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
version: 0.2.0 # Versioning is at the project level, NOT the model level
|
||||
models:
|
||||
- ref('fct_orders')
|
||||
- ref('dim_customers')
|
||||
requirements: # these can be LESS strict compared to the upstream contract, but NEVER more
|
||||
- tests
|
||||
- '>5 run history' # I don't trust data until it's been run a few times, this is tracked statefully within the artifacts_location with run_results.json file counts
|
||||
api_private_key: 'jioq2hfj28338' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
|
||||
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too
|
||||
|
||||
# there's definitely a use case for a contracts config to contain BOTH upstream and downstream contracts
|
||||
# ex: finance-only project that depends on core-only and flows data downstream to marketing-only
|
||||
|
||||
# dbt_contracts.yml
|
||||
# core-only config for illustration
|
||||
producer:
|
||||
version: 0.2.0 # version at the top-level only, forced to update if config is different from dbt_contracts.json state file
|
||||
finance-only: # give the project a plain name to ref
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
share_permissions:
|
||||
full_share: # share everything, code, docs, node, data in lineage
|
||||
models: # I have 5 models but I only expose data for 4
|
||||
- ref('fct_orders')
|
||||
- ref('dim_customers')
|
||||
- ref('dim_parts')
|
||||
- ref('dbt_metric')
|
||||
nodes_only: # share node lineage but not docs, code, or data
|
||||
models:
|
||||
+except: # share everything except the below
|
||||
- ref('stg_sensitive_code')
|
||||
requirements:
|
||||
- tests
|
||||
- freshness
|
||||
- '>5 run history' # I don't expect downstream users to trust data until it's been run a few times
|
||||
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
|
||||
- date: 11/11/2022 # date to upgrade by, defaults to 12am UTC
|
||||
version: 0.1.0 # version to upgrade from
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
|
||||
|
||||
marketing-only: # give the project a plain name to ref
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('dim_customers')
|
||||
requirements:
|
||||
- tests
|
||||
- freshness
|
||||
- '>5 run history' # I don't expect downstream users to trust data until it's been run a few times
|
||||
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
|
||||
- days: 0 # No time to upgrade, I'm breaking the contract
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
|
||||
|
||||
# TODO: how to enforce least privilege across the any config and stricter contracts above?
|
||||
any: # any downstream project and ref this contract
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('example_analysis')
|
||||
requirements:
|
||||
- tests
|
||||
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
|
||||
- days: 10
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
|
||||
|
||||
multi-project: # unique config for multi-project contracts
|
||||
contract_list: # define a list of projects to enforce in a single contract, these have their own configs to map to this list
|
||||
- operations-only
|
||||
- sales-only
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('example_analysis')
|
||||
requirements:
|
||||
- tests
|
||||
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
|
||||
- days: 10
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
|
||||
|
||||
# if the project configs only contain a path, then it searches for the multi-project config
|
||||
sales-only:
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
|
||||
operations-only:
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
16
tests/functional/dbt_contracts/dbt_contracts_consumer.yml
Normal file
16
tests/functional/dbt_contracts/dbt_contracts_consumer.yml
Normal file
@@ -0,0 +1,16 @@
|
||||
consumer: # akin to a generic top level import like how people work with python
|
||||
# this presumes the upstream producer solely dictates contract terms
|
||||
# it is the responsibility of the producer to validate the contract is met
|
||||
# the consumer is responsible for validating the contract is met to more strigent standards if needed
|
||||
- name: 'core-only' # give the project a plain name to ref
|
||||
# path: https://github.com/sungchun12/dbt_bigquery_example.git # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
contract_version: 0.2.0 # Versioning is at the project level, NOT the model level
|
||||
contract_location: 's3://core-only-bucket/dbt-contracts'
|
||||
credentials: '{"aws_access_key_id": "YOUR_ACCESS_KEY_ID", "aws_secret_access_key":"YOUR_SECRET_ACCESS_KEY"}' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
|
||||
# artifacts_location: argument NOT required as it inherits from producer
|
||||
- name: 'finance-only' # give the project a plain name to ref
|
||||
# path: https://github.com/sungchun12/snowflake_dbt_demo_project.git # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
contract_version: 0.1.0 # Versioning is at the project level, NOT the model level
|
||||
contract_location: 's3://finance-only-bucket/dbt-contracts'
|
||||
credentials: '{"aws_access_key_id": "YOUR_ACCESS_KEY_ID", "aws_secret_access_key":"YOUR_SECRET_ACCESS_KEY"}' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
|
||||
# artifacts_location: argument NOT required as it inherits from producer
|
||||
@@ -0,0 +1,52 @@
|
||||
version: 2
|
||||
|
||||
models:
|
||||
- name: dim_customers
|
||||
contracts:
|
||||
producer: # this is implied, but should be explicit for clarity
|
||||
finance-only:
|
||||
version: 0.1.0
|
||||
nodes_only: false # optional, default is false
|
||||
requirements:
|
||||
test_coverage: # how many of the models are required to be tested
|
||||
enabled: true # optional, default is false
|
||||
# threshold: .80 # 80% of the models must be tested, optional, default is 1.0
|
||||
freshness_coverage: # how many of the sources are required to be fresh
|
||||
enabled: true # optional, default is false
|
||||
# threshold: .80 # 80% of the models must be tested, optional, default is 1.0
|
||||
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
|
||||
success_only: true # only produce at successful runs else errors for consumers
|
||||
max_upgrade_time:
|
||||
days: 10 # how many days can a project be upgraded before it is considered stale
|
||||
security:
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
description: Customer dimensions table
|
||||
docs:
|
||||
node_color: 'red'
|
||||
columns:
|
||||
- name: customer_key
|
||||
description: Primary key on the customers table
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: region
|
||||
description: region name
|
||||
tests:
|
||||
- accepted_values:
|
||||
values: ['AFRICA','MIDDLE EAST','ASIA','EUROPE','AMERICA']
|
||||
severity: warn
|
||||
- name: name
|
||||
description: customer id
|
||||
- name: address
|
||||
description: address of the customer
|
||||
- name: nation
|
||||
description: nation name
|
||||
- name: phone_number
|
||||
description: phone number of the customer
|
||||
- name: account_balance
|
||||
description: '{{ doc("account_balance") }}'
|
||||
- name: market_segment
|
||||
description: market segment of the customer
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
43
tests/functional/dbt_contracts/dbt_contracts_sql_example.sql
Normal file
43
tests/functional/dbt_contracts/dbt_contracts_sql_example.sql
Normal file
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
Welcome to your first dbt model!
|
||||
Did you know that you can also configure models directly within SQL files?
|
||||
This will override configurations stated in dbt_project.yml
|
||||
Try changing "table" to "view" below
|
||||
*/
|
||||
|
||||
|
||||
{{ config(materialized='view',
|
||||
contracts={
|
||||
"producer": {
|
||||
"finance-only": {
|
||||
"version": "0.1.0",
|
||||
"requirements": {
|
||||
"test_coverage": { "enabled": true },
|
||||
"freshness_coverage": { "enabled": true },
|
||||
"run_history": 5,
|
||||
"success_only": true,
|
||||
"max_upgrade_time": { "days": 10 }
|
||||
},
|
||||
"security": { "api_public_key": "asfawef3" }
|
||||
}
|
||||
}
|
||||
}
|
||||
) }}
|
||||
|
||||
|
||||
with source_data as (
|
||||
|
||||
select 1 as id
|
||||
union all
|
||||
select 2 as id
|
||||
|
||||
)
|
||||
|
||||
select *
|
||||
from source_data
|
||||
|
||||
/*
|
||||
Uncomment the line below to remove records with null `id` values
|
||||
*/
|
||||
|
||||
-- where id is not null
|
||||
142
tests/functional/dbt_contracts/dbt_project.yml
Normal file
142
tests/functional/dbt_contracts/dbt_project.yml
Normal file
@@ -0,0 +1,142 @@
|
||||
|
||||
# Name your project! Project names should contain only lowercase characters
|
||||
# and underscores. A good package name should reflect your organization's
|
||||
# name or the intended use of these models
|
||||
name: 'tpch'
|
||||
version: '1.0.0'
|
||||
config-version: 2
|
||||
|
||||
# This setting configures which "profile" dbt uses for this project.
|
||||
profile: 'tpch'
|
||||
|
||||
# These configurations specify where dbt should look for different types of files.
|
||||
# The `source-paths` config, for example, states that models in this project can be
|
||||
# found in the "models/" directory. You probably won't need to change these!
|
||||
model-paths: ["models"]
|
||||
analysis-paths: ["analysis"]
|
||||
test-paths: ["tests"]
|
||||
seed-paths: ["data"]
|
||||
macro-paths: ["macros"]
|
||||
snapshot-paths: ["snapshots"]
|
||||
|
||||
#TODO: add api key config here?
|
||||
# one project = one remote contract artifact location
|
||||
contract-paths:
|
||||
local: "contracts"
|
||||
remote: "s3://my-bucket/dbt-contracts"
|
||||
target-path: "custom_target_path" # directory which will store compiled SQL files
|
||||
clean-targets: # directories to be removed by `dbt clean`
|
||||
- "target"
|
||||
- "dbt_modules"
|
||||
|
||||
on-run-start:
|
||||
- "alter warehouse transforming set warehouse_size=small;"
|
||||
- '{{create_udfs()}}' # comment / uncomment this line to build UDFs called in the create_udfs macro
|
||||
|
||||
on-run-end:
|
||||
- "alter warehouse transforming set warehouse_size=xsmall;"
|
||||
- "{{ grant_all_on_schemas(schemas, 'transformer') }}"
|
||||
|
||||
vars:
|
||||
load_type: 'I'
|
||||
start_date: '1999-01-01'
|
||||
test: 'false' # to trigger runs for unit testing - override in a CLI param in testing job
|
||||
fct_order_items: 'mock_source__fct_order_items' # this is a map for unit testing
|
||||
dbt_artifacts:
|
||||
dbt_artifacts_schema: dbt_artifacts_sung # optional, default is 'dbt_artifacts'
|
||||
dbt_artifacts_table: artifacts # optional, default is 'artifacts'
|
||||
|
||||
# Configuring models
|
||||
# Full documentation: https://docs.getdbt.com/docs/configuring-models
|
||||
|
||||
models:
|
||||
contracts:
|
||||
producer: # this top-level producer contract for ANY consumers to access
|
||||
version: 0.1.0
|
||||
+grants:
|
||||
select: ['user_a', 'user_b']
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('example_analysis')
|
||||
- entity('orders')
|
||||
metrics:
|
||||
- metric('revenue')
|
||||
nodes_only:
|
||||
+except: # share everything except the below
|
||||
- ref('stg_sensitive_code')
|
||||
requirements:
|
||||
test_coverage: # how many of the models are required to be tested
|
||||
enabled: true # optional, default is false
|
||||
threshold: .80 # 80% of the models must be tested, optional, default is 1.0
|
||||
freshness_coverage: # how many of the sources are required to be fresh
|
||||
enabled: true # optional, default is false
|
||||
threshold: .80 # 80% of the models must be tested, optional, default is 1.0
|
||||
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
|
||||
success_only: true # only produce at successful runs else errors for consumers
|
||||
max_upgrade_time:
|
||||
days: 10 # how many days can a project be upgraded before it is considered stale
|
||||
security:
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
dbt_artifacts:
|
||||
+docs:
|
||||
show: false
|
||||
+schema: dbt_artifacts_sung
|
||||
staging:
|
||||
+schema: dbt_artifacts_sung
|
||||
tpch:
|
||||
staging:
|
||||
+materialized: view
|
||||
+docs:
|
||||
# show: false
|
||||
node_color: "#cd7f32"
|
||||
|
||||
marts:
|
||||
core:
|
||||
contracts:
|
||||
producer: # this is implied, but should be explicit for clarity
|
||||
finance-only:
|
||||
version: 0.1.0
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('example_analysis')
|
||||
nodes_only:
|
||||
+except: # share everything except the below
|
||||
- ref('stg_sensitive_code')
|
||||
requirements:
|
||||
test_coverage: .80 # how many of the models are required to be tested
|
||||
freshness_coverage: .80 # how many of the sources are required to be fresh
|
||||
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
|
||||
success_only: true # only produce at successful runs else errors for consumers
|
||||
max_upgrade_time:
|
||||
days: 10 # how many days can a project be upgraded before it is considered stale
|
||||
security:
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
|
||||
multi-project:
|
||||
sales-only:
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
operations-only:
|
||||
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
|
||||
version: 0.3.0
|
||||
models: # I have 5 models but I only expose data for 1
|
||||
- ref('example_analysis')
|
||||
nodes_only:
|
||||
+except: # share everything except the below
|
||||
- ref('stg_sensitive_code')
|
||||
requirements:
|
||||
test_coverage: .80 # how many of the models are required to be tested
|
||||
freshness_coverage: .80 # how many of the sources are required to be fresh
|
||||
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
|
||||
success_only: true # only produce at successful runs else errors for consumers
|
||||
max_upgrade_time:
|
||||
days: 10 # how many days can a project be upgraded before it is considered stale
|
||||
security:
|
||||
api_public_key: 'asfawef3' # replace with env_var for security
|
||||
materialized: table
|
||||
+docs:
|
||||
node_color: "blue"
|
||||
|
||||
seeds:
|
||||
tpch:
|
||||
snowflake_contract_rates:
|
||||
+column_types:
|
||||
effective_date: DATE
|
||||
rate: NUMBER
|
||||
@@ -0,0 +1,55 @@
|
||||
{
|
||||
"name": "tpch",
|
||||
"version": "1.0.0",
|
||||
"config-version": 2,
|
||||
"profile": "tpch",
|
||||
"model-paths": ["models"],
|
||||
"analysis-paths": ["analysis"],
|
||||
"test-paths": ["tests"],
|
||||
"seed-paths": ["data"],
|
||||
"macro-paths": ["macros"],
|
||||
"snapshot-paths": ["snapshots"],
|
||||
"target-path": "custom_target_path",
|
||||
"clean-targets": ["target", "dbt_modules"],
|
||||
"on-run-start": [
|
||||
"alter warehouse transforming set warehouse_size=small;",
|
||||
"{{create_udfs()}}"
|
||||
],
|
||||
"on-run-end": [
|
||||
"alter warehouse transforming set warehouse_size=xsmall;",
|
||||
"{{ grant_all_on_schemas(schemas, 'transformer') }}"
|
||||
],
|
||||
"vars": {
|
||||
"load_type": "I",
|
||||
"start_date": "1999-01-01",
|
||||
"test": "false",
|
||||
"fct_order_items": "mock_source__fct_order_items",
|
||||
"dbt_artifacts": {
|
||||
"dbt_artifacts_schema": "dbt_artifacts_sung",
|
||||
"dbt_artifacts_table": "artifacts"
|
||||
}
|
||||
},
|
||||
"models": {
|
||||
"dbt_artifacts": {
|
||||
"+docs": { "show": false },
|
||||
"+schema": "dbt_artifacts_sung",
|
||||
"staging": { "+schema": "dbt_artifacts_sung" }
|
||||
},
|
||||
"tpch": {
|
||||
"staging": {
|
||||
"+materialized": "view",
|
||||
"+docs": { "node_color": "#cd7f32" }
|
||||
},
|
||||
"marts": {
|
||||
"core": { "materialized": "table", "+docs": { "node_color": "blue" } }
|
||||
}
|
||||
}
|
||||
},
|
||||
"seeds": {
|
||||
"tpch": {
|
||||
"snowflake_contract_rates": {
|
||||
"+column_types": { "effective_date": "DATE", "rate": "NUMBER" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
6576
tests/functional/dbt_contracts/v8_new_manifest.json
Normal file
6576
tests/functional/dbt_contracts/v8_new_manifest.json
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user