Compare commits

...

7 Commits

Author SHA1 Message Date
Colin
3ce8d1490e Merge branch 'main' into externalCatalogConfig 2024-10-10 14:44:35 -07:00
Colin
d76fc5751a add catalog to provider and components 2024-10-10 12:51:30 -07:00
Colin
418ce4d62f add functional test and add catalogs to source resolution 2024-10-09 12:55:45 -07:00
Colin
288be96754 explore where/how to ingest the catalog config 2024-10-03 15:52:06 -07:00
Colin
706ff326e9 add catalog config to manifest.py 2024-10-02 08:58:51 -07:00
Colin
b5bf57b910 add catalog config to manifest.py 2024-10-02 08:57:51 -07:00
Colin
666cc3bb72 add py.typed 2024-10-01 13:58:54 -07:00
12 changed files with 161 additions and 7 deletions

View File

@@ -3,6 +3,8 @@ from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
from dbt_config.catalog_config import ExternalCatalog
from dbt.artifacts.resources.base import Docs, FileHash, GraphResource
from dbt.artifacts.resources.types import NodeType, TimePeriod
from dbt.artifacts.resources.v1.config import NodeConfig
@@ -164,6 +166,7 @@ class DeferRelation(HasRelationMetadata):
meta: Dict[str, Any]
tags: List[str]
config: Optional[NodeConfig]
external_catalog: Optional[ExternalCatalog]
@property
def identifier(self):

View File

@@ -0,0 +1,24 @@
from typing import Dict, Optional
from dbt.clients.yaml_helper import load_yaml_text
from dbt.constants import EXTERNAL_CATALOG_FILE_NAME
from dbt_common.clients.system import load_file_contents, path_exists
def _load_yaml(path):
contents = load_file_contents(path)
return load_yaml_text(contents)
def _load_yml_dict(file_path):
if path_exists(file_path):
ret = _load_yaml(file_path) or {}
return ret
return None
def load_external_catalog_config(project) -> Optional[Dict]:
unparsed_config = _load_yml_dict(f"{project.project_root}/{EXTERNAL_CATALOG_FILE_NAME}")
if unparsed_config is not None:
return unparsed_config
return None

View File

@@ -229,3 +229,9 @@ class PackageRenderer(SecretRenderer):
@property
def name(self):
return "Packages config"
class CatalogRenderer(SecretRenderer):
@property
def name(self):
return "Catalog config"

View File

@@ -15,6 +15,8 @@ from typing import (
Type,
)
from dbt_config.catalog_config import ExternalCatalogConfig
from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
@@ -39,6 +41,7 @@ from dbt_common.dataclass_schema import ValidationError
from dbt_common.events.functions import warn_or_error
from dbt_common.helper_types import DictDefaultEmptyStr, FQNPath, PathSet
from .external_config import load_external_catalog_config
from .profile import Profile
from .project import Project
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
@@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
catalogs: Optional[ExternalCatalogConfig] = None
def __post_init__(self):
self.validate()
@@ -125,12 +129,15 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile: Profile,
args: Any,
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None,
catalogs: Optional[ExternalCatalogConfig] = None,
) -> "RuntimeConfig":
"""Instantiate a RuntimeConfig from its components.
:param profile: A parsed dbt Profile.
:param project: A parsed dbt Project.
:param args: The parsed command-line arguments.
:param dependencies: A mapping of project names to RuntimeConfigs.
:param catalogs: A parsed dbt ExternalCatalogConfig.
:returns RuntimeConfig: The new configuration.
"""
quoting: Dict[str, Any] = (
@@ -194,6 +201,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
catalogs=catalogs,
)
# Called by 'load_projects' in this class
@@ -253,7 +261,9 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
# Called by RuntimeConfig.from_args
@classmethod
def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]:
def collect_parts(
cls: Type["RuntimeConfig"], args: Any
) -> Tuple[Project, Profile, Optional[ExternalCatalogConfig]]:
# profile_name from the project
project_root = args.project_dir if args.project_dir else os.getcwd()
cli_vars: Dict[str, Any] = getattr(args, "vars", {})
@@ -264,7 +274,9 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
)
flags = get_flags()
project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars)
return project, profile
catalog_yml = load_external_catalog_config(project)
catalogs = ExternalCatalogConfig.model_validate(catalog_yml) if catalog_yml else None
return project, profile, catalogs
# Called in task/base.py, in BaseTask.from_args
@classmethod
@@ -278,12 +290,13 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
:raises DbtProfileError: If the profile is invalid or missing.
:raises DbtValidationError: If the cli variables are invalid.
"""
project, profile = cls.collect_parts(args)
project, profile, catalogs = cls.collect_parts(args)
return cls.from_parts(
project=project,
profile=profile,
args=args,
catalogs=catalogs,
)
def get_metadata(self) -> ManifestMetadata:

View File

@@ -15,6 +15,7 @@ DBT_PROJECT_FILE_NAME = "dbt_project.yml"
PACKAGES_FILE_NAME = "packages.yml"
DEPENDENCIES_FILE_NAME = "dependencies.yml"
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
EXTERNAL_CATALOG_FILE_NAME = "catalog.yml"
MANIFEST_FILE_NAME = "manifest.json"
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"

View File

@@ -19,6 +19,7 @@ from typing import (
from typing_extensions import Protocol
from dbt import selected_resources
from dbt.adapters.base.catalog import ExternalCatalogIntegrations
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
@@ -890,6 +891,9 @@ class ProviderContext(ManifestContext):
self.context_config: Optional[ContextConfig] = context_config
self.provider: Provider = provider
self.adapter = get_adapter(self.config)
self.catalog_integrations = ExternalCatalogIntegrations.from_json_strings(
self.manifest.catalogs.values(), self.adapter.ExternalCatalogIntegration
)
# The macro namespace is used in creating the DatabaseWrapper
self.db_wrapper = self.provider.DatabaseWrapper(self.adapter, self.namespace)
@@ -1287,6 +1291,7 @@ class ProviderContext(ManifestContext):
return {
"Relation": self.db_wrapper.Relation,
"Column": self.adapter.Column,
"catalogs": self.catalog_integrations,
}
@contextproperty()

View File

@@ -20,6 +20,7 @@ from typing import (
Union,
)
from dbt_config.catalog_config import ExternalCatalog
from typing_extensions import Protocol
import dbt_common.exceptions
@@ -882,6 +883,7 @@ class Manifest(MacroMethods, dbtClassMixin):
unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict)
saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict)
fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict)
catalogs: MutableMapping[str, str] = field(default_factory=dict)
_doc_lookup: Optional[DocLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
@@ -1379,6 +1381,26 @@ class Manifest(MacroMethods, dbtClassMixin):
current_project: str,
node_package: str,
) -> MaybeParsedSource:
if target_source_name in self.catalogs:
catalog = ExternalCatalog.model_validate_json(self.catalogs[target_source_name])
identifier = f"{target_source_name}.{target_table_name}"
catalog_database = catalog.configuration.internal_namespace.database
catalog_schema = catalog.configuration.internal_namespace.schema_
return SourceDefinition(
database=catalog_database,
schema=catalog_schema,
fqn=[catalog_database, catalog_schema, catalog.name, target_table_name],
name=target_table_name,
source_description=f"External Catalog source for {target_source_name}.{target_table_name}",
source_name=target_source_name,
unique_id=identifier,
identifier=identifier,
package_name="dbt",
path="/root/catalogs.yml",
loader=catalog.type.value,
resource_type=NodeType.Source,
original_file_path="/root/catalogs.yml",
)
search_name = f"{target_source_name}.{target_table_name}"
candidates = _packages_to_search(current_project, node_package)

View File

@@ -10,6 +10,7 @@ from itertools import chain
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union
import msgpack
from dbt_config.catalog_config import ExternalCatalogConfig
import dbt.deprecations
import dbt.exceptions
@@ -29,6 +30,7 @@ from dbt.artifacts.schemas.base import Writable
from dbt.clients.jinja import MacroStack, get_rendered
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.config import Project, RuntimeConfig
from dbt.config.external_config import load_external_catalog_config
from dbt.constants import (
MANIFEST_FILE_NAME,
PARTIAL_PARSE_FILE_NAME,
@@ -444,6 +446,13 @@ class ManifestLoader:
self.manifest.sources = patcher.sources
self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch
# Get catalog.yml and update the manifest
raw_catalog = load_external_catalog_config(self.root_project)
if raw_catalog:
catalog_config = ExternalCatalogConfig.model_validate(raw_catalog)
self.manifest.catalogs = {
c.name: c.model_dump_json(by_alias=True) for c in catalog_config.catalogs
}
# We need to rebuild disabled in order to include disabled sources
self.manifest.rebuild_disabled_lookup()
@@ -466,6 +475,7 @@ class ManifestLoader:
self.process_docs(self.root_project)
self.process_metrics(self.root_project)
self.process_saved_queries(self.root_project)
self.process_catalog(self.root_project)
self.process_model_inferred_primary_keys()
self.check_valid_group_config()
self.check_valid_access_property()
@@ -1140,6 +1150,11 @@ class ManifestLoader:
continue
_process_metrics_for_node(self.manifest, current_project, exposure)
def process_catalog(self, config: RuntimeConfig):
if config.catalogs:
for catalog in config.catalogs.catalogs:
self.manifest.catalogs[catalog.name] = catalog.model_dump_json(by_alias=True)
def process_saved_queries(self, config: RuntimeConfig):
"""Processes SavedQuery nodes to populate their `depends_on`."""
# Note: This will also capture various nodes which have been re-parsed

View File

@@ -1,6 +1,7 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#egg=dbt-config&subdirectory=config
git+https://github.com/dbt-labs/dbt-postgres.git@main
# black must match what's in .pre-commit-config.yaml to be sure local env matches CI
black==24.3.0

View File

@@ -32,7 +32,7 @@ models:
models__untagged_sql = """
{{
config(materialized='table')
config(materialized=table)
}}
select id, value from {{ source('raw', 'seed') }}

View File

@@ -0,0 +1,48 @@
import pytest
import yaml
from dbt_config.catalog_config import ExternalCatalog
from dbt.tests.util import run_dbt, write_file
@pytest.fixture(scope="class", autouse=True)
def dbt_catalog_config(project_root):
config = {
"catalogs": [
{
"name": "my_external_catalog",
"type": "iceberg",
"configuration": {
"table_format": "iceberg",
"catalog_namespace": "dbt",
"internal_namespace": {
"database": "my_db",
"schema": "my_schema",
},
"external_location": "s3://my-bucket/my-path",
},
"management": {
"enabled": True,
"create_if_not_exists": False,
"alter_if_different": False,
"read_only": True,
"refresh": "on-start",
},
}
],
}
write_file(yaml.safe_dump(config), project_root, "catalog.yml")
class TestCatalogConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": "select 1 as id from {{ source('my_external_catalog', 'my_table') }}",
}
def test_supplying_external_catalog(self, project):
manifest = run_dbt(["parse"])
assert manifest.catalogs != {}
assert manifest.nodes["model.test.model"].sources == [["my_external_catalog", "my_table"]]
ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"])

View File

@@ -6,12 +6,15 @@ issues.
import os
import string
from typing import Dict
from unittest import TestCase, mock
import agate
import pytest
from dbt_config.catalog_config import ExternalCatalogConfig
from dbt.config.project import PartialProject
from dbt.config.renderer import CatalogRenderer
from dbt.contracts.graph.manifest import Manifest
from dbt_common.dataclass_schema import ValidationError
@@ -57,6 +60,14 @@ def profile_from_dict(profile, profile_name, cli_vars="{}"):
)
def catalog_from_dict(catalog, cli_vars=None):
if cli_vars is None:
cli_vars = {}
renderer = CatalogRenderer(cli_vars)
rendered = renderer.render_value(catalog)
return ExternalCatalogConfig.model_validate(rendered)
def project_from_dict(project, profile, packages=None, selectors=None, cli_vars="{}"):
from dbt.config.renderer import DbtProjectYamlRenderer
from dbt.config.utils import parse_cli_vars
@@ -77,7 +88,9 @@ def project_from_dict(project, profile, packages=None, selectors=None, cli_vars=
return partial.render(renderer)
def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars={}):
def config_from_parts_or_dicts(
project, profile, packages=None, selectors=None, cli_vars={}, catalogs=None
):
from copy import deepcopy
from dbt.config import Profile, Project, RuntimeConfig
@@ -103,10 +116,13 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None,
cli_vars,
)
if isinstance(catalogs, Dict):
catalogs = catalog_from_dict(catalogs, cli_vars)
args = Obj()
args.vars = cli_vars
args.profile_dir = "/dev/null"
return RuntimeConfig.from_parts(project=project, profile=profile, args=args)
return RuntimeConfig.from_parts(project=project, profile=profile, args=args, catalogs=catalogs)
def inject_plugin(plugin):