Compare commits

...

29 Commits

Author SHA1 Message Date
Sung Won Chung
dfe0e63876 distinguish logs 2022-09-27 12:30:38 -07:00
Sung Won Chung
642922f3e7 color code logs 2022-09-23 13:46:19 -07:00
Sung Won Chung
5ca1f194e9 example logs UX 2022-09-23 13:38:58 -07:00
Sung Won Chung
120e4d3edc simulate downloading contracts.json 2022-09-22 14:55:31 -07:00
Sung Won Chung
290eab7766 update key name 2022-09-22 14:30:05 -07:00
Sung Won Chung
8dcabb1e6f filthy code to get the wheels turning 2022-09-22 14:27:51 -07:00
Sung Won Chung
43d94a6e56 add permission contract config 2022-09-22 10:42:04 -07:00
Sung Won Chung
bf6a44f4ac update consumer configs 2022-09-22 10:30:37 -07:00
Sung Won Chung
412673a252 todo for additional args 2022-09-21 14:32:56 -07:00
Sung Won Chung
77e57afdea cli command works 2022-09-21 14:29:17 -07:00
Sung Won Chung
3d343ee991 we need a new manifest 2022-09-21 12:41:26 -07:00
Sung Won Chung
b08b72e20e placeholder functions similar to meta 2022-09-21 12:41:04 -07:00
Sung Won Chung
87972f4c29 placeholder contract config 2022-09-21 11:54:07 -07:00
Sung Won Chung
860fbf6caf Merge branch 'main' of https://github.com/dbt-labs/dbt into blueprint-dbt-contracts 2022-09-21 10:46:07 -07:00
Sung Won Chung
ff6e554689 consumer contract in dedicated file 2022-09-20 13:53:31 -07:00
Sung Won Chung
a92d580895 clean up coverage configs 2022-09-20 13:51:56 -07:00
Sung Won Chung
2439e891a0 schema contract example 2022-09-20 12:38:00 -07:00
Sung Won Chung
5b1f329245 delete old configs 2022-09-20 12:24:34 -07:00
Sung Won Chung
6df97d9348 add sql example 2022-09-20 12:24:20 -07:00
Sung Won Chung
23cc3dc532 add Callum's suggestions 2022-09-20 12:09:37 -07:00
Sung Won Chung
61478b09e0 more example configs 2022-09-20 10:59:49 -07:00
Sung Won Chung
2ea007ff5b top level producer/consumer project config 2022-09-20 10:38:10 -07:00
Sung Won Chung
b6c2b12978 new cli selector placement file 2022-09-19 10:14:42 -07:00
Sung Won Chung
3f51109e88 new cli selector placement 2022-09-19 10:14:13 -07:00
Sung Won Chung
c075e35d7f better names for upstream and downstream 2022-09-19 10:06:02 -07:00
Sung Won Chung
2814cb93e5 comment over code to touch 2022-09-15 12:40:24 -07:00
Sung Won Chung
3e5db87fc6 explain configs 2022-09-15 12:23:50 -07:00
Sung Won Chung
e13ab0aaa9 starter template 2022-09-13 11:41:32 -07:00
Sung Won Chung
f59da0cbea starter template 2022-09-13 10:15:58 -07:00
18 changed files with 7549 additions and 5 deletions

View File

@@ -156,6 +156,28 @@ def value_or(value: Optional[T], default: T) -> T:
return value
# TODO: replicate function for dbt_contracts.yml
# def _raw_contracts_from(project_root: str) -> Dict[str, Any]:
# project_root = os.path.normpath(project_root)
# project_yaml_filepath = os.path.join(project_root, "dbt_contracts.yml")
# # get the project.yml contents
# if not path_exists(project_yaml_filepath):
# raise DbtProjectError(
# "no dbt_contracts.yml found at expected path {}".format(
# project_yaml_filepath
# )
# )
# project_dict = _load_yaml(project_yaml_filepath)
# if not isinstance(project_dict, dict):
# raise DbtProjectError("dbt_contracts.yml does not parse to a dictionary")
# return project_dict
def _raw_project_from(project_root: str) -> Dict[str, Any]:
project_root = os.path.normpath(project_root)
@@ -195,7 +217,8 @@ def validate_version(dbt_version: List[VersionSpecifier], project_name: str):
installed = get_installed_version()
if not versions_compatible(*dbt_version):
msg = IMPOSSIBLE_VERSION_ERROR.format(
package=project_name, version_spec=[x.to_version_string() for x in dbt_version]
package=project_name,
version_spec=[x.to_version_string() for x in dbt_version],
)
raise DbtProjectError(msg)
@@ -351,7 +374,8 @@ class PartialProject(RenderComponents):
# `data_paths` is deprecated but still allowed. Copy it into
# `seed_paths` to simlify logic throughout the rest of the system.
seed_paths: List[str] = value_or(
cfg.seed_paths if "seed-paths" in rendered.project_dict else cfg.data_paths, ["seeds"]
cfg.seed_paths if "seed-paths" in rendered.project_dict else cfg.data_paths,
["seeds"],
)
test_paths: List[str] = value_or(cfg.test_paths, ["tests"])
analysis_paths: List[str] = value_or(cfg.analysis_paths, ["analyses"])

View File

@@ -46,6 +46,7 @@ class BaseRenderer:
def render_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
# print(deep_map_render(self.render_entry, data)) # dbt_contracts.yml should output a typed dictionary
return deep_map_render(self.render_entry, data)
except RecursionException:
raise DbtProjectError(

View File

@@ -622,6 +622,24 @@ class SnapshotConfig(EmptySnapshotConfig):
return self.from_dict(data)
# TODO: add a contract config to store the yaml configs in python memory
@dataclass
class ContractConfig(NodeAndTestConfig):
# this is repeated because of a different default
schema: Optional[str] = field(
default="dbt_test__audit",
metadata=CompareBehavior.Exclude.meta(),
)
materialized: str = "test"
severity: Severity = Severity("ERROR")
store_failures: Optional[bool] = None
where: Optional[str] = None
limit: Optional[int] = None
fail_calc: str = "count(*)"
warn_if: str = "!= 0"
error_if: str = "!= 0"
RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
NodeType.Metric: MetricConfig,
NodeType.Exposure: ExposureConfig,
@@ -644,4 +662,5 @@ def get_config_for(resource_type: NodeType, base=False) -> Type[BaseConfig]:
lookup = BASE_RESOURCE_TYPES
else:
lookup = RESOURCE_TYPES
# print(f"lookup config {lookup.get(resource_type, NodeConfig)}")
return lookup.get(resource_type, NodeConfig)

View File

@@ -88,6 +88,14 @@ class Docs(dbtClassMixin, Replaceable):
node_color: Optional[str] = None
@dataclass
class Contracts(dbtClassMixin, Replaceable):
# TODO: need strict typing here for various configs
# TODO: make these optional?
producer: Dict[str, Any] = field(default_factory=dict) # similar to meta
consumer: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HasDocs(AdditionalPropertiesMixin, ExtensibleDbtClassMixin, Replaceable):
name: str

View File

@@ -352,6 +352,11 @@ class DbtProjectError(DbtConfigError):
pass
# TODO: need a config error for contracts
# class DbtContractsError(DbtConfigError):
# pass
class DbtSelectorsError(DbtConfigError):
pass

View File

@@ -25,6 +25,7 @@ import dbt.task.clean as clean_task
import dbt.task.compile as compile_task
import dbt.task.debug as debug_task
import dbt.task.deps as deps_task
import dbt.task.contracts as contracts_task
import dbt.task.freshness as freshness_task
import dbt.task.generate as generate_task
import dbt.task.init as init_task
@@ -43,7 +44,11 @@ import dbt.tracking
from dbt.utils import ExitCodes, args_to_dict
from dbt.config.profile import read_user_config
from dbt.exceptions import InternalException, NotImplementedException, FailedToConnectException
from dbt.exceptions import (
InternalException,
NotImplementedException,
FailedToConnectException,
)
class DBTVersion(argparse.Action):
@@ -60,7 +65,11 @@ class DBTVersion(argparse.Action):
help="show program's version number and exit",
):
super().__init__(
option_strings=option_strings, dest=dest, default=default, nargs=0, help=help
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
def __call__(self, parser, namespace, values, option_string=None):
@@ -411,6 +420,8 @@ def _build_build_subparser(subparsers, base_subparser):
return sub
# TODO: Will this main.py file be completely refactored based on the latest roadmap update?
# use this function as a template for the new contract command
def _build_clean_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"clean",
@@ -458,6 +469,38 @@ def _build_deps_subparser(subparsers, base_subparser):
return sub
def _build_contracts_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"contracts",
parents=[base_subparser],
help="""
Pull the most recent version of the projects to consume listed in dbt_contracts.yml
""",
)
# TODO: add arguments for consumer/producer
# sub.add_argument(
# "--preview",
# action="store_true",
# help="""
# If specified, DBT will show path information for this project
# """,
# )
# _add_version_check(sub)
# sub.add_argument(
# "--publish",
# action="store_true",
# help="""
# If specified, DBT will show path information for this project
# """,
# )
# _add_version_check(sub)
sub.set_defaults(cls=contracts_task.DepsTask, which="contracts", rpc_method="contracts")
return sub
def _build_snapshot_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"snapshot",
@@ -885,7 +928,9 @@ def _build_run_operation_subparser(subparsers, base_subparser):
""",
)
sub.set_defaults(
cls=run_operation_task.RunOperationTask, which="run-operation", rpc_method="run-operation"
cls=run_operation_task.RunOperationTask,
which="run-operation",
rpc_method="run-operation",
)
return sub
@@ -1145,6 +1190,7 @@ def parse_args(args, cls=DBTArgumentParser):
_build_clean_subparser(subs, base_subparser)
_build_debug_subparser(subs, base_subparser)
_build_deps_subparser(subs, base_subparser)
_build_contracts_subparser(subs, base_subparser)
_build_list_subparser(subs, base_subparser)
build_sub = _build_build_subparser(subs, base_subparser)

View File

@@ -307,6 +307,11 @@ class ConfiguredParser(
else:
parsed_node.docs = Docs(show=docs_show)
# If we have contracts in the config, merge with the node level, for backwards
# compatibility with earlier node-only config.
if "contracts" in config_dict and config_dict["contracts"]:
parsed_node.meta = config_dict["contracts"]
# unrendered_config is used to compare the original database/schema/alias
# values and to handle 'same_config' and 'same_contents' calls
parsed_node.unrendered_config = config.build_config_dict(

View File

@@ -834,6 +834,9 @@ class NonSourceParser(YamlDocsReader, Generic[NonSourceTarget, Parsed]):
def normalize_docs_attribute(self, data, path):
return self.normalize_attribute(data, path, "docs")
def normalize_contracts_attribute(self, data, path):
return self.normalize_attribute(data, path, "contracts")
def patch_node_config(self, node, patch):
# Get the ContextConfig that's used in calculating the config
# This must match the model resource_type that's being patched

179
core/dbt/task/contracts.py Normal file
View File

@@ -0,0 +1,179 @@
# coding=utf-8
# # TODO: use dbt deps and debug code as a template to print out helpful information for the dbt conrtracts command
import os
import shutil
import json
import dbt.utils
import dbt.deprecations
import dbt.exceptions
from dbt.ui import green, red
from dbt.config import UnsetProfileConfig
# from dbt.config.renderer import DbtProjectYamlRenderer
# from dbt.deps.base import downloads_directory
# from dbt.deps.resolver import resolve_packages
# from dbt.events.functions import fire_event
# from dbt.events.types import (
# DepsNoPackagesFound,
# DepsStartPackageInstall,
# DepsUpdateAvailable,
# DepsUTD,
# DepsInstallInfo,
# DepsListSubdirectory,
# DepsNotifyUpdatesAvailable,
# EmptyLine,
# )
# from dbt.clients import system
from dbt.task.base import BaseTask, move_to_nearest_project_dir
from dbt.clients.yaml_helper import load_yaml_text
# from dbt.clients.git import clone_and_checkout
# TODO: point to github repo to consume using existing mechanic for packages
# TODO: run a dbt compile to output the consumed manifest.json
# TODO: integrate Doug's consumer ref code
# TODO: what if I included this directly in the deps command? no, keep this separate
# Remember, we aren't doing a real implementation of contracts, just a proof of concept. Therefore, I can create net new scripts knowing they will be thrown away. The goal is understanding the general structure of the code and how it will be used.
class DepsTask(BaseTask):
ConfigType = UnsetProfileConfig
def __init__(self, args, config: UnsetProfileConfig):
super().__init__(args=args, config=config)
def track_package_install(self, package_name: str, source_type: str, version: str) -> None:
# Hub packages do not need to be hashed, as they are public
# Use the string 'local' for local package versions
if source_type == "local":
package_name = dbt.utils.md5(package_name)
version = "local"
elif source_type != "hub":
package_name = dbt.utils.md5(package_name)
version = dbt.utils.md5(version)
dbt.tracking.track_package_install(
self.config,
self.config.args,
{"name": package_name, "source": source_type, "version": version},
)
def run(self):
print("xxxxxxxxxxxxxxxxxxxx")
# system.make_directory(self.config.packages_install_path)
# packages = self.config.packages.packages
# TODO: Locate the dbt_contracts.yml file
project_dir = os.getcwd() # running a dbt project locally
default_directory_location = os.path.join(project_dir, "dbt_contracts.yml")
print(f"default_directory_location: {default_directory_location}")
# TODO: read in the dbt_contracts.yml as a dictionary
with open(default_directory_location, "r") as stream:
contracts_consumed_rendered = load_yaml_text(stream)
print(f"contracts_consumed_rendered: {contracts_consumed_rendered}")
consumer = contracts_consumed_rendered.get("consumer")
print("xxxxxxxxxxxxxxxxxxxx\n")
# TODO: Verify the api private key works(print statement for now: fire_event)
# Will have to create a menu of options such as gcs, s3, API key, etc. to authenticate
contract_validation = {}
for x in consumer:
contract_validation.update({x.get("contract_location"): x.get("credentials")})
print(f'{x.get("name")}: contract credentials verified {green("[OK connection ok]")}')
# TODO: output the consumed code to a `contracts/projects/consumed` directory
contracts_dir = project_dir + "/dbt_contracts"
if not os.path.exists(contracts_dir):
os.mkdir(contracts_dir)
# download the contracts from the contract_location and store them in the contracts_dir
# in the short-term, we will copy the contracts from the local test directory to the contracts_dir
# this contracts.json will consolidate a subset of the manifest.json, catalog.json, run_results.json, sources.json files and then merge that with the consumer's manifest.json, catalog.json(run_results.json, sources.json files are for validating contract requirements only)
dummy_contracts_file_location = "../tests/functional/dbt_contracts/contracts.json"
for x in consumer:
contract_name = x.get("name")
contract_version_expected = x.get("contract_version")
contract_destination = f"{contracts_dir}/{contract_name}-contracts.json"
with open(dummy_contracts_file_location) as json_file:
contract_data = json.load(json_file)
contract_version_actual = contract_data.get("metadata").get("contract_version")
if contract_version_expected == contract_version_actual:
shutil.copyfile(dummy_contracts_file_location, contract_destination)
print(f"Successful contract consumed[{contract_name}]: {contract_destination}")
# TODO: output the consumed contracts.json to a `contracts/consumed` directory within the respective consumed project directory
# TODO: Read in the consumed `contracts.json` to produce a report card in a terminal output
# What's published vs. private nodes?
print(f" Published Nodes: {contract_data.get('contracts').get('published')}")
print(f" Private Nodes: {contract_data.get('contracts').get('private')}")
# What are the contract expectations vs. actuals?
print(
f" Test Coverage: {contract_data.get('contracts').get('requirements').get('test_coverage')} {green('[OK and valid]')}"
)
print(
f" Freshness Coverage: {contract_data.get('contracts').get('requirements').get('freshness_coverage')} {green('[OK and valid]')}"
)
print(
f" Max Upgrade Time Between Versions: {contract_data.get('contracts').get('requirements').get('max_upgrade_time')}"
)
# What permissions do I need to select published nodes?
print(
f" Published Node Permissions: {contract_data.get('contracts').get('permissions')}"
)
# How do I select them?
contract_name = contract_data.get("contracts").get("name")
print(" Published Node Selection:")
print(f" select * from {{{{ ref('{contract_name}','my_first_model') }}}}")
print(f" select * from {{{{ ref('{contract_name}','my_second_model') }}}}")
else:
print(
f"Contract version mismatch, will not consume[{contract_name}]. Expected: {contract_version_expected}, Actual: {contract_version_actual} {red('[Not Compatible]')} \n"
)
# git clone may not be necessary because the contracts.json will contain all the info from the manifest.json and catalog.json
# for x in consumer:
# project_location = x.get("path")
# print(f"project_location: {project_location}")
# clone_and_checkout(repo=project_location, cwd=contracts_dir)
# if not packages:
# fire_event(DepsNoPackagesFound())
# return
# with downloads_directory():
# final_deps = resolve_packages(packages, self.config)
# renderer = DbtProjectYamlRenderer(self.config, self.config.cli_vars)
# packages_to_upgrade = []
# for package in final_deps:
# package_name = package.name
# source_type = package.source_type()
# version = package.get_version()
# fire_event(DepsStartPackageInstall(package_name=package_name))
# package.install(self.config, renderer)
# fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
# if source_type == "hub":
# version_latest = package.get_version_latest()
# if version_latest != version:
# packages_to_upgrade.append(package_name)
# fire_event(DepsUpdateAvailable(version_latest=version_latest))
# else:
# fire_event(DepsUTD())
# if package.get_subdirectory():
# fire_event(DepsListSubdirectory(subdirectory=package.get_subdirectory()))
# self.track_package_install(
# package_name=package_name, source_type=source_type, version=version
# )
# if packages_to_upgrade:
# fire_event(EmptyLine())
# fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))
@classmethod
def from_args(cls, args):
# deps needs to move to the project directory, as it does put files
# into the modules directory
move_to_nearest_project_dir(args)
return super().from_args(args)

View File

@@ -204,6 +204,7 @@ def _deep_map_render(
return ret
# TODO: We'll have to re-use this function for dbt_contracts.yml rendering
def deep_map_render(func: Callable[[Any, Tuple[Union[str, int], ...]], Any], value: Any) -> Any:
"""This function renders a nested dictionary derived from a yaml
file. It is used to render dbt_project.yml, profiles.yml, and

View File

@@ -0,0 +1,279 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/contracts/v1.json",
"dbt_version": "1.3.0b1",
"contract_version": "0.1.0",
"generated_at": "2022-08-17T13:50:25.756889Z",
"invocation_id": "0d8bd1ee-7cf9-4569-87e6-f03be091edf0",
"env": {},
"project_id": "55d83c99b23a2f84fa11d1299c6f9272",
"user_id": "3e8c7b85-8948-45a1-9da8-3c4956a6d938",
"send_anonymous_usage_stats": true,
"adapter_type": "snowflake"
},
"contracts": {
"name": "core-only",
"published": [
"upstream_producer.model.tpch.my_first_model",
"seed.upstream_project.my_first_dbt_seed"
],
"private": [
"model.upstream_project.my_private_model",
"seed.upstream_project.my_private_seed"
],
"requirements": {
"test_coverage": { "expected": 0.8, "actual": 0.85 },
"freshness_coverage": { "expected": 0.8, "actual": 0.85 },
"max_upgrade_time": { "days": 7 }
},
"permissions": ["select: ['user_a', 'user_b']"]
},
"nodes_manifest": {
"upstream_producer.model.tpch.my_first_model": {
"raw_sql": "/*\n Welcome to your first dbt model!\n Did you know that you can also configure models directly within SQL files?\n This will override configurations stated in dbt_project.yml\n Try changing \"table\" to \"view\" below\n*/\n\n\n{{ config(materialized='view') }}\n\n\nwith source_data as (\n\n select 1 as id\n union all\n select 2 as id\n\n)\n\nselect *\nfrom source_data\n\n/*\n Uncomment the line below to remove records with null `id` values\n*/\n\n-- where id is not null",
"resource_type": "model",
"depends_on": { "macros": [], "nodes": [] },
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [],
"meta": {},
"materialized": "view",
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"post-hook": [],
"pre-hook": []
},
"database": "ANALYTICS",
"schema": "dbt_sung",
"fqn": ["tpch", "demo_examples", "my_first_model"],
"unique_id": "model.tpch.my_first_model",
"package_name": "tpch",
"root_path": "/Users/sung/Desktop/dbt/dbt_env",
"path": "demo_examples/my_first_model.sql",
"original_file_path": "models/demo_examples/my_first_model.sql",
"name": "my_first_model",
"alias": "my_first_model",
"checksum": {
"name": "sha256",
"checksum": "727d4af05e45dc7556410e97db98eb49597270b9f9e0976773a3efe4a40deb47"
},
"tags": [],
"refs": [],
"sources": [],
"description": "",
"columns": {},
"meta": {},
"docs": { "show": true },
"patch_path": null,
"compiled_path": null,
"build_path": null,
"deferred": false,
"unrendered_config": { "materialized": "view" },
"created_at": 1648488021.2328372
},
"upstream_producer.model.tpch.my_second_model": {
"raw_sql": "-- Use the `ref` function to select from other models\n\nselect *\nfrom {{ ref('my_first_model') }}\nwhere id = 2",
"resource_type": "model",
"depends_on": { "macros": [], "nodes": ["model.tpch.my_first_model"] },
"config": {
"enabled": true,
"alias": null,
"schema": null,
"database": null,
"tags": [],
"meta": {},
"materialized": "view",
"persist_docs": {},
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "ignore",
"post-hook": [],
"pre-hook": []
},
"database": "ANALYTICS",
"schema": "dbt_sung",
"fqn": ["tpch", "demo_examples", "my_second_model"],
"unique_id": "model.tpch.my_second_model",
"package_name": "tpch",
"root_path": "/Users/sung/Desktop/dbt/dbt_env",
"path": "demo_examples/my_second_model.sql",
"original_file_path": "models/demo_examples/my_second_model.sql",
"name": "my_second_model",
"alias": "my_second_model",
"checksum": {
"name": "sha256",
"checksum": "f60b137bc59426bbf3b690a71e534735c72cc3f3567c2a4def1f81c578bd0c67"
},
"tags": [],
"refs": [["my_first_model"]],
"sources": [],
"description": "",
"columns": {},
"meta": {},
"docs": { "show": true },
"patch_path": null,
"compiled_path": null,
"build_path": null,
"deferred": false,
"unrendered_config": {},
"created_at": 1648488021.2358248
}
},
"nodes_catalog": {
"model.tpch.my_first_model": {
"metadata": {
"type": "VIEW",
"schema": "DBT_SUNG",
"name": "MY_FIRST_MODEL",
"database": "ANALYTICS",
"comment": null,
"owner": "TRANSFORMER"
},
"columns": {
"ID": { "type": "NUMBER", "index": 1, "name": "ID", "comment": null }
},
"stats": {
"has_stats": {
"id": "has_stats",
"label": "Has Stats?",
"value": false,
"include": false,
"description": "Indicates whether there are statistics for this table"
}
},
"unique_id": "model.tpch.my_first_model"
},
"model.tpch.my_second_model": {
"metadata": {
"type": "VIEW",
"schema": "DBT_SUNG",
"name": "MY_SECOND_MODEL",
"database": "ANALYTICS",
"comment": null,
"owner": "TRANSFORMER"
},
"columns": {
"ID": { "type": "NUMBER", "index": 1, "name": "ID", "comment": null }
},
"stats": {
"has_stats": {
"id": "has_stats",
"label": "Has Stats?",
"value": false,
"include": false,
"description": "Indicates whether there are statistics for this table"
}
},
"unique_id": "model.tpch.my_second_model"
}
},
"nodes_run_results": [
{
"status": "success",
"timing": [
{
"name": "compile",
"started_at": "2022-09-22T21:16:18.812819Z",
"completed_at": "2022-09-22T21:16:18.951459Z"
},
{
"name": "execute",
"started_at": "2022-09-22T21:16:18.966860Z",
"completed_at": "2022-09-22T21:16:18.966873Z"
}
],
"thread_id": "Thread-7",
"execution_time": 0.19610309600830078,
"adapter_response": {},
"message": null,
"failures": null,
"unique_id": "model.tpch.my_first_model"
},
{
"status": "success",
"timing": [
{
"name": "compile",
"started_at": "2022-09-22T21:16:19.684269Z",
"completed_at": "2022-09-22T21:16:19.737776Z"
},
{
"name": "execute",
"started_at": "2022-09-22T21:16:19.813574Z",
"completed_at": "2022-09-22T21:16:19.813583Z"
}
],
"thread_id": "Thread-17",
"execution_time": 0.29882001876831055,
"adapter_response": {},
"message": null,
"failures": null,
"unique_id": "model.tpch.my_second_model"
}
],
"nodes_sources_results": [
{
"unique_id": "source.tpch.tpch_test.lineitem",
"max_loaded_at": "1998-11-29T00:00:00+00:00",
"snapshotted_at": "2022-04-12T14:08:42.647000+00:00",
"max_loaded_at_time_ago_in_s": 737474922.647,
"status": "warn",
"criteria": {
"warn_after": { "count": 6, "period": "hour" },
"error_after": { "count": null, "period": null },
"filter": null
},
"adapter_response": {},
"timing": [
{
"name": "compile",
"started_at": "2022-04-12T14:08:41.488132Z",
"completed_at": "2022-04-12T14:08:41.488138Z"
},
{
"name": "execute",
"started_at": "2022-04-12T14:08:41.489064Z",
"completed_at": "2022-04-12T14:08:43.139148Z"
}
],
"thread_id": "Thread-3",
"execution_time": 1.6662919521331787
},
{
"unique_id": "source.tpch.tpch_test.orders",
"max_loaded_at": "1998-08-02T00:00:00+00:00",
"snapshotted_at": "2022-04-12T14:08:42.656000+00:00",
"max_loaded_at_time_ago_in_s": 747756522.656,
"status": "warn",
"criteria": {
"warn_after": { "count": 6, "period": "hour" },
"error_after": { "count": null, "period": null },
"filter": null
},
"adapter_response": {},
"timing": [
{
"name": "compile",
"started_at": "2022-04-12T14:08:41.475146Z",
"completed_at": "2022-04-12T14:08:41.475159Z"
},
{
"name": "execute",
"started_at": "2022-04-12T14:08:41.475848Z",
"completed_at": "2022-04-12T14:08:43.570703Z"
}
],
"thread_id": "Thread-4",
"execution_time": 2.0986380577087402
}
]
}

View File

@@ -0,0 +1,90 @@
# dbt_contracts.yml
# upstream-only config for illustration
consumer:
core-only: # give the project a plain name to ref
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
version: 0.2.0 # Versioning is at the project level, NOT the model level
models:
- ref('fct_orders')
- ref('dim_customers')
requirements: # these can be LESS strict compared to the upstream contract, but NEVER more
- tests
- '>5 run history' # I don't trust data until it's been run a few times, this is tracked statefully within the artifacts_location with run_results.json file counts
api_private_key: 'jioq2hfj28338' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too
# there's definitely a use case for a contracts config to contain BOTH upstream and downstream contracts
# ex: finance-only project that depends on core-only and flows data downstream to marketing-only
# dbt_contracts.yml
# core-only config for illustration
producer:
version: 0.2.0 # version at the top-level only, forced to update if config is different from dbt_contracts.json state file
finance-only: # give the project a plain name to ref
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
share_permissions:
full_share: # share everything, code, docs, node, data in lineage
models: # I have 5 models but I only expose data for 4
- ref('fct_orders')
- ref('dim_customers')
- ref('dim_parts')
- ref('dbt_metric')
nodes_only: # share node lineage but not docs, code, or data
models:
+except: # share everything except the below
- ref('stg_sensitive_code')
requirements:
- tests
- freshness
- '>5 run history' # I don't expect downstream users to trust data until it's been run a few times
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
- date: 11/11/2022 # date to upgrade by, defaults to 12am UTC
version: 0.1.0 # version to upgrade from
api_public_key: 'asfawef3' # replace with env_var for security
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
marketing-only: # give the project a plain name to ref
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
models: # I have 5 models but I only expose data for 1
- ref('dim_customers')
requirements:
- tests
- freshness
- '>5 run history' # I don't expect downstream users to trust data until it's been run a few times
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
- days: 0 # No time to upgrade, I'm breaking the contract
api_public_key: 'asfawef3' # replace with env_var for security
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
# TODO: how to enforce least privilege across the any config and stricter contracts above?
any: # any downstream project and ref this contract
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
models: # I have 5 models but I only expose data for 1
- ref('example_analysis')
requirements:
- tests
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
- days: 10
api_public_key: 'asfawef3' # replace with env_var for security
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
multi-project: # unique config for multi-project contracts
contract_list: # define a list of projects to enforce in a single contract, these have their own configs to map to this list
- operations-only
- sales-only
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
models: # I have 5 models but I only expose data for 1
- ref('example_analysis')
requirements:
- tests
max_upgrade_time: # enforce how long someone has to upgrade from a previous version to the latest
- days: 10
api_public_key: 'asfawef3' # replace with env_var for security
artifacts_location: 's3://my-bucket/dbt-contracts' # replace with env_var for security, this can be a local path too AND different from upstream, dbt needs to read those files in memory to compare them
# if the project configs only contain a path, then it searches for the multi-project config
sales-only:
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
operations-only:
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/

View File

@@ -0,0 +1,16 @@
consumer: # akin to a generic top level import like how people work with python
# this presumes the upstream producer solely dictates contract terms
# it is the responsibility of the producer to validate the contract is met
# the consumer is responsible for validating the contract is met to more strigent standards if needed
- name: 'core-only' # give the project a plain name to ref
# path: https://github.com/sungchun12/dbt_bigquery_example.git # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
contract_version: 0.2.0 # Versioning is at the project level, NOT the model level
contract_location: 's3://core-only-bucket/dbt-contracts'
credentials: '{"aws_access_key_id": "YOUR_ACCESS_KEY_ID", "aws_secret_access_key":"YOUR_SECRET_ACCESS_KEY"}' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
# artifacts_location: argument NOT required as it inherits from producer
- name: 'finance-only' # give the project a plain name to ref
# path: https://github.com/sungchun12/snowflake_dbt_demo_project.git # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
contract_version: 0.1.0 # Versioning is at the project level, NOT the model level
contract_location: 's3://finance-only-bucket/dbt-contracts'
credentials: '{"aws_access_key_id": "YOUR_ACCESS_KEY_ID", "aws_secret_access_key":"YOUR_SECRET_ACCESS_KEY"}' # replace with env_var for security TODO: how to store this securely? Do we read this in memory and match public and private api keys?
# artifacts_location: argument NOT required as it inherits from producer

View File

@@ -0,0 +1,52 @@
version: 2
models:
- name: dim_customers
contracts:
producer: # this is implied, but should be explicit for clarity
finance-only:
version: 0.1.0
nodes_only: false # optional, default is false
requirements:
test_coverage: # how many of the models are required to be tested
enabled: true # optional, default is false
# threshold: .80 # 80% of the models must be tested, optional, default is 1.0
freshness_coverage: # how many of the sources are required to be fresh
enabled: true # optional, default is false
# threshold: .80 # 80% of the models must be tested, optional, default is 1.0
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
success_only: true # only produce at successful runs else errors for consumers
max_upgrade_time:
days: 10 # how many days can a project be upgraded before it is considered stale
security:
api_public_key: 'asfawef3' # replace with env_var for security
description: Customer dimensions table
docs:
node_color: 'red'
columns:
- name: customer_key
description: Primary key on the customers table
tests:
- unique
- not_null
- name: region
description: region name
tests:
- accepted_values:
values: ['AFRICA','MIDDLE EAST','ASIA','EUROPE','AMERICA']
severity: warn
- name: name
description: customer id
- name: address
description: address of the customer
- name: nation
description: nation name
- name: phone_number
description: phone number of the customer
- name: account_balance
description: '{{ doc("account_balance") }}'
- name: market_segment
description: market segment of the customer
tests:
- unique
- not_null

View File

@@ -0,0 +1,43 @@
/*
Welcome to your first dbt model!
Did you know that you can also configure models directly within SQL files?
This will override configurations stated in dbt_project.yml
Try changing "table" to "view" below
*/
{{ config(materialized='view',
contracts={
"producer": {
"finance-only": {
"version": "0.1.0",
"requirements": {
"test_coverage": { "enabled": true },
"freshness_coverage": { "enabled": true },
"run_history": 5,
"success_only": true,
"max_upgrade_time": { "days": 10 }
},
"security": { "api_public_key": "asfawef3" }
}
}
}
) }}
with source_data as (
select 1 as id
union all
select 2 as id
)
select *
from source_data
/*
Uncomment the line below to remove records with null `id` values
*/
-- where id is not null

View File

@@ -0,0 +1,142 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'tpch'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'tpch'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
#TODO: add api key config here?
# one project = one remote contract artifact location
contract-paths:
local: "contracts"
remote: "s3://my-bucket/dbt-contracts"
target-path: "custom_target_path" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
on-run-start:
- "alter warehouse transforming set warehouse_size=small;"
- '{{create_udfs()}}' # comment / uncomment this line to build UDFs called in the create_udfs macro
on-run-end:
- "alter warehouse transforming set warehouse_size=xsmall;"
- "{{ grant_all_on_schemas(schemas, 'transformer') }}"
vars:
load_type: 'I'
start_date: '1999-01-01'
test: 'false' # to trigger runs for unit testing - override in a CLI param in testing job
fct_order_items: 'mock_source__fct_order_items' # this is a map for unit testing
dbt_artifacts:
dbt_artifacts_schema: dbt_artifacts_sung # optional, default is 'dbt_artifacts'
dbt_artifacts_table: artifacts # optional, default is 'artifacts'
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
contracts:
producer: # this top-level producer contract for ANY consumers to access
version: 0.1.0
+grants:
select: ['user_a', 'user_b']
models: # I have 5 models but I only expose data for 1
- ref('example_analysis')
- entity('orders')
metrics:
- metric('revenue')
nodes_only:
+except: # share everything except the below
- ref('stg_sensitive_code')
requirements:
test_coverage: # how many of the models are required to be tested
enabled: true # optional, default is false
threshold: .80 # 80% of the models must be tested, optional, default is 1.0
freshness_coverage: # how many of the sources are required to be fresh
enabled: true # optional, default is false
threshold: .80 # 80% of the models must be tested, optional, default is 1.0
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
success_only: true # only produce at successful runs else errors for consumers
max_upgrade_time:
days: 10 # how many days can a project be upgraded before it is considered stale
security:
api_public_key: 'asfawef3' # replace with env_var for security
dbt_artifacts:
+docs:
show: false
+schema: dbt_artifacts_sung
staging:
+schema: dbt_artifacts_sung
tpch:
staging:
+materialized: view
+docs:
# show: false
node_color: "#cd7f32"
marts:
core:
contracts:
producer: # this is implied, but should be explicit for clarity
finance-only:
version: 0.1.0
models: # I have 5 models but I only expose data for 1
- ref('example_analysis')
nodes_only:
+except: # share everything except the below
- ref('stg_sensitive_code')
requirements:
test_coverage: .80 # how many of the models are required to be tested
freshness_coverage: .80 # how many of the sources are required to be fresh
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
success_only: true # only produce at successful runs else errors for consumers
max_upgrade_time:
days: 10 # how many days can a project be upgraded before it is considered stale
security:
api_public_key: 'asfawef3' # replace with env_var for security
multi-project:
sales-only:
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
operations-only:
path: git repo OR local subfolder # example: "https://{{env_var('DBT_ENV_SECRET_GIT_CREDENTIAL')}}@github.com/dbt-labs/awesome_repo.git" OR ./models/core/
version: 0.3.0
models: # I have 5 models but I only expose data for 1
- ref('example_analysis')
nodes_only:
+except: # share everything except the below
- ref('stg_sensitive_code')
requirements:
test_coverage: .80 # how many of the models are required to be tested
freshness_coverage: .80 # how many of the sources are required to be fresh
run_history: 5 # how many sucessful runs are required to be in the run history, look at number of successful run_results.json files in a location and compare to this number
success_only: true # only produce at successful runs else errors for consumers
max_upgrade_time:
days: 10 # how many days can a project be upgraded before it is considered stale
security:
api_public_key: 'asfawef3' # replace with env_var for security
materialized: table
+docs:
node_color: "blue"
seeds:
tpch:
snowflake_contract_rates:
+column_types:
effective_date: DATE
rate: NUMBER

View File

@@ -0,0 +1,55 @@
{
"name": "tpch",
"version": "1.0.0",
"config-version": 2,
"profile": "tpch",
"model-paths": ["models"],
"analysis-paths": ["analysis"],
"test-paths": ["tests"],
"seed-paths": ["data"],
"macro-paths": ["macros"],
"snapshot-paths": ["snapshots"],
"target-path": "custom_target_path",
"clean-targets": ["target", "dbt_modules"],
"on-run-start": [
"alter warehouse transforming set warehouse_size=small;",
"{{create_udfs()}}"
],
"on-run-end": [
"alter warehouse transforming set warehouse_size=xsmall;",
"{{ grant_all_on_schemas(schemas, 'transformer') }}"
],
"vars": {
"load_type": "I",
"start_date": "1999-01-01",
"test": "false",
"fct_order_items": "mock_source__fct_order_items",
"dbt_artifacts": {
"dbt_artifacts_schema": "dbt_artifacts_sung",
"dbt_artifacts_table": "artifacts"
}
},
"models": {
"dbt_artifacts": {
"+docs": { "show": false },
"+schema": "dbt_artifacts_sung",
"staging": { "+schema": "dbt_artifacts_sung" }
},
"tpch": {
"staging": {
"+materialized": "view",
"+docs": { "node_color": "#cd7f32" }
},
"marts": {
"core": { "materialized": "table", "+docs": { "node_color": "blue" } }
}
}
},
"seeds": {
"tpch": {
"snowflake_contract_rates": {
"+column_types": { "effective_date": "DATE", "rate": "NUMBER" }
}
}
}
}

File diff suppressed because it is too large Load Diff