Compare commits

...

27 Commits

Author SHA1 Message Date
Jeremy Cohen
8e810db848 Feedback on PR 5392 2022-12-14 20:14:52 +01:00
Axel Goblet
fffb329ed1 black 2022-09-23 11:32:19 +02:00
Axel Goblet
38e8a17bb4 change warning output 2022-09-23 09:54:45 +02:00
Axel Goblet
ccd0891328 add schema name generation 2022-09-23 09:43:38 +02:00
Axel Goblet
f7b53edbf6 simplify manage task 2022-09-21 08:44:42 +02:00
Axel Goblet
e2d33fb297 add changie 2022-09-20 12:26:26 +02:00
Axel Goblet
dcf5ba5061 remove todo 2022-09-20 11:24:45 +02:00
Axel Goblet
846d92a5af make SchemaManagementConfiguration extensible 2022-09-20 11:15:54 +02:00
Axel Goblet
d114313761 formatting 2022-09-20 10:59:16 +02:00
Axel Goblet
5fa4b39fb8 update tests 2022-09-20 10:51:00 +02:00
Axel Goblet
27aee8e8e6 fix adapter cache 2022-09-20 10:03:06 +02:00
Axel Goblet
7f867e61a4 make actions enum 2022-09-19 09:54:58 +02:00
Axel Goblet
6a6e387198 simplify manage task 2022-09-19 09:22:01 +02:00
Axel Goblet
445820051d remove file 2022-09-16 16:57:30 +02:00
Axel Goblet
b8d944d93f remove file 2022-09-16 16:56:01 +02:00
Axel Goblet
3a17595a08 wip: move manage logic to separate command 2022-09-16 14:59:21 +02:00
Aram Panasenco
b765429dab Include py.typed in MANIFEST.in (#5703)
This enables packages that install dbt-core from pypi to use mypy.
2022-09-16 14:59:21 +02:00
Batuhan Taskaya
713be18ede Add --target-path as a CLI option. (#5402) 2022-09-16 14:59:21 +02:00
Bram Neijt
09424d73af Manage schemas is optional 2022-09-16 14:59:20 +02:00
agoblet
4754fdc9df Delete tmp.csv 2022-09-16 14:59:20 +02:00
Axel Goblet
94d477dad5 make tests more dry 2022-09-16 14:59:20 +02:00
Axel Goblet
bb0e8bbe6a add unmanaged schema test 2022-09-16 14:59:20 +02:00
Axel Goblet
fcce48b992 add view dropping test 2022-09-16 14:59:20 +02:00
Axel Goblet
c60fa4698e rename tests 2022-09-16 14:59:20 +02:00
Axel Goblet
161d3ae976 improve tests 2022-09-16 14:59:19 +02:00
Axel Goblet
de5d13bc62 add noop and warn tests 2022-09-16 14:59:19 +02:00
Bram Neijt
4e2273c1ef Add management schema feature 2022-09-16 14:59:19 +02:00
9 changed files with 556 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
kind: Features
body: Added a `manage` CLI command that allows users to drop unused database relations
time: 2022-09-20T12:25:29.226182+02:00
custom:
Author: agoblet bneijt
Issue: "4957"
PR: "5392"

View File

@@ -36,6 +36,7 @@ from dbt.config.selectors import SelectorDict
from dbt.contracts.project import (
Project as ProjectContract,
SemverString,
SchemaManagementConfiguration,
)
from dbt.contracts.project import PackageConfig
from dbt.dataclass_schema import ValidationError
@@ -361,6 +362,7 @@ class PartialProject(RenderComponents):
model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths
)
managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, [])
docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
asset_paths: List[str] = value_or(cfg.asset_paths, [])
target_path: str = flag_or(flags.TARGET_PATH, cfg.target_path, "target")
@@ -424,6 +426,7 @@ class PartialProject(RenderComponents):
asset_paths=asset_paths,
target_path=target_path,
snapshot_paths=snapshot_paths,
managed_schemas=managed_schemas,
clean_targets=clean_targets,
log_path=log_path,
packages_install_path=packages_install_path,
@@ -531,6 +534,7 @@ class Project:
asset_paths: List[str]
target_path: str
snapshot_paths: List[str]
managed_schemas: List[SchemaManagementConfiguration]
clean_targets: List[str]
log_path: str
packages_install_path: str
@@ -604,6 +608,7 @@ class Project:
"asset-paths": self.asset_paths,
"target-path": self.target_path,
"snapshot-paths": self.snapshot_paths,
"managed-schemas": [schema.to_dict() for schema in self.managed_schemas],
"clean-targets": self.clean_targets,
"log-path": self.log_path,
"quoting": self.quoting,

View File

@@ -88,6 +88,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
asset_paths=project.asset_paths,
target_path=project.target_path,
snapshot_paths=project.snapshot_paths,
managed_schemas=project.managed_schemas,
clean_targets=project.clean_targets,
log_path=project.log_path,
packages_install_path=project.packages_install_path,
@@ -520,6 +521,7 @@ class UnsetProfileConfig(RuntimeConfig):
asset_paths=project.asset_paths,
target_path=project.target_path,
snapshot_paths=project.snapshot_paths,
managed_schemas=project.managed_schemas,
clean_targets=project.clean_targets,
log_path=project.log_path,
packages_install_path=project.packages_install_path,

View File

@@ -7,6 +7,7 @@ from dbt.dataclass_schema import (
HyphenatedDbtClassMixin,
ExtensibleDbtClassMixin,
register_pattern,
StrEnum,
)
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Union, Any
@@ -160,6 +161,35 @@ BANNED_PROJECT_NAMES = {
}
class PruneModelsAction(StrEnum):
SKIP = "skip"
DROP = "drop"
WARN = "warn"
@dataclass
class SchemaManagementConfiguration(HyphenatedDbtClassMixin, Replaceable):
# TODOs:
# (1) Do uniqueness validation here, to ensure that we don't get the same 'database' + 'schema' combo twice.
# (2) Support adapter-specific aliases for 'database' + 'schema'
# (e.g. 'project' + 'dataset' for BigQuery, 'catalog' for Spark/Databricks)
# There is a 'translate_aliases' method defined on adapters for this purpose.
database: Optional[str] = None
schema: Optional[str] = None
prune_models: Optional[PruneModelsAction] = None
@classmethod
def validate(cls, data):
super().validate(data)
# (1) validate uniqueness
@classmethod
def __pre_deserialize__(cls, data):
data = super().__pre_deserialize__(data)
# (2) data = cls.translate_aliases(data)
return data
@dataclass
class Project(HyphenatedDbtClassMixin, Replaceable):
name: Identifier
@@ -177,6 +207,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
asset_paths: Optional[List[str]] = None
target_path: Optional[str] = None
snapshot_paths: Optional[List[str]] = None
managed_schemas: Optional[List[SchemaManagementConfiguration]] = None
clean_targets: Optional[List[str]] = None
profile: Optional[str] = None
log_path: Optional[str] = None

View File

@@ -36,6 +36,7 @@ import dbt.task.seed as seed_task
import dbt.task.serve as serve_task
import dbt.task.snapshot as snapshot_task
import dbt.task.test as test_task
import dbt.task.manage as manage_task
from dbt.profiler import profiler
from dbt.adapters.factory import reset_adapters, cleanup_connections
@@ -448,6 +449,22 @@ def _build_debug_subparser(subparsers, base_subparser):
return sub
# TODO: create in new CLI after rebasing against 'main'
def _build_manage_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"manage",
parents=[base_subparser],
help="""
Drops relations that are present in the database and absent in the DBT models.
Not to be confused with the clean command which deletes folders rather than relations.
""",
)
_add_version_check(sub)
sub.set_defaults(cls=manage_task.ManageTask, which="manage", rpc_method="manage")
return sub
def _build_deps_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"deps",
@@ -1150,6 +1167,7 @@ def parse_args(args, cls=DBTArgumentParser):
_build_debug_subparser(subs, base_subparser)
_build_deps_subparser(subs, base_subparser)
_build_list_subparser(subs, base_subparser)
_build_manage_subparser(subs, base_subparser)
build_sub = _build_build_subparser(subs, base_subparser)
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)

191
core/dbt/task/manage.py Normal file
View File

@@ -0,0 +1,191 @@
# coding=utf-8
from typing import Dict, Set, Tuple, List
from .runnable import ManifestTask, GraphRunnableTask
from dbt.exceptions import warn_or_error, ValidationException
from dbt.adapters.factory import get_adapter
from dbt.contracts.graph.parsed import (
ParsedModelNode,
)
from dbt.contracts.project import PruneModelsAction
from dbt.utils import executor
from concurrent.futures import as_completed, Future
from dbt.adapters.base import BaseRelation
class ManageTask(ManifestTask):
def _runtime_initialize(self):
# we just need to load the manifest
# we don't actually need to 'compile' it into a DAG/queue, if we're not doing node selection
self.load_manifest()
def before_run(self, adapter):
with adapter.connection_named("master"):
# this is a bad look! but we just want to borrow this one method
# populating the cache will speed up calls to 'list_relations' further down
# in all other ways, ManageTask is much more like a ManifestTask than a GraphRunnableTask
# (e.g. it doesn't need a DAG queue)
GraphRunnableTask.populate_adapter_cache(self, adapter)
def run(self):
self._runtime_initialize()
adapter = get_adapter(self.config)
self.before_run(adapter)
try:
with adapter.connection_named("master"):
adapter.clear_transaction()
self._prune_models(adapter)
finally:
adapter.cleanup_connections()
def _prune_models(self, adapter):
# TODO: do this uniqueness check as part of config validation (= parsing),
# instead of during task/runtime
self._assert_schema_uniqueness()
if len(self.config.managed_schemas) == 0:
# TODO: turn into structured event
warn_or_error("No schemas configured to manage")
return
models_in_codebase: Set[Tuple[Tuple[str, str, str]]] = set(
(
# what the user has configured - use this one for matching
(n.config.database, n.config.schema),
# the resolved values - actual location in database - use this one for lookup
(n.database, n.schema, n.alias),
)
for n in self.manifest.nodes.values()
if isinstance(n, ParsedModelNode) # TODO: seeds? tests via --store-failures?
)
should_drop = set()
should_warn = set()
for config in self.config.managed_schemas:
target_action = config.prune_models or PruneModelsAction.SKIP
# if set to 'skip', let's skip right away!
if target_action == PruneModelsAction.SKIP:
continue
# these are the 'database' and 'schema' configs set in dbt_project.yml
managed_db_config = config.database
managed_schema_config = config.schema
# THE PLAN
# match models from the manifest that have the same configured values of 'database' + 'schema'
# then, look up their resolved 'database' + 'schema'.
# Depending on custom behavior of generate_X_name macros,
# this relationship (configured -> resolved) could be one -> one, one -> many, or many -> one
resolved_schemas_to_check = set()
models = [
model
for model in models_in_codebase
if (
(
model[0][0] == managed_db_config
or (model[0][0] is None and managed_db_config is None)
)
and (
model[0][1] == managed_schema_config
or (model[0][1] is None and managed_schema_config is None)
)
)
]
if len(models) == 0:
# TODO: turn into structured event
warn_or_error(
f"While managing configured schema '{managed_db_config}.{managed_schema_config}': No models found with matching config"
)
for model in models:
# now store the resolved 'database' + 'schema' values of matched nodes
resolved_schemas_to_check.add((model[1][0], model[1][1]))
# It's also possible that the last model from a 'managed schema' was just deleted or disabled.
# For thoroughness, we'll also resolve the naive (not-node-specific) result of 'generate_X_name' macros, and check those too.
generated_database_name = adapter.execute_macro(
"generate_database_name", kwargs={"custom_database_name": config.database}
)
generated_schema_name = adapter.execute_macro(
"generate_schema_name", kwargs={"custom_schema_name": config.schema}
)
resolved_schemas_to_check.add((generated_database_name, generated_schema_name))
# It's possible that we've already checked this schema via another managed_schema config (the many -> one case)
# but we still want to tag the resolved objects with the managed_schema action, in case it's 'warn' in one case and 'drop' in another
# in any case, these calls to 'list_relations' should be cache hits
for database, schema in resolved_schemas_to_check:
models_in_database: Dict[Tuple[str, str, str], str] = {
(database, schema, relation.identifier): relation
for relation in adapter.list_relations(database, schema)
}
if len(models_in_database) == 0:
# TODO: turn into structured event
warn_or_error(
f"No objects in managed schema '{database}.{schema}', resolved from config '{managed_db_config}.{managed_schema_config}'"
)
# compare 'found' models_in_database against 'expected' values from manifest
expected_models = set([model[1] for model in models_in_codebase])
should_act_upon = models_in_database.keys() - expected_models
for (target_database, target_schema, target_identifier) in sorted(should_act_upon):
object = models_in_database[
(target_database, target_schema, target_identifier)
]
if target_action == PruneModelsAction.WARN:
should_warn.add(object)
elif target_action == PruneModelsAction.DROP:
should_drop.add(object)
# If the same object appeared in both the 'warn' and 'drop' categories,
# let's err on the safe side: warn, don't drop.
# This can happen due to custom behavior in generate_X_name macros, such that
# the relationship from configured -> resolved is many -> one,
# and the same object will be found in different 'managed_schemas' with different configured actions.
# Example pattern: `generate_schema_name_for_env` with `target.name != prod`. All custom schemas are ignored, and every model just lands in {target.schema}.
to_drop = should_drop - should_warn
# First, warn about everything configured to warn
for object in should_warn:
# TODO: turn into structured event
message = f"Found unused object {object}"
if object in should_drop:
message = message + ", not dropping because also configured to warn"
warn_or_error(message)
# Now, drop everything that needs dropping
# with concurrency up to number of configured --threads
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for object in to_drop:
# TODO: turn into structured event
print(f"Dropping {object}")
fut = tpe.submit_connected(
adapter,
f"drop_{object.database}_{object.schema}",
adapter.drop_relation,
object,
)
futures.append(fut)
for fut in as_completed(futures):
# trigger/re-raise any exceptions while dropping relations
fut.result()
def _assert_schema_uniqueness(self):
schemas = set()
for config in self.config.managed_schemas:
schema = (config.database, config.schema)
if schema in schemas:
raise ValidationException(f"Duplicate schema found: {schema}")
schemas.add(schema)
def interpret_results(self, results):
# if all queries succeed, this will return exit code 0 ("success")
# if any query fails, or warning + --warn-error, it will return exit code 2 ("unhandled")
return True

View File

@@ -430,6 +430,11 @@ class TestProjInfo:
result = self.run_sql(sql, fetch="all")
return {model_name: materialization for (model_name, materialization) in result}
def update_models(self, models: dict):
"""Update the modules in the test project"""
self.project_root.join("models").remove()
write_project_files(self.project_root, "models", models)
# This is the main fixture that is used in all functional tests. It pulls in the other
# fixtures that are necessary to set up a dbt project, and saves some of the information

View File

@@ -0,0 +1 @@
Test schema management as introduced by https://github.com/dbt-labs/dbt-core/issues/4957

View File

@@ -0,0 +1,296 @@
import pytest
import os
from dbt.exceptions import CompilationException, ValidationException
from dbt.tests.util import run_dbt, check_table_does_exist, check_table_does_not_exist
def model(materialized, unique_schema=None):
return f"""
{{{{
config(
materialized = "{materialized}",
schema = {f'"{unique_schema}"' if unique_schema is not None else "None"}
)
}}}}
SELECT * FROM (
VALUES (1, 'one'),
(2, 'two'),
(3, 'three')
) AS t (num,letter)
"""
class Base:
materialized = "table"
@pytest.fixture(scope="class")
def models(self):
return {
"model_a.sql": model(self.materialized),
"model_b.sql": model(self.materialized),
}
@pytest.fixture(scope="class")
def dbt_profile_target(self):
return {
"type": "postgres",
"threads": 4,
"host": "localhost",
"port": int(os.getenv("POSTGRES_TEST_PORT", 5432)),
"user": os.getenv("POSTGRES_TEST_USER", "root"),
"pass": os.getenv("POSTGRES_TEST_PASS", "password"),
"dbname": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
}
class TestMissingConfiguration(Base):
def test_should_raise_exception(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
with pytest.raises(CompilationException):
run_dbt(["--warn-error", "manage"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
def test_should_not_delete_anything(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
run_dbt(["manage"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
class TestUnmanagedSchema(TestMissingConfiguration):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"schema": "some_other_schema",
"prune-models": "drop",
}
]
}
class TestEmptyConfiguration(TestMissingConfiguration):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {"managed-schemas": []}
class TestWarn(TestMissingConfiguration):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "warn",
}
]
}
class TestDrop(Base):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "drop",
}
]
}
def test(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
run_dbt(["manage"])
check_table_does_not_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
class TestDropView(TestDrop):
materialized = "view"
class TestSkip(Base):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "skip",
}
]
}
def test_should_not_raise_exception(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
run_dbt(["--warn-error", "manage"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
def test_should_not_delete_anything(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
run_dbt(["manage"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
class TestDefaultAction(TestSkip):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
}
]
}
class TestCustomSchema(Base):
custom_schema = "custom"
@pytest.fixture(scope="class")
def models(self):
return {
"model_a.sql": model(self.materialized, self.custom_schema),
"model_b.sql": model(self.materialized, self.custom_schema),
}
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "drop",
"schema": self.custom_schema,
}
]
}
def test(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_a")
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
project.update_models(
{
"model_a.sql": model(self.materialized),
"model_b.sql": model(self.materialized, self.custom_schema),
}
)
run_dbt(["manage"])
check_table_does_not_exist(
project.adapter, f"{self._generate_schema_name(project)}.model_a"
)
check_table_does_not_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
def _generate_schema_name(self, project):
return f"{project.test_schema}_{self.custom_schema}"
class TestDuplicateConfiguration(Base):
@pytest.fixture(scope="class")
def project_config_update(self, unique_schema):
return {
"managed-schemas": [
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "drop",
},
{
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
"prune-models": "warn",
},
]
}
def test(
self,
project,
):
run_dbt(["run"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")
project.update_models(
{
"model_b.sql": model(self.materialized),
}
)
with pytest.raises(ValidationException):
run_dbt(["manage"])
check_table_does_exist(project.adapter, "model_a")
check_table_does_exist(project.adapter, "model_b")