mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-21 19:21:28 +00:00
Compare commits
27 Commits
enable-pos
...
jerco/mana
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e810db848 | ||
|
|
fffb329ed1 | ||
|
|
38e8a17bb4 | ||
|
|
ccd0891328 | ||
|
|
f7b53edbf6 | ||
|
|
e2d33fb297 | ||
|
|
dcf5ba5061 | ||
|
|
846d92a5af | ||
|
|
d114313761 | ||
|
|
5fa4b39fb8 | ||
|
|
27aee8e8e6 | ||
|
|
7f867e61a4 | ||
|
|
6a6e387198 | ||
|
|
445820051d | ||
|
|
b8d944d93f | ||
|
|
3a17595a08 | ||
|
|
b765429dab | ||
|
|
713be18ede | ||
|
|
09424d73af | ||
|
|
4754fdc9df | ||
|
|
94d477dad5 | ||
|
|
bb0e8bbe6a | ||
|
|
fcce48b992 | ||
|
|
c60fa4698e | ||
|
|
161d3ae976 | ||
|
|
de5d13bc62 | ||
|
|
4e2273c1ef |
7
.changes/unreleased/Features-20220920-122529.yaml
Normal file
7
.changes/unreleased/Features-20220920-122529.yaml
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
kind: Features
|
||||||
|
body: Added a `manage` CLI command that allows users to drop unused database relations
|
||||||
|
time: 2022-09-20T12:25:29.226182+02:00
|
||||||
|
custom:
|
||||||
|
Author: agoblet bneijt
|
||||||
|
Issue: "4957"
|
||||||
|
PR: "5392"
|
||||||
@@ -36,6 +36,7 @@ from dbt.config.selectors import SelectorDict
|
|||||||
from dbt.contracts.project import (
|
from dbt.contracts.project import (
|
||||||
Project as ProjectContract,
|
Project as ProjectContract,
|
||||||
SemverString,
|
SemverString,
|
||||||
|
SchemaManagementConfiguration,
|
||||||
)
|
)
|
||||||
from dbt.contracts.project import PackageConfig
|
from dbt.contracts.project import PackageConfig
|
||||||
from dbt.dataclass_schema import ValidationError
|
from dbt.dataclass_schema import ValidationError
|
||||||
@@ -361,6 +362,7 @@ class PartialProject(RenderComponents):
|
|||||||
model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths
|
model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths
|
||||||
)
|
)
|
||||||
|
|
||||||
|
managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, [])
|
||||||
docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
|
docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
|
||||||
asset_paths: List[str] = value_or(cfg.asset_paths, [])
|
asset_paths: List[str] = value_or(cfg.asset_paths, [])
|
||||||
target_path: str = flag_or(flags.TARGET_PATH, cfg.target_path, "target")
|
target_path: str = flag_or(flags.TARGET_PATH, cfg.target_path, "target")
|
||||||
@@ -424,6 +426,7 @@ class PartialProject(RenderComponents):
|
|||||||
asset_paths=asset_paths,
|
asset_paths=asset_paths,
|
||||||
target_path=target_path,
|
target_path=target_path,
|
||||||
snapshot_paths=snapshot_paths,
|
snapshot_paths=snapshot_paths,
|
||||||
|
managed_schemas=managed_schemas,
|
||||||
clean_targets=clean_targets,
|
clean_targets=clean_targets,
|
||||||
log_path=log_path,
|
log_path=log_path,
|
||||||
packages_install_path=packages_install_path,
|
packages_install_path=packages_install_path,
|
||||||
@@ -531,6 +534,7 @@ class Project:
|
|||||||
asset_paths: List[str]
|
asset_paths: List[str]
|
||||||
target_path: str
|
target_path: str
|
||||||
snapshot_paths: List[str]
|
snapshot_paths: List[str]
|
||||||
|
managed_schemas: List[SchemaManagementConfiguration]
|
||||||
clean_targets: List[str]
|
clean_targets: List[str]
|
||||||
log_path: str
|
log_path: str
|
||||||
packages_install_path: str
|
packages_install_path: str
|
||||||
@@ -604,6 +608,7 @@ class Project:
|
|||||||
"asset-paths": self.asset_paths,
|
"asset-paths": self.asset_paths,
|
||||||
"target-path": self.target_path,
|
"target-path": self.target_path,
|
||||||
"snapshot-paths": self.snapshot_paths,
|
"snapshot-paths": self.snapshot_paths,
|
||||||
|
"managed-schemas": [schema.to_dict() for schema in self.managed_schemas],
|
||||||
"clean-targets": self.clean_targets,
|
"clean-targets": self.clean_targets,
|
||||||
"log-path": self.log_path,
|
"log-path": self.log_path,
|
||||||
"quoting": self.quoting,
|
"quoting": self.quoting,
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
|||||||
asset_paths=project.asset_paths,
|
asset_paths=project.asset_paths,
|
||||||
target_path=project.target_path,
|
target_path=project.target_path,
|
||||||
snapshot_paths=project.snapshot_paths,
|
snapshot_paths=project.snapshot_paths,
|
||||||
|
managed_schemas=project.managed_schemas,
|
||||||
clean_targets=project.clean_targets,
|
clean_targets=project.clean_targets,
|
||||||
log_path=project.log_path,
|
log_path=project.log_path,
|
||||||
packages_install_path=project.packages_install_path,
|
packages_install_path=project.packages_install_path,
|
||||||
@@ -520,6 +521,7 @@ class UnsetProfileConfig(RuntimeConfig):
|
|||||||
asset_paths=project.asset_paths,
|
asset_paths=project.asset_paths,
|
||||||
target_path=project.target_path,
|
target_path=project.target_path,
|
||||||
snapshot_paths=project.snapshot_paths,
|
snapshot_paths=project.snapshot_paths,
|
||||||
|
managed_schemas=project.managed_schemas,
|
||||||
clean_targets=project.clean_targets,
|
clean_targets=project.clean_targets,
|
||||||
log_path=project.log_path,
|
log_path=project.log_path,
|
||||||
packages_install_path=project.packages_install_path,
|
packages_install_path=project.packages_install_path,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ from dbt.dataclass_schema import (
|
|||||||
HyphenatedDbtClassMixin,
|
HyphenatedDbtClassMixin,
|
||||||
ExtensibleDbtClassMixin,
|
ExtensibleDbtClassMixin,
|
||||||
register_pattern,
|
register_pattern,
|
||||||
|
StrEnum,
|
||||||
)
|
)
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Optional, List, Dict, Union, Any
|
from typing import Optional, List, Dict, Union, Any
|
||||||
@@ -160,6 +161,35 @@ BANNED_PROJECT_NAMES = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class PruneModelsAction(StrEnum):
|
||||||
|
SKIP = "skip"
|
||||||
|
DROP = "drop"
|
||||||
|
WARN = "warn"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SchemaManagementConfiguration(HyphenatedDbtClassMixin, Replaceable):
|
||||||
|
# TODOs:
|
||||||
|
# (1) Do uniqueness validation here, to ensure that we don't get the same 'database' + 'schema' combo twice.
|
||||||
|
# (2) Support adapter-specific aliases for 'database' + 'schema'
|
||||||
|
# (e.g. 'project' + 'dataset' for BigQuery, 'catalog' for Spark/Databricks)
|
||||||
|
# There is a 'translate_aliases' method defined on adapters for this purpose.
|
||||||
|
database: Optional[str] = None
|
||||||
|
schema: Optional[str] = None
|
||||||
|
prune_models: Optional[PruneModelsAction] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def validate(cls, data):
|
||||||
|
super().validate(data)
|
||||||
|
# (1) validate uniqueness
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def __pre_deserialize__(cls, data):
|
||||||
|
data = super().__pre_deserialize__(data)
|
||||||
|
# (2) data = cls.translate_aliases(data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Project(HyphenatedDbtClassMixin, Replaceable):
|
class Project(HyphenatedDbtClassMixin, Replaceable):
|
||||||
name: Identifier
|
name: Identifier
|
||||||
@@ -177,6 +207,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
|
|||||||
asset_paths: Optional[List[str]] = None
|
asset_paths: Optional[List[str]] = None
|
||||||
target_path: Optional[str] = None
|
target_path: Optional[str] = None
|
||||||
snapshot_paths: Optional[List[str]] = None
|
snapshot_paths: Optional[List[str]] = None
|
||||||
|
managed_schemas: Optional[List[SchemaManagementConfiguration]] = None
|
||||||
clean_targets: Optional[List[str]] = None
|
clean_targets: Optional[List[str]] = None
|
||||||
profile: Optional[str] = None
|
profile: Optional[str] = None
|
||||||
log_path: Optional[str] = None
|
log_path: Optional[str] = None
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ import dbt.task.seed as seed_task
|
|||||||
import dbt.task.serve as serve_task
|
import dbt.task.serve as serve_task
|
||||||
import dbt.task.snapshot as snapshot_task
|
import dbt.task.snapshot as snapshot_task
|
||||||
import dbt.task.test as test_task
|
import dbt.task.test as test_task
|
||||||
|
import dbt.task.manage as manage_task
|
||||||
from dbt.profiler import profiler
|
from dbt.profiler import profiler
|
||||||
from dbt.adapters.factory import reset_adapters, cleanup_connections
|
from dbt.adapters.factory import reset_adapters, cleanup_connections
|
||||||
|
|
||||||
@@ -448,6 +449,22 @@ def _build_debug_subparser(subparsers, base_subparser):
|
|||||||
return sub
|
return sub
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: create in new CLI after rebasing against 'main'
|
||||||
|
def _build_manage_subparser(subparsers, base_subparser):
|
||||||
|
sub = subparsers.add_parser(
|
||||||
|
"manage",
|
||||||
|
parents=[base_subparser],
|
||||||
|
help="""
|
||||||
|
Drops relations that are present in the database and absent in the DBT models.
|
||||||
|
|
||||||
|
Not to be confused with the clean command which deletes folders rather than relations.
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
_add_version_check(sub)
|
||||||
|
sub.set_defaults(cls=manage_task.ManageTask, which="manage", rpc_method="manage")
|
||||||
|
return sub
|
||||||
|
|
||||||
|
|
||||||
def _build_deps_subparser(subparsers, base_subparser):
|
def _build_deps_subparser(subparsers, base_subparser):
|
||||||
sub = subparsers.add_parser(
|
sub = subparsers.add_parser(
|
||||||
"deps",
|
"deps",
|
||||||
@@ -1150,6 +1167,7 @@ def parse_args(args, cls=DBTArgumentParser):
|
|||||||
_build_debug_subparser(subs, base_subparser)
|
_build_debug_subparser(subs, base_subparser)
|
||||||
_build_deps_subparser(subs, base_subparser)
|
_build_deps_subparser(subs, base_subparser)
|
||||||
_build_list_subparser(subs, base_subparser)
|
_build_list_subparser(subs, base_subparser)
|
||||||
|
_build_manage_subparser(subs, base_subparser)
|
||||||
|
|
||||||
build_sub = _build_build_subparser(subs, base_subparser)
|
build_sub = _build_build_subparser(subs, base_subparser)
|
||||||
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)
|
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)
|
||||||
|
|||||||
191
core/dbt/task/manage.py
Normal file
191
core/dbt/task/manage.py
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
# coding=utf-8
|
||||||
|
from typing import Dict, Set, Tuple, List
|
||||||
|
|
||||||
|
from .runnable import ManifestTask, GraphRunnableTask
|
||||||
|
from dbt.exceptions import warn_or_error, ValidationException
|
||||||
|
from dbt.adapters.factory import get_adapter
|
||||||
|
from dbt.contracts.graph.parsed import (
|
||||||
|
ParsedModelNode,
|
||||||
|
)
|
||||||
|
from dbt.contracts.project import PruneModelsAction
|
||||||
|
from dbt.utils import executor
|
||||||
|
from concurrent.futures import as_completed, Future
|
||||||
|
from dbt.adapters.base import BaseRelation
|
||||||
|
|
||||||
|
|
||||||
|
class ManageTask(ManifestTask):
|
||||||
|
def _runtime_initialize(self):
|
||||||
|
# we just need to load the manifest
|
||||||
|
# we don't actually need to 'compile' it into a DAG/queue, if we're not doing node selection
|
||||||
|
self.load_manifest()
|
||||||
|
|
||||||
|
def before_run(self, adapter):
|
||||||
|
with adapter.connection_named("master"):
|
||||||
|
# this is a bad look! but we just want to borrow this one method
|
||||||
|
# populating the cache will speed up calls to 'list_relations' further down
|
||||||
|
# in all other ways, ManageTask is much more like a ManifestTask than a GraphRunnableTask
|
||||||
|
# (e.g. it doesn't need a DAG queue)
|
||||||
|
GraphRunnableTask.populate_adapter_cache(self, adapter)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self._runtime_initialize()
|
||||||
|
adapter = get_adapter(self.config)
|
||||||
|
self.before_run(adapter)
|
||||||
|
try:
|
||||||
|
with adapter.connection_named("master"):
|
||||||
|
adapter.clear_transaction()
|
||||||
|
self._prune_models(adapter)
|
||||||
|
finally:
|
||||||
|
adapter.cleanup_connections()
|
||||||
|
|
||||||
|
def _prune_models(self, adapter):
|
||||||
|
# TODO: do this uniqueness check as part of config validation (= parsing),
|
||||||
|
# instead of during task/runtime
|
||||||
|
self._assert_schema_uniqueness()
|
||||||
|
|
||||||
|
if len(self.config.managed_schemas) == 0:
|
||||||
|
# TODO: turn into structured event
|
||||||
|
warn_or_error("No schemas configured to manage")
|
||||||
|
return
|
||||||
|
|
||||||
|
models_in_codebase: Set[Tuple[Tuple[str, str, str]]] = set(
|
||||||
|
(
|
||||||
|
# what the user has configured - use this one for matching
|
||||||
|
(n.config.database, n.config.schema),
|
||||||
|
# the resolved values - actual location in database - use this one for lookup
|
||||||
|
(n.database, n.schema, n.alias),
|
||||||
|
)
|
||||||
|
for n in self.manifest.nodes.values()
|
||||||
|
if isinstance(n, ParsedModelNode) # TODO: seeds? tests via --store-failures?
|
||||||
|
)
|
||||||
|
|
||||||
|
should_drop = set()
|
||||||
|
should_warn = set()
|
||||||
|
|
||||||
|
for config in self.config.managed_schemas:
|
||||||
|
target_action = config.prune_models or PruneModelsAction.SKIP
|
||||||
|
|
||||||
|
# if set to 'skip', let's skip right away!
|
||||||
|
if target_action == PruneModelsAction.SKIP:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# these are the 'database' and 'schema' configs set in dbt_project.yml
|
||||||
|
managed_db_config = config.database
|
||||||
|
managed_schema_config = config.schema
|
||||||
|
|
||||||
|
# THE PLAN
|
||||||
|
# match models from the manifest that have the same configured values of 'database' + 'schema'
|
||||||
|
# then, look up their resolved 'database' + 'schema'.
|
||||||
|
# Depending on custom behavior of generate_X_name macros,
|
||||||
|
# this relationship (configured -> resolved) could be one -> one, one -> many, or many -> one
|
||||||
|
|
||||||
|
resolved_schemas_to_check = set()
|
||||||
|
|
||||||
|
models = [
|
||||||
|
model
|
||||||
|
for model in models_in_codebase
|
||||||
|
if (
|
||||||
|
(
|
||||||
|
model[0][0] == managed_db_config
|
||||||
|
or (model[0][0] is None and managed_db_config is None)
|
||||||
|
)
|
||||||
|
and (
|
||||||
|
model[0][1] == managed_schema_config
|
||||||
|
or (model[0][1] is None and managed_schema_config is None)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
if len(models) == 0:
|
||||||
|
# TODO: turn into structured event
|
||||||
|
warn_or_error(
|
||||||
|
f"While managing configured schema '{managed_db_config}.{managed_schema_config}': No models found with matching config"
|
||||||
|
)
|
||||||
|
for model in models:
|
||||||
|
# now store the resolved 'database' + 'schema' values of matched nodes
|
||||||
|
resolved_schemas_to_check.add((model[1][0], model[1][1]))
|
||||||
|
|
||||||
|
# It's also possible that the last model from a 'managed schema' was just deleted or disabled.
|
||||||
|
# For thoroughness, we'll also resolve the naive (not-node-specific) result of 'generate_X_name' macros, and check those too.
|
||||||
|
generated_database_name = adapter.execute_macro(
|
||||||
|
"generate_database_name", kwargs={"custom_database_name": config.database}
|
||||||
|
)
|
||||||
|
generated_schema_name = adapter.execute_macro(
|
||||||
|
"generate_schema_name", kwargs={"custom_schema_name": config.schema}
|
||||||
|
)
|
||||||
|
resolved_schemas_to_check.add((generated_database_name, generated_schema_name))
|
||||||
|
|
||||||
|
# It's possible that we've already checked this schema via another managed_schema config (the many -> one case)
|
||||||
|
# but we still want to tag the resolved objects with the managed_schema action, in case it's 'warn' in one case and 'drop' in another
|
||||||
|
# in any case, these calls to 'list_relations' should be cache hits
|
||||||
|
for database, schema in resolved_schemas_to_check:
|
||||||
|
models_in_database: Dict[Tuple[str, str, str], str] = {
|
||||||
|
(database, schema, relation.identifier): relation
|
||||||
|
for relation in adapter.list_relations(database, schema)
|
||||||
|
}
|
||||||
|
if len(models_in_database) == 0:
|
||||||
|
# TODO: turn into structured event
|
||||||
|
warn_or_error(
|
||||||
|
f"No objects in managed schema '{database}.{schema}', resolved from config '{managed_db_config}.{managed_schema_config}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
# compare 'found' models_in_database against 'expected' values from manifest
|
||||||
|
expected_models = set([model[1] for model in models_in_codebase])
|
||||||
|
should_act_upon = models_in_database.keys() - expected_models
|
||||||
|
|
||||||
|
for (target_database, target_schema, target_identifier) in sorted(should_act_upon):
|
||||||
|
object = models_in_database[
|
||||||
|
(target_database, target_schema, target_identifier)
|
||||||
|
]
|
||||||
|
if target_action == PruneModelsAction.WARN:
|
||||||
|
should_warn.add(object)
|
||||||
|
elif target_action == PruneModelsAction.DROP:
|
||||||
|
should_drop.add(object)
|
||||||
|
|
||||||
|
# If the same object appeared in both the 'warn' and 'drop' categories,
|
||||||
|
# let's err on the safe side: warn, don't drop.
|
||||||
|
# This can happen due to custom behavior in generate_X_name macros, such that
|
||||||
|
# the relationship from configured -> resolved is many -> one,
|
||||||
|
# and the same object will be found in different 'managed_schemas' with different configured actions.
|
||||||
|
# Example pattern: `generate_schema_name_for_env` with `target.name != prod`. All custom schemas are ignored, and every model just lands in {target.schema}.
|
||||||
|
to_drop = should_drop - should_warn
|
||||||
|
|
||||||
|
# First, warn about everything configured to warn
|
||||||
|
for object in should_warn:
|
||||||
|
# TODO: turn into structured event
|
||||||
|
message = f"Found unused object {object}"
|
||||||
|
if object in should_drop:
|
||||||
|
message = message + ", not dropping because also configured to warn"
|
||||||
|
warn_or_error(message)
|
||||||
|
|
||||||
|
# Now, drop everything that needs dropping
|
||||||
|
# with concurrency up to number of configured --threads
|
||||||
|
with executor(self.config) as tpe:
|
||||||
|
futures: List[Future[List[BaseRelation]]] = []
|
||||||
|
for object in to_drop:
|
||||||
|
# TODO: turn into structured event
|
||||||
|
print(f"Dropping {object}")
|
||||||
|
fut = tpe.submit_connected(
|
||||||
|
adapter,
|
||||||
|
f"drop_{object.database}_{object.schema}",
|
||||||
|
adapter.drop_relation,
|
||||||
|
object,
|
||||||
|
)
|
||||||
|
futures.append(fut)
|
||||||
|
|
||||||
|
for fut in as_completed(futures):
|
||||||
|
# trigger/re-raise any exceptions while dropping relations
|
||||||
|
fut.result()
|
||||||
|
|
||||||
|
def _assert_schema_uniqueness(self):
|
||||||
|
schemas = set()
|
||||||
|
|
||||||
|
for config in self.config.managed_schemas:
|
||||||
|
schema = (config.database, config.schema)
|
||||||
|
if schema in schemas:
|
||||||
|
raise ValidationException(f"Duplicate schema found: {schema}")
|
||||||
|
schemas.add(schema)
|
||||||
|
|
||||||
|
def interpret_results(self, results):
|
||||||
|
# if all queries succeed, this will return exit code 0 ("success")
|
||||||
|
# if any query fails, or warning + --warn-error, it will return exit code 2 ("unhandled")
|
||||||
|
return True
|
||||||
5
core/dbt/tests/fixtures/project.py
vendored
5
core/dbt/tests/fixtures/project.py
vendored
@@ -430,6 +430,11 @@ class TestProjInfo:
|
|||||||
result = self.run_sql(sql, fetch="all")
|
result = self.run_sql(sql, fetch="all")
|
||||||
return {model_name: materialization for (model_name, materialization) in result}
|
return {model_name: materialization for (model_name, materialization) in result}
|
||||||
|
|
||||||
|
def update_models(self, models: dict):
|
||||||
|
"""Update the modules in the test project"""
|
||||||
|
self.project_root.join("models").remove()
|
||||||
|
write_project_files(self.project_root, "models", models)
|
||||||
|
|
||||||
|
|
||||||
# This is the main fixture that is used in all functional tests. It pulls in the other
|
# This is the main fixture that is used in all functional tests. It pulls in the other
|
||||||
# fixtures that are necessary to set up a dbt project, and saves some of the information
|
# fixtures that are necessary to set up a dbt project, and saves some of the information
|
||||||
|
|||||||
1
tests/functional/schema_management/README.md
Normal file
1
tests/functional/schema_management/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Test schema management as introduced by https://github.com/dbt-labs/dbt-core/issues/4957
|
||||||
296
tests/functional/schema_management/test_schema_management.py
Normal file
296
tests/functional/schema_management/test_schema_management.py
Normal file
@@ -0,0 +1,296 @@
|
|||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
from dbt.exceptions import CompilationException, ValidationException
|
||||||
|
from dbt.tests.util import run_dbt, check_table_does_exist, check_table_does_not_exist
|
||||||
|
|
||||||
|
|
||||||
|
def model(materialized, unique_schema=None):
|
||||||
|
return f"""
|
||||||
|
{{{{
|
||||||
|
config(
|
||||||
|
materialized = "{materialized}",
|
||||||
|
schema = {f'"{unique_schema}"' if unique_schema is not None else "None"}
|
||||||
|
)
|
||||||
|
}}}}
|
||||||
|
SELECT * FROM (
|
||||||
|
VALUES (1, 'one'),
|
||||||
|
(2, 'two'),
|
||||||
|
(3, 'three')
|
||||||
|
) AS t (num,letter)
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class Base:
|
||||||
|
materialized = "table"
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def models(self):
|
||||||
|
return {
|
||||||
|
"model_a.sql": model(self.materialized),
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def dbt_profile_target(self):
|
||||||
|
return {
|
||||||
|
"type": "postgres",
|
||||||
|
"threads": 4,
|
||||||
|
"host": "localhost",
|
||||||
|
"port": int(os.getenv("POSTGRES_TEST_PORT", 5432)),
|
||||||
|
"user": os.getenv("POSTGRES_TEST_USER", "root"),
|
||||||
|
"pass": os.getenv("POSTGRES_TEST_PASS", "password"),
|
||||||
|
"dbname": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestMissingConfiguration(Base):
|
||||||
|
def test_should_raise_exception(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
with pytest.raises(CompilationException):
|
||||||
|
run_dbt(["--warn-error", "manage"])
|
||||||
|
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
def test_should_not_delete_anything(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
run_dbt(["manage"])
|
||||||
|
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
|
||||||
|
class TestUnmanagedSchema(TestMissingConfiguration):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"schema": "some_other_schema",
|
||||||
|
"prune-models": "drop",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestEmptyConfiguration(TestMissingConfiguration):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {"managed-schemas": []}
|
||||||
|
|
||||||
|
|
||||||
|
class TestWarn(TestMissingConfiguration):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "warn",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestDrop(Base):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "drop",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def test(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
run_dbt(["manage"])
|
||||||
|
|
||||||
|
check_table_does_not_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
|
||||||
|
class TestDropView(TestDrop):
|
||||||
|
materialized = "view"
|
||||||
|
|
||||||
|
|
||||||
|
class TestSkip(Base):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "skip",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_should_not_raise_exception(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
run_dbt(["--warn-error", "manage"])
|
||||||
|
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
def test_should_not_delete_anything(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
run_dbt(["manage"])
|
||||||
|
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
|
||||||
|
class TestDefaultAction(TestSkip):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestCustomSchema(Base):
|
||||||
|
custom_schema = "custom"
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def models(self):
|
||||||
|
return {
|
||||||
|
"model_a.sql": model(self.materialized, self.custom_schema),
|
||||||
|
"model_b.sql": model(self.materialized, self.custom_schema),
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "drop",
|
||||||
|
"schema": self.custom_schema,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def test(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_a")
|
||||||
|
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_a.sql": model(self.materialized),
|
||||||
|
"model_b.sql": model(self.materialized, self.custom_schema),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
run_dbt(["manage"])
|
||||||
|
|
||||||
|
check_table_does_not_exist(
|
||||||
|
project.adapter, f"{self._generate_schema_name(project)}.model_a"
|
||||||
|
)
|
||||||
|
check_table_does_not_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
|
||||||
|
|
||||||
|
def _generate_schema_name(self, project):
|
||||||
|
return f"{project.test_schema}_{self.custom_schema}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestDuplicateConfiguration(Base):
|
||||||
|
@pytest.fixture(scope="class")
|
||||||
|
def project_config_update(self, unique_schema):
|
||||||
|
return {
|
||||||
|
"managed-schemas": [
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "drop",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||||
|
"prune-models": "warn",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def test(
|
||||||
|
self,
|
||||||
|
project,
|
||||||
|
):
|
||||||
|
run_dbt(["run"])
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
|
|
||||||
|
project.update_models(
|
||||||
|
{
|
||||||
|
"model_b.sql": model(self.materialized),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
with pytest.raises(ValidationException):
|
||||||
|
run_dbt(["manage"])
|
||||||
|
|
||||||
|
check_table_does_exist(project.adapter, "model_a")
|
||||||
|
check_table_does_exist(project.adapter, "model_b")
|
||||||
Reference in New Issue
Block a user