Remove dbt-postgres and dbt-tests-adapter from dbt-core (#9492)

* delete dbt-tests-adapter and dbt-postgres from dbt-core

* update non-code files to reflect change

* add changie

* update build-dist.sh

* update tox.ini

* fix build-dist.sh

* move adapter tests into /functional dir

* remove adapter unit tests

* update code comments and README.md
This commit is contained in:
colin-rogers-dbt
2024-02-05 12:28:57 -08:00
committed by GitHub
parent 7885e874c6
commit 15dcb9a19d
249 changed files with 133 additions and 4176 deletions

View File

@@ -36,12 +36,4 @@ first_value = 1
[bumpversion:file:core/dbt/version.py]
[bumpversion:file:plugins/postgres/setup.py]
[bumpversion:file:plugins/postgres/dbt/adapters/postgres/__version__.py]
[bumpversion:file:docker/Dockerfile]
[bumpversion:file:tests/adapter/setup.py]
[bumpversion:file:tests/adapter/dbt/tests/adapter/__version__.py]

View File

@@ -1,6 +1,6 @@
# dbt Core Changelog
- This file provides a full account of all changes to `dbt-core` and `dbt-postgres`
- This file provides a full account of all changes to `dbt-core`
- Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases.
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)

View File

@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Remove dbt-tests-adapter and dbt-postgres packages from dbt-core
time: 2024-01-30T14:05:50.291291-08:00
custom:
Author: colin-rogers-dbt
Issue: "9455"

View File

@@ -11,11 +11,6 @@ updates:
schedule:
interval: "daily"
rebase-strategy: "disabled"
- package-ecosystem: "pip"
directory: "/plugins/postgres"
schedule:
interval: "daily"
rebase-strategy: "disabled"
# docker dependencies
- package-ecosystem: "docker"

View File

@@ -31,7 +31,8 @@ This is the docs website code. It comes from the dbt-docs repository, and is gen
## Adapters
dbt uses an adapter-plugin pattern to extend support to different databases, warehouses, query engines, etc. For testing and development purposes, the dbt-postgres plugin lives alongside the dbt-core codebase, in the [`plugins`](plugins) subdirectory. Like other adapter plugins, it is a self-contained codebase and package that builds on top of dbt-core.
dbt uses an adapter-plugin pattern to extend support to different databases, warehouses, query engines, etc.
Note: dbt-postgres used to exist in dbt-core but is now in [its own repo](https://github.com/dbt-labs/dbt-postgres)
Each adapter is a mix of python, Jinja2, and SQL. The adapter code also makes heavy use of Jinja2 to wrap modular chunks of SQL functionality, define default implementations, and allow plugins to override it.

View File

@@ -1,6 +1,6 @@
# dbt Core Changelog
- This file provides a full account of all changes to `dbt-core` and `dbt-postgres`
- This file provides a full account of all changes to `dbt-core`
- Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases.
- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version.
- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-core/blob/main/CONTRIBUTING.md#adding-changelog-entry)

View File

@@ -22,7 +22,7 @@ If you get stuck, we're happy to help! Drop us a line in the `#dbt-core-developm
### Notes
- **Adapters:** Is your issue or proposed code change related to a specific [database adapter](https://docs.getdbt.com/docs/available-adapters)? If so, please open issues, PRs, and discussions in that adapter's repository instead. The sole exception is Postgres; the `dbt-postgres` plugin lives in this repository (`dbt-core`).
- **Adapters:** Is your issue or proposed code change related to a specific [database adapter](https://docs.getdbt.com/docs/available-adapters)? If so, please open issues, PRs, and discussions in that adapter's repository instead.
- **CLA:** Please note that anyone contributing code to `dbt-core` must sign the [Contributor License Agreement](https://docs.getdbt.com/docs/contributor-license-agreements). If you are unable to sign the CLA, the `dbt-core` maintainers will unfortunately be unable to merge any of your Pull Requests. We welcome you to participate in discussions, open issues, and comment on existing ones.
- **Branches:** All pull requests from community contributors should target the `main` branch (default). If the change is needed as a patch for a minor version of dbt that has already been released (or is already a release candidate), a maintainer will backport the changes in your PR to the relevant "latest" release branch (`1.0.latest`, `1.1.latest`, ...). If an issue fix applies to a release branch, that fix should be first committed to the development branch and then to the release branch (rarely release-branch fixes may not apply to `main`).
- **Releases**: Before releasing a new minor version of Core, we prepare a series of alphas and release candidates to allow users (especially employees of dbt Labs!) to test the new version in live environments. This is an important quality assurance step, as it exposes the new code to a wide variety of complicated deployments and can surface bugs before official release. Releases are accessible via pip, homebrew, and dbt Cloud.

View File

@@ -86,12 +86,12 @@ test: .env ## Runs unit tests with py and code checks against staged changes.
$(DOCKER_CMD) pre-commit run mypy-check --hook-stage manual | grep -v "INFO"
.PHONY: integration
integration: .env ## Runs postgres integration tests with py-integration
integration: .env ## Runs core integration tests using postgres with py-integration
@\
$(CI_FLAGS) $(DOCKER_CMD) tox -e py-integration -- -nauto
.PHONY: integration-fail-fast
integration-fail-fast: .env ## Runs postgres integration tests with py-integration in "fail fast" mode.
integration-fail-fast: .env ## Runs core integration tests using postgres with py-integration in "fail fast" mode.
@\
$(DOCKER_CMD) tox -e py-integration -- -x -nauto

View File

@@ -398,7 +398,7 @@ def logs_dir(request, prefix):
# This fixture is for customizing tests that need overrides in adapter
# repos. Example in dbt.tests.adapter.basic.test_base.
# repos. Example in tests.functional.adapter.basic.test_base.
@pytest.fixture(scope="class")
def test_config():
return {}

View File

@@ -1,4 +1,5 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
black==23.3.0
bumpversion
ddtrace==2.1.7

View File

@@ -14,10 +14,8 @@ ARG build_for=linux/amd64
FROM --platform=$build_for python:3.10.7-slim-bullseye as base
# N.B. The refs updated automagically every release via bumpversion
# N.B. dbt-postgres is currently found in the core codebase so a value of dbt-core@<some_version> is correct
ARG dbt_core_ref=dbt-core@v1.8.0a1
ARG dbt_postgres_ref=dbt-core@v1.8.0a1
ARG dbt_postgres_ref=dbt-postgres@v1.8.0a1
ARG dbt_redshift_ref=dbt-redshift@v1.8.0a1
ARG dbt_bigquery_ref=dbt-bigquery@v1.8.0a1
ARG dbt_snowflake_ref=dbt-snowflake@v1.8.0a1

View File

@@ -82,7 +82,6 @@ docker build --tag my-third-party-dbt \
There are a few special cases worth noting:
* The `dbt-spark` database adapter comes in three different versions named `PyHive`, `ODBC`, and the default `all`. If you wish to overide this you can use the `--build-arg` flag with the value of `dbt_spark_version=<version_name>`. See the [docs](https://docs.getdbt.com/reference/warehouse-profiles/spark-profile) for more information.
* The `dbt-postgres` database adapter is released as part of the `dbt-core` codebase. If you wish to overide the version used, make sure you use the gitref for `dbt-core`:
```
docker build --tag my_dbt \
--target dbt-postgres \

View File

@@ -1,3 +1 @@
-e ./core
-e ./plugins/postgres
-e ./tests/adapter

View File

@@ -1 +0,0 @@
recursive-include dbt/include *.sql *.yml

View File

@@ -1,36 +0,0 @@
<p align="center">
<img src="https://raw.githubusercontent.com/dbt-labs/dbt-core/ec7dee39f793aa4f7dd3dae37282cc87664813e4/etc/dbt-logo-full.svg" alt="dbt logo" width="500"/>
</p>
<p align="center">
<a href="https://github.com/dbt-labs/dbt-core/actions/workflows/main.yml">
<img src="https://github.com/dbt-labs/dbt-core/actions/workflows/main.yml/badge.svg?event=push" alt="CI Badge"/>
</a>
</p>
**[dbt](https://www.getdbt.com/)** enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
## dbt-postgres
The `dbt-postgres` package contains all of the code enabling dbt to work with a Postgres database. For
more information on using dbt with Postgres, consult [the docs](https://docs.getdbt.com/docs/profile-postgres).
## Getting started
- [Install dbt](https://docs.getdbt.com/docs/installation)
- Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/)
## Join the dbt Community
- Be part of the conversation in the [dbt Community Slack](http://community.getdbt.com/)
- Read more on the [dbt Community Discourse](https://discourse.getdbt.com)
## Reporting bugs and contributing code
- Want to report a bug or request a feature? Let us know on [Slack](http://community.getdbt.com/), or open [an issue](https://github.com/dbt-labs/dbt-core/issues/new)
- Want to help us build dbt? Check out the [Contributing Guide](https://github.com/dbt-labs/dbt-core/blob/HEAD/CONTRIBUTING.md)
## Code of Conduct
Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [dbt Code of Conduct](https://community.getdbt.com/code-of-conduct).

View File

@@ -1,7 +0,0 @@
# N.B.
# This will add to the packages __path__ all subdirectories of directories on sys.path named after the package which effectively combines both modules into a single namespace (dbt.adapters)
# The matching statement is in core/dbt/__init__.py
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

View File

@@ -1,7 +0,0 @@
# N.B.
# This will add to the packages __path__ all subdirectories of directories on sys.path named after the package which effectively combines both modules into a single namespace (dbt.adapters)
# The matching statement is in core/dbt/adapters/__init__.py
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

View File

@@ -1,13 +0,0 @@
# these are mostly just exports, #noqa them so flake8 will be happy
from dbt.adapters.postgres.connections import PostgresConnectionManager # noqa
from dbt.adapters.postgres.connections import PostgresCredentials
from dbt.adapters.postgres.column import PostgresColumn # noqa
from dbt.adapters.postgres.relation import PostgresRelation # noqa: F401
from dbt.adapters.postgres.impl import PostgresAdapter
from dbt.adapters.base import AdapterPlugin
from dbt.include import postgres
Plugin = AdapterPlugin(
adapter=PostgresAdapter, credentials=PostgresCredentials, include_path=postgres.PACKAGE_PATH
)

View File

@@ -1 +0,0 @@
version = "1.8.0a1"

View File

@@ -1,12 +0,0 @@
from dbt.adapters.base import Column
class PostgresColumn(Column):
@property
def data_type(self):
# on postgres, do not convert 'text' or 'varchar' to 'varchar()'
if self.dtype.lower() == "text" or (
self.dtype.lower() == "character varying" and self.char_size is None
):
return self.dtype
return super().data_type

View File

@@ -1,209 +0,0 @@
from contextlib import contextmanager
import psycopg2
from psycopg2.extensions import string_types
import dbt_common.exceptions
from dbt.adapters.sql import SQLConnectionManager
from dbt.adapters.contracts.connection import AdapterResponse, Credentials
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.helper_types import Port
from dataclasses import dataclass
from typing import Optional
from typing_extensions import Annotated
from mashumaro.jsonschema.annotations import Maximum, Minimum
logger = AdapterLogger("Postgres")
@dataclass
class PostgresCredentials(Credentials):
host: str
user: str
# Annotated is used by mashumaro for jsonschema generation
port: Annotated[Port, Minimum(0), Maximum(65535)]
password: str # on postgres the password is mandatory
connect_timeout: int = 10
role: Optional[str] = None
search_path: Optional[str] = None
keepalives_idle: int = 0 # 0 means to use the default value
sslmode: Optional[str] = None
sslcert: Optional[str] = None
sslkey: Optional[str] = None
sslrootcert: Optional[str] = None
application_name: Optional[str] = "dbt"
retries: int = 1
_ALIASES = {"dbname": "database", "pass": "password"}
@property
def type(self):
return "postgres"
@property
def unique_field(self):
return self.host
def _connection_keys(self):
return (
"host",
"port",
"user",
"database",
"schema",
"connect_timeout",
"role",
"search_path",
"keepalives_idle",
"sslmode",
"sslcert",
"sslkey",
"sslrootcert",
"application_name",
"retries",
)
class PostgresConnectionManager(SQLConnectionManager):
TYPE = "postgres"
@contextmanager
def exception_handler(self, sql):
try:
yield
except psycopg2.DatabaseError as e:
logger.debug("Postgres error: {}".format(str(e)))
try:
self.rollback_if_open()
except psycopg2.Error:
logger.debug("Failed to release connection!")
pass
raise dbt_common.exceptions.DbtDatabaseError(str(e).strip()) from e
except Exception as e:
logger.debug("Error running SQL: {}", sql)
logger.debug("Rolling back transaction.")
self.rollback_if_open()
if isinstance(e, dbt_common.exceptions.DbtRuntimeError):
# during a sql query, an internal to dbt exception was raised.
# this sounds a lot like a signal handler and probably has
# useful information, so raise it without modification.
raise
raise dbt_common.exceptions.DbtRuntimeError(e) from e
@classmethod
def open(cls, connection):
if connection.state == "open":
logger.debug("Connection is already open, skipping open.")
return connection
credentials = cls.get_credentials(connection.credentials)
kwargs = {}
# we don't want to pass 0 along to connect() as postgres will try to
# call an invalid setsockopt() call (contrary to the docs).
if credentials.keepalives_idle:
kwargs["keepalives_idle"] = credentials.keepalives_idle
# psycopg2 doesn't support search_path officially,
# see https://github.com/psycopg/psycopg2/issues/465
search_path = credentials.search_path
if search_path is not None and search_path != "":
# see https://postgresql.org/docs/9.5/libpq-connect.html
kwargs["options"] = "-c search_path={}".format(search_path.replace(" ", "\\ "))
if credentials.sslmode:
kwargs["sslmode"] = credentials.sslmode
if credentials.sslcert is not None:
kwargs["sslcert"] = credentials.sslcert
if credentials.sslkey is not None:
kwargs["sslkey"] = credentials.sslkey
if credentials.sslrootcert is not None:
kwargs["sslrootcert"] = credentials.sslrootcert
if credentials.application_name:
kwargs["application_name"] = credentials.application_name
def connect():
handle = psycopg2.connect(
dbname=credentials.database,
user=credentials.user,
host=credentials.host,
password=credentials.password,
port=credentials.port,
connect_timeout=credentials.connect_timeout,
**kwargs,
)
if credentials.role:
handle.cursor().execute("set role {}".format(credentials.role))
return handle
retryable_exceptions = [
# OperationalError is subclassed by all psycopg2 Connection Exceptions and it's raised
# by generic connection timeouts without an error code. This is a limitation of
# psycopg2 which doesn't provide subclasses for errors without a SQLSTATE error code.
# The limitation has been known for a while and there are no efforts to tackle it.
# See: https://github.com/psycopg/psycopg2/issues/682
psycopg2.errors.OperationalError,
]
def exponential_backoff(attempt: int):
return attempt * attempt
return cls.retry_connection(
connection,
connect=connect,
logger=logger,
retry_limit=credentials.retries,
retry_timeout=exponential_backoff,
retryable_exceptions=retryable_exceptions,
)
def cancel(self, connection):
connection_name = connection.name
try:
pid = connection.handle.get_backend_pid()
except psycopg2.InterfaceError as exc:
# if the connection is already closed, not much to cancel!
if "already closed" in str(exc):
logger.debug(f"Connection {connection_name} was already closed")
return
# probably bad, re-raise it
raise
sql = "select pg_terminate_backend({})".format(pid)
logger.debug("Cancelling query '{}' ({})".format(connection_name, pid))
_, cursor = self.add_query(sql)
res = cursor.fetchone()
logger.debug("Cancel query '{}': {}".format(connection_name, res))
@classmethod
def get_credentials(cls, credentials):
return credentials
@classmethod
def get_response(cls, cursor) -> AdapterResponse:
message = str(cursor.statusmessage)
rows = cursor.rowcount
status_message_parts = message.split() if message is not None else []
status_messsage_strings = [part for part in status_message_parts if not part.isdigit()]
code = " ".join(status_messsage_strings)
return AdapterResponse(_message=message, code=code, rows_affected=rows)
@classmethod
def data_type_code_to_name(cls, type_code: int) -> str:
if type_code in string_types:
return string_types[type_code].name
else:
return f"unknown type_code {type_code}"

View File

@@ -1,152 +0,0 @@
from datetime import datetime
from dataclasses import dataclass
from typing import Any, Optional, Set, List
from dbt.adapters.base.meta import available
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.capability import CapabilitySupport, Support, CapabilityDict, Capability
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres.column import PostgresColumn
from dbt.adapters.postgres import PostgresRelation
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.dataclass_schema import dbtClassMixin, ValidationError
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.exceptions import (
CrossDbReferenceProhibitedError,
IndexConfigNotDictError,
IndexConfigError,
UnexpectedDbReferenceError,
)
from dbt_common.utils import encoding as dbt_encoding
GET_RELATIONS_MACRO_NAME = "postgres__get_relations"
@dataclass
class PostgresIndexConfig(dbtClassMixin):
columns: List[str]
unique: bool = False
type: Optional[str] = None
def render(self, relation):
# We append the current timestamp to the index name because otherwise
# the index will only be created on every other run. See
# https://github.com/dbt-labs/dbt-core/issues/1945#issuecomment-576714925
# for an explanation.
now = datetime.utcnow().isoformat()
inputs = self.columns + [relation.render(), str(self.unique), str(self.type), now]
string = "_".join(inputs)
return dbt_encoding.md5(string)
@classmethod
def parse(cls, raw_index) -> Optional["PostgresIndexConfig"]:
if raw_index is None:
return None
try:
cls.validate(raw_index)
return cls.from_dict(raw_index)
except ValidationError as exc:
raise IndexConfigError(exc)
except TypeError:
raise IndexConfigNotDictError(raw_index)
@dataclass
class PostgresConfig(AdapterConfig):
unlogged: Optional[bool] = None
indexes: Optional[List[PostgresIndexConfig]] = None
class PostgresAdapter(SQLAdapter):
Relation = PostgresRelation
ConnectionManager = PostgresConnectionManager
Column = PostgresColumn
AdapterSpecificConfigs = PostgresConfig
CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.ENFORCED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.ENFORCED,
ConstraintType.primary_key: ConstraintSupport.ENFORCED,
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}
CATALOG_BY_RELATION_SUPPORT = True
_capabilities: CapabilityDict = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)
@classmethod
def date_function(cls):
return "now()"
@available
def verify_database(self, database):
if database.startswith('"'):
database = database.strip('"')
expected = self.config.credentials.database
if database.lower() != expected.lower():
raise UnexpectedDbReferenceError(self.type(), database, expected)
# return an empty string on success so macros can call this
return ""
@available
def parse_index(self, raw_index: Any) -> Optional[PostgresIndexConfig]:
return PostgresIndexConfig.parse(raw_index)
def _link_cached_database_relations(self, schemas: Set[str]):
"""
:param schemas: The set of schemas that should have links added.
"""
database = self.config.credentials.database
table = self.execute_macro(GET_RELATIONS_MACRO_NAME)
for (dep_schema, dep_name, refed_schema, refed_name) in table:
dependent = self.Relation.create(
database=database, schema=dep_schema, identifier=dep_name
)
referenced = self.Relation.create(
database=database, schema=refed_schema, identifier=refed_name
)
# don't record in cache if this relation isn't in a relevant
# schema
if refed_schema.lower() in schemas:
self.cache.add_link(referenced, dependent)
def _get_catalog_schemas(self, manifest):
# postgres only allow one database (the main one)
schema_search_map = super()._get_catalog_schemas(manifest)
try:
return schema_search_map.flatten()
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)
def _link_cached_relations(self, manifest):
schemas: Set[str] = set()
relations_schemas = self._get_cache_schemas(manifest)
for relation in relations_schemas:
self.verify_database(relation.database)
schemas.add(relation.schema.lower())
self._link_cached_database_relations(schemas)
def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)
def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"
def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge"]
def debug_query(self):
self.execute("select 1 as id")

View File

@@ -1,103 +0,0 @@
from dataclasses import dataclass
from typing import Optional, Set, FrozenSet
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import (
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.contracts.relation import RelationType, RelationConfig
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.postgres.relation_configs import (
PostgresIndexConfig,
PostgresIndexConfigChange,
PostgresMaterializedViewConfig,
PostgresMaterializedViewConfigChangeCollection,
MAX_CHARACTERS_IN_IDENTIFIER,
)
@dataclass(frozen=True, eq=False, repr=False)
class PostgresRelation(BaseRelation):
renameable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
RelationType.MaterializedView,
}
)
replaceable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)
def __post_init__(self):
# Check for length of Postgres table/view names.
# Check self.type to exclude test relation identifiers
if (
self.identifier is not None
and self.type is not None
and len(self.identifier) > self.relation_max_name_length()
):
raise DbtRuntimeError(
f"Relation name '{self.identifier}' "
f"is longer than {self.relation_max_name_length()} characters"
)
def relation_max_name_length(self):
return MAX_CHARACTERS_IN_IDENTIFIER
def get_materialized_view_config_change_collection(
self, relation_results: RelationResults, relation_config: RelationConfig
) -> Optional[PostgresMaterializedViewConfigChangeCollection]:
config_change_collection = PostgresMaterializedViewConfigChangeCollection()
existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = PostgresMaterializedViewConfig.from_config(relation_config)
config_change_collection.indexes = self._get_index_config_changes(
existing_materialized_view.indexes, new_materialized_view.indexes
)
# we return `None` instead of an empty `PostgresMaterializedViewConfigChangeCollection` object
# so that it's easier and more extensible to check in the materialization:
# `core/../materializations/materialized_view.sql` :
# {% if configuration_changes is none %}
if config_change_collection.has_changes:
return config_change_collection
def _get_index_config_changes(
self,
existing_indexes: FrozenSet[PostgresIndexConfig],
new_indexes: FrozenSet[PostgresIndexConfig],
) -> Set[PostgresIndexConfigChange]:
"""
Get the index updates that will occur as a result of a new run
There are four scenarios:
1. Indexes are equal -> don't return these
2. Index is new -> create these
3. Index is old -> drop these
4. Indexes are not equal -> drop old, create new -> two actions
Returns: a set of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
"""
drop_changes = set(
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.drop, "context": index}
)
for index in existing_indexes.difference(new_indexes)
)
create_changes = set(
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.create, "context": index}
)
for index in new_indexes.difference(existing_indexes)
)
return set().union(drop_changes, create_changes)

View File

@@ -1,11 +0,0 @@
from dbt.adapters.postgres.relation_configs.constants import ( # noqa: F401
MAX_CHARACTERS_IN_IDENTIFIER,
)
from dbt.adapters.postgres.relation_configs.index import ( # noqa: F401
PostgresIndexConfig,
PostgresIndexConfigChange,
)
from dbt.adapters.postgres.relation_configs.materialized_view import ( # noqa: F401
PostgresMaterializedViewConfig,
PostgresMaterializedViewConfigChangeCollection,
)

View File

@@ -1 +0,0 @@
MAX_CHARACTERS_IN_IDENTIFIER = 63

View File

@@ -1,165 +0,0 @@
from dataclasses import dataclass, field
from typing import Set, FrozenSet
import agate
from dbt_common.dataclass_schema import StrEnum
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigValidationMixin,
RelationConfigValidationRule,
RelationConfigChangeAction,
RelationConfigChange,
)
class PostgresIndexMethod(StrEnum):
btree = "btree"
hash = "hash"
gist = "gist"
spgist = "spgist"
gin = "gin"
brin = "brin"
@classmethod
def default(cls) -> "PostgresIndexMethod":
return cls.btree
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class PostgresIndexConfig(RelationConfigBase, RelationConfigValidationMixin):
"""
This config fallows the specs found here:
https://www.postgresql.org/docs/current/sql-createindex.html
The following parameters are configurable by dbt:
- name: the name of the index in the database, this isn't predictable since we apply a timestamp
- unique: checks for duplicate values when the index is created and on data updates
- method: the index method to be used
- column_names: the columns in the index
Applicable defaults for non-configurable parameters:
- concurrently: `False`
- nulls_distinct: `True`
"""
name: str = field(default=None, hash=False, compare=False)
column_names: FrozenSet[str] = field(default_factory=frozenset, hash=True)
unique: bool = field(default=False, hash=True)
method: PostgresIndexMethod = field(default=PostgresIndexMethod.default(), hash=True)
@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
validation_check=self.column_names is not None,
validation_error=DbtRuntimeError(
"Indexes require at least one column, but none were provided"
),
),
}
@classmethod
def from_dict(cls, config_dict) -> "PostgresIndexConfig":
# TODO: include the QuotePolicy instead of defaulting to lower()
kwargs_dict = {
"name": config_dict.get("name"),
"column_names": frozenset(
column.lower() for column in config_dict.get("column_names", set())
),
"unique": config_dict.get("unique"),
"method": config_dict.get("method"),
}
index: "PostgresIndexConfig" = super().from_dict(kwargs_dict) # type: ignore
return index
@classmethod
def parse_model_node(cls, model_node_entry: dict) -> dict:
config_dict = {
"column_names": set(model_node_entry.get("columns", set())),
"unique": model_node_entry.get("unique"),
"method": model_node_entry.get("type"),
}
return config_dict
@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> dict:
config_dict = {
"name": relation_results_entry.get("name"),
"column_names": set(relation_results_entry.get("column_names", "").split(",")),
"unique": relation_results_entry.get("unique"),
"method": relation_results_entry.get("method"),
}
return config_dict
@property
def as_node_config(self) -> dict:
"""
Returns: a dictionary that can be passed into `get_create_index_sql()`
"""
node_config = {
"columns": list(self.column_names),
"unique": self.unique,
"type": self.method.value,
}
return node_config
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class PostgresIndexConfigChange(RelationConfigChange, RelationConfigValidationMixin):
"""
Example of an index change:
{
"action": "create",
"context": {
"name": "", # we don't know the name since it gets created as a hash at runtime
"columns": ["column_1", "column_3"],
"type": "hash",
"unique": True
}
},
{
"action": "drop",
"context": {
"name": "index_abc", # we only need this to drop, but we need the rest to compare
"columns": ["column_1"],
"type": "btree",
"unique": True
}
}
"""
context: PostgresIndexConfig
@property
def requires_full_refresh(self) -> bool:
return False
@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
validation_check=self.action
in {RelationConfigChangeAction.create, RelationConfigChangeAction.drop},
validation_error=DbtRuntimeError(
"Invalid operation, only `drop` and `create` changes are supported for indexes."
),
),
RelationConfigValidationRule(
validation_check=not (
self.action == RelationConfigChangeAction.drop and self.context.name is None
),
validation_error=DbtRuntimeError(
"Invalid operation, attempting to drop an index with no name."
),
),
RelationConfigValidationRule(
validation_check=not (
self.action == RelationConfigChangeAction.create
and self.context.column_names == set()
),
validation_error=DbtRuntimeError(
"Invalid operations, attempting to create an index with no columns."
),
),
}

View File

@@ -1,112 +0,0 @@
from dataclasses import dataclass, field
from typing import Set, FrozenSet, List, Dict
from typing_extensions import Self
import agate
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.postgres.relation_configs.constants import MAX_CHARACTERS_IN_IDENTIFIER
from dbt.adapters.postgres.relation_configs.index import (
PostgresIndexConfig,
PostgresIndexConfigChange,
)
@dataclass(frozen=True, eq=True, unsafe_hash=True)
class PostgresMaterializedViewConfig(RelationConfigBase, RelationConfigValidationMixin):
"""
This config follows the specs found here:
https://www.postgresql.org/docs/current/sql-creatematerializedview.html
The following parameters are configurable by dbt:
- table_name: name of the materialized view
- query: the query that defines the view
- indexes: the collection (set) of indexes on the materialized view
Applicable defaults for non-configurable parameters:
- method: `heap`
- tablespace_name: `default_tablespace`
- with_data: `True`
"""
table_name: str = ""
query: str = ""
indexes: FrozenSet[PostgresIndexConfig] = field(default_factory=frozenset)
@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
# index rules get run by default with the mixin
return {
RelationConfigValidationRule(
validation_check=self.table_name is None
or len(self.table_name) <= MAX_CHARACTERS_IN_IDENTIFIER,
validation_error=DbtRuntimeError(
f"The materialized view name is more than {MAX_CHARACTERS_IN_IDENTIFIER} "
f"characters: {self.table_name}"
),
),
}
@classmethod
def from_dict(cls, config_dict: dict) -> Self:
kwargs_dict = {
"table_name": config_dict.get("table_name"),
"query": config_dict.get("query"),
"indexes": frozenset(
PostgresIndexConfig.from_dict(index) for index in config_dict.get("indexes", {})
),
}
materialized_view: Self = super().from_dict(kwargs_dict) # type: ignore
return materialized_view
@classmethod
def from_config(cls, relation_config: RelationConfig) -> Self:
materialized_view_config = cls.parse_config(relation_config)
materialized_view = cls.from_dict(materialized_view_config)
return materialized_view
@classmethod
def parse_config(cls, relation_config: RelationConfig) -> Dict:
indexes: List[dict] = relation_config.config.extra.get("indexes", [])
config_dict = {
"table_name": relation_config.identifier,
"query": relation_config.compiled_code,
"indexes": [PostgresIndexConfig.parse_model_node(index) for index in indexes],
}
return config_dict
@classmethod
def from_relation_results(cls, relation_results: RelationResults) -> Self:
materialized_view_config = cls.parse_relation_results(relation_results)
materialized_view = cls.from_dict(materialized_view_config)
return materialized_view
@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
indexes: agate.Table = relation_results.get("indexes", agate.Table(rows={}))
config_dict = {
"indexes": [
PostgresIndexConfig.parse_relation_results(index) for index in indexes.rows
],
}
return config_dict
@dataclass
class PostgresMaterializedViewConfigChangeCollection:
indexes: Set[PostgresIndexConfigChange] = field(default_factory=set)
@property
def requires_full_refresh(self) -> bool:
return any(index.requires_full_refresh for index in self.indexes)
@property
def has_changes(self) -> bool:
return self.indexes != set()

View File

@@ -1,3 +0,0 @@
import os
PACKAGE_PATH = os.path.dirname(__file__)

View File

@@ -1,5 +0,0 @@
config-version: 2
name: dbt_postgres
version: 1.0
macro-paths: ["macros"]

View File

@@ -1,254 +0,0 @@
{% macro postgres__create_table_as(temporary, relation, sql) -%}
{%- set unlogged = config.get('unlogged', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{%- endmacro %}
{% macro postgres__get_create_index_sql(relation, index_dict) -%}
{%- set index_config = adapter.parse_index(index_dict) -%}
{%- set comma_separated_columns = ", ".join(index_config.columns) -%}
{%- set index_name = index_config.render(relation) -%}
create {% if index_config.unique -%}
unique
{%- endif %} index if not exists
"{{ index_name }}"
on {{ relation }} {% if index_config.type -%}
using {{ index_config.type }}
{%- endif %}
({{ comma_separated_columns }});
{%- endmacro %}
{% macro postgres__create_schema(relation) -%}
{% if relation.database -%}
{{ adapter.verify_database(relation.database) }}
{%- endif -%}
{%- call statement('create_schema') -%}
create schema if not exists {{ relation.without_identifier().include(database=False) }}
{%- endcall -%}
{% endmacro %}
{% macro postgres__drop_schema(relation) -%}
{% if relation.database -%}
{{ adapter.verify_database(relation.database) }}
{%- endif -%}
{%- call statement('drop_schema') -%}
drop schema if exists {{ relation.without_identifier().include(database=False) }} cascade
{%- endcall -%}
{% endmacro %}
{% macro postgres__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from {{ relation.information_schema('columns') }}
where table_name = '{{ relation.identifier }}'
{% if relation.schema %}
and table_schema = '{{ relation.schema }}'
{% endif %}
order by ordinal_position
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{% endmacro %}
{% macro postgres__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'{{ schema_relation.database }}' as database,
tablename as name,
schemaname as schema,
'table' as type
from pg_tables
where schemaname ilike '{{ schema_relation.schema }}'
union all
select
'{{ schema_relation.database }}' as database,
viewname as name,
schemaname as schema,
'view' as type
from pg_views
where schemaname ilike '{{ schema_relation.schema }}'
union all
select
'{{ schema_relation.database }}' as database,
matviewname as name,
schemaname as schema,
'materialized_view' as type
from pg_matviews
where schemaname ilike '{{ schema_relation.schema }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}
{% macro postgres__information_schema_name(database) -%}
{% if database_name -%}
{{ adapter.verify_database(database_name) }}
{%- endif -%}
information_schema
{%- endmacro %}
{% macro postgres__list_schemas(database) %}
{% if database -%}
{{ adapter.verify_database(database) }}
{%- endif -%}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) %}
select distinct nspname from pg_namespace
{% endcall %}
{{ return(load_result('list_schemas').table) }}
{% endmacro %}
{% macro postgres__check_schema_exists(information_schema, schema) -%}
{% if information_schema.database -%}
{{ adapter.verify_database(information_schema.database) }}
{%- endif -%}
{% call statement('check_schema_exists', fetch_result=True, auto_begin=False) %}
select count(*) from pg_namespace where nspname = '{{ schema }}'
{% endcall %}
{{ return(load_result('check_schema_exists').table) }}
{% endmacro %}
{#
Postgres tables have a maximum length of 63 characters, anything longer is silently truncated.
Temp and backup relations add a lot of extra characters to the end of table names to ensure uniqueness.
To prevent this going over the character limit, the base_relation name is truncated to ensure
that name + suffix + uniquestring is < 63 characters.
#}
{% macro postgres__make_relation_with_suffix(base_relation, suffix, dstring) %}
{% if dstring %}
{% set dt = modules.datetime.datetime.now() %}
{% set dtstring = dt.strftime("%H%M%S%f") %}
{% set suffix = suffix ~ dtstring %}
{% endif %}
{% set suffix_length = suffix|length %}
{% set relation_max_name_length = base_relation.relation_max_name_length() %}
{% if suffix_length > relation_max_name_length %}
{% do exceptions.raise_compiler_error('Relation suffix is too long (' ~ suffix_length ~ ' characters). Maximum length is ' ~ relation_max_name_length ~ ' characters.') %}
{% endif %}
{% set identifier = base_relation.identifier[:relation_max_name_length - suffix_length] ~ suffix %}
{{ return(base_relation.incorporate(path={"identifier": identifier })) }}
{% endmacro %}
{% macro postgres__make_intermediate_relation(base_relation, suffix) %}
{{ return(postgres__make_relation_with_suffix(base_relation, suffix, dstring=False)) }}
{% endmacro %}
{% macro postgres__make_temp_relation(base_relation, suffix) %}
{% set temp_relation = postgres__make_relation_with_suffix(base_relation, suffix, dstring=True) %}
{{ return(temp_relation.incorporate(path={"schema": none,
"database": none})) }}
{% endmacro %}
{% macro postgres__make_backup_relation(base_relation, backup_relation_type, suffix) %}
{% set backup_relation = postgres__make_relation_with_suffix(base_relation, suffix, dstring=False) %}
{{ return(backup_relation.incorporate(type=backup_relation_type)) }}
{% endmacro %}
{#
By using dollar-quoting like this, users can embed anything they want into their comments
(including nested dollar-quoting), as long as they do not use this exact dollar-quoting
label. It would be nice to just pick a new one but eventually you do have to give up.
#}
{% macro postgres_escape_comment(comment) -%}
{% if comment is not string %}
{% do exceptions.raise_compiler_error('cannot escape a non-string: ' ~ comment) %}
{% endif %}
{%- set magic = '$dbt_comment_literal_block$' -%}
{%- if magic in comment -%}
{%- do exceptions.raise_compiler_error('The string ' ~ magic ~ ' is not allowed in comments.') -%}
{%- endif -%}
{{ magic }}{{ comment }}{{ magic }}
{%- endmacro %}
{% macro postgres__alter_relation_comment(relation, comment) %}
{% set escaped_comment = postgres_escape_comment(comment) %}
comment on {{ relation.type }} {{ relation }} is {{ escaped_comment }};
{% endmacro %}
{% macro postgres__alter_column_comment(relation, column_dict) %}
{% set existing_columns = adapter.get_columns_in_relation(relation) | map(attribute="name") | list %}
{% for column_name in column_dict if (column_name in existing_columns) %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = postgres_escape_comment(comment) %}
comment on column {{ relation }}.{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} is {{ escaped_comment }};
{% endfor %}
{% endmacro %}
{%- macro postgres__get_show_grant_sql(relation) -%}
select grantee, privilege_type
from {{ relation.information_schema('role_table_grants') }}
where grantor = current_role
and grantee != current_role
and table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
{%- endmacro -%}
{% macro postgres__copy_grants() %}
{{ return(False) }}
{% endmacro %}
{% macro postgres__get_show_indexes_sql(relation) %}
select
i.relname as name,
m.amname as method,
ix.indisunique as "unique",
array_to_string(array_agg(a.attname), ',') as column_names
from pg_index ix
join pg_class i
on i.oid = ix.indexrelid
join pg_am m
on m.oid=i.relam
join pg_class t
on t.oid = ix.indrelid
join pg_namespace n
on n.oid = t.relnamespace
join pg_attribute a
on a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
where t.relname = '{{ relation.identifier }}'
and n.nspname = '{{ relation.schema }}'
and t.relkind in ('r', 'm')
group by 1, 2, 3
order by 1, 2, 3
{% endmacro %}
{%- macro postgres__get_drop_index_sql(relation, index_name) -%}
drop index if exists "{{ relation.schema }}"."{{ index_name }}"
{%- endmacro -%}

View File

@@ -1,67 +0,0 @@
{% macro postgres__get_catalog_relations(information_schema, relations) -%}
{%- call statement('catalog', fetch_result=True) -%}
{#
If the user has multiple databases set and the first one is wrong, this will fail.
But we won't fail in the case where there are multiple quoting-difference-only dbs, which is better.
#}
{% set database = information_schema.database %}
{{ adapter.verify_database(database) }}
select
'{{ database }}' as table_database,
sch.nspname as table_schema,
tbl.relname as table_name,
case tbl.relkind
when 'v' then 'VIEW'
when 'm' then 'MATERIALIZED VIEW'
else 'BASE TABLE'
end as table_type,
tbl_desc.description as table_comment,
col.attname as column_name,
col.attnum as column_index,
pg_catalog.format_type(col.atttypid, col.atttypmod) as column_type,
col_desc.description as column_comment,
pg_get_userbyid(tbl.relowner) as table_owner
from pg_catalog.pg_namespace sch
join pg_catalog.pg_class tbl on tbl.relnamespace = sch.oid
join pg_catalog.pg_attribute col on col.attrelid = tbl.oid
left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0)
left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum)
where (
{%- for relation in relations -%}
{%- if relation.identifier -%}
(upper(sch.nspname) = upper('{{ relation.schema }}') and
upper(tbl.relname) = upper('{{ relation.identifier }}'))
{%- else-%}
upper(sch.nspname) = upper('{{ relation.schema }}')
{%- endif -%}
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session
and tbl.relpersistence in ('p', 'u') -- [p]ermanent table or [u]nlogged table. Exclude [t]emporary tables
and tbl.relkind in ('r', 'v', 'f', 'p', 'm') -- o[r]dinary table, [v]iew, [f]oreign table, [p]artitioned table, [m]aterialized view. Other values are [i]ndex, [S]equence, [c]omposite type, [t]OAST table
and col.attnum > 0 -- negative numbers are used for system columns such as oid
and not col.attisdropped -- column as not been dropped
order by
sch.nspname,
tbl.relname,
col.attnum
{%- endcall -%}
{{ return(load_result('catalog').table) }}
{%- endmacro %}
{% macro postgres__get_catalog(information_schema, schemas) -%}
{%- set relations = [] -%}
{%- for schema in schemas -%}
{%- set dummy = relations.append({'schema': schema}) -%}
{%- endfor -%}
{{ return(postgres__get_catalog_relations(information_schema, relations)) }}
{%- endmacro %}

View File

@@ -1,9 +0,0 @@
{% macro postgres__get_incremental_default_sql(arg_dict) %}
{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}
{% endmacro %}

View File

@@ -1,18 +0,0 @@
{% macro postgres__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set insert_cols_csv = insert_cols | join(', ') -%}
update {{ target }}
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text)
and {{ target }}.dbt_valid_to is null;
insert into {{ target }} ({{ insert_cols_csv }})
select {% for column in insert_cols -%}
DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
{%- endfor %}
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text;
{% endmacro %}

View File

@@ -1,80 +0,0 @@
{% macro postgres__get_relations() -%}
{#
-- in pg_depend, objid is the dependent, refobjid is the referenced object
-- > a pg_depend entry indicates that the referenced object cannot be
-- > dropped without also dropping the dependent object.
#}
{%- call statement('relations', fetch_result=True) -%}
with relation as (
select
pg_rewrite.ev_class as class,
pg_rewrite.oid as id
from pg_rewrite
),
class as (
select
oid as id,
relname as name,
relnamespace as schema,
relkind as kind
from pg_class
),
dependency as (
select distinct
pg_depend.objid as id,
pg_depend.refobjid as ref
from pg_depend
),
schema as (
select
pg_namespace.oid as id,
pg_namespace.nspname as name
from pg_namespace
where nspname != 'information_schema' and nspname not like 'pg\_%'
),
referenced as (
select
relation.id AS id,
referenced_class.name ,
referenced_class.schema ,
referenced_class.kind
from relation
join class as referenced_class on relation.class=referenced_class.id
where referenced_class.kind in ('r', 'v', 'm')
),
relationships as (
select
referenced.name as referenced_name,
referenced.schema as referenced_schema_id,
dependent_class.name as dependent_name,
dependent_class.schema as dependent_schema_id,
referenced.kind as kind
from referenced
join dependency on referenced.id=dependency.id
join class as dependent_class on dependency.ref=dependent_class.id
where
(referenced.name != dependent_class.name or
referenced.schema != dependent_class.schema)
)
select
referenced_schema.name as referenced_schema,
relationships.referenced_name as referenced_name,
dependent_schema.name as dependent_schema,
relationships.dependent_name as dependent_name
from relationships
join schema as dependent_schema on relationships.dependent_schema_id=dependent_schema.id
join schema as referenced_schema on relationships.referenced_schema_id=referenced_schema.id
group by referenced_schema, referenced_name, dependent_schema, dependent_name
order by referenced_schema, referenced_name, dependent_schema, dependent_name;
{%- endcall -%}
{{ return(load_result('relations').table) }}
{% endmacro %}
{% macro postgres_get_relations() %}
{{ return(postgres__get_relations()) }}
{% endmacro %}

View File

@@ -1,50 +0,0 @@
{% macro postgres__get_alter_materialized_view_as_sql(
relation,
configuration_changes,
sql,
existing_relation,
backup_relation,
intermediate_relation
) %}
-- apply a full refresh immediately if needed
{% if configuration_changes.requires_full_refresh %}
{{ get_replace_sql(existing_relation, relation, sql) }}
-- otherwise apply individual changes as needed
{% else %}
{{ postgres__update_indexes_on_materialized_view(relation, configuration_changes.indexes) }}
{%- endif -%}
{% endmacro %}
{%- macro postgres__update_indexes_on_materialized_view(relation, index_changes) -%}
{{- log("Applying UPDATE INDEXES to: " ~ relation) -}}
{%- for _index_change in index_changes -%}
{%- set _index = _index_change.context -%}
{%- if _index_change.action == "drop" -%}
{{ postgres__get_drop_index_sql(relation, _index.name) }};
{%- elif _index_change.action == "create" -%}
{{ postgres__get_create_index_sql(relation, _index.as_node_config) }}
{%- endif -%}
{%- endfor -%}
{%- endmacro -%}
{% macro postgres__get_materialized_view_configuration_changes(existing_relation, new_config) %}
{% set _existing_materialized_view = postgres__describe_materialized_view(existing_relation) %}
{% set _configuration_changes = existing_relation.get_materialized_view_config_change_collection(_existing_materialized_view, new_config.model) %}
{% do return(_configuration_changes) %}
{% endmacro %}

View File

@@ -1,8 +0,0 @@
{% macro postgres__get_create_materialized_view_as_sql(relation, sql) %}
create materialized view if not exists {{ relation }} as {{ sql }};
{% for _index_dict in config.get('indexes', []) -%}
{{- get_create_index_sql(relation, _index_dict) -}}
{%- endfor -%}
{% endmacro %}

View File

@@ -1,5 +0,0 @@
{% macro postgres__describe_materialized_view(relation) %}
-- for now just get the indexes, we don't need the name or the query yet
{% set _indexes = run_query(get_show_indexes_sql(relation)) %}
{% do return({'indexes': _indexes}) %}
{% endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__drop_materialized_view(relation) -%}
drop materialized view if exists {{ relation }} cascade
{%- endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__refresh_materialized_view(relation) %}
refresh materialized view {{ relation }}
{% endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__get_rename_materialized_view_sql(relation, new_name) %}
alter materialized view {{ relation }} rename to {{ new_name }}
{% endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__drop_table(relation) -%}
drop table if exists {{ relation }} cascade
{%- endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__get_rename_table_sql(relation, new_name) %}
alter table {{ relation }} rename to {{ new_name }}
{% endmacro %}

View File

@@ -1,17 +0,0 @@
{% macro postgres__get_replace_table_sql(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{{ get_table_columns_and_constraints() }}
{%- set sql = get_select_subquery(sql) %}
{% endif %}
as (
{{ sql }}
);
{%- endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__drop_view(relation) -%}
drop view if exists {{ relation }} cascade
{%- endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro postgres__get_rename_view_sql(relation, new_name) %}
alter view {{ relation }} rename to {{ new_name }}
{% endmacro %}

View File

@@ -1,15 +0,0 @@
{% macro postgres__get_replace_view_sql(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace view {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
);
{%- endmacro %}

View File

@@ -1,20 +0,0 @@
{% macro postgres__current_timestamp() -%}
now()
{%- endmacro %}
{% macro postgres__snapshot_string_as_time(timestamp) -%}
{%- set result = "'" ~ timestamp ~ "'::timestamp without time zone" -%}
{{ return(result) }}
{%- endmacro %}
{% macro postgres__snapshot_get_time() -%}
{{ current_timestamp() }}::timestamp without time zone
{%- endmacro %}
{% macro postgres__current_timestamp_backcompat() %}
current_timestamp::{{ type_timestamp() }}
{% endmacro %}
{% macro postgres__current_timestamp_in_utc_backcompat() %}
(current_timestamp at time zone 'utc')::{{ type_timestamp() }}
{% endmacro %}

View File

@@ -1,7 +0,0 @@
{#- /*Postgres doesn't support any_value, so we're using min() to get the same result*/ -#}
{% macro postgres__any_value(expression) -%}
min({{ expression }})
{%- endmacro %}

View File

@@ -1,5 +0,0 @@
{% macro postgres__dateadd(datepart, interval, from_date_or_timestamp) %}
{{ from_date_or_timestamp }} + ((interval '1 {{ datepart }}') * ({{ interval }}))
{% endmacro %}

View File

@@ -1,32 +0,0 @@
{% macro postgres__datediff(first_date, second_date, datepart) -%}
{% if datepart == 'year' %}
(date_part('year', ({{second_date}})::date) - date_part('year', ({{first_date}})::date))
{% elif datepart == 'quarter' %}
({{ datediff(first_date, second_date, 'year') }} * 4 + date_part('quarter', ({{second_date}})::date) - date_part('quarter', ({{first_date}})::date))
{% elif datepart == 'month' %}
({{ datediff(first_date, second_date, 'year') }} * 12 + date_part('month', ({{second_date}})::date) - date_part('month', ({{first_date}})::date))
{% elif datepart == 'day' %}
(({{second_date}})::date - ({{first_date}})::date)
{% elif datepart == 'week' %}
({{ datediff(first_date, second_date, 'day') }} / 7 + case
when date_part('dow', ({{first_date}})::timestamp) <= date_part('dow', ({{second_date}})::timestamp) then
case when {{first_date}} <= {{second_date}} then 0 else -1 end
else
case when {{first_date}} <= {{second_date}} then 1 else 0 end
end)
{% elif datepart == 'hour' %}
({{ datediff(first_date, second_date, 'day') }} * 24 + date_part('hour', ({{second_date}})::timestamp) - date_part('hour', ({{first_date}})::timestamp))
{% elif datepart == 'minute' %}
({{ datediff(first_date, second_date, 'hour') }} * 60 + date_part('minute', ({{second_date}})::timestamp) - date_part('minute', ({{first_date}})::timestamp))
{% elif datepart == 'second' %}
({{ datediff(first_date, second_date, 'minute') }} * 60 + floor(date_part('second', ({{second_date}})::timestamp)) - floor(date_part('second', ({{first_date}})::timestamp)))
{% elif datepart == 'millisecond' %}
({{ datediff(first_date, second_date, 'minute') }} * 60000 + floor(date_part('millisecond', ({{second_date}})::timestamp)) - floor(date_part('millisecond', ({{first_date}})::timestamp)))
{% elif datepart == 'microsecond' %}
({{ datediff(first_date, second_date, 'minute') }} * 60000000 + floor(date_part('microsecond', ({{second_date}})::timestamp)) - floor(date_part('microsecond', ({{first_date}})::timestamp)))
{% else %}
{{ exceptions.raise_compiler_error("Unsupported datepart for macro datediff in postgres: {!r}".format(datepart)) }}
{% endif %}
{%- endmacro %}

View File

@@ -1,14 +0,0 @@
{% macro postgres__last_day(date, datepart) -%}
{%- if datepart == 'quarter' -%}
-- postgres dateadd does not support quarter interval.
cast(
{{dbt.dateadd('day', '-1',
dbt.dateadd('month', '3', dbt.date_trunc(datepart, date))
)}}
as date)
{%- else -%}
{{dbt.default_last_day(date, datepart)}}
{%- endif -%}
{%- endmacro %}

View File

@@ -1,23 +0,0 @@
{% macro postgres__listagg(measure, delimiter_text, order_by_clause, limit_num) -%}
{% if limit_num -%}
array_to_string(
(array_agg(
{{ measure }}
{% if order_by_clause -%}
{{ order_by_clause }}
{%- endif %}
))[1:{{ limit_num }}],
{{ delimiter_text }}
)
{%- else %}
string_agg(
{{ measure }},
{{ delimiter_text }}
{% if order_by_clause -%}
{{ order_by_clause }}
{%- endif %}
)
{%- endif %}
{%- endmacro %}

View File

@@ -1,9 +0,0 @@
{% macro postgres__split_part(string_text, delimiter_text, part_number) %}
{% if part_number >= 0 %}
{{ dbt.default__split_part(string_text, delimiter_text, part_number) }}
{% else %}
{{ dbt._split_part_negative(string_text, delimiter_text, part_number) }}
{% endif %}
{% endmacro %}

View File

@@ -1,21 +0,0 @@
fixed:
type: postgres
prompts:
host:
hint: 'hostname for the instance'
port:
default: 5432
type: 'int'
user:
hint: 'dev username'
pass:
hint: 'dev password'
hide_input: true
dbname:
hint: 'default database that dbt will build objects in'
schema:
hint: 'default schema that dbt will build objects in'
threads:
hint: '1 or more'
type: 'int'
default: 1

View File

@@ -1,24 +0,0 @@
default:
outputs:
dev:
type: postgres
threads: [1 or more]
host: [host]
port: [port]
user: [dev_username]
pass: [dev_password]
dbname: [dbname]
schema: [dev_schema]
prod:
type: postgres
threads: [1 or more]
host: [host]
port: [port]
user: [prod_username]
pass: [prod_password]
dbname: [dbname]
schema: [prod_schema]
target: dev

View File

@@ -1,83 +0,0 @@
#!/usr/bin/env python
import os
import sys
if sys.version_info < (3, 8):
print("Error: dbt does not support this version of Python.")
print("Please upgrade to Python 3.8 or higher.")
sys.exit(1)
from setuptools import setup
try:
from setuptools import find_namespace_packages
except ImportError:
# the user has a downlevel version of setuptools.
print("Error: dbt requires setuptools v40.1.0 or higher.")
print('Please upgrade setuptools with "pip install --upgrade setuptools" ' "and try again")
sys.exit(1)
PSYCOPG2_MESSAGE = """
No package name override was set.
Using 'psycopg2-binary' package to satisfy 'psycopg2'
If you experience segmentation faults, silent crashes, or installation errors,
consider retrying with the 'DBT_PSYCOPG2_NAME' environment variable set to
'psycopg2'. It may require a compiler toolchain and development libraries!
""".strip()
def _dbt_psycopg2_name():
# if the user chose something, use that
package_name = os.getenv("DBT_PSYCOPG2_NAME", "")
if package_name:
return package_name
# default to psycopg2-binary for all OSes/versions
print(PSYCOPG2_MESSAGE)
return "psycopg2-binary"
package_name = "dbt-postgres"
package_version = "1.8.0a1"
description = """The postgres adapter plugin for dbt (data build tool)"""
this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, "README.md")) as f:
long_description = f.read()
DBT_PSYCOPG2_NAME = _dbt_psycopg2_name()
setup(
name=package_name,
version=package_version,
description=description,
long_description=long_description,
long_description_content_type="text/markdown",
author="dbt Labs",
author_email="info@dbtlabs.com",
url="https://github.com/dbt-labs/dbt-core",
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
include_package_data=True,
install_requires=[
"dbt-core=={}".format(package_version),
"{}~=2.8".format(DBT_PSYCOPG2_NAME),
# installed via dbt-core, but referenced directly, don't pin to avoid version conflicts with dbt-core
"agate",
],
zip_safe=False,
classifiers=[
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Microsoft :: Windows",
"Operating System :: MacOS :: MacOS X",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
python_requires=">=3.8",
)

View File

@@ -1,2 +1 @@
./core
./plugins/postgres

View File

@@ -14,13 +14,11 @@ rm -rf "$DBT_PATH"/dist
rm -rf "$DBT_PATH"/build
mkdir -p "$DBT_PATH"/dist
for SUBPATH in core plugins/postgres tests/adapter
do
rm -rf "$DBT_PATH"/"$SUBPATH"/dist
rm -rf "$DBT_PATH"/"$SUBPATH"/build
cd "$DBT_PATH"/"$SUBPATH"
$PYTHON_BIN setup.py sdist bdist_wheel
cp -r "$DBT_PATH"/"$SUBPATH"/dist/* "$DBT_PATH"/dist/
done
rm -rf "$DBT_PATH"/core/dist
rm -rf "$DBT_PATH"core/build
cd "$DBT_PATH"/core
$PYTHON_BIN setup.py sdist bdist_wheel
cp -r "$DBT_PATH"/"core"/dist/* "$DBT_PATH"/dist/
set +x

View File

@@ -1,41 +0,0 @@
<p align="center">
<img src="https://raw.githubusercontent.com/dbt-labs/dbt-core/fa1ea14ddfb1d5ae319d5141844910dd53ab2834/etc/dbt-core.svg" alt="dbt logo" width="750"/>
</p>
# dbt-tests-adapter
For context and guidance on using this package, please read: ["Testing a new adapter"](https://docs.getdbt.com/docs/contributing/testing-a-new-adapter)
## What is it?
This package includes reusable test cases that reinforce behaviors common to all or many adapter plugins. There are two categories of tests:
1. **Basic tests** that every adapter plugin is expected to pass. These are defined in `tests.adapter.basic`. Given differences across data platforms, these may require slight modification or reimplementation. Significantly overriding or disabling these tests should be with good reason, since each represents basic functionality expected by dbt users. For example, if your adapter does not support incremental models, you should disable the test, [by marking it with `skip` or `xfail`](https://docs.pytest.org/en/latest/how-to/skipping.html), as well as noting that limitation in any documentation, READMEs, and usage guides that accompany your adapter.
2. **Optional tests**, for second-order functionality that is common across plugins, but not required for basic use. Your plugin can opt into these test cases by inheriting existing ones, or reimplementing them with adjustments. For now, this category includes all tests located outside the `basic` subdirectory. More tests will be added as we convert older tests defined on dbt-core and mature plugins to use the standard framework.
## How to use it?
Each test case in this repo is packaged as a class, prefixed `Base`. To enable a test case to run with your adapter plugin, you should inherit the base class into a new class, prefixed `Test`. That test class will be discovered and run by `pytest`. It can also makes modifications if needed.
```python
class TestSimpleMaterializations(BaseSimpleMaterializations):
pass
```
## Distribution
To install:
```sh
pip install dbt-tests-adapter
```
This package is versioned in lockstep with `dbt-core`, and [the same versioning guidelines](https://docs.getdbt.com/docs/core-versions) apply:
- New "basic" test cases MAY be added in minor versions ONLY. They may not be included in patch releases.
- Breaking changes to existing test cases MAY be included and communicated as part of minor version upgrades ONLY. They MAY NOT be included in patch releases. We will aim to avoid these whenever possible.
- New "optional" test cases, and non-breaking fixes to existing test cases, MAY be added in minor or patch versions.
Assuming you adapter plugin is pinned to a specific minor version of `dbt-core` (e.g. `~=1.1.0`), you can use the same pin for `dbt-tests-adapter`.
**Note:** This is packaged as a plugin using a python namespace package. It cannot have an `__init__.py` file in the part of the hierarchy to which it needs to be attached.

View File

@@ -1,3 +0,0 @@
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)

View File

@@ -1 +0,0 @@
version = "1.8.0a1"

View File

@@ -1,33 +0,0 @@
MY_SEED = """
id,value,record_valid_date
1,100,2023-01-01 00:00:00
2,200,2023-01-02 00:00:00
3,300,2023-01-02 00:00:00
""".strip()
MY_TABLE = """
{{ config(
materialized='table',
) }}
select *
from {{ ref('my_seed') }}
"""
MY_VIEW = """
{{ config(
materialized='view',
) }}
select *
from {{ ref('my_seed') }}
"""
MY_MATERIALIZED_VIEW = """
{{ config(
materialized='materialized_view',
) }}
select *
from {{ ref('my_seed') }}
"""

View File

@@ -1,58 +0,0 @@
import pytest
import os
from dbt.tests.util import (
relation_from_name,
get_connection,
)
from dbt.context.base import BaseContext # diff_of_two_dicts only
TEST_USER_ENV_VARS = ["DBT_TEST_USER_1", "DBT_TEST_USER_2", "DBT_TEST_USER_3"]
def replace_all(text, dic):
for i, j in dic.items():
text = text.replace(i, j)
return text
class BaseGrants:
def privilege_grantee_name_overrides(self):
# these privilege and grantee names are valid on most databases, but not all!
# looking at you, BigQuery
# optionally use this to map from "select" --> "other_select_name", "insert" --> ...
return {
"select": "select",
"insert": "insert",
"fake_privilege": "fake_privilege",
"invalid_user": "invalid_user",
}
def interpolate_name_overrides(self, yaml_text):
return replace_all(yaml_text, self.privilege_grantee_name_overrides())
@pytest.fixture(scope="class", autouse=True)
def get_test_users(self, project):
test_users = []
for env_var in TEST_USER_ENV_VARS:
user_name = os.getenv(env_var)
if user_name:
test_users.append(user_name)
return test_users
def get_grants_on_relation(self, project, relation_name):
relation = relation_from_name(project.adapter, relation_name)
adapter = project.adapter
with get_connection(adapter):
kwargs = {"relation": relation}
show_grant_sql = adapter.execute_macro("get_show_grant_sql", kwargs=kwargs)
_, grant_table = adapter.execute(show_grant_sql, fetch=True)
actual_grants = adapter.standardize_grants_dict(grant_table)
return actual_grants
def assert_expected_grants_match_actual(self, project, relation_name, expected_grants):
actual_grants = self.get_grants_on_relation(project, relation_name)
# need a case-insensitive comparison
# so just a simple "assert expected == actual_grants" won't work
diff_a = BaseContext.diff_of_two_dicts(actual_grants, expected_grants)
diff_b = BaseContext.diff_of_two_dicts(expected_grants, actual_grants)
assert diff_a == diff_b == {}

View File

@@ -1,102 +0,0 @@
import pytest
from dbt.tests.util import (
run_dbt,
run_dbt_and_capture,
get_manifest,
write_file,
relation_from_name,
get_connection,
)
from dbt.tests.adapter.grants.base_grants import BaseGrants
my_incremental_model_sql = """
select 1 as fun
"""
incremental_model_schema_yml = """
version: 2
models:
- name: my_incremental_model
config:
materialized: incremental
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
"""
user2_incremental_model_schema_yml = """
version: 2
models:
- name: my_incremental_model
config:
materialized: incremental
grants:
select: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
class BaseIncrementalGrants(BaseGrants):
@pytest.fixture(scope="class")
def models(self):
updated_schema = self.interpolate_name_overrides(incremental_model_schema_yml)
return {
"my_incremental_model.sql": my_incremental_model_sql,
"schema.yml": updated_schema,
}
def test_incremental_grants(self, project, get_test_users):
# we want the test to fail, not silently skip
test_users = get_test_users
select_privilege_name = self.privilege_grantee_name_overrides()["select"]
assert len(test_users) == 3
# Incremental materialization, single select grant
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model_id = "model.test.my_incremental_model"
model = manifest.nodes[model_id]
assert model.config.materialized == "incremental"
expected = {select_privilege_name: [test_users[0]]}
self.assert_expected_grants_match_actual(project, "my_incremental_model", expected)
# Incremental materialization, run again without changes
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
assert "revoke " not in log_output
assert "grant " not in log_output # with space to disambiguate from 'show grants'
self.assert_expected_grants_match_actual(project, "my_incremental_model", expected)
# Incremental materialization, change select grant user
updated_yaml = self.interpolate_name_overrides(user2_incremental_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
assert "revoke " in log_output
manifest = get_manifest(project.project_root)
model = manifest.nodes[model_id]
assert model.config.materialized == "incremental"
expected = {select_privilege_name: [test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_incremental_model", expected)
# Incremental materialization, same config, now with --full-refresh
run_dbt(["--debug", "run", "--full-refresh"])
assert len(results) == 1
# whether grants or revokes happened will vary by adapter
self.assert_expected_grants_match_actual(project, "my_incremental_model", expected)
# Now drop the schema (with the table in it)
adapter = project.adapter
relation = relation_from_name(adapter, "my_incremental_model")
with get_connection(adapter):
adapter.drop_schema(relation)
# Incremental materialization, same config, rebuild now that table is missing
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
assert "grant " in log_output
assert "revoke " not in log_output
self.assert_expected_grants_match_actual(project, "my_incremental_model", expected)
class TestIncrementalGrants(BaseIncrementalGrants):
pass

View File

@@ -1,68 +0,0 @@
import pytest
from dbt.tests.util import (
run_dbt_and_capture,
write_file,
)
from dbt.tests.adapter.grants.base_grants import BaseGrants
my_invalid_model_sql = """
select 1 as fun
"""
invalid_user_table_model_schema_yml = """
version: 2
models:
- name: my_invalid_model
config:
materialized: table
grants:
select: ['invalid_user']
"""
invalid_privilege_table_model_schema_yml = """
version: 2
models:
- name: my_invalid_model
config:
materialized: table
grants:
fake_privilege: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
class BaseInvalidGrants(BaseGrants):
# The purpose of this test is to understand the user experience when providing
# an invalid 'grants' configuration. dbt will *not* try to intercept or interpret
# the database's own error at runtime -- it will just return those error messages.
# Hopefully they're helpful!
@pytest.fixture(scope="class")
def models(self):
return {
"my_invalid_model.sql": my_invalid_model_sql,
}
# Adapters will need to reimplement these methods with the specific
# language of their database
def grantee_does_not_exist_error(self):
return "does not exist"
def privilege_does_not_exist_error(self):
return "unrecognized privilege"
def test_invalid_grants(self, project, get_test_users, logs_dir):
# failure when grant to a user/role that doesn't exist
yaml_file = self.interpolate_name_overrides(invalid_user_table_model_schema_yml)
write_file(yaml_file, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"], expect_pass=False)
assert self.grantee_does_not_exist_error() in log_output
# failure when grant to a privilege that doesn't exist
yaml_file = self.interpolate_name_overrides(invalid_privilege_table_model_schema_yml)
write_file(yaml_file, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"], expect_pass=False)
assert self.privilege_does_not_exist_error() in log_output
class TestInvalidGrants(BaseInvalidGrants):
pass

View File

@@ -1,156 +0,0 @@
import pytest
from dbt.tests.util import (
run_dbt_and_capture,
get_manifest,
write_file,
)
from dbt.tests.adapter.grants.base_grants import BaseGrants
my_model_sql = """
select 1 as fun
"""
model_schema_yml = """
version: 2
models:
- name: my_model
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
"""
user2_model_schema_yml = """
version: 2
models:
- name: my_model
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
table_model_schema_yml = """
version: 2
models:
- name: my_model
config:
materialized: table
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
"""
user2_table_model_schema_yml = """
version: 2
models:
- name: my_model
config:
materialized: table
grants:
select: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
multiple_users_table_model_schema_yml = """
version: 2
models:
- name: my_model
config:
materialized: table
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}", "{{ env_var('DBT_TEST_USER_2') }}"]
"""
multiple_privileges_table_model_schema_yml = """
version: 2
models:
- name: my_model
config:
materialized: table
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
insert: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
class BaseModelGrants(BaseGrants):
@pytest.fixture(scope="class")
def models(self):
updated_schema = self.interpolate_name_overrides(model_schema_yml)
return {
"my_model.sql": my_model_sql,
"schema.yml": updated_schema,
}
def test_view_table_grants(self, project, get_test_users):
# we want the test to fail, not silently skip
test_users = get_test_users
select_privilege_name = self.privilege_grantee_name_overrides()["select"]
insert_privilege_name = self.privilege_grantee_name_overrides()["insert"]
assert len(test_users) == 3
# View materialization, single select grant
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model_id = "model.test.my_model"
model = manifest.nodes[model_id]
expected = {select_privilege_name: [test_users[0]]}
assert model.config.grants == expected
assert model.config.materialized == "view"
self.assert_expected_grants_match_actual(project, "my_model", expected)
# View materialization, change select grant user
updated_yaml = self.interpolate_name_overrides(user2_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
expected = {select_privilege_name: [get_test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_model", expected)
# Table materialization, single select grant
updated_yaml = self.interpolate_name_overrides(table_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model_id = "model.test.my_model"
model = manifest.nodes[model_id]
assert model.config.materialized == "table"
expected = {select_privilege_name: [test_users[0]]}
self.assert_expected_grants_match_actual(project, "my_model", expected)
# Table materialization, change select grant user
updated_yaml = self.interpolate_name_overrides(user2_table_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model = manifest.nodes[model_id]
assert model.config.materialized == "table"
expected = {select_privilege_name: [test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_model", expected)
# Table materialization, multiple grantees
updated_yaml = self.interpolate_name_overrides(multiple_users_table_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model = manifest.nodes[model_id]
assert model.config.materialized == "table"
expected = {select_privilege_name: [test_users[0], test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_model", expected)
# Table materialization, multiple privileges
updated_yaml = self.interpolate_name_overrides(multiple_privileges_table_model_schema_yml)
write_file(updated_yaml, project.project_root, "models", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "run"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
model = manifest.nodes[model_id]
assert model.config.materialized == "table"
expected = {select_privilege_name: [test_users[0]], insert_privilege_name: [test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_model", expected)
class TestModelGrants(BaseModelGrants):
pass

View File

@@ -1,143 +0,0 @@
import pytest
from dbt.tests.util import (
run_dbt,
run_dbt_and_capture,
get_manifest,
write_file,
)
from dbt.tests.adapter.grants.base_grants import BaseGrants
seeds__my_seed_csv = """
id,name,some_date
1,Easton,1981-05-20T06:46:51
2,Lillian,1978-09-03T18:10:33
""".lstrip()
schema_base_yml = """
version: 2
seeds:
- name: my_seed
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
"""
user2_schema_base_yml = """
version: 2
seeds:
- name: my_seed
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
ignore_grants_yml = """
version: 2
seeds:
- name: my_seed
config:
grants: {}
"""
zero_grants_yml = """
version: 2
seeds:
- name: my_seed
config:
grants:
select: []
"""
class BaseSeedGrants(BaseGrants):
def seeds_support_partial_refresh(self):
return True
@pytest.fixture(scope="class")
def seeds(self):
updated_schema = self.interpolate_name_overrides(schema_base_yml)
return {
"my_seed.csv": seeds__my_seed_csv,
"schema.yml": updated_schema,
}
def test_seed_grants(self, project, get_test_users):
test_users = get_test_users
select_privilege_name = self.privilege_grantee_name_overrides()["select"]
# seed command
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
seed_id = "seed.test.my_seed"
seed = manifest.nodes[seed_id]
expected = {select_privilege_name: [test_users[0]]}
assert seed.config.grants == expected
assert "grant " in log_output
self.assert_expected_grants_match_actual(project, "my_seed", expected)
# run it again, with no config changes
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
if self.seeds_support_partial_refresh():
# grants carried over -- nothing should have changed
assert "revoke " not in log_output
assert "grant " not in log_output
else:
# seeds are always full-refreshed on this adapter, so we need to re-grant
assert "grant " in log_output
self.assert_expected_grants_match_actual(project, "my_seed", expected)
# change the grantee, assert it updates
updated_yaml = self.interpolate_name_overrides(user2_schema_base_yml)
write_file(updated_yaml, project.project_root, "seeds", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
expected = {select_privilege_name: [test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_seed", expected)
# run it again, with --full-refresh, grants should be the same
run_dbt(["seed", "--full-refresh"])
self.assert_expected_grants_match_actual(project, "my_seed", expected)
# change config to 'grants: {}' -- should be completely ignored
updated_yaml = self.interpolate_name_overrides(ignore_grants_yml)
write_file(updated_yaml, project.project_root, "seeds", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
assert "revoke " not in log_output
assert "grant " not in log_output
manifest = get_manifest(project.project_root)
seed_id = "seed.test.my_seed"
seed = manifest.nodes[seed_id]
expected_config = {}
expected_actual = {select_privilege_name: [test_users[1]]}
assert seed.config.grants == expected_config
if self.seeds_support_partial_refresh():
# ACTUAL grants will NOT match expected grants
self.assert_expected_grants_match_actual(project, "my_seed", expected_actual)
else:
# there should be ZERO grants on the seed
self.assert_expected_grants_match_actual(project, "my_seed", expected_config)
# now run with ZERO grants -- all grants should be removed
# whether explicitly (revoke) or implicitly (recreated without any grants added on)
updated_yaml = self.interpolate_name_overrides(zero_grants_yml)
write_file(updated_yaml, project.project_root, "seeds", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
if self.seeds_support_partial_refresh():
assert "revoke " in log_output
expected = {}
self.assert_expected_grants_match_actual(project, "my_seed", expected)
# run it again -- dbt shouldn't try to grant or revoke anything
(results, log_output) = run_dbt_and_capture(["--debug", "seed"])
assert len(results) == 1
assert "revoke " not in log_output
assert "grant " not in log_output
self.assert_expected_grants_match_actual(project, "my_seed", expected)
class TestSeedGrants(BaseSeedGrants):
pass

View File

@@ -1,78 +0,0 @@
import pytest
from dbt.tests.util import (
run_dbt,
run_dbt_and_capture,
get_manifest,
write_file,
)
from dbt.tests.adapter.grants.base_grants import BaseGrants
my_snapshot_sql = """
{% snapshot my_snapshot %}
{{ config(
check_cols='all', unique_key='id', strategy='check',
target_database=database, target_schema=schema
) }}
select 1 as id, cast('blue' as {{ type_string() }}) as color
{% endsnapshot %}
""".strip()
snapshot_schema_yml = """
version: 2
snapshots:
- name: my_snapshot
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_1') }}"]
"""
user2_snapshot_schema_yml = """
version: 2
snapshots:
- name: my_snapshot
config:
grants:
select: ["{{ env_var('DBT_TEST_USER_2') }}"]
"""
class BaseSnapshotGrants(BaseGrants):
@pytest.fixture(scope="class")
def snapshots(self):
return {
"my_snapshot.sql": my_snapshot_sql,
"schema.yml": self.interpolate_name_overrides(snapshot_schema_yml),
}
def test_snapshot_grants(self, project, get_test_users):
test_users = get_test_users
select_privilege_name = self.privilege_grantee_name_overrides()["select"]
# run the snapshot
results = run_dbt(["snapshot"])
assert len(results) == 1
manifest = get_manifest(project.project_root)
snapshot_id = "snapshot.test.my_snapshot"
snapshot = manifest.nodes[snapshot_id]
expected = {select_privilege_name: [test_users[0]]}
assert snapshot.config.grants == expected
self.assert_expected_grants_match_actual(project, "my_snapshot", expected)
# run it again, nothing should have changed
(results, log_output) = run_dbt_and_capture(["--debug", "snapshot"])
assert len(results) == 1
assert "revoke " not in log_output
assert "grant " not in log_output
self.assert_expected_grants_match_actual(project, "my_snapshot", expected)
# change the grantee, assert it updates
updated_yaml = self.interpolate_name_overrides(user2_snapshot_schema_yml)
write_file(updated_yaml, project.project_root, "snapshots", "schema.yml")
(results, log_output) = run_dbt_and_capture(["--debug", "snapshot"])
assert len(results) == 1
expected = {select_privilege_name: [test_users[1]]}
self.assert_expected_grants_match_actual(project, "my_snapshot", expected)
class TestSnapshotGrants(BaseSnapshotGrants):
pass

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env python
import os
import sys
if sys.version_info < (3, 8):
print("Error: dbt does not support this version of Python.")
print("Please upgrade to Python 3.8 or higher.")
sys.exit(1)
from setuptools import setup
try:
from setuptools import find_namespace_packages
except ImportError:
# the user has a downlevel version of setuptools.
print("Error: dbt requires setuptools v40.1.0 or higher.")
print('Please upgrade setuptools with "pip install --upgrade setuptools" ' "and try again")
sys.exit(1)
package_name = "dbt-tests-adapter"
package_version = "1.8.0a1"
description = """The dbt adapter tests for adapter plugins"""
this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, "README.md")) as f:
long_description = f.read()
setup(
name=package_name,
version=package_version,
description=description,
long_description=long_description,
long_description_content_type="text/markdown",
author="dbt Labs",
author_email="info@dbtlabs.com",
url="https://github.com/dbt-labs/dbt-core/tree/main/tests/adapter",
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
install_requires=[
"dbt-core=={}".format(package_version),
"pytest>=7.0.0",
],
zip_safe=False,
classifiers=[
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Microsoft :: Windows",
"Operating System :: MacOS :: MacOS X",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
python_requires=">=3.8",
)

View File

View File

@@ -1,6 +1,6 @@
import pytest
from dbt.tests.util import run_dbt
from dbt.tests.adapter.aliases.fixtures import (
from tests.functional.adapter.aliases.fixtures import (
MACROS__CAST_SQL,
MACROS__EXPECT_VALUE_SQL,
MODELS__SCHEMA_YML,

View File

@@ -6,7 +6,7 @@ from dbt.tests.util import (
check_relation_types,
check_relations_equal,
)
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
base_view_sql,
base_table_sql,

View File

@@ -5,7 +5,7 @@ import dbt
from dbt.tests.util import run_dbt, rm_file, get_artifact, check_datetime_between
from dbt.tests.fixtures.project import write_project_files
from dbt.tests.adapter.basic.expected_catalog import (
from tests.functional.adapter.basic.expected_catalog import (
base_expected_catalog,
no_stats,
expected_references_catalog,
@@ -452,9 +452,9 @@ class BaseDocsGenerate(BaseGenerateProject):
verify_catalog(project, expected_catalog, start_time)
# Check that assets have been copied to the target directory for use in the docs html page
assert os.path.exists(os.path.join(".", "target", "assets"))
assert os.path.exists(os.path.join(".", "target", "assets", "lorem-ipsum.txt"))
assert not os.path.exists(os.path.join(".", "target", "non-existent-assets"))
assert os.path.exists(os.path.join("", "target", "assets"))
assert os.path.exists(os.path.join("", "target", "assets", "lorem-ipsum.txt"))
assert not os.path.exists(os.path.join("", "target", "non-existent-assets"))
class TestDocsGenerate(BaseDocsGenerate):

View File

@@ -7,7 +7,7 @@ from dbt.tests.util import (
check_result_nodes_by_name,
relation_from_name,
)
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
base_ephemeral_sql,
ephemeral_view_sql,

View File

@@ -1,6 +1,6 @@
import pytest
from dbt.tests.util import run_dbt
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
generic_test_seed_yml,
base_view_sql,

View File

@@ -1,7 +1,7 @@
import pytest
from dbt.tests.util import run_dbt, check_relations_equal, relation_from_name
from dbt.artifacts.schemas.results import RunStatus
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
seeds_added_csv,
schema_base_yml,

View File

@@ -1,5 +1,5 @@
import pytest
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
test_passing_sql,
test_failing_sql,
)

View File

@@ -1,7 +1,7 @@
import pytest
from dbt.tests.util import run_dbt, check_result_nodes_by_name
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
ephemeral_with_cte_sql,
test_ephemeral_passing_sql,

View File

@@ -1,6 +1,6 @@
import pytest
from dbt.tests.util import run_dbt, update_rows, relation_from_name
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
seeds_added_csv,
cc_all_snapshot_sql,

View File

@@ -1,6 +1,6 @@
import pytest
from dbt.tests.util import run_dbt, relation_from_name, update_rows
from dbt.tests.adapter.basic.files import (
from tests.functional.adapter.basic.files import (
seeds_base_csv,
seeds_newcolumns_csv,
seeds_added_csv,

View File

@@ -2,7 +2,7 @@ from dbt.artifacts.schemas.catalog import CatalogArtifact
from dbt.tests.util import run_dbt
import pytest
from dbt.tests.adapter.catalog import files
from tests.functional.adapter.catalog import files
class CatalogRelationTypes:

View File

@@ -1,6 +1,10 @@
import pytest
from dbt.tests.util import run_dbt
from dbt.tests.adapter.column_types.fixtures import macro_test_is_type_sql, model_sql, schema_yml
from tests.functional.adapter.column_types.fixtures import (
macro_test_is_type_sql,
model_sql,
schema_yml,
)
class BaseColumnTypes:

Some files were not shown because too many files have changed in this diff Show More