mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
27 Commits
jerco/pyth
...
jerco/mana
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e810db848 | ||
|
|
fffb329ed1 | ||
|
|
38e8a17bb4 | ||
|
|
ccd0891328 | ||
|
|
f7b53edbf6 | ||
|
|
e2d33fb297 | ||
|
|
dcf5ba5061 | ||
|
|
846d92a5af | ||
|
|
d114313761 | ||
|
|
5fa4b39fb8 | ||
|
|
27aee8e8e6 | ||
|
|
7f867e61a4 | ||
|
|
6a6e387198 | ||
|
|
445820051d | ||
|
|
b8d944d93f | ||
|
|
3a17595a08 | ||
|
|
b765429dab | ||
|
|
713be18ede | ||
|
|
09424d73af | ||
|
|
4754fdc9df | ||
|
|
94d477dad5 | ||
|
|
bb0e8bbe6a | ||
|
|
fcce48b992 | ||
|
|
c60fa4698e | ||
|
|
161d3ae976 | ||
|
|
de5d13bc62 | ||
|
|
4e2273c1ef |
7
.changes/unreleased/Features-20220920-122529.yaml
Normal file
7
.changes/unreleased/Features-20220920-122529.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
kind: Features
|
||||
body: Added a `manage` CLI command that allows users to drop unused database relations
|
||||
time: 2022-09-20T12:25:29.226182+02:00
|
||||
custom:
|
||||
Author: agoblet bneijt
|
||||
Issue: "4957"
|
||||
PR: "5392"
|
||||
@@ -36,6 +36,7 @@ from dbt.config.selectors import SelectorDict
|
||||
from dbt.contracts.project import (
|
||||
Project as ProjectContract,
|
||||
SemverString,
|
||||
SchemaManagementConfiguration,
|
||||
)
|
||||
from dbt.contracts.project import PackageConfig
|
||||
from dbt.dataclass_schema import ValidationError
|
||||
@@ -361,6 +362,7 @@ class PartialProject(RenderComponents):
|
||||
model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths
|
||||
)
|
||||
|
||||
managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, [])
|
||||
docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
|
||||
asset_paths: List[str] = value_or(cfg.asset_paths, [])
|
||||
target_path: str = flag_or(flags.TARGET_PATH, cfg.target_path, "target")
|
||||
@@ -424,6 +426,7 @@ class PartialProject(RenderComponents):
|
||||
asset_paths=asset_paths,
|
||||
target_path=target_path,
|
||||
snapshot_paths=snapshot_paths,
|
||||
managed_schemas=managed_schemas,
|
||||
clean_targets=clean_targets,
|
||||
log_path=log_path,
|
||||
packages_install_path=packages_install_path,
|
||||
@@ -531,6 +534,7 @@ class Project:
|
||||
asset_paths: List[str]
|
||||
target_path: str
|
||||
snapshot_paths: List[str]
|
||||
managed_schemas: List[SchemaManagementConfiguration]
|
||||
clean_targets: List[str]
|
||||
log_path: str
|
||||
packages_install_path: str
|
||||
@@ -604,6 +608,7 @@ class Project:
|
||||
"asset-paths": self.asset_paths,
|
||||
"target-path": self.target_path,
|
||||
"snapshot-paths": self.snapshot_paths,
|
||||
"managed-schemas": [schema.to_dict() for schema in self.managed_schemas],
|
||||
"clean-targets": self.clean_targets,
|
||||
"log-path": self.log_path,
|
||||
"quoting": self.quoting,
|
||||
|
||||
@@ -88,6 +88,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
|
||||
asset_paths=project.asset_paths,
|
||||
target_path=project.target_path,
|
||||
snapshot_paths=project.snapshot_paths,
|
||||
managed_schemas=project.managed_schemas,
|
||||
clean_targets=project.clean_targets,
|
||||
log_path=project.log_path,
|
||||
packages_install_path=project.packages_install_path,
|
||||
@@ -520,6 +521,7 @@ class UnsetProfileConfig(RuntimeConfig):
|
||||
asset_paths=project.asset_paths,
|
||||
target_path=project.target_path,
|
||||
snapshot_paths=project.snapshot_paths,
|
||||
managed_schemas=project.managed_schemas,
|
||||
clean_targets=project.clean_targets,
|
||||
log_path=project.log_path,
|
||||
packages_install_path=project.packages_install_path,
|
||||
|
||||
@@ -7,6 +7,7 @@ from dbt.dataclass_schema import (
|
||||
HyphenatedDbtClassMixin,
|
||||
ExtensibleDbtClassMixin,
|
||||
register_pattern,
|
||||
StrEnum,
|
||||
)
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, List, Dict, Union, Any
|
||||
@@ -160,6 +161,35 @@ BANNED_PROJECT_NAMES = {
|
||||
}
|
||||
|
||||
|
||||
class PruneModelsAction(StrEnum):
|
||||
SKIP = "skip"
|
||||
DROP = "drop"
|
||||
WARN = "warn"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SchemaManagementConfiguration(HyphenatedDbtClassMixin, Replaceable):
|
||||
# TODOs:
|
||||
# (1) Do uniqueness validation here, to ensure that we don't get the same 'database' + 'schema' combo twice.
|
||||
# (2) Support adapter-specific aliases for 'database' + 'schema'
|
||||
# (e.g. 'project' + 'dataset' for BigQuery, 'catalog' for Spark/Databricks)
|
||||
# There is a 'translate_aliases' method defined on adapters for this purpose.
|
||||
database: Optional[str] = None
|
||||
schema: Optional[str] = None
|
||||
prune_models: Optional[PruneModelsAction] = None
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data):
|
||||
super().validate(data)
|
||||
# (1) validate uniqueness
|
||||
|
||||
@classmethod
|
||||
def __pre_deserialize__(cls, data):
|
||||
data = super().__pre_deserialize__(data)
|
||||
# (2) data = cls.translate_aliases(data)
|
||||
return data
|
||||
|
||||
|
||||
@dataclass
|
||||
class Project(HyphenatedDbtClassMixin, Replaceable):
|
||||
name: Identifier
|
||||
@@ -177,6 +207,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable):
|
||||
asset_paths: Optional[List[str]] = None
|
||||
target_path: Optional[str] = None
|
||||
snapshot_paths: Optional[List[str]] = None
|
||||
managed_schemas: Optional[List[SchemaManagementConfiguration]] = None
|
||||
clean_targets: Optional[List[str]] = None
|
||||
profile: Optional[str] = None
|
||||
log_path: Optional[str] = None
|
||||
|
||||
@@ -36,6 +36,7 @@ import dbt.task.seed as seed_task
|
||||
import dbt.task.serve as serve_task
|
||||
import dbt.task.snapshot as snapshot_task
|
||||
import dbt.task.test as test_task
|
||||
import dbt.task.manage as manage_task
|
||||
from dbt.profiler import profiler
|
||||
from dbt.adapters.factory import reset_adapters, cleanup_connections
|
||||
|
||||
@@ -448,6 +449,22 @@ def _build_debug_subparser(subparsers, base_subparser):
|
||||
return sub
|
||||
|
||||
|
||||
# TODO: create in new CLI after rebasing against 'main'
|
||||
def _build_manage_subparser(subparsers, base_subparser):
|
||||
sub = subparsers.add_parser(
|
||||
"manage",
|
||||
parents=[base_subparser],
|
||||
help="""
|
||||
Drops relations that are present in the database and absent in the DBT models.
|
||||
|
||||
Not to be confused with the clean command which deletes folders rather than relations.
|
||||
""",
|
||||
)
|
||||
_add_version_check(sub)
|
||||
sub.set_defaults(cls=manage_task.ManageTask, which="manage", rpc_method="manage")
|
||||
return sub
|
||||
|
||||
|
||||
def _build_deps_subparser(subparsers, base_subparser):
|
||||
sub = subparsers.add_parser(
|
||||
"deps",
|
||||
@@ -1150,6 +1167,7 @@ def parse_args(args, cls=DBTArgumentParser):
|
||||
_build_debug_subparser(subs, base_subparser)
|
||||
_build_deps_subparser(subs, base_subparser)
|
||||
_build_list_subparser(subs, base_subparser)
|
||||
_build_manage_subparser(subs, base_subparser)
|
||||
|
||||
build_sub = _build_build_subparser(subs, base_subparser)
|
||||
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)
|
||||
|
||||
191
core/dbt/task/manage.py
Normal file
191
core/dbt/task/manage.py
Normal file
@@ -0,0 +1,191 @@
|
||||
# coding=utf-8
|
||||
from typing import Dict, Set, Tuple, List
|
||||
|
||||
from .runnable import ManifestTask, GraphRunnableTask
|
||||
from dbt.exceptions import warn_or_error, ValidationException
|
||||
from dbt.adapters.factory import get_adapter
|
||||
from dbt.contracts.graph.parsed import (
|
||||
ParsedModelNode,
|
||||
)
|
||||
from dbt.contracts.project import PruneModelsAction
|
||||
from dbt.utils import executor
|
||||
from concurrent.futures import as_completed, Future
|
||||
from dbt.adapters.base import BaseRelation
|
||||
|
||||
|
||||
class ManageTask(ManifestTask):
|
||||
def _runtime_initialize(self):
|
||||
# we just need to load the manifest
|
||||
# we don't actually need to 'compile' it into a DAG/queue, if we're not doing node selection
|
||||
self.load_manifest()
|
||||
|
||||
def before_run(self, adapter):
|
||||
with adapter.connection_named("master"):
|
||||
# this is a bad look! but we just want to borrow this one method
|
||||
# populating the cache will speed up calls to 'list_relations' further down
|
||||
# in all other ways, ManageTask is much more like a ManifestTask than a GraphRunnableTask
|
||||
# (e.g. it doesn't need a DAG queue)
|
||||
GraphRunnableTask.populate_adapter_cache(self, adapter)
|
||||
|
||||
def run(self):
|
||||
self._runtime_initialize()
|
||||
adapter = get_adapter(self.config)
|
||||
self.before_run(adapter)
|
||||
try:
|
||||
with adapter.connection_named("master"):
|
||||
adapter.clear_transaction()
|
||||
self._prune_models(adapter)
|
||||
finally:
|
||||
adapter.cleanup_connections()
|
||||
|
||||
def _prune_models(self, adapter):
|
||||
# TODO: do this uniqueness check as part of config validation (= parsing),
|
||||
# instead of during task/runtime
|
||||
self._assert_schema_uniqueness()
|
||||
|
||||
if len(self.config.managed_schemas) == 0:
|
||||
# TODO: turn into structured event
|
||||
warn_or_error("No schemas configured to manage")
|
||||
return
|
||||
|
||||
models_in_codebase: Set[Tuple[Tuple[str, str, str]]] = set(
|
||||
(
|
||||
# what the user has configured - use this one for matching
|
||||
(n.config.database, n.config.schema),
|
||||
# the resolved values - actual location in database - use this one for lookup
|
||||
(n.database, n.schema, n.alias),
|
||||
)
|
||||
for n in self.manifest.nodes.values()
|
||||
if isinstance(n, ParsedModelNode) # TODO: seeds? tests via --store-failures?
|
||||
)
|
||||
|
||||
should_drop = set()
|
||||
should_warn = set()
|
||||
|
||||
for config in self.config.managed_schemas:
|
||||
target_action = config.prune_models or PruneModelsAction.SKIP
|
||||
|
||||
# if set to 'skip', let's skip right away!
|
||||
if target_action == PruneModelsAction.SKIP:
|
||||
continue
|
||||
|
||||
# these are the 'database' and 'schema' configs set in dbt_project.yml
|
||||
managed_db_config = config.database
|
||||
managed_schema_config = config.schema
|
||||
|
||||
# THE PLAN
|
||||
# match models from the manifest that have the same configured values of 'database' + 'schema'
|
||||
# then, look up their resolved 'database' + 'schema'.
|
||||
# Depending on custom behavior of generate_X_name macros,
|
||||
# this relationship (configured -> resolved) could be one -> one, one -> many, or many -> one
|
||||
|
||||
resolved_schemas_to_check = set()
|
||||
|
||||
models = [
|
||||
model
|
||||
for model in models_in_codebase
|
||||
if (
|
||||
(
|
||||
model[0][0] == managed_db_config
|
||||
or (model[0][0] is None and managed_db_config is None)
|
||||
)
|
||||
and (
|
||||
model[0][1] == managed_schema_config
|
||||
or (model[0][1] is None and managed_schema_config is None)
|
||||
)
|
||||
)
|
||||
]
|
||||
if len(models) == 0:
|
||||
# TODO: turn into structured event
|
||||
warn_or_error(
|
||||
f"While managing configured schema '{managed_db_config}.{managed_schema_config}': No models found with matching config"
|
||||
)
|
||||
for model in models:
|
||||
# now store the resolved 'database' + 'schema' values of matched nodes
|
||||
resolved_schemas_to_check.add((model[1][0], model[1][1]))
|
||||
|
||||
# It's also possible that the last model from a 'managed schema' was just deleted or disabled.
|
||||
# For thoroughness, we'll also resolve the naive (not-node-specific) result of 'generate_X_name' macros, and check those too.
|
||||
generated_database_name = adapter.execute_macro(
|
||||
"generate_database_name", kwargs={"custom_database_name": config.database}
|
||||
)
|
||||
generated_schema_name = adapter.execute_macro(
|
||||
"generate_schema_name", kwargs={"custom_schema_name": config.schema}
|
||||
)
|
||||
resolved_schemas_to_check.add((generated_database_name, generated_schema_name))
|
||||
|
||||
# It's possible that we've already checked this schema via another managed_schema config (the many -> one case)
|
||||
# but we still want to tag the resolved objects with the managed_schema action, in case it's 'warn' in one case and 'drop' in another
|
||||
# in any case, these calls to 'list_relations' should be cache hits
|
||||
for database, schema in resolved_schemas_to_check:
|
||||
models_in_database: Dict[Tuple[str, str, str], str] = {
|
||||
(database, schema, relation.identifier): relation
|
||||
for relation in adapter.list_relations(database, schema)
|
||||
}
|
||||
if len(models_in_database) == 0:
|
||||
# TODO: turn into structured event
|
||||
warn_or_error(
|
||||
f"No objects in managed schema '{database}.{schema}', resolved from config '{managed_db_config}.{managed_schema_config}'"
|
||||
)
|
||||
|
||||
# compare 'found' models_in_database against 'expected' values from manifest
|
||||
expected_models = set([model[1] for model in models_in_codebase])
|
||||
should_act_upon = models_in_database.keys() - expected_models
|
||||
|
||||
for (target_database, target_schema, target_identifier) in sorted(should_act_upon):
|
||||
object = models_in_database[
|
||||
(target_database, target_schema, target_identifier)
|
||||
]
|
||||
if target_action == PruneModelsAction.WARN:
|
||||
should_warn.add(object)
|
||||
elif target_action == PruneModelsAction.DROP:
|
||||
should_drop.add(object)
|
||||
|
||||
# If the same object appeared in both the 'warn' and 'drop' categories,
|
||||
# let's err on the safe side: warn, don't drop.
|
||||
# This can happen due to custom behavior in generate_X_name macros, such that
|
||||
# the relationship from configured -> resolved is many -> one,
|
||||
# and the same object will be found in different 'managed_schemas' with different configured actions.
|
||||
# Example pattern: `generate_schema_name_for_env` with `target.name != prod`. All custom schemas are ignored, and every model just lands in {target.schema}.
|
||||
to_drop = should_drop - should_warn
|
||||
|
||||
# First, warn about everything configured to warn
|
||||
for object in should_warn:
|
||||
# TODO: turn into structured event
|
||||
message = f"Found unused object {object}"
|
||||
if object in should_drop:
|
||||
message = message + ", not dropping because also configured to warn"
|
||||
warn_or_error(message)
|
||||
|
||||
# Now, drop everything that needs dropping
|
||||
# with concurrency up to number of configured --threads
|
||||
with executor(self.config) as tpe:
|
||||
futures: List[Future[List[BaseRelation]]] = []
|
||||
for object in to_drop:
|
||||
# TODO: turn into structured event
|
||||
print(f"Dropping {object}")
|
||||
fut = tpe.submit_connected(
|
||||
adapter,
|
||||
f"drop_{object.database}_{object.schema}",
|
||||
adapter.drop_relation,
|
||||
object,
|
||||
)
|
||||
futures.append(fut)
|
||||
|
||||
for fut in as_completed(futures):
|
||||
# trigger/re-raise any exceptions while dropping relations
|
||||
fut.result()
|
||||
|
||||
def _assert_schema_uniqueness(self):
|
||||
schemas = set()
|
||||
|
||||
for config in self.config.managed_schemas:
|
||||
schema = (config.database, config.schema)
|
||||
if schema in schemas:
|
||||
raise ValidationException(f"Duplicate schema found: {schema}")
|
||||
schemas.add(schema)
|
||||
|
||||
def interpret_results(self, results):
|
||||
# if all queries succeed, this will return exit code 0 ("success")
|
||||
# if any query fails, or warning + --warn-error, it will return exit code 2 ("unhandled")
|
||||
return True
|
||||
5
core/dbt/tests/fixtures/project.py
vendored
5
core/dbt/tests/fixtures/project.py
vendored
@@ -430,6 +430,11 @@ class TestProjInfo:
|
||||
result = self.run_sql(sql, fetch="all")
|
||||
return {model_name: materialization for (model_name, materialization) in result}
|
||||
|
||||
def update_models(self, models: dict):
|
||||
"""Update the modules in the test project"""
|
||||
self.project_root.join("models").remove()
|
||||
write_project_files(self.project_root, "models", models)
|
||||
|
||||
|
||||
# This is the main fixture that is used in all functional tests. It pulls in the other
|
||||
# fixtures that are necessary to set up a dbt project, and saves some of the information
|
||||
|
||||
1
tests/functional/schema_management/README.md
Normal file
1
tests/functional/schema_management/README.md
Normal file
@@ -0,0 +1 @@
|
||||
Test schema management as introduced by https://github.com/dbt-labs/dbt-core/issues/4957
|
||||
296
tests/functional/schema_management/test_schema_management.py
Normal file
296
tests/functional/schema_management/test_schema_management.py
Normal file
@@ -0,0 +1,296 @@
|
||||
import pytest
|
||||
import os
|
||||
from dbt.exceptions import CompilationException, ValidationException
|
||||
from dbt.tests.util import run_dbt, check_table_does_exist, check_table_does_not_exist
|
||||
|
||||
|
||||
def model(materialized, unique_schema=None):
|
||||
return f"""
|
||||
{{{{
|
||||
config(
|
||||
materialized = "{materialized}",
|
||||
schema = {f'"{unique_schema}"' if unique_schema is not None else "None"}
|
||||
)
|
||||
}}}}
|
||||
SELECT * FROM (
|
||||
VALUES (1, 'one'),
|
||||
(2, 'two'),
|
||||
(3, 'three')
|
||||
) AS t (num,letter)
|
||||
"""
|
||||
|
||||
|
||||
class Base:
|
||||
materialized = "table"
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {
|
||||
"model_a.sql": model(self.materialized),
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def dbt_profile_target(self):
|
||||
return {
|
||||
"type": "postgres",
|
||||
"threads": 4,
|
||||
"host": "localhost",
|
||||
"port": int(os.getenv("POSTGRES_TEST_PORT", 5432)),
|
||||
"user": os.getenv("POSTGRES_TEST_USER", "root"),
|
||||
"pass": os.getenv("POSTGRES_TEST_PASS", "password"),
|
||||
"dbname": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
}
|
||||
|
||||
|
||||
class TestMissingConfiguration(Base):
|
||||
def test_should_raise_exception(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
with pytest.raises(CompilationException):
|
||||
run_dbt(["--warn-error", "manage"])
|
||||
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
def test_should_not_delete_anything(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
run_dbt(["manage"])
|
||||
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
|
||||
class TestUnmanagedSchema(TestMissingConfiguration):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"schema": "some_other_schema",
|
||||
"prune-models": "drop",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class TestEmptyConfiguration(TestMissingConfiguration):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {"managed-schemas": []}
|
||||
|
||||
|
||||
class TestWarn(TestMissingConfiguration):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "warn",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class TestDrop(Base):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "drop",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def test(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
run_dbt(["manage"])
|
||||
|
||||
check_table_does_not_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
|
||||
class TestDropView(TestDrop):
|
||||
materialized = "view"
|
||||
|
||||
|
||||
class TestSkip(Base):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "skip",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def test_should_not_raise_exception(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
|
||||
run_dbt(["--warn-error", "manage"])
|
||||
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
def test_should_not_delete_anything(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
run_dbt(["manage"])
|
||||
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
|
||||
class TestDefaultAction(TestSkip):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class TestCustomSchema(Base):
|
||||
custom_schema = "custom"
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def models(self):
|
||||
return {
|
||||
"model_a.sql": model(self.materialized, self.custom_schema),
|
||||
"model_b.sql": model(self.materialized, self.custom_schema),
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "drop",
|
||||
"schema": self.custom_schema,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def test(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_a")
|
||||
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_a.sql": model(self.materialized),
|
||||
"model_b.sql": model(self.materialized, self.custom_schema),
|
||||
}
|
||||
)
|
||||
run_dbt(["manage"])
|
||||
|
||||
check_table_does_not_exist(
|
||||
project.adapter, f"{self._generate_schema_name(project)}.model_a"
|
||||
)
|
||||
check_table_does_not_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b")
|
||||
|
||||
def _generate_schema_name(self, project):
|
||||
return f"{project.test_schema}_{self.custom_schema}"
|
||||
|
||||
|
||||
class TestDuplicateConfiguration(Base):
|
||||
@pytest.fixture(scope="class")
|
||||
def project_config_update(self, unique_schema):
|
||||
return {
|
||||
"managed-schemas": [
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "drop",
|
||||
},
|
||||
{
|
||||
"database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"),
|
||||
"prune-models": "warn",
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
def test(
|
||||
self,
|
||||
project,
|
||||
):
|
||||
run_dbt(["run"])
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
|
||||
project.update_models(
|
||||
{
|
||||
"model_b.sql": model(self.materialized),
|
||||
}
|
||||
)
|
||||
with pytest.raises(ValidationException):
|
||||
run_dbt(["manage"])
|
||||
|
||||
check_table_does_exist(project.adapter, "model_a")
|
||||
check_table_does_exist(project.adapter, "model_b")
|
||||
Reference in New Issue
Block a user