mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-19 19:31:29 +00:00
Compare commits
7 Commits
enable-pos
...
externalCa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce8d1490e | ||
|
|
d76fc5751a | ||
|
|
418ce4d62f | ||
|
|
288be96754 | ||
|
|
706ff326e9 | ||
|
|
b5bf57b910 | ||
|
|
666cc3bb72 |
@@ -3,6 +3,8 @@ from dataclasses import dataclass, field
|
||||
from datetime import timedelta
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from dbt_config.catalog_config import ExternalCatalog
|
||||
|
||||
from dbt.artifacts.resources.base import Docs, FileHash, GraphResource
|
||||
from dbt.artifacts.resources.types import NodeType, TimePeriod
|
||||
from dbt.artifacts.resources.v1.config import NodeConfig
|
||||
@@ -164,6 +166,7 @@ class DeferRelation(HasRelationMetadata):
|
||||
meta: Dict[str, Any]
|
||||
tags: List[str]
|
||||
config: Optional[NodeConfig]
|
||||
external_catalog: Optional[ExternalCatalog]
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
|
||||
24
core/dbt/config/external_config.py
Normal file
24
core/dbt/config/external_config.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from typing import Dict, Optional
|
||||
|
||||
from dbt.clients.yaml_helper import load_yaml_text
|
||||
from dbt.constants import EXTERNAL_CATALOG_FILE_NAME
|
||||
from dbt_common.clients.system import load_file_contents, path_exists
|
||||
|
||||
|
||||
def _load_yaml(path):
|
||||
contents = load_file_contents(path)
|
||||
return load_yaml_text(contents)
|
||||
|
||||
|
||||
def _load_yml_dict(file_path):
|
||||
if path_exists(file_path):
|
||||
ret = _load_yaml(file_path) or {}
|
||||
return ret
|
||||
return None
|
||||
|
||||
|
||||
def load_external_catalog_config(project) -> Optional[Dict]:
|
||||
unparsed_config = _load_yml_dict(f"{project.project_root}/{EXTERNAL_CATALOG_FILE_NAME}")
|
||||
if unparsed_config is not None:
|
||||
return unparsed_config
|
||||
return None
|
||||
@@ -229,3 +229,9 @@ class PackageRenderer(SecretRenderer):
|
||||
@property
|
||||
def name(self):
|
||||
return "Packages config"
|
||||
|
||||
|
||||
class CatalogRenderer(SecretRenderer):
|
||||
@property
|
||||
def name(self):
|
||||
return "Catalog config"
|
||||
|
||||
@@ -15,6 +15,8 @@ from typing import (
|
||||
Type,
|
||||
)
|
||||
|
||||
from dbt_config.catalog_config import ExternalCatalogConfig
|
||||
|
||||
from dbt import tracking
|
||||
from dbt.adapters.contracts.connection import (
|
||||
AdapterRequiredConfig,
|
||||
@@ -39,6 +41,7 @@ from dbt_common.dataclass_schema import ValidationError
|
||||
from dbt_common.events.functions import warn_or_error
|
||||
from dbt_common.helper_types import DictDefaultEmptyStr, FQNPath, PathSet
|
||||
|
||||
from .external_config import load_external_catalog_config
|
||||
from .profile import Profile
|
||||
from .project import Project
|
||||
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
|
||||
@@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
profile_name: str
|
||||
cli_vars: Dict[str, Any]
|
||||
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
|
||||
catalogs: Optional[ExternalCatalogConfig] = None
|
||||
|
||||
def __post_init__(self):
|
||||
self.validate()
|
||||
@@ -125,12 +129,15 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
profile: Profile,
|
||||
args: Any,
|
||||
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None,
|
||||
catalogs: Optional[ExternalCatalogConfig] = None,
|
||||
) -> "RuntimeConfig":
|
||||
"""Instantiate a RuntimeConfig from its components.
|
||||
|
||||
:param profile: A parsed dbt Profile.
|
||||
:param project: A parsed dbt Project.
|
||||
:param args: The parsed command-line arguments.
|
||||
:param dependencies: A mapping of project names to RuntimeConfigs.
|
||||
:param catalogs: A parsed dbt ExternalCatalogConfig.
|
||||
:returns RuntimeConfig: The new configuration.
|
||||
"""
|
||||
quoting: Dict[str, Any] = (
|
||||
@@ -194,6 +201,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
dependencies=dependencies,
|
||||
dbt_cloud=project.dbt_cloud,
|
||||
flags=project.flags,
|
||||
catalogs=catalogs,
|
||||
)
|
||||
|
||||
# Called by 'load_projects' in this class
|
||||
@@ -253,7 +261,9 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
|
||||
# Called by RuntimeConfig.from_args
|
||||
@classmethod
|
||||
def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]:
|
||||
def collect_parts(
|
||||
cls: Type["RuntimeConfig"], args: Any
|
||||
) -> Tuple[Project, Profile, Optional[ExternalCatalogConfig]]:
|
||||
# profile_name from the project
|
||||
project_root = args.project_dir if args.project_dir else os.getcwd()
|
||||
cli_vars: Dict[str, Any] = getattr(args, "vars", {})
|
||||
@@ -264,7 +274,9 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
)
|
||||
flags = get_flags()
|
||||
project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars)
|
||||
return project, profile
|
||||
catalog_yml = load_external_catalog_config(project)
|
||||
catalogs = ExternalCatalogConfig.model_validate(catalog_yml) if catalog_yml else None
|
||||
return project, profile, catalogs
|
||||
|
||||
# Called in task/base.py, in BaseTask.from_args
|
||||
@classmethod
|
||||
@@ -278,12 +290,13 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
:raises DbtProfileError: If the profile is invalid or missing.
|
||||
:raises DbtValidationError: If the cli variables are invalid.
|
||||
"""
|
||||
project, profile = cls.collect_parts(args)
|
||||
project, profile, catalogs = cls.collect_parts(args)
|
||||
|
||||
return cls.from_parts(
|
||||
project=project,
|
||||
profile=profile,
|
||||
args=args,
|
||||
catalogs=catalogs,
|
||||
)
|
||||
|
||||
def get_metadata(self) -> ManifestMetadata:
|
||||
|
||||
@@ -15,6 +15,7 @@ DBT_PROJECT_FILE_NAME = "dbt_project.yml"
|
||||
PACKAGES_FILE_NAME = "packages.yml"
|
||||
DEPENDENCIES_FILE_NAME = "dependencies.yml"
|
||||
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
|
||||
EXTERNAL_CATALOG_FILE_NAME = "catalog.yml"
|
||||
MANIFEST_FILE_NAME = "manifest.json"
|
||||
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
|
||||
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
|
||||
|
||||
@@ -19,6 +19,7 @@ from typing import (
|
||||
from typing_extensions import Protocol
|
||||
|
||||
from dbt import selected_resources
|
||||
from dbt.adapters.base.catalog import ExternalCatalogIntegrations
|
||||
from dbt.adapters.base.column import Column
|
||||
from dbt.adapters.base.relation import EventTimeFilter
|
||||
from dbt.adapters.contracts.connection import AdapterResponse
|
||||
@@ -890,6 +891,9 @@ class ProviderContext(ManifestContext):
|
||||
self.context_config: Optional[ContextConfig] = context_config
|
||||
self.provider: Provider = provider
|
||||
self.adapter = get_adapter(self.config)
|
||||
self.catalog_integrations = ExternalCatalogIntegrations.from_json_strings(
|
||||
self.manifest.catalogs.values(), self.adapter.ExternalCatalogIntegration
|
||||
)
|
||||
# The macro namespace is used in creating the DatabaseWrapper
|
||||
self.db_wrapper = self.provider.DatabaseWrapper(self.adapter, self.namespace)
|
||||
|
||||
@@ -1287,6 +1291,7 @@ class ProviderContext(ManifestContext):
|
||||
return {
|
||||
"Relation": self.db_wrapper.Relation,
|
||||
"Column": self.adapter.Column,
|
||||
"catalogs": self.catalog_integrations,
|
||||
}
|
||||
|
||||
@contextproperty()
|
||||
|
||||
@@ -20,6 +20,7 @@ from typing import (
|
||||
Union,
|
||||
)
|
||||
|
||||
from dbt_config.catalog_config import ExternalCatalog
|
||||
from typing_extensions import Protocol
|
||||
|
||||
import dbt_common.exceptions
|
||||
@@ -882,6 +883,7 @@ class Manifest(MacroMethods, dbtClassMixin):
|
||||
unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict)
|
||||
saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict)
|
||||
fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict)
|
||||
catalogs: MutableMapping[str, str] = field(default_factory=dict)
|
||||
|
||||
_doc_lookup: Optional[DocLookup] = field(
|
||||
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
|
||||
@@ -1379,6 +1381,26 @@ class Manifest(MacroMethods, dbtClassMixin):
|
||||
current_project: str,
|
||||
node_package: str,
|
||||
) -> MaybeParsedSource:
|
||||
if target_source_name in self.catalogs:
|
||||
catalog = ExternalCatalog.model_validate_json(self.catalogs[target_source_name])
|
||||
identifier = f"{target_source_name}.{target_table_name}"
|
||||
catalog_database = catalog.configuration.internal_namespace.database
|
||||
catalog_schema = catalog.configuration.internal_namespace.schema_
|
||||
return SourceDefinition(
|
||||
database=catalog_database,
|
||||
schema=catalog_schema,
|
||||
fqn=[catalog_database, catalog_schema, catalog.name, target_table_name],
|
||||
name=target_table_name,
|
||||
source_description=f"External Catalog source for {target_source_name}.{target_table_name}",
|
||||
source_name=target_source_name,
|
||||
unique_id=identifier,
|
||||
identifier=identifier,
|
||||
package_name="dbt",
|
||||
path="/root/catalogs.yml",
|
||||
loader=catalog.type.value,
|
||||
resource_type=NodeType.Source,
|
||||
original_file_path="/root/catalogs.yml",
|
||||
)
|
||||
search_name = f"{target_source_name}.{target_table_name}"
|
||||
candidates = _packages_to_search(current_project, node_package)
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from itertools import chain
|
||||
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union
|
||||
|
||||
import msgpack
|
||||
from dbt_config.catalog_config import ExternalCatalogConfig
|
||||
|
||||
import dbt.deprecations
|
||||
import dbt.exceptions
|
||||
@@ -29,6 +30,7 @@ from dbt.artifacts.schemas.base import Writable
|
||||
from dbt.clients.jinja import MacroStack, get_rendered
|
||||
from dbt.clients.jinja_static import statically_extract_macro_calls
|
||||
from dbt.config import Project, RuntimeConfig
|
||||
from dbt.config.external_config import load_external_catalog_config
|
||||
from dbt.constants import (
|
||||
MANIFEST_FILE_NAME,
|
||||
PARTIAL_PARSE_FILE_NAME,
|
||||
@@ -444,6 +446,13 @@ class ManifestLoader:
|
||||
self.manifest.sources = patcher.sources
|
||||
self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch
|
||||
|
||||
# Get catalog.yml and update the manifest
|
||||
raw_catalog = load_external_catalog_config(self.root_project)
|
||||
if raw_catalog:
|
||||
catalog_config = ExternalCatalogConfig.model_validate(raw_catalog)
|
||||
self.manifest.catalogs = {
|
||||
c.name: c.model_dump_json(by_alias=True) for c in catalog_config.catalogs
|
||||
}
|
||||
# We need to rebuild disabled in order to include disabled sources
|
||||
self.manifest.rebuild_disabled_lookup()
|
||||
|
||||
@@ -466,6 +475,7 @@ class ManifestLoader:
|
||||
self.process_docs(self.root_project)
|
||||
self.process_metrics(self.root_project)
|
||||
self.process_saved_queries(self.root_project)
|
||||
self.process_catalog(self.root_project)
|
||||
self.process_model_inferred_primary_keys()
|
||||
self.check_valid_group_config()
|
||||
self.check_valid_access_property()
|
||||
@@ -1140,6 +1150,11 @@ class ManifestLoader:
|
||||
continue
|
||||
_process_metrics_for_node(self.manifest, current_project, exposure)
|
||||
|
||||
def process_catalog(self, config: RuntimeConfig):
|
||||
if config.catalogs:
|
||||
for catalog in config.catalogs.catalogs:
|
||||
self.manifest.catalogs[catalog.name] = catalog.model_dump_json(by_alias=True)
|
||||
|
||||
def process_saved_queries(self, config: RuntimeConfig):
|
||||
"""Processes SavedQuery nodes to populate their `depends_on`."""
|
||||
# Note: This will also capture various nodes which have been re-parsed
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
git+https://github.com/dbt-labs/dbt-adapters.git@main
|
||||
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
|
||||
git+https://github.com/dbt-labs/dbt-common.git@main
|
||||
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig
|
||||
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#egg=dbt-config&subdirectory=config
|
||||
git+https://github.com/dbt-labs/dbt-postgres.git@main
|
||||
# black must match what's in .pre-commit-config.yaml to be sure local env matches CI
|
||||
black==24.3.0
|
||||
|
||||
@@ -32,7 +32,7 @@ models:
|
||||
|
||||
models__untagged_sql = """
|
||||
{{
|
||||
config(materialized='table')
|
||||
config(materialized=table)
|
||||
}}
|
||||
|
||||
select id, value from {{ source('raw', 'seed') }}
|
||||
|
||||
48
tests/functional/test_external_catalog.py
Normal file
48
tests/functional/test_external_catalog.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import pytest
|
||||
import yaml
|
||||
from dbt_config.catalog_config import ExternalCatalog
|
||||
|
||||
from dbt.tests.util import run_dbt, write_file
|
||||
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def dbt_catalog_config(project_root):
|
||||
config = {
|
||||
"catalogs": [
|
||||
{
|
||||
"name": "my_external_catalog",
|
||||
"type": "iceberg",
|
||||
"configuration": {
|
||||
"table_format": "iceberg",
|
||||
"catalog_namespace": "dbt",
|
||||
"internal_namespace": {
|
||||
"database": "my_db",
|
||||
"schema": "my_schema",
|
||||
},
|
||||
"external_location": "s3://my-bucket/my-path",
|
||||
},
|
||||
"management": {
|
||||
"enabled": True,
|
||||
"create_if_not_exists": False,
|
||||
"alter_if_different": False,
|
||||
"read_only": True,
|
||||
"refresh": "on-start",
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
write_file(yaml.safe_dump(config), project_root, "catalog.yml")
|
||||
|
||||
|
||||
class TestCatalogConfig:
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {
|
||||
"model.sql": "select 1 as id from {{ source('my_external_catalog', 'my_table') }}",
|
||||
}
|
||||
|
||||
def test_supplying_external_catalog(self, project):
|
||||
manifest = run_dbt(["parse"])
|
||||
assert manifest.catalogs != {}
|
||||
assert manifest.nodes["model.test.model"].sources == [["my_external_catalog", "my_table"]]
|
||||
ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"])
|
||||
@@ -6,12 +6,15 @@ issues.
|
||||
|
||||
import os
|
||||
import string
|
||||
from typing import Dict
|
||||
from unittest import TestCase, mock
|
||||
|
||||
import agate
|
||||
import pytest
|
||||
from dbt_config.catalog_config import ExternalCatalogConfig
|
||||
|
||||
from dbt.config.project import PartialProject
|
||||
from dbt.config.renderer import CatalogRenderer
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
from dbt_common.dataclass_schema import ValidationError
|
||||
|
||||
@@ -57,6 +60,14 @@ def profile_from_dict(profile, profile_name, cli_vars="{}"):
|
||||
)
|
||||
|
||||
|
||||
def catalog_from_dict(catalog, cli_vars=None):
|
||||
if cli_vars is None:
|
||||
cli_vars = {}
|
||||
renderer = CatalogRenderer(cli_vars)
|
||||
rendered = renderer.render_value(catalog)
|
||||
return ExternalCatalogConfig.model_validate(rendered)
|
||||
|
||||
|
||||
def project_from_dict(project, profile, packages=None, selectors=None, cli_vars="{}"):
|
||||
from dbt.config.renderer import DbtProjectYamlRenderer
|
||||
from dbt.config.utils import parse_cli_vars
|
||||
@@ -77,7 +88,9 @@ def project_from_dict(project, profile, packages=None, selectors=None, cli_vars=
|
||||
return partial.render(renderer)
|
||||
|
||||
|
||||
def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars={}):
|
||||
def config_from_parts_or_dicts(
|
||||
project, profile, packages=None, selectors=None, cli_vars={}, catalogs=None
|
||||
):
|
||||
from copy import deepcopy
|
||||
|
||||
from dbt.config import Profile, Project, RuntimeConfig
|
||||
@@ -103,10 +116,13 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None,
|
||||
cli_vars,
|
||||
)
|
||||
|
||||
if isinstance(catalogs, Dict):
|
||||
catalogs = catalog_from_dict(catalogs, cli_vars)
|
||||
|
||||
args = Obj()
|
||||
args.vars = cli_vars
|
||||
args.profile_dir = "/dev/null"
|
||||
return RuntimeConfig.from_parts(project=project, profile=profile, args=args)
|
||||
return RuntimeConfig.from_parts(project=project, profile=profile, args=args, catalogs=catalogs)
|
||||
|
||||
|
||||
def inject_plugin(plugin):
|
||||
|
||||
Reference in New Issue
Block a user