Compare commits

...

3 Commits

Author SHA1 Message Date
Jeremy Cohen
1bed1e9fbc Rm plugins/redshift, plugins/snowflake 2021-09-29 00:22:50 +02:00
Kyle Wigley
581d567440 Consolidate dbt-snowflake
Continue pulling apart Snowflake tests

Fixups

Rip out even more
2021-09-29 00:22:47 +02:00
Kyle Wigley
db6402d77d Consolidate dbt-redshift
Start moving unit tests

fix typo; rm breakpoints

rm references to redshift

rm all references to redshift except `integration/100_rpc_test` 🙁

cleanup CI

fix tests!

fix docs test bc of schema version change
2021-09-29 00:22:32 +02:00
120 changed files with 77 additions and 5842 deletions

View File

@@ -34,17 +34,9 @@ first_value = 1
[bumpversion:file:plugins/postgres/setup.py]
[bumpversion:file:plugins/redshift/setup.py]
[bumpversion:file:plugins/snowflake/setup.py]
[bumpversion:file:plugins/bigquery/setup.py]
[bumpversion:file:plugins/postgres/dbt/adapters/postgres/__version__.py]
[bumpversion:file:plugins/redshift/dbt/adapters/redshift/__version__.py]
[bumpversion:file:plugins/snowflake/dbt/adapters/snowflake/__version__.py]
[bumpversion:file:plugins/bigquery/dbt/adapters/bigquery/__version__.py]

View File

@@ -21,16 +21,6 @@ updates:
schedule:
interval: "daily"
rebase-strategy: "disabled"
- package-ecosystem: "pip"
directory: "/plugins/redshift"
schedule:
interval: "daily"
rebase-strategy: "disabled"
- package-ecosystem: "pip"
directory: "/plugins/snowflake"
schedule:
interval: "daily"
rebase-strategy: "disabled"
# docker dependencies
- package-ecosystem: "docker"

View File

@@ -1,7 +1,7 @@
module.exports = ({ context }) => {
const defaultPythonVersion = "3.8";
const supportedPythonVersions = ["3.6", "3.7", "3.8", "3.9"];
const supportedAdapters = ["snowflake", "postgres", "bigquery", "redshift"];
const supportedAdapters = ["postgres", "bigquery"];
// if PR, generate matrix based on files changed and PR labels
if (context.eventName.includes("pull_request")) {

View File

@@ -91,16 +91,9 @@ jobs:
- 'core/**'
- 'plugins/postgres/**'
- 'dev-requirements.txt'
snowflake:
- 'core/**'
- 'plugins/snowflake/**'
bigquery:
- 'core/**'
- 'plugins/bigquery/**'
redshift:
- 'core/**'
- 'plugins/redshift/**'
- 'plugins/postgres/**'
- name: Generate integration test matrix
id: generate-matrix
@@ -191,33 +184,6 @@ jobs:
if: matrix.adapter == 'postgres'
run: tox
- name: Run tox (redshift)
if: matrix.adapter == 'redshift'
env:
REDSHIFT_TEST_DBNAME: ${{ secrets.REDSHIFT_TEST_DBNAME }}
REDSHIFT_TEST_PASS: ${{ secrets.REDSHIFT_TEST_PASS }}
REDSHIFT_TEST_USER: ${{ secrets.REDSHIFT_TEST_USER }}
REDSHIFT_TEST_PORT: ${{ secrets.REDSHIFT_TEST_PORT }}
REDSHIFT_TEST_HOST: ${{ secrets.REDSHIFT_TEST_HOST }}
run: tox
- name: Run tox (snowflake)
if: matrix.adapter == 'snowflake'
env:
SNOWFLAKE_TEST_ACCOUNT: ${{ secrets.SNOWFLAKE_TEST_ACCOUNT }}
SNOWFLAKE_TEST_PASSWORD: ${{ secrets.SNOWFLAKE_TEST_PASSWORD }}
SNOWFLAKE_TEST_USER: ${{ secrets.SNOWFLAKE_TEST_USER }}
SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }}
SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN: ${{ secrets.SNOWFLAKE_TEST_OAUTH_REFRESH_TOKEN }}
SNOWFLAKE_TEST_OAUTH_CLIENT_ID: ${{ secrets.SNOWFLAKE_TEST_OAUTH_CLIENT_ID }}
SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET: ${{ secrets.SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET }}
SNOWFLAKE_TEST_ALT_DATABASE: ${{ secrets.SNOWFLAKE_TEST_ALT_DATABASE }}
SNOWFLAKE_TEST_ALT_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_ALT_WAREHOUSE }}
SNOWFLAKE_TEST_DATABASE: ${{ secrets.SNOWFLAKE_TEST_DATABASE }}
SNOWFLAKE_TEST_QUOTED_DATABASE: ${{ secrets.SNOWFLAKE_TEST_QUOTED_DATABASE }}
SNOWFLAKE_TEST_ROLE: ${{ secrets.SNOWFLAKE_TEST_ROLE }}
run: tox
- name: Run tox (bigquery)
if: matrix.adapter == 'bigquery'
env:

View File

@@ -26,7 +26,7 @@ This is the docs website code. It comes from the dbt-docs repository, and is gen
## Adapters
dbt uses an adapter-plugin pattern to extend support to different databases, warehouses, query engines, etc. The four core adapters that are in the main repository, contained within the [`plugins`](plugins) subdirectory, are: Postgres Redshift, Snowflake and BigQuery. Other warehouses use adapter plugins defined in separate repositories (e.g. [dbt-spark](https://github.com/dbt-labs/dbt-spark), [dbt-presto](https://github.com/dbt-labs/dbt-presto)).
dbt uses an adapter-plugin pattern to extend support to different databases, warehouses, query engines, etc. For testing and development purposes, the dbt-postgres plugin lives alongside the dbt-core codebase, in the [`plugins`](plugins) subdirectory. Like other adapter plugins, it is a self-contained codebase and package that builds on top of dbt-core.
Each adapter is a mix of python, Jinja2, and SQL. The adapter code also makes heavy use of Jinja2 to wrap modular chunks of SQL functionality, define default implementations, and allow plugins to override it.

View File

@@ -44,22 +44,6 @@ integration-postgres: .env ## Runs postgres integration tests with py38.
integration-postgres-fail-fast: .env ## Runs postgres integration tests with py38 in "fail fast" mode.
$(DOCKER_CMD) tox -e py38-postgres -- -x -nauto
.PHONY: integration-redshift
integration-redshift: .env ## Runs redshift integration tests with py38.
$(DOCKER_CMD) tox -e py38-redshift -- -nauto
.PHONY: integration-redshift-fail-fast
integration-redshift-fail-fast: .env ## Runs redshift integration tests with py38 in "fail fast" mode.
$(DOCKER_CMD) tox -e py38-redshift -- -x -nauto
.PHONY: integration-snowflake
integration-snowflake: .env ## Runs snowflake integration tests with py38.
$(DOCKER_CMD) tox -e py38-snowflake -- -nauto
.PHONY: integration-snowflake-fail-fast
integration-snowflake-fail-fast: .env ## Runs snowflake integration tests with py38 in "fail fast" mode.
$(DOCKER_CMD) tox -e py38-snowflake -- -x -nauto
.PHONY: integration-bigquery
integration-bigquery: .env ## Runs bigquery integration tests with py38.
$(DOCKER_CMD) tox -e py38-bigquery -- -nauto

View File

@@ -1,5 +1,3 @@
-e ./core
-e ./plugins/postgres
-e ./plugins/redshift
-e ./plugins/snowflake
-e ./plugins/bigquery

View File

@@ -1,32 +0,0 @@
<p align="center">
<img src="https://raw.githubusercontent.com/dbt-labs/dbt/6c6649f9129d5d108aa3b0526f634cd8f3a9d1ed/etc/dbt-logo-full.svg" alt="dbt logo" width="500"/>
</p>
**[dbt](https://www.getdbt.com/)** (data build tool) enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
## dbt-redshift
The `dbt-redshift` package contains all of the code required to make dbt operate on a Redshift database. For
more information on using dbt with Redshift, consult [the docs](https://docs.getdbt.com/docs/profile-redshift).
## Find out more
- Check out the [Introduction to dbt](https://docs.getdbt.com/docs/introduction/).
- Read the [dbt Viewpoint](https://docs.getdbt.com/docs/about/viewpoint/).
## Join thousands of analysts in the dbt community
- Join the [chat](http://community.getdbt.com/) on Slack.
- Find community posts on [dbt Discourse](https://discourse.getdbt.com).
## Reporting bugs and contributing code
- Want to report a bug or request a feature? Let us know on [Slack](http://community.getdbt.com/), or open [an issue](https://github.com/dbt-labs/dbt/issues/new).
- Want to help us build dbt? Check out the [Contributing Getting Started Guide](https://github.com/dbt-labs/dbt/blob/HEAD/CONTRIBUTING.md)
## Code of Conduct
Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [dbt Code of Conduct](https://community.getdbt.com/code-of-conduct).

View File

@@ -1,15 +0,0 @@
from dbt.adapters.redshift.connections import RedshiftConnectionManager # noqa
from dbt.adapters.redshift.connections import RedshiftCredentials
from dbt.adapters.redshift.relation import RedshiftColumn # noqa
from dbt.adapters.redshift.relation import RedshiftRelation # noqa: F401
from dbt.adapters.redshift.impl import RedshiftAdapter
from dbt.adapters.base import AdapterPlugin
from dbt.include import redshift
Plugin = AdapterPlugin(
adapter=RedshiftAdapter,
credentials=RedshiftCredentials,
include_path=redshift.PACKAGE_PATH,
dependencies=['postgres'])

View File

@@ -1 +0,0 @@
version = '0.21.0rc1'

View File

@@ -1,174 +0,0 @@
from multiprocessing import Lock
from contextlib import contextmanager
from typing import NewType
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres import PostgresCredentials
from dbt.logger import GLOBAL_LOGGER as logger # noqa
import dbt.exceptions
import dbt.flags
import boto3
from dbt.dataclass_schema import FieldEncoder, dbtClassMixin, StrEnum
from dataclasses import dataclass, field
from typing import Optional, List
drop_lock: Lock = dbt.flags.MP_CONTEXT.Lock()
IAMDuration = NewType('IAMDuration', int)
class IAMDurationEncoder(FieldEncoder):
@property
def json_schema(self):
return {'type': 'integer', 'minimum': 0, 'maximum': 65535}
dbtClassMixin.register_field_encoders({IAMDuration: IAMDurationEncoder()})
class RedshiftConnectionMethod(StrEnum):
DATABASE = 'database'
IAM = 'iam'
@dataclass
class RedshiftCredentials(PostgresCredentials):
method: RedshiftConnectionMethod = RedshiftConnectionMethod.DATABASE
password: Optional[str] = None
cluster_id: Optional[str] = field(
default=None,
metadata={'description': 'If using IAM auth, the name of the cluster'},
)
iam_profile: Optional[str] = None
iam_duration_seconds: int = 900
search_path: Optional[str] = None
keepalives_idle: int = 240
autocreate: bool = False
db_groups: List[str] = field(default_factory=list)
ra3_node: Optional[bool] = False
@property
def type(self):
return 'redshift'
def _connection_keys(self):
keys = super()._connection_keys()
return keys + (
'method',
'cluster_id',
'iam_profile',
'iam_duration_seconds'
)
class RedshiftConnectionManager(PostgresConnectionManager):
TYPE = 'redshift'
@contextmanager
def fresh_transaction(self, name=None):
"""On entrance to this context manager, hold an exclusive lock and
create a fresh transaction for redshift, then commit and begin a new
one before releasing the lock on exit.
See drop_relation in RedshiftAdapter for more information.
:param Optional[str] name: The name of the connection to use, or None
to use the default.
"""
with drop_lock:
connection = self.get_thread_connection()
if connection.transaction_open:
self.commit()
self.begin()
yield
self.commit()
self.begin()
@classmethod
def fetch_cluster_credentials(cls, db_user, db_name, cluster_id,
iam_profile, duration_s, autocreate,
db_groups):
"""Fetches temporary login credentials from AWS. The specified user
must already exist in the database, or else an error will occur"""
if iam_profile is None:
session = boto3.Session()
boto_client = session.client("redshift")
else:
logger.debug("Connecting to Redshift using 'IAM'" +
f"with profile {iam_profile}")
boto_session = boto3.Session(
profile_name=iam_profile
)
boto_client = boto_session.client('redshift')
try:
return boto_client.get_cluster_credentials(
DbUser=db_user,
DbName=db_name,
ClusterIdentifier=cluster_id,
DurationSeconds=duration_s,
AutoCreate=autocreate,
DbGroups=db_groups,)
except boto_client.exceptions.ClientError as e:
raise dbt.exceptions.FailedToConnectException(
"Unable to get temporary Redshift cluster credentials: {}"
.format(e))
@classmethod
def get_tmp_iam_cluster_credentials(cls, credentials):
cluster_id = credentials.cluster_id
# default via:
# boto3.readthedocs.io/en/latest/reference/services/redshift.html
iam_duration_s = credentials.iam_duration_seconds
if not cluster_id:
raise dbt.exceptions.FailedToConnectException(
"'cluster_id' must be provided in profile if IAM "
"authentication method selected")
cluster_creds = cls.fetch_cluster_credentials(
credentials.user,
credentials.database,
credentials.cluster_id,
credentials.iam_profile,
iam_duration_s,
credentials.autocreate,
credentials.db_groups,
)
# replace username and password with temporary redshift credentials
return credentials.replace(user=cluster_creds.get('DbUser'),
password=cluster_creds.get('DbPassword'))
@classmethod
def get_credentials(cls, credentials):
method = credentials.method
# Support missing 'method' for backwards compatibility
if method == 'database' or method is None:
logger.debug("Connecting to Redshift using 'database' credentials")
# this requirement is really annoying to encode into json schema,
# so validate it here
if credentials.password is None:
raise dbt.exceptions.FailedToConnectException(
"'password' field is required for 'database' credentials"
)
return credentials
elif method == 'iam':
logger.debug("Connecting to Redshift using 'IAM' credentials")
return cls.get_tmp_iam_cluster_credentials(credentials)
else:
raise dbt.exceptions.FailedToConnectException(
"Invalid 'method' in profile: '{}'".format(method))

View File

@@ -1,88 +0,0 @@
from dataclasses import dataclass
from typing import Optional
from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.base.meta import available
from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.redshift import RedshiftConnectionManager
from dbt.adapters.redshift import RedshiftColumn
from dbt.adapters.redshift import RedshiftRelation
from dbt.logger import GLOBAL_LOGGER as logger # noqa
import dbt.exceptions
@dataclass
class RedshiftConfig(AdapterConfig):
sort_type: Optional[str] = None
dist: Optional[str] = None
sort: Optional[str] = None
bind: Optional[bool] = None
class RedshiftAdapter(PostgresAdapter, SQLAdapter):
Relation = RedshiftRelation
ConnectionManager = RedshiftConnectionManager
Column = RedshiftColumn
AdapterSpecificConfigs = RedshiftConfig
@classmethod
def date_function(cls):
return 'getdate()'
def drop_relation(self, relation):
"""
In Redshift, DROP TABLE ... CASCADE should not be used
inside a transaction. Redshift doesn't prevent the CASCADE
part from conflicting with concurrent transactions. If we do
attempt to drop two tables with CASCADE at once, we'll often
get the dreaded:
table was dropped by a concurrent transaction
So, we need to lock around calls to the underlying
drop_relation() function.
https://docs.aws.amazon.com/redshift/latest/dg/r_DROP_TABLE.html
"""
with self.connections.fresh_transaction():
return super().drop_relation(relation)
@classmethod
def convert_text_type(cls, agate_table, col_idx):
column = agate_table.columns[col_idx]
# `lens` must be a list, so this can't be a generator expression,
# because max() raises ane exception if its argument has no members.
lens = [len(d.encode("utf-8")) for d in column.values_without_nulls()]
max_len = max(lens) if lens else 64
return "varchar({})".format(max_len)
@classmethod
def convert_time_type(cls, agate_table, col_idx):
return "varchar(24)"
@available
def verify_database(self, database):
if database.startswith('"'):
database = database.strip('"')
expected = self.config.credentials.database
ra3_node = self.config.credentials.ra3_node
if database.lower() != expected.lower() and not ra3_node:
raise dbt.exceptions.NotImplementedException(
'Cross-db references allowed only in RA3.* node. ({} vs {})'
.format(database, expected)
)
# return an empty string on success so macros can call this
return ''
def _get_catalog_schemas(self, manifest):
# redshift(besides ra3) only allow one database (the main one)
schemas = super(SQLAdapter, self)._get_catalog_schemas(manifest)
try:
return schemas.flatten(allow_multiple_databases=self.config.credentials.ra3_node)
except dbt.exceptions.RuntimeException as exc:
dbt.exceptions.raise_compiler_error(
'Cross-db references allowed only in {} RA3.* node. Got {}'
.format(self.type(), exc.msg)
)

View File

@@ -1,15 +0,0 @@
from dbt.adapters.base import Column
from dataclasses import dataclass
from dbt.adapters.postgres.relation import PostgresRelation
@dataclass(frozen=True, eq=False, repr=False)
class RedshiftRelation(PostgresRelation):
# Override the method in the Postgres Relation
# because Redshift allows longer names
def relation_max_name_length(self):
return 127
class RedshiftColumn(Column):
pass # redshift does not inherit from postgres here

View File

@@ -1,3 +0,0 @@
import os
PACKAGE_PATH = os.path.dirname(__file__)

View File

@@ -1,5 +0,0 @@
config-version: 2
name: dbt_redshift
version: 1.0
macro-paths: ["macros"]

View File

@@ -1,283 +0,0 @@
{% macro dist(dist) %}
{%- if dist is not none -%}
{%- set dist = dist.strip().lower() -%}
{%- if dist in ['all', 'even'] -%}
diststyle {{ dist }}
{%- elif dist == "auto" -%}
{%- else -%}
diststyle key distkey ({{ dist }})
{%- endif -%}
{%- endif -%}
{%- endmacro -%}
{% macro sort(sort_type, sort) %}
{%- if sort is not none %}
{{ sort_type | default('compound', boolean=true) }} sortkey(
{%- if sort is string -%}
{%- set sort = [sort] -%}
{%- endif -%}
{%- for item in sort -%}
{{ item }}
{%- if not loop.last -%},{%- endif -%}
{%- endfor -%}
)
{%- endif %}
{%- endmacro -%}
{% macro redshift__create_table_as(temporary, relation, sql) -%}
{%- set _dist = config.get('dist') -%}
{%- set _sort_type = config.get(
'sort_type',
validator=validation.any['compound', 'interleaved']) -%}
{%- set _sort = config.get(
'sort',
validator=validation.any[list, basestring]) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create {% if temporary -%}temporary{%- endif %} table
{{ relation.include(database=(not temporary), schema=(not temporary)) }}
{{ dist(_dist) }}
{{ sort(_sort_type, _sort) }}
as (
{{ sql }}
);
{%- endmacro %}
{% macro redshift__create_view_as(relation, sql) -%}
{%- set binding = config.get('bind', default=True) -%}
{% set bind_qualifier = '' if binding else 'with no schema binding' %}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create view {{ relation }} as (
{{ sql }}
) {{ bind_qualifier }};
{% endmacro %}
{% macro redshift__create_schema(relation) -%}
{{ postgres__create_schema(relation) }}
{% endmacro %}
{% macro redshift__drop_schema(relation) -%}
{{ postgres__drop_schema(relation) }}
{% endmacro %}
{% macro redshift__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
with bound_views as (
select
ordinal_position,
table_schema,
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from information_schema."columns"
where table_name = '{{ relation.identifier }}'
),
unbound_views as (
select
ordinal_position,
view_schema,
col_name,
case
when col_type ilike 'character varying%' then
'character varying'
when col_type ilike 'numeric%' then 'numeric'
else col_type
end as col_type,
case
when col_type like 'character%'
then nullif(REGEXP_SUBSTR(col_type, '[0-9]+'), '')::int
else null
end as character_maximum_length,
case
when col_type like 'numeric%'
then nullif(
SPLIT_PART(REGEXP_SUBSTR(col_type, '[0-9,]+'), ',', 1),
'')::int
else null
end as numeric_precision,
case
when col_type like 'numeric%'
then nullif(
SPLIT_PART(REGEXP_SUBSTR(col_type, '[0-9,]+'), ',', 2),
'')::int
else null
end as numeric_scale
from pg_get_late_binding_view_cols()
cols(view_schema name, view_name name, col_name name,
col_type varchar, ordinal_position int)
where view_name = '{{ relation.identifier }}'
),
external_views as (
select
columnnum,
schemaname,
columnname,
case
when external_type ilike 'character varying%' or external_type ilike 'varchar%'
then 'character varying'
when external_type ilike 'numeric%' then 'numeric'
else external_type
end as external_type,
case
when external_type like 'character%' or external_type like 'varchar%'
then nullif(
REGEXP_SUBSTR(external_type, '[0-9]+'),
'')::int
else null
end as character_maximum_length,
case
when external_type like 'numeric%'
then nullif(
SPLIT_PART(REGEXP_SUBSTR(external_type, '[0-9,]+'), ',', 1),
'')::int
else null
end as numeric_precision,
case
when external_type like 'numeric%'
then nullif(
SPLIT_PART(REGEXP_SUBSTR(external_type, '[0-9,]+'), ',', 2),
'')::int
else null
end as numeric_scale
from
pg_catalog.svv_external_columns
where
schemaname = '{{ relation.schema }}'
and tablename = '{{ relation.identifier }}'
),
unioned as (
select * from bound_views
union all
select * from unbound_views
union all
select * from external_views
)
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from unioned
{% if relation.schema %}
where table_schema = '{{ relation.schema }}'
{% endif %}
order by ordinal_position
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{% endmacro %}
{% macro redshift__list_relations_without_caching(schema_relation) %}
{{ return(postgres__list_relations_without_caching(schema_relation)) }}
{% endmacro %}
{% macro redshift__information_schema_name(database) -%}
{{ return(postgres__information_schema_name(database)) }}
{%- endmacro %}
{% macro redshift__list_schemas(database) -%}
{{ return(postgres__list_schemas(database)) }}
{%- endmacro %}
{% macro redshift__check_schema_exists(information_schema, schema) -%}
{{ return(postgres__check_schema_exists(information_schema, schema)) }}
{%- endmacro %}
{% macro redshift__current_timestamp() -%}
getdate()
{%- endmacro %}
{% macro redshift__snapshot_get_time() -%}
{{ current_timestamp() }}::timestamp
{%- endmacro %}
{% macro redshift__snapshot_string_as_time(timestamp) -%}
{%- set result = "'" ~ timestamp ~ "'::timestamp" -%}
{{ return(result) }}
{%- endmacro %}
{% macro redshift__make_temp_relation(base_relation, suffix) %}
{% do return(postgres__make_temp_relation(base_relation, suffix)) %}
{% endmacro %}
{% macro redshift__persist_docs(relation, model, for_relation, for_columns) -%}
{% if for_relation and config.persist_relation_docs() and model.description %}
{% do run_query(alter_relation_comment(relation, model.description)) %}
{% endif %}
{# Override: do not set column comments for LBVs #}
{% set is_lbv = config.get('materialized') == 'view' and config.get('bind') == false %}
{% if for_columns and config.persist_column_docs() and model.columns and not is_lbv %}
{% do run_query(alter_column_comment(relation, model.columns)) %}
{% endif %}
{% endmacro %}
{% macro redshift__alter_relation_comment(relation, comment) %}
{% do return(postgres__alter_relation_comment(relation, comment)) %}
{% endmacro %}
{% macro redshift__alter_column_comment(relation, column_dict) %}
{% do return(postgres__alter_column_comment(relation, column_dict)) %}
{% endmacro %}
{% macro redshift__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns %}
{% for column in add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} add column {{ column.name }} {{ column.data_type }}
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endif %}
{% if remove_columns %}
{% for column in remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} drop column {{ column.name }}
{% endset %}
{% do run_query(sql) %}
{% endfor %}
{% endif %}
{% endmacro %}

View File

@@ -1,242 +0,0 @@
{% macro redshift__get_base_catalog(information_schema, schemas) -%}
{%- call statement('base_catalog', fetch_result=True) -%}
{% set database = information_schema.database %}
{{ adapter.verify_database(database) }}
with late_binding as (
select
'{{ database }}'::varchar as table_database,
table_schema,
table_name,
'LATE BINDING VIEW'::varchar as table_type,
null::text as table_comment,
column_name,
column_index,
column_type,
null::text as column_comment
from pg_get_late_binding_view_cols()
cols(table_schema name, table_name name, column_name name,
column_type varchar,
column_index int)
order by "column_index"
),
early_binding as (
select
'{{ database }}'::varchar as table_database,
sch.nspname as table_schema,
tbl.relname as table_name,
case tbl.relkind
when 'v' then 'VIEW'
else 'BASE TABLE'
end as table_type,
tbl_desc.description as table_comment,
col.attname as column_name,
col.attnum as column_index,
pg_catalog.format_type(col.atttypid, col.atttypmod) as column_type,
col_desc.description as column_comment
from pg_catalog.pg_namespace sch
join pg_catalog.pg_class tbl on tbl.relnamespace = sch.oid
join pg_catalog.pg_attribute col on col.attrelid = tbl.oid
left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0)
left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum)
where (
{%- for schema in schemas -%}
upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
and tbl.relkind in ('r', 'v', 'f', 'p')
and col.attnum > 0
and not col.attisdropped
),
table_owners as (
select
'{{ database }}'::varchar as table_database,
schemaname as table_schema,
tablename as table_name,
tableowner as table_owner
from pg_tables
union all
select
'{{ database }}'::varchar as table_database,
schemaname as table_schema,
viewname as table_name,
viewowner as table_owner
from pg_views
),
unioned as (
select *
from early_binding
union all
select *
from late_binding
)
select *,
table_database || '.' || table_schema || '.' || table_name as table_id
from unioned
join table_owners using (table_database, table_schema, table_name)
where (
{%- for schema in schemas -%}
upper(table_schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
order by "column_index"
{%- endcall -%}
{{ return(load_result('base_catalog').table) }}
{%- endmacro %}
{% macro redshift__get_extended_catalog(schemas) %}
{%- call statement('extended_catalog', fetch_result=True) -%}
select
"database" || '.' || "schema" || '.' || "table" as table_id,
'Encoded'::text as "stats:encoded:label",
encoded as "stats:encoded:value",
'Indicates whether any column in the table has compression encoding defined.'::text as "stats:encoded:description",
true as "stats:encoded:include",
'Dist Style' as "stats:diststyle:label",
diststyle as "stats:diststyle:value",
'Distribution style or distribution key column, if key distribution is defined.'::text as "stats:diststyle:description",
true as "stats:diststyle:include",
'Sort Key 1' as "stats:sortkey1:label",
-- handle 0xFF byte in response for interleaved sort styles
case
when sortkey1 like 'INTERLEAVED%' then 'INTERLEAVED'::text
else sortkey1
end as "stats:sortkey1:value",
'First column in the sort key.'::text as "stats:sortkey1:description",
(sortkey1 is not null) as "stats:sortkey1:include",
'Max Varchar' as "stats:max_varchar:label",
max_varchar as "stats:max_varchar:value",
'Size of the largest column that uses a VARCHAR data type.'::text as "stats:max_varchar:description",
true as "stats:max_varchar:include",
-- exclude this, as the data is strangely returned with null-byte characters
'Sort Key 1 Encoding' as "stats:sortkey1_enc:label",
sortkey1_enc as "stats:sortkey1_enc:value",
'Compression encoding of the first column in the sort key.' as "stats:sortkey1_enc:description",
false as "stats:sortkey1_enc:include",
'# Sort Keys' as "stats:sortkey_num:label",
sortkey_num as "stats:sortkey_num:value",
'Number of columns defined as sort keys.' as "stats:sortkey_num:description",
(sortkey_num > 0) as "stats:sortkey_num:include",
'Approximate Size' as "stats:size:label",
size * 1000000 as "stats:size:value",
'Approximate size of the table, calculated from a count of 1MB blocks'::text as "stats:size:description",
true as "stats:size:include",
'Disk Utilization' as "stats:pct_used:label",
pct_used / 100.0 as "stats:pct_used:value",
'Percent of available space that is used by the table.'::text as "stats:pct_used:description",
true as "stats:pct_used:include",
'Unsorted %' as "stats:unsorted:label",
unsorted / 100.0 as "stats:unsorted:value",
'Percent of unsorted rows in the table.'::text as "stats:unsorted:description",
(unsorted is not null) as "stats:unsorted:include",
'Stats Off' as "stats:stats_off:label",
stats_off as "stats:stats_off:value",
'Number that indicates how stale the table statistics are; 0 is current, 100 is out of date.'::text as "stats:stats_off:description",
true as "stats:stats_off:include",
'Approximate Row Count' as "stats:rows:label",
tbl_rows as "stats:rows:value",
'Approximate number of rows in the table. This value includes rows marked for deletion, but not yet vacuumed.'::text as "stats:rows:description",
true as "stats:rows:include",
'Sort Key Skew' as "stats:skew_sortkey1:label",
skew_sortkey1 as "stats:skew_sortkey1:value",
'Ratio of the size of the largest non-sort key column to the size of the first column of the sort key.'::text as "stats:skew_sortkey1:description",
(skew_sortkey1 is not null) as "stats:skew_sortkey1:include",
'Skew Rows' as "stats:skew_rows:label",
skew_rows as "stats:skew_rows:value",
'Ratio of the number of rows in the slice with the most rows to the number of rows in the slice with the fewest rows.'::text as "stats:skew_rows:description",
(skew_rows is not null) as "stats:skew_rows:include"
from svv_table_info
where (
{%- for schema in schemas -%}
upper(schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endcall -%}
{{ return(load_result('extended_catalog').table) }}
{% endmacro %}
{% macro redshift__can_select_from(table_name) %}
{%- call statement('has_table_privilege', fetch_result=True) -%}
select has_table_privilege(current_user, '{{ table_name }}', 'SELECT') as can_select
{%- endcall -%}
{% set can_select = load_result('has_table_privilege').table[0]['can_select'] %}
{{ return(can_select) }}
{% endmacro %}
{% macro redshift__no_svv_table_info_warning() %}
{% set msg %}
Warning: The database user "{{ target.user }}" has insufficient permissions to
query the "svv_table_info" table. Please grant SELECT permissions on this table
to the "{{ target.user }}" user to fetch extended table details from Redshift.
{% endset %}
{{ log(msg, info=True) }}
{% endmacro %}
{% macro redshift__get_catalog(information_schema, schemas) %}
{#-- Compute a left-outer join in memory. Some Redshift queries are
-- leader-only, and cannot be joined to other compute-based queries #}
{% set catalog = redshift__get_base_catalog(information_schema, schemas) %}
{% set select_extended = redshift__can_select_from('svv_table_info') %}
{% if select_extended %}
{% set extended_catalog = redshift__get_extended_catalog(schemas) %}
{% set catalog = catalog.join(extended_catalog, 'table_id') %}
{% else %}
{{ redshift__no_svv_table_info_warning() }}
{% endif %}
{{ return(catalog.exclude(['table_id'])) }}
{% endmacro %}

View File

@@ -1,4 +0,0 @@
{% macro redshift__snapshot_merge_sql(target, source, insert_cols) -%}
{{ postgres__snapshot_merge_sql(target, source, insert_cols) }}
{% endmacro %}

View File

@@ -1,3 +0,0 @@
{% macro redshift__get_relations () -%}
{{ return(dbt.postgres__get_relations()) }}
{% endmacro %}

View File

@@ -1,25 +0,0 @@
default:
outputs:
dev:
type: redshift
threads: [1 or more]
host: [host]
port: [port]
user: [dev_username]
pass: [dev_password]
dbname: [dbname]
schema: [dev_schema]
prod:
type: redshift
method: iam
cluster_id: [cluster_id]
threads: [1 or more]
host: [host]
port: [port]
user: [prod_user]
dbname: [dbname]
schema: [prod_schema]
target: dev

View File

@@ -1,70 +0,0 @@
#!/usr/bin/env python
import os
import sys
if sys.version_info < (3, 6):
print('Error: dbt does not support this version of Python.')
print('Please upgrade to Python 3.6 or higher.')
sys.exit(1)
from setuptools import setup
try:
from setuptools import find_namespace_packages
except ImportError:
# the user has a downlevel version of setuptools.
print('Error: dbt requires setuptools v40.1.0 or higher.')
print('Please upgrade setuptools with "pip install --upgrade setuptools" '
'and try again')
sys.exit(1)
package_name = "dbt-redshift"
package_version = "0.21.0rc1"
description = """The redshift adapter plugin for dbt (data build tool)"""
this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, 'README.md')) as f:
long_description = f.read()
setup(
name=package_name,
version=package_version,
description=description,
long_description=description,
long_description_content_type='text/markdown',
author="dbt Labs",
author_email="info@dbtlabs.com",
url="https://github.com/dbt-labs/dbt",
packages=find_namespace_packages(include=['dbt', 'dbt.*']),
package_data={
'dbt': [
'include/redshift/dbt_project.yml',
'include/redshift/sample_profiles.yml',
'include/redshift/macros/*.sql',
'include/redshift/macros/**/*.sql',
]
},
install_requires=[
'dbt-core=={}'.format(package_version),
'dbt-postgres=={}'.format(package_version),
# the following are all to match snowflake-connector-python
'boto3>=1.4.4,<2.0.0',
],
zip_safe=False,
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS :: MacOS X',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
python_requires=">=3.6.2",
)

View File

@@ -1,32 +0,0 @@
<p align="center">
<img src="https://raw.githubusercontent.com/dbt-labs/dbt/6c6649f9129d5d108aa3b0526f634cd8f3a9d1ed/etc/dbt-logo-full.svg" alt="dbt logo" width="500"/>
</p>
**[dbt](https://www.getdbt.com/)** (data build tool) enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
dbt is the T in ELT. Organize, cleanse, denormalize, filter, rename, and pre-aggregate the raw data in your warehouse so that it's ready for analysis.
## dbt-snowflake
The `dbt-snowflake` package contains all of the code required to make dbt operate on a Snowflake database. For
more information on using dbt with Snowflake, consult [the docs](https://docs.getdbt.com/docs/profile-snowflake).
## Find out more
- Check out the [Introduction to dbt](https://docs.getdbt.com/docs/introduction/).
- Read the [dbt Viewpoint](https://docs.getdbt.com/docs/about/viewpoint/).
## Join thousands of analysts in the dbt community
- Join the [chat](http://community.getdbt.com/) on Slack.
- Find community posts on [dbt Discourse](https://discourse.getdbt.com).
## Reporting bugs and contributing code
- Want to report a bug or request a feature? Let us know on [Slack](http://community.getdbt.com/), or open [an issue](https://github.com/dbt-labs/dbt/issues/new).
- Want to help us build dbt? Check out the [Contributing Getting Started Guide](https://github.com/dbt-labs/dbt/blob/HEAD/CONTRIBUTING.md)
## Code of Conduct
Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [dbt Code of Conduct](https://community.getdbt.com/code-of-conduct).

View File

@@ -1,13 +0,0 @@
from dbt.adapters.snowflake.column import SnowflakeColumn # noqa
from dbt.adapters.snowflake.connections import SnowflakeConnectionManager # noqa
from dbt.adapters.snowflake.connections import SnowflakeCredentials
from dbt.adapters.snowflake.relation import SnowflakeRelation # noqa
from dbt.adapters.snowflake.impl import SnowflakeAdapter
from dbt.adapters.base import AdapterPlugin
from dbt.include import snowflake
Plugin = AdapterPlugin(
adapter=SnowflakeAdapter,
credentials=SnowflakeCredentials,
include_path=snowflake.PACKAGE_PATH)

View File

@@ -1 +0,0 @@
version = '0.21.0rc1'

View File

@@ -1,31 +0,0 @@
from dataclasses import dataclass
from dbt.adapters.base.column import Column
from dbt.exceptions import RuntimeException
@dataclass
class SnowflakeColumn(Column):
def is_integer(self) -> bool:
# everything that smells like an int is actually a NUMBER(38, 0)
return False
def is_numeric(self) -> bool:
return self.dtype.lower() in [
'int', 'integer', 'bigint', 'smallint', 'tinyint', 'byteint',
'numeric', 'decimal', 'number'
]
def is_float(self):
return self.dtype.lower() in [
'float', 'float4', 'float8', 'double', 'double precision', 'real',
]
def string_size(self) -> int:
if not self.is_string():
raise RuntimeException("Called string_size() on non-string field!")
if self.dtype == 'text' or self.char_size is None:
return 16777216
else:
return int(self.char_size)

View File

@@ -1,375 +0,0 @@
import base64
import datetime
import pytz
import re
from contextlib import contextmanager
from dataclasses import dataclass
from io import StringIO
from time import sleep
from typing import Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import requests
import snowflake.connector
import snowflake.connector.errors
from dbt.exceptions import (
InternalException, RuntimeException, FailedToConnectException,
DatabaseException, warn_or_error
)
from dbt.adapters.base import Credentials
from dbt.contracts.connection import AdapterResponse
from dbt.adapters.sql import SQLConnectionManager
from dbt.logger import GLOBAL_LOGGER as logger
_TOKEN_REQUEST_URL = 'https://{}.snowflakecomputing.com/oauth/token-request'
@dataclass
class SnowflakeCredentials(Credentials):
account: str
user: str
warehouse: Optional[str] = None
role: Optional[str] = None
password: Optional[str] = None
authenticator: Optional[str] = None
private_key_path: Optional[str] = None
private_key_passphrase: Optional[str] = None
token: Optional[str] = None
oauth_client_id: Optional[str] = None
oauth_client_secret: Optional[str] = None
query_tag: Optional[str] = None
client_session_keep_alive: bool = False
def __post_init__(self):
if (
self.authenticator != 'oauth' and
(self.oauth_client_secret or self.oauth_client_id or self.token)
):
# the user probably forgot to set 'authenticator' like I keep doing
warn_or_error(
'Authenticator is not set to oauth, but an oauth-only '
'parameter is set! Did you mean to set authenticator: oauth?'
)
@property
def type(self):
return 'snowflake'
@property
def unique_field(self):
return self.account
def _connection_keys(self):
return (
'account', 'user', 'database', 'schema', 'warehouse', 'role',
'client_session_keep_alive'
)
def auth_args(self):
# Pull all of the optional authentication args for the connector,
# let connector handle the actual arg validation
result = {}
if self.password:
result['password'] = self.password
if self.authenticator:
result['authenticator'] = self.authenticator
if self.authenticator == 'oauth':
token = self.token
# if we have a client ID/client secret, the token is a refresh
# token, not an access token
if self.oauth_client_id and self.oauth_client_secret:
token = self._get_access_token()
elif self.oauth_client_id:
warn_or_error(
'Invalid profile: got an oauth_client_id, but not an '
'oauth_client_secret!'
)
elif self.oauth_client_secret:
warn_or_error(
'Invalid profile: got an oauth_client_secret, but not '
'an oauth_client_id!'
)
result['token'] = token
# enable the token cache
result['client_store_temporary_credential'] = True
result['private_key'] = self._get_private_key()
return result
def _get_access_token(self) -> str:
if self.authenticator != 'oauth':
raise InternalException('Can only get access tokens for oauth')
missing = any(
x is None for x in
(self.oauth_client_id, self.oauth_client_secret, self.token)
)
if missing:
raise InternalException(
'need a client ID a client secret, and a refresh token to get '
'an access token'
)
# should the full url be a config item?
token_url = _TOKEN_REQUEST_URL.format(self.account)
# I think this is only used to redirect on success, which we ignore
# (it does not have to match the integration's settings in snowflake)
redirect_uri = 'http://localhost:9999'
data = {
'grant_type': 'refresh_token',
'refresh_token': self.token,
'redirect_uri': redirect_uri
}
auth = base64.b64encode(
f'{self.oauth_client_id}:{self.oauth_client_secret}'
.encode('ascii')
).decode('ascii')
headers = {
'Authorization': f'Basic {auth}',
'Content-type': 'application/x-www-form-urlencoded;charset=utf-8'
}
result_json = None
max_iter = 20
# Attempt to obtain JSON for 1 second before throwing an error
for i in range(max_iter):
result = requests.post(token_url, headers=headers, data=data)
try:
result_json = result.json()
break
except ValueError as e:
message = result.text
logger.debug(f"Got a non-json response ({result.status_code}): \
{e}, message: {message}")
sleep(0.05)
if result_json is None:
raise DatabaseException(f"""Did not receive valid json with access_token.
Showing json response: {result_json}""")
return result_json['access_token']
def _get_private_key(self):
"""Get Snowflake private key by path or None."""
if not self.private_key_path:
return None
if self.private_key_passphrase:
encoded_passphrase = self.private_key_passphrase.encode()
else:
encoded_passphrase = None
with open(self.private_key_path, 'rb') as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=encoded_passphrase,
backend=default_backend())
return p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
class SnowflakeConnectionManager(SQLConnectionManager):
TYPE = 'snowflake'
@contextmanager
def exception_handler(self, sql):
try:
yield
except snowflake.connector.errors.ProgrammingError as e:
msg = str(e)
logger.debug('Snowflake query id: {}'.format(e.sfqid))
logger.debug('Snowflake error: {}'.format(msg))
if 'Empty SQL statement' in msg:
logger.debug("got empty sql statement, moving on")
elif 'This session does not have a current database' in msg:
raise FailedToConnectException(
('{}\n\nThis error sometimes occurs when invalid '
'credentials are provided, or when your default role '
'does not have access to use the specified database. '
'Please double check your profile and try again.')
.format(msg))
else:
raise DatabaseException(msg)
except Exception as e:
if isinstance(e, snowflake.connector.errors.Error):
logger.debug('Snowflake query id: {}'.format(e.sfqid))
logger.debug("Error running SQL: {}", sql)
logger.debug("Rolling back transaction.")
self.rollback_if_open()
if isinstance(e, RuntimeException):
# during a sql query, an internal to dbt exception was raised.
# this sounds a lot like a signal handler and probably has
# useful information, so raise it without modification.
raise
raise RuntimeException(str(e)) from e
@classmethod
def open(cls, connection):
if connection.state == 'open':
logger.debug('Connection is already open, skipping open.')
return connection
try:
creds = connection.credentials
handle = snowflake.connector.connect(
account=creds.account,
user=creds.user,
database=creds.database,
schema=creds.schema,
warehouse=creds.warehouse,
role=creds.role,
autocommit=True,
client_session_keep_alive=creds.client_session_keep_alive,
application='dbt',
**creds.auth_args()
)
if creds.query_tag:
handle.cursor().execute(
("alter session set query_tag = '{}'")
.format(creds.query_tag))
connection.handle = handle
connection.state = 'open'
except snowflake.connector.errors.Error as e:
logger.debug("Got an error when attempting to open a snowflake "
"connection: '{}'"
.format(e))
connection.handle = None
connection.state = 'fail'
raise FailedToConnectException(str(e))
def cancel(self, connection):
handle = connection.handle
sid = handle.session_id
connection_name = connection.name
sql = 'select system$abort_session({})'.format(sid)
logger.debug("Cancelling query '{}' ({})".format(connection_name, sid))
_, cursor = self.add_query(sql)
res = cursor.fetchone()
logger.debug("Cancel query '{}': {}".format(connection_name, res))
@classmethod
def get_response(cls, cursor) -> AdapterResponse:
code = cursor.sqlstate
if code is None:
code = 'SUCCESS'
return AdapterResponse(
_message="{} {}".format(code, cursor.rowcount),
rows_affected=cursor.rowcount,
code=code
)
# disable transactional logic by default on Snowflake
# except for DML statements where explicitly defined
def add_begin_query(self, *args, **kwargs):
pass
def add_commit_query(self, *args, **kwargs):
pass
def begin(self):
pass
def commit(self):
pass
def clear_transaction(self):
pass
@classmethod
def _split_queries(cls, sql):
"Splits sql statements at semicolons into discrete queries"
sql_s = str(sql)
sql_buf = StringIO(sql_s)
split_query = snowflake.connector.util_text.split_statements(sql_buf)
return [part[0] for part in split_query]
@classmethod
def process_results(cls, column_names, rows):
# Override for Snowflake. The datetime objects returned by
# snowflake-connector-python are not pickleable, so we need
# to replace them with sane timezones
fixed = []
for row in rows:
fixed_row = []
for col in row:
if isinstance(col, datetime.datetime) and col.tzinfo:
offset = col.utcoffset()
offset_seconds = offset.total_seconds()
new_timezone = pytz.FixedOffset(offset_seconds // 60)
col = col.astimezone(tz=new_timezone)
fixed_row.append(col)
fixed.append(fixed_row)
return super().process_results(column_names, fixed)
def add_query(self, sql, auto_begin=True,
bindings=None, abridge_sql_log=False):
connection = None
cursor = None
if bindings:
# The snowflake connector is more strict than, eg., psycopg2 -
# which allows any iterable thing to be passed as a binding.
bindings = tuple(bindings)
queries = self._split_queries(sql)
for individual_query in queries:
# hack -- after the last ';', remove comments and don't run
# empty queries. this avoids using exceptions as flow control,
# and also allows us to return the status of the last cursor
without_comments = re.sub(
re.compile(
r'(\".*?\"|\'.*?\')|(/\*.*?\*/|--[^\r\n]*$)', re.MULTILINE
),
'', individual_query).strip()
if without_comments == "":
continue
connection, cursor = super().add_query(
individual_query, auto_begin,
bindings=bindings,
abridge_sql_log=abridge_sql_log
)
if cursor is None:
conn = self.get_thread_connection()
if conn is None or conn.name is None:
conn_name = '<None>'
else:
conn_name = conn.name
raise RuntimeException(
"Tried to run an empty query on model '{}'. If you are "
"conditionally running\nsql, eg. in a model hook, make "
"sure your `else` clause contains valid sql!\n\n"
"Provided SQL:\n{}"
.format(conn_name, sql)
)
return connection, cursor

View File

@@ -1,190 +0,0 @@
from dataclasses import dataclass
from typing import Mapping, Any, Optional, List, Union
import agate
from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
LIST_RELATIONS_MACRO_NAME,
)
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import (
raise_compiler_error, RuntimeException, DatabaseException
)
from dbt.utils import filter_null_values
@dataclass
class SnowflakeConfig(AdapterConfig):
transient: Optional[bool] = None
cluster_by: Optional[Union[str, List[str]]] = None
automatic_clustering: Optional[bool] = None
secure: Optional[bool] = None
copy_grants: Optional[bool] = None
snowflake_warehouse: Optional[str] = None
query_tag: Optional[str] = None
merge_update_columns: Optional[str] = None
class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Column = SnowflakeColumn
ConnectionManager = SnowflakeConnectionManager
AdapterSpecificConfigs = SnowflakeConfig
@classmethod
def date_function(cls):
return "CURRENT_TIMESTAMP()"
@classmethod
def _catalog_filter_table(
cls, table: agate.Table, manifest: Manifest
) -> agate.Table:
# On snowflake, users can set QUOTED_IDENTIFIERS_IGNORE_CASE, so force
# the column names to their lowercased forms.
lowered = table.rename(
column_names=[c.lower() for c in table.column_names]
)
return super()._catalog_filter_table(lowered, manifest)
def _make_match_kwargs(self, database, schema, identifier):
quoting = self.config.quoting
if identifier is not None and quoting["identifier"] is False:
identifier = identifier.upper()
if schema is not None and quoting["schema"] is False:
schema = schema.upper()
if database is not None and quoting["database"] is False:
database = database.upper()
return filter_null_values(
{"identifier": identifier, "schema": schema, "database": database}
)
def _get_warehouse(self) -> str:
_, table = self.execute(
'select current_warehouse() as warehouse',
fetch=True
)
if len(table) == 0 or len(table[0]) == 0:
# can this happen?
raise RuntimeException(
'Could not get current warehouse: no results'
)
return str(table[0][0])
def _use_warehouse(self, warehouse: str):
"""Use the given warehouse. Quotes are never applied."""
self.execute('use warehouse {}'.format(warehouse))
def pre_model_hook(self, config: Mapping[str, Any]) -> Optional[str]:
default_warehouse = self.config.credentials.warehouse
warehouse = config.get('snowflake_warehouse', default_warehouse)
if warehouse == default_warehouse or warehouse is None:
return None
previous = self._get_warehouse()
self._use_warehouse(warehouse)
return previous
def post_model_hook(
self, config: Mapping[str, Any], context: Optional[str]
) -> None:
if context is not None:
self._use_warehouse(context)
def list_schemas(self, database: str) -> List[str]:
try:
results = self.execute_macro(
LIST_SCHEMAS_MACRO_NAME,
kwargs={'database': database}
)
except DatabaseException as exc:
msg = (
f'Database error while listing schemas in database '
f'"{database}"\n{exc}'
)
raise RuntimeException(msg)
# this uses 'show terse schemas in database', and the column name we
# want is 'name'
return [row['name'] for row in results]
def get_columns_in_relation(self, relation):
try:
return super().get_columns_in_relation(relation)
except DatabaseException as exc:
if 'does not exist or not authorized' in str(exc):
return []
else:
raise
def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {'schema_relation': schema_relation}
try:
results = self.execute_macro(
LIST_RELATIONS_MACRO_NAME,
kwargs=kwargs
)
except DatabaseException as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
# and skip listing the missing ones, which sounds expensive.
if 'Object does not exist' in str(exc):
return []
raise
relations = []
quote_policy = {
'database': True,
'schema': True,
'identifier': True
}
columns = ['database_name', 'schema_name', 'name', 'kind']
for _database, _schema, _identifier, _type in results.select(columns):
try:
_type = self.Relation.get_relation_type(_type.lower())
except ValueError:
_type = self.Relation.External
relations.append(self.Relation.create(
database=_database,
schema=_schema,
identifier=_identifier,
quote_policy=quote_policy,
type=_type
))
return relations
def quote_seed_column(
self, column: str, quote_config: Optional[bool]
) -> str:
quote_columns: bool = False
if isinstance(quote_config, bool):
quote_columns = quote_config
elif quote_config is None:
pass
else:
raise_compiler_error(
f'The seed configuration value of "quote_columns" has an '
f'invalid type {type(quote_config)}'
)
if quote_columns:
return self.quote(column)
else:
return column
def timestamp_add_sql(
self, add_to: str, number: int = 1, interval: str = 'hour'
) -> str:
return f'DATEADD({interval}, {number}, {add_to})'

View File

@@ -1,14 +0,0 @@
from dataclasses import dataclass
from dbt.adapters.base.relation import BaseRelation, Policy
@dataclass
class SnowflakeQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
quote_policy: SnowflakeQuotePolicy = SnowflakeQuotePolicy()

View File

@@ -1,2 +0,0 @@
import os
PACKAGE_PATH = os.path.dirname(__file__)

View File

@@ -1,5 +0,0 @@
config-version: 2
name: dbt_snowflake
version: 1.0
macro-paths: ["macros"]

View File

@@ -1,251 +0,0 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ sql }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ sql }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- endif -%}
{% endmacro %}
{% macro snowflake__create_view_as(relation, sql) -%}
{%- set secure = config.get('secure', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
create or replace {% if secure -%}
secure
{%- endif %} view {{ relation }} {% if copy_grants -%} copy grants {%- endif %} as (
{{ sql }}
);
{% endmacro %}
{% macro snowflake__get_columns_in_relation(relation) -%}
{%- set sql -%}
describe table {{ relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{% set maximum = 10000 %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many columns in relation {{ relation }}! dbt can only get
information about relations with fewer than {{ maximum }} columns.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{% set columns = [] %}
{% for row in result %}
{% do columns.append(api.Column.from_description(row['name'], row['type'])) %}
{% endfor %}
{% do return(columns) %}
{% endmacro %}
{% macro snowflake__list_schemas(database) -%}
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #}
{% set maximum = 10000 %}
{% set sql -%}
show terse schemas in database {{ database }}
limit {{ maximum }}
{%- endset %}
{% set result = run_query(sql) %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many schemas in database {{ database }}! dbt can only get
information about databases with fewer than {{ maximum }} schemas.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{{ return(result) }}
{% endmacro %}
{% macro snowflake__list_relations_without_caching(schema_relation) %}
{%- set sql -%}
show terse objects in {{ schema_relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{% set maximum = 10000 %}
{% if (result | length) >= maximum %}
{% set msg %}
Too many schemas in schema {{ schema_relation }}! dbt can only get
information about schemas with fewer than {{ maximum }} objects.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}
{%- do return(result) -%}
{% endmacro %}
{% macro snowflake__check_schema_exists(information_schema, schema) -%}
{% call statement('check_schema_exists', fetch_result=True) -%}
select count(*)
from {{ information_schema }}.schemata
where upper(schema_name) = upper('{{ schema }}')
and upper(catalog_name) = upper('{{ information_schema.database }}')
{%- endcall %}
{{ return(load_result('check_schema_exists').table) }}
{%- endmacro %}
{% macro snowflake__current_timestamp() -%}
convert_timezone('UTC', current_timestamp())
{%- endmacro %}
{% macro snowflake__snapshot_string_as_time(timestamp) -%}
{%- set result = "to_timestamp_ntz('" ~ timestamp ~ "')" -%}
{{ return(result) }}
{%- endmacro %}
{% macro snowflake__snapshot_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}
{% macro snowflake__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
alter table {{ from_relation }} rename to {{ to_relation }}
{%- endcall %}
{% endmacro %}
{% macro snowflake__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter {{ adapter.quote(column_name) }} set data type {{ new_column_type }};
{% endcall %}
{% endmacro %}
{% macro snowflake__alter_relation_comment(relation, relation_comment) -%}
comment on {{ relation.type }} {{ relation }} IS $${{ relation_comment | replace('$', '[$]') }}$$;
{% endmacro %}
{% macro snowflake__alter_column_comment(relation, column_dict) -%}
{% set existing_columns = adapter.get_columns_in_relation(relation) | map(attribute="name") | list %}
alter {{ relation.type }} {{ relation }} alter
{% for column_name in column_dict if (column_name in existing_columns) or (column_name|upper in existing_columns) %}
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} COMMENT $${{ column_dict[column_name]['description'] | replace('$', '[$]') }}$$ {{ ',' if not loop.last else ';' }}
{% endfor %}
{% endmacro %}
{% macro get_current_query_tag() -%}
{{ return(run_query("show parameters like 'query_tag' in session").rows[0]['value']) }}
{% endmacro %}
{% macro set_query_tag() -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
{% macro unset_query_tag(original_query_tag) -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% if original_query_tag %}
{{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
{% do run_query("alter session set query_tag = '{}'".format(original_query_tag)) %}
{% else %}
{{ log("No original query_tag, unsetting parameter.") }}
{% do run_query("alter session unset query_tag") %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro snowflake__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} add column
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% if remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} drop column
{% for column in remove_columns %}
{{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}
{% macro snowflake_dml_explicit_transaction(dml) %}
{#
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
statements before passing them into run_query(), or calling in the 'main' statement
of a materialization
#}
{% set dml_transaction -%}
begin;
{{ dml }};
commit;
{%- endset %}
{% do return(dml_transaction) %}
{% endmacro %}
{% macro snowflake__truncate_relation(relation) -%}
{% set truncate_dml %}
truncate table {{ relation }}
{% endset %}
{% call statement('truncate_relation') -%}
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}

View File

@@ -1,67 +0,0 @@
{% macro snowflake__get_catalog(information_schema, schemas) -%}
{% set query %}
with tables as (
select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
table_type as "table_type",
comment as "table_comment",
-- note: this is the _role_ that owns the table
table_owner as "table_owner",
'Clustering Key' as "stats:clustering_key:label",
clustering_key as "stats:clustering_key:value",
'The key used to cluster this table' as "stats:clustering_key:description",
(clustering_key is not null) as "stats:clustering_key:include",
'Row Count' as "stats:row_count:label",
row_count as "stats:row_count:value",
'An approximate count of rows in this table' as "stats:row_count:description",
(row_count is not null) as "stats:row_count:include",
'Approximate Size' as "stats:bytes:label",
bytes as "stats:bytes:value",
'Approximate size of the table as reported by Snowflake' as "stats:bytes:description",
(bytes is not null) as "stats:bytes:include",
'Last Modified' as "stats:last_modified:label",
to_varchar(convert_timezone('UTC', last_altered), 'yyyy-mm-dd HH24:MI'||'UTC') as "stats:last_modified:value",
'The timestamp for last update/change' as "stats:last_modified:description",
(last_altered is not null and table_type='BASE TABLE') as "stats:last_modified:include"
from {{ information_schema }}.tables
),
columns as (
select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type",
comment as "column_comment"
from {{ information_schema }}.columns
)
select *
from tables
join columns using ("table_database", "table_schema", "table_name")
where (
{%- for schema in schemas -%}
upper("table_schema") = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
order by "column_index"
{%- endset -%}
{{ return(run_query(query)) }}
{%- endmacro %}

View File

@@ -1,80 +0,0 @@
{% macro dbt_snowflake_validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="merge") -%}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'delete+insert'
{%- endset %}
{% if strategy not in ['merge', 'delete+insert'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}
{% do return(strategy) %}
{% endmacro %}
{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% if strategy == 'merge' %}
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% elif strategy == 'delete+insert' %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}
{% endmacro %}
{% materialization incremental, adapter='snowflake' -%}
{% set original_query_tag = set_query_tag() %}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}
{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}
{{ run_hooks(post_hooks) }}
{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}

View File

@@ -1,44 +0,0 @@
{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) -%}
{#
Workaround for Snowflake not being happy with a merge on a constant-false predicate.
When no unique_key is provided, this macro will do a regular insert. If a unique_key
is provided, then this macro will do a proper merge instead.
#}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set dml -%}
{%- if unique_key is none -%}
{{ sql_header if sql_header is not none }}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source_sql }}
)
{%- else -%}
{{ default__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) }}
{%- endif -%}
{%- endset -%}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) %}
{% set dml = default__snapshot_merge_sql(target, source, insert_cols) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}

View File

@@ -1,37 +0,0 @@
{% macro snowflake__load_csv_rows(model, agate_table) %}
{% set batch_size = get_batch_size() %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}
{% set statements = [] %}
{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}
{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}
{% set sql %}
insert into {{ this.render() }} ({{ cols_sql }}) values
{% for row in chunk -%}
({%- for column in agate_table.column_names -%}
%s
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}
{% do adapter.add_query('BEGIN', auto_begin=False) %}
{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}
{% do adapter.add_query('COMMIT', auto_begin=False) %}
{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}
{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}

View File

@@ -1,34 +0,0 @@
{% materialization table, adapter='snowflake' %}
{% set original_query_tag = set_query_tag() %}
{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}
{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
--build model
{% call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}
{{ run_hooks(post_hooks) }}
{% do persist_docs(target_relation, model) %}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

View File

@@ -1,13 +0,0 @@
{% materialization view, adapter='snowflake' -%}
{% set original_query_tag = set_query_tag() %}
{% set to_return = create_or_replace_view() %}
{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model, for_columns=false) %}
{% do return(to_return) %}
{% do unset_query_tag(original_query_tag) %}
{%- endmaterialization %}

View File

@@ -1,29 +0,0 @@
default:
outputs:
dev: # User-Password config
type: snowflake
account: [account id + region (if applicable)]
user: [username]
password: [password]
role: [user role]
database: [database name]
warehouse: [warehouse name]
schema: [dbt schema]
threads: [1 or more]
client_session_keep_alive: False
prod: # Keypair config
type: snowflake
account: [account id + region (if applicable)]
user: [username]
role: [user role]
private_key_path: [path/to/private.key]
private_key_passphrase: [passphrase for the private key, if key is encrypted]
database: [database name]
warehouse: [warehouse name]
schema: [dbt schema]
threads: [1 or more]
client_session_keep_alive: False
target: dev

View File

@@ -1,70 +0,0 @@
#!/usr/bin/env python
import os
import sys
if sys.version_info < (3, 6):
print('Error: dbt does not support this version of Python.')
print('Please upgrade to Python 3.6 or higher.')
sys.exit(1)
from setuptools import setup
try:
from setuptools import find_namespace_packages
except ImportError:
# the user has a downlevel version of setuptools.
print('Error: dbt requires setuptools v40.1.0 or higher.')
print('Please upgrade setuptools with "pip install --upgrade setuptools" '
'and try again')
sys.exit(1)
package_name = "dbt-snowflake"
package_version = "0.21.0rc1"
description = """The snowflake adapter plugin for dbt (data build tool)"""
this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, 'README.md')) as f:
long_description = f.read()
setup(
name=package_name,
version=package_version,
description=description,
long_description=description,
long_description_content_type='text/markdown',
author="dbt Labs",
author_email="info@dbtlabs.com",
url="https://github.com/dbt-labs/dbt",
packages=find_namespace_packages(include=['dbt', 'dbt.*']),
package_data={
'dbt': [
'include/snowflake/dbt_project.yml',
'include/snowflake/sample_profiles.yml',
'include/snowflake/macros/*.sql',
'include/snowflake/macros/**/*.sql',
]
},
install_requires=[
'dbt-core=={}'.format(package_version),
'snowflake-connector-python[secure-local-storage]>=2.4.1,<2.6.0',
'requests<3.0.0',
'cryptography>=3.2,<4',
],
zip_safe=False,
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS :: MacOS X',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
python_requires=">=3.6.2",
)

View File

@@ -1,5 +1,3 @@
./core
./plugins/postgres
./plugins/redshift
./plugins/snowflake
./plugins/bigquery

View File

@@ -14,7 +14,7 @@ rm -rf "$DBT_PATH"/dist
rm -rf "$DBT_PATH"/build
mkdir -p "$DBT_PATH"/dist
for SUBPATH in core plugins/postgres plugins/redshift plugins/bigquery plugins/snowflake
for SUBPATH in core plugins/postgres plugins/bigquery
do
rm -rf "$DBT_PATH"/"$SUBPATH"/dist
rm -rf "$DBT_PATH"/"$SUBPATH"/build

View File

@@ -44,8 +44,6 @@ setup(
install_requires=[
'dbt-core=={}'.format(package_version),
'dbt-postgres=={}'.format(package_version),
'dbt-redshift=={}'.format(package_version),
'dbt-snowflake=={}'.format(package_version),
'dbt-bigquery=={}'.format(package_version),
],
zip_safe=False,

View File

@@ -14,9 +14,3 @@ SNOWFLAKE_TEST_OAUTH_CLIENT_SECRET=
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON=
BIGQUERY_TEST_ALT_DATABASE=
REDSHIFT_TEST_HOST=
REDSHIFT_TEST_USER=
REDSHIFT_TEST_PASS=
REDSHIFT_TEST_PORT=
REDSHIFT_TEST_DBNAME=

View File

@@ -1,9 +1,6 @@
{{ config(materialized='incremental', unique_key='id') }}
-- this will fail on snowflake with "merge" due
-- to the nondeterministic join on id
select 1 as id
union all
select 1 as id

View File

@@ -105,89 +105,6 @@ class TestSimpleCopy(BaseTestSimpleCopy):
self.assertManyTablesEqual(["seed", "view_model", "materialized"])
@use_profile("snowflake")
def test__snowflake__simple_copy(self):
self.use_default_project({
"data-paths": [self.dir("seed-initial")],
"seeds": {
'quote_columns': False,
}
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({"data-paths": [self.dir("seed-update")]})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({
"test-paths": [self.dir("tests")],
"data-paths": [self.dir("seed-update")],
})
self.run_dbt(['test'])
@use_profile("snowflake")
def test__snowflake__simple_copy__quoting_off(self):
self.use_default_project({
"quoting": {"identifier": False},
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-update")],
"quoting": {"identifier": False},
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({
"test-paths": [self.dir("tests")],
"data-paths": [self.dir("snowflake-seed-update")],
"quoting": {"identifier": False},
})
self.run_dbt(['test'])
@use_profile("snowflake")
def test__snowflake__seed__quoting_switch(self):
self.use_default_project({
"quoting": {"identifier": False},
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-update")],
"quoting": {"identifier": True},
})
results = self.run_dbt(["seed"], expect_pass=False)
self.use_default_project({
"test-paths": [self.dir("tests")],
"data-paths": [self.dir("snowflake-seed-initial")],
})
self.run_dbt(['test'])
@use_profile("bigquery")
def test__bigquery__simple_copy(self):
results = self.run_dbt(["seed"])
@@ -213,155 +130,6 @@ class TestSimpleCopy(BaseTestSimpleCopy):
self.assertTablesEqual("seed", "get_and_ref")
class TestSimpleCopyQuotingIdentifierOn(BaseTestSimpleCopy):
@property
def project_config(self):
return self.seed_quote_cfg_with({
'quoting': {
'identifier': True,
},
})
@use_profile("snowflake")
def test__snowflake__simple_copy__quoting_on(self):
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["seed", "view_model", "incremental", "materialized", "get_and_ref"])
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-update")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["seed", "view_model", "incremental", "materialized", "get_and_ref"])
# can't run the test as this one's identifiers will be the wrong case
class BaseLowercasedSchemaTest(BaseTestSimpleCopy):
def unique_schema(self):
# bypass the forced uppercasing that unique_schema() does on snowflake
return super().unique_schema().lower()
class TestSnowflakeSimpleLowercasedSchemaCopy(BaseLowercasedSchemaTest):
@use_profile('snowflake')
def test__snowflake__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("snowflake-seed-initial")]})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({"data-paths": [self.dir("snowflake-seed-update")]})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "INCREMENTAL", "MATERIALIZED", "GET_AND_REF"])
self.use_default_project({
"test-paths": [self.dir("tests")],
"data-paths": [self.dir("snowflake-seed-update")],
})
self.run_dbt(['test'])
class TestSnowflakeSimpleLowercasedSchemaQuoted(BaseLowercasedSchemaTest):
@property
def project_config(self):
return self.seed_quote_cfg_with({
'quoting': {'identifier': False, 'schema': True}
})
@use_profile("snowflake")
def test__snowflake__seed__quoting_switch_schema_lower(self):
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
# this is intentional - should not error!
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-update")],
"quoting": {"identifier": False, "schema": False},
})
results = self.run_dbt(["seed"], expect_pass=False)
class TestSnowflakeSimpleUppercasedSchemaQuoted(BaseTestSimpleCopy):
@property
def project_config(self):
return self.seed_quote_cfg_with({
'quoting': {'identifier': False, 'schema': True}
})
@use_profile("snowflake")
def test__snowflake__seed__quoting_switch_schema_upper(self):
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
# this is intentional - should not error!
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-update")],
"quoting": {"identifier": False, "schema": False},
})
results = self.run_dbt(["seed"])
class TestSnowflakeIncrementalOverwrite(BaseTestSimpleCopy):
@property
def models(self):
return self.dir("models-snowflake")
@use_profile("snowflake")
def test__snowflake__incremental_overwrite(self):
self.use_default_project({
"data-paths": [self.dir("snowflake-seed-initial")],
})
results = self.run_dbt(["run"])
self.assertEqual(len(results), 1)
results = self.run_dbt(["run"], expect_pass=False)
self.assertEqual(len(results), 1)
# Setting the incremental_strategy should make this succeed
self.use_default_project({
"models": {
"incremental_strategy": "delete+insert"
},
"data-paths": [self.dir("snowflake-seed-update")],
})
results = self.run_dbt(["run"])
self.assertEqual(len(results), 1)
class TestShouting(BaseTestSimpleCopy):
@property
def models(self):
@@ -498,21 +266,3 @@ class TestIncrementalMergeColumns(BaseTestSimpleCopy):
})
self.seed_and_run()
self.assertTablesEqual("incremental_update_cols", "expected_result")
@use_profile("snowflake")
def test__snowflake__incremental_merge_columns(self):
self.use_default_project({
"data-paths": ["seeds-merge-cols-initial"],
"seeds": {
"quote_columns": False
}
})
self.seed_and_run()
self.use_default_project({
"data-paths": ["seeds-merge-cols-update"],
"seeds": {
"quote_columns": False
}
})
self.seed_and_run()
self.assertTablesEqual("incremental_update_cols", "expected_result")

View File

@@ -27,19 +27,3 @@ class TestVarcharWidening(DBTIntegrationTest):
self.assertTablesEqual("seed","incremental")
self.assertTablesEqual("seed","materialized")
@use_profile('snowflake')
def test__snowflake__varchar_widening(self):
self.run_sql_file("seed.sql")
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["SEED", "INCREMENTAL", "MATERIALIZED"])
self.run_sql_file("update.sql")
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["SEED", "INCREMENTAL", "MATERIALIZED"])

View File

@@ -66,28 +66,6 @@ class TestSimpleReference(DBTIntegrationTest):
self.assertTablesEqual("summary_expected","ephemeral_summary")
self.assertTablesEqual("summary_expected","view_using_ref")
@use_profile('snowflake')
def test__snowflake__simple_reference(self):
results = self.run_dbt()
self.assertEqual(len(results), 8)
# Copies should match
self.assertManyTablesEqual(
["SEED", "INCREMENTAL_COPY", "MATERIALIZED_COPY", "VIEW_COPY"],
["SUMMARY_EXPECTED", "INCREMENTAL_SUMMARY", "MATERIALIZED_SUMMARY", "VIEW_SUMMARY", "EPHEMERAL_SUMMARY"]
)
self.run_sql_file("update.sql")
results = self.run_dbt()
self.assertEqual(len(results), 8)
self.assertManyTablesEqual(
["SEED", "INCREMENTAL_COPY", "MATERIALIZED_COPY", "VIEW_COPY"],
["SUMMARY_EXPECTED", "INCREMENTAL_SUMMARY", "MATERIALIZED_SUMMARY", "VIEW_SUMMARY", "EPHEMERAL_SUMMARY"]
)
@use_profile('postgres')
def test__postgres__simple_reference_with_models(self):
@@ -140,57 +118,6 @@ class TestSimpleReference(DBTIntegrationTest):
self.assertTrue('ephemeral_summary' in created_models)
self.assertEqual(created_models['ephemeral_summary'], 'table')
@use_profile('snowflake')
def test__snowflake__simple_reference_with_models(self):
# Run materialized_copy & ephemeral_copy
# ephemeral_copy should not actually be materialized b/c it is ephemeral
results = self.run_dbt(
['run', '--models', 'materialized_copy', 'ephemeral_copy']
)
self.assertEqual(len(results), 1)
# Copies should match
self.assertTablesEqual("SEED", "MATERIALIZED_COPY")
created_models = self.get_models_in_schema()
self.assertTrue('MATERIALIZED_COPY' in created_models)
@use_profile('snowflake')
def test__snowflake__simple_reference_with_models_and_children(self):
# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
# the dependent ephemeral_summary, however, should be materialized as a table
results = self.run_dbt(
['run', '--models', 'materialized_copy+', 'ephemeral_copy+']
)
self.assertEqual(len(results), 3)
# Copies should match
self.assertManyTablesEqual(
["SEED", "MATERIALIZED_COPY"],
["SUMMARY_EXPECTED", "MATERIALIZED_SUMMARY", "EPHEMERAL_SUMMARY"]
)
created_models = self.get_models_in_schema()
self.assertFalse('INCREMENTAL_COPY' in created_models)
self.assertFalse('INCREMENTAL_SUMMARY' in created_models)
self.assertFalse('VIEW_COPY' in created_models)
self.assertFalse('VIEW_SUMMARY' in created_models)
# make sure this wasn't errantly materialized
self.assertFalse('EPHEMERAL_COPY' in created_models)
self.assertTrue('MATERIALIZED_COPY' in created_models)
self.assertTrue('MATERIALIZED_SUMMARY' in created_models)
self.assertEqual(created_models['MATERIALIZED_COPY'], 'table')
self.assertEqual(created_models['MATERIALIZED_SUMMARY'], 'table')
self.assertTrue('EPHEMERAL_SUMMARY' in created_models)
self.assertEqual(created_models['EPHEMERAL_SUMMARY'], 'table')
class TestErrorReference(DBTIntegrationTest):
@property

View File

@@ -28,10 +28,7 @@ class BaseSimpleSnapshotTest(DBTIntegrationTest):
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)
def assert_case_tables_equal(self, actual, expected):
if self.adapter_type == 'snowflake':
actual = actual.upper()
expected = expected.upper()
# this does something different on snowflake, but here it's just assertTablesEqual
self.assertTablesEqual(actual, expected)
def assert_expected(self):
@@ -69,34 +66,6 @@ class TestSimpleSnapshotFiles(BaseSimpleSnapshotTest):
self.assert_expected()
@use_profile('snowflake')
def test__snowflake__simple_snapshot(self):
self.dbt_run_seed_snapshot()
self.assert_expected()
self.run_sql_file("invalidate_snowflake.sql")
self.run_sql_file("update.sql")
results = self.run_snapshot()
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)
self.assert_expected()
@use_profile('redshift')
def test__redshift__simple_snapshot(self):
self.dbt_run_seed_snapshot()
self.assert_expected()
self.run_sql_file("invalidate_postgres.sql")
self.run_sql_file("update.sql")
results = self.run_snapshot()
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)
self.assert_expected()
class TestSimpleColumnSnapshotFiles(DBTIntegrationTest):
@@ -159,14 +128,6 @@ class TestSimpleColumnSnapshotFiles(DBTIntegrationTest):
def test_postgres_renamed_source(self):
self._run_snapshot_test()
@use_profile('snowflake')
def test_snowflake_renamed_source(self):
self._run_snapshot_test()
@use_profile('redshift')
def test_redshift_renamed_source(self):
self._run_snapshot_test()
@use_profile('bigquery')
def test_bigquery_renamed_source(self):
self._run_snapshot_test()
@@ -419,10 +380,7 @@ class TestCrossDBSnapshotFiles(DBTIntegrationTest):
@property
def project_config(self):
if self.adapter_type == 'snowflake':
paths = ['test-snapshots-pg']
else:
paths = ['test-snapshots-bq']
paths = ['test-snapshots-bq']
return {
'config-version': 2,
'snapshot-paths': paths,
@@ -432,23 +390,6 @@ class TestCrossDBSnapshotFiles(DBTIntegrationTest):
def run_snapshot(self):
return self.run_dbt(['snapshot', '--vars', '{{"target_database": {}}}'.format(self.alternative_database)])
@use_profile('snowflake')
def test__snowflake__cross_snapshot(self):
self.run_sql_file("seed.sql")
results = self.run_snapshot()
self.assertEqual(len(results), 1)
self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL", table_b_db=self.alternative_database)
self.run_sql_file("invalidate_snowflake.sql")
self.run_sql_file("update.sql")
results = self.run_snapshot()
self.assertEqual(len(results), 1)
self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL", table_b_db=self.alternative_database)
@use_profile('bigquery')
def test__bigquery__cross_snapshot(self):
self.run_sql_file("seed_bq.sql")
@@ -842,23 +783,10 @@ class TestSnapshotHardDelete(DBTIntegrationTest):
self.run_sql_file('seed_bq.sql')
self._test_snapshot_hard_delete()
@use_profile('snowflake')
def test__snowflake__snapshot_hard_delete(self):
self.run_sql_file('seed.sql')
self._test_snapshot_hard_delete()
@use_profile('redshift')
def test__redshift__snapshot_hard_delete(self):
self.run_sql_file('seed.sql')
self._test_snapshot_hard_delete()
def _test_snapshot_hard_delete(self):
self._snapshot()
if self.adapter_type == 'snowflake':
self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL")
else:
self.assertTablesEqual("snapshot_expected", "snapshot_actual")
self.assertTablesEqual("snapshot_expected", "snapshot_actual")
self._invalidated_snapshot_datetime = None
self._revived_snapshot_datetime = None

View File

@@ -1,5 +1,4 @@
from test.integration.base import DBTIntegrationTest, use_profile
import dbt.exceptions
class TestSimpleSnapshotFiles(DBTIntegrationTest):
@@ -35,11 +34,6 @@ class TestSimpleSnapshotFiles(DBTIntegrationTest):
def assert_expected(self):
self.run_dbt(['test', '--data', '--vars', 'version: 3'])
@use_profile('snowflake')
def test__snowflake__simple_snapshot(self):
self.test_snapshot_check_cols_cycle()
self.assert_expected()
@use_profile('postgres')
def test__postgres__simple_snapshot(self):
self.test_snapshot_check_cols_cycle()
@@ -49,8 +43,3 @@ class TestSimpleSnapshotFiles(DBTIntegrationTest):
def test__bigquery__simple_snapshot(self):
self.test_snapshot_check_cols_cycle()
self.assert_expected()
@use_profile('redshift')
def test__redshift__simple_snapshot(self):
self.test_snapshot_check_cols_cycle()
self.assert_expected()

View File

@@ -60,66 +60,6 @@ class TestSimpleSeedColumnOverridePostgres(TestSimpleSeedColumnOverride):
self.assertEqual(len(results), 10)
class TestSimpleSeedColumnOverrideRedshift(TestSimpleSeedColumnOverride):
@property
def models(self):
return "models-rs"
@property
def profile_config(self):
return self.redshift_profile()
def seed_enabled_types(self):
return {
"id": "text",
"birthday": "date",
}
def seed_tricky_types(self):
return {
'id_str': 'text',
'looks_like_a_bool': 'text',
'looks_like_a_date': 'text',
}
@use_profile('redshift')
def test_redshift_simple_seed_with_column_override_redshift(self):
results = self.run_dbt(["seed", "--show"])
self.assertEqual(len(results), 2)
results = self.run_dbt(["test"])
self.assertEqual(len(results), 10)
class TestSimpleSeedColumnOverrideSnowflake(TestSimpleSeedColumnOverride):
@property
def models(self):
return "models-snowflake"
def seed_enabled_types(self):
return {
"id": "FLOAT",
"birthday": "TEXT",
}
def seed_tricky_types(self):
return {
'id_str': 'TEXT',
'looks_like_a_bool': 'TEXT',
'looks_like_a_date': 'TEXT',
}
@property
def profile_config(self):
return self.snowflake_profile()
@use_profile('snowflake')
def test_snowflake_simple_seed_with_column_override_snowflake(self):
results = self.run_dbt(["seed", "--show"])
self.assertEqual(len(results), 2)
results = self.run_dbt(["test"])
self.assertEqual(len(results), 10)
class TestSimpleSeedColumnOverrideBQ(TestSimpleSeedColumnOverride):
@property
def models(self):

View File

@@ -346,8 +346,4 @@ class TestSimpleBigSeedBatched(DBTIntegrationTest):
@use_profile('postgres')
def test_postgres_big_batched_seed(self):
self.test_big_batched_seed()
@use_profile('snowflake')
def test_snowflake_big_batched_seed(self):
self.test_big_batched_seed()

View File

@@ -109,20 +109,6 @@ class TestGraphSelection(DBTIntegrationTest):
self.assertNotIn('users_rollup_dependency', created_models)
self.assert_correct_schemas()
@use_profile('snowflake')
def test__snowflake__specific_model(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(['run', '--select', 'users'])
self.assertEqual(len(results), 1)
self.assertTablesEqual("SEED", "USERS")
created_models = self.get_models_in_schema()
self.assertFalse('USERS_ROLLUP' in created_models)
self.assertFalse('BASE_USERS' in created_models)
self.assertFalse('EMAILS' in created_models)
self.assert_correct_schemas()
@use_profile('postgres')
def test__postgres__specific_model_and_children(self):
self.run_sql_file("seed.sql")
@@ -139,21 +125,6 @@ class TestGraphSelection(DBTIntegrationTest):
self.assertNotIn('emails', created_models)
self.assert_correct_schemas()
@use_profile('snowflake')
def test__snowflake__specific_model_and_children(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(['run', '--select', 'users+'])
self.assertEqual(len(results), 4)
self.assertManyTablesEqual(
["SEED", "USERS"],
["SUMMARY_EXPECTED", "USERS_ROLLUP"]
)
created_models = self.get_models_in_schema()
self.assertFalse('BASE_USERS' in created_models)
self.assertFalse('EMAILS' in created_models)
@use_profile('postgres')
def test__postgres__specific_model_and_children_limited(self):
self.run_sql_file("seed.sql")
@@ -184,22 +155,6 @@ class TestGraphSelection(DBTIntegrationTest):
self.assertFalse('emails' in created_models)
self.assert_correct_schemas()
@use_profile('snowflake')
def test__snowflake__specific_model_and_parents(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(['run', '--select', '+users_rollup'])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(
["SEED", "USERS"],
["SUMMARY_EXPECTED", "USERS_ROLLUP"]
)
created_models = self.get_models_in_schema()
self.assertFalse('BASE_USERS' in created_models)
self.assertFalse('EMAILS' in created_models)
@use_profile('postgres')
def test__postgres__specific_model_and_parents_limited(self):
self.run_sql_file("seed.sql")
@@ -230,21 +185,6 @@ class TestGraphSelection(DBTIntegrationTest):
self.assertFalse('emails' in created_models)
self.assert_correct_schemas()
@use_profile('snowflake')
def test__snowflake__specific_model_with_exclusion(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(
['run', '--select', '+users_rollup', '--exclude', 'users_rollup']
)
self.assertEqual(len(results), 1)
self.assertManyTablesEqual(["SEED", "USERS"])
created_models = self.get_models_in_schema()
self.assertFalse('BASE_USERS' in created_models)
self.assertFalse('USERS_ROLLUP' in created_models)
self.assertFalse('EMAILS' in created_models)
@use_profile('postgres')
def test__postgres__locally_qualified_name(self):
results = self.run_dbt(['run', '--select', 'test.subdir'])
@@ -326,28 +266,6 @@ class TestGraphSelection(DBTIntegrationTest):
self.assertEqual(len(results), 2)
assert sorted([r.node.name for r in results]) == ['unique_users_id', 'unique_users_rollup_gender']
@use_profile('snowflake')
def test__snowflake__skip_intermediate(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(['run', '--select', '@models/users.sql'])
# base_users, emails, users_rollup, users_rollup_dependency
self.assertEqual(len(results), 4)
# now re-run, skipping users_rollup
results = self.run_dbt(['run', '--select', '@users', '--exclude', 'users_rollup'])
self.assertEqual(len(results), 3)
# make sure that users_rollup_dependency and users don't interleave
users = [r for r in results if r.node.name == 'users'][0]
dep = [r for r in results if r.node.name == 'users_rollup_dependency'][0]
user_last_end = users.timing[1].completed_at
dep_first_start = dep.timing[0].started_at
self.assertTrue(
user_last_end <= dep_first_start,
'dependency started before its transitive parent ({} > {})'.format(user_last_end, dep_first_start)
)
@use_profile('postgres')
def test__postgres__concat(self):
self.run_sql_file("seed.sql")

View File

@@ -56,26 +56,3 @@ class TestDataTests(DBTIntegrationTest):
defined_tests = os.listdir(self.test_path)
self.assertNotEqual(len(test_results), 0)
self.assertEqual(len(test_results), len(defined_tests))
@use_profile('snowflake')
def test_snowflake_data_tests(self):
self.use_profile('snowflake')
self.run_sql_file("seed.sql")
results = self.run_dbt()
self.assertEqual(len(results), 1)
test_results = self.run_data_validations()
for result in test_results:
# assert that all deliberately failing tests actually fail
if 'fail' in result.node.name:
self.assertEqual(result.status, 'fail')
self.assertFalse(result.skipped)
self.assertTrue(result.failures > 0)
# assert that actual tests pass
else:
self.assertEqual(result.status, 'pass')
self.assertFalse(result.skipped)
self.assertEqual(result.failures, 0)

View File

@@ -62,14 +62,6 @@ class TestAdapterMacroDeprecation(BaseTestDeprecations):
exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace
assert 'The "adapter_macro" macro has been deprecated' in exc_str
@use_profile('redshift')
def test_redshift_adapter_macro(self):
self.assertEqual(deprecations.active_deprecations, set())
# pick up the postgres macro
self.run_dbt()
expected = {'adapter-macro'}
self.assertEqual(expected, deprecations.active_deprecations)
@use_profile('bigquery')
def test_bigquery_adapter_macro(self):
self.assertEqual(deprecations.active_deprecations, set())
@@ -107,15 +99,6 @@ class TestAdapterMacroDeprecationPackages(BaseTestDeprecations):
exc_str = ' '.join(str(exc.exception).split()) # flatten all whitespace
assert 'The "adapter_macro" macro has been deprecated' in exc_str
@use_profile('redshift')
def test_redshift_adapter_macro_pkg(self):
self.assertEqual(deprecations.active_deprecations, set())
# pick up the postgres macro
self.assertEqual(deprecations.active_deprecations, set())
self.run_dbt()
expected = {'adapter-macro'}
self.assertEqual(expected, deprecations.active_deprecations)
@use_profile('bigquery')
def test_bigquery_adapter_macro_pkg(self):
self.assertEqual(deprecations.active_deprecations, set())

View File

@@ -95,20 +95,6 @@ class TestAdapterMacroNoDestination(DBTIntegrationTest):
assert "In dispatch: No macro named 'dispatch_to_nowhere' found" in str(exc.value)
class TestDispatchMacroUseParent(DBTIntegrationTest):
@property
def schema(self):
return "test_macros_016"
@property
def models(self):
return "dispatch-inheritance-models"
@use_profile('redshift')
def test_redshift_inherited_macro(self):
self.run_dbt(['run'])
class TestMacroOverrideBuiltin(DBTIntegrationTest):
@property
def schema(self):
@@ -125,7 +111,6 @@ class TestMacroOverrideBuiltin(DBTIntegrationTest):
'macro-paths': ['override-get-columns-macros'],
}
@use_profile('postgres')
def test_postgres_overrides(self):
# the first time, the model doesn't exist

View File

@@ -43,17 +43,6 @@ class TestEphemeralMulti(DBTIntegrationTest):
expected_sql = "".join(expected_sql.split())
self.assertEqual(sql_file, expected_sql)
@use_profile('snowflake')
def test__snowflake(self):
self.run_sql_file("seed.sql")
results = self.run_dbt()
self.assertEqual(len(results), 3)
self.assertManyTablesEqual(
["SEED", "DEPENDENT", "DOUBLE_DEPENDENT", "SUPER_DEPENDENT"]
)
class TestEphemeralNested(DBTIntegrationTest):
@property

View File

@@ -37,19 +37,3 @@ class TestConcurrency(DBTIntegrationTest):
self.assertTableDoesNotExist("skip")
self.assertIn('PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7', output)
@use_profile('snowflake')
def test__snowflake__concurrency(self):
self.run_sql_file("seed.sql")
results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "DEP", "TABLE_A", "TABLE_B"])
self.run_sql_file("update.sql")
results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 7)
self.assertManyTablesEqual(["SEED", "VIEW_MODEL", "DEP", "TABLE_A", "TABLE_B"])

View File

@@ -1,10 +0,0 @@
{% macro generate_database_name(database_name, node) %}
{% if database_name == 'alt' %}
{{ env_var('SNOWFLAKE_TEST_ALT_DATABASE') }}
{% elif database_name %}
{{ database_name }}
{% else %}
{{ target.database }}
{% endif %}
{% endmacro %}

View File

@@ -1,3 +0,0 @@
select * from {{ target.schema }}.seed

View File

@@ -1,2 +0,0 @@
{{ config(database='alt') }}
select * from {{ ref('view_1') }}

View File

@@ -1,30 +0,0 @@
{{ config(database='alt', materialized='table') }}
with v1 as (
select * from {{ ref('view_1') }}
),
v2 as (
select * from {{ ref('view_2') }}
),
combined as (
select last_name from v1
union all
select last_name from v2
)
select
last_name,
count(*) as count
from combined
group by 1

View File

@@ -1,39 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
class TestOverrideDatabase(DBTIntegrationTest):
setup_alternate_db = True
@property
def schema(self):
return "custom_schema_024"
@property
def models(self):
return "db-models"
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': ['custom-db-macros'],
}
@use_profile('snowflake')
def test_snowflake_override_generate_db_name(self):
self.run_sql_file('seed.sql')
self.assertTableDoesExist('SEED', schema=self.unique_schema(), database=self.default_database)
self.assertTableDoesExist('AGG', schema=self.unique_schema(), database=self.default_database)
results = self.run_dbt()
self.assertEqual(len(results), 3)
self.assertTableDoesExist('VIEW_1', schema=self.unique_schema(), database=self.default_database)
self.assertTableDoesExist('VIEW_2', schema=self.unique_schema(), database=self.alternative_database)
self.assertTableDoesExist('VIEW_3', schema=self.unique_schema(), database=self.alternative_database)
# not overridden
self.assertTablesEqual('SEED', 'VIEW_1', table_b_db=self.default_database)
# overridden
self.assertTablesEqual('SEED', 'VIEW_2', table_b_db=self.alternative_database)
self.assertTablesEqual('AGG', 'VIEW_3', table_b_db=self.alternative_database)

View File

@@ -131,59 +131,6 @@ class TestCustomProjectSchemaWithPrefix(DBTIntegrationTest):
self.assertTablesEqual("agg", "view_3", schema, self.xf_schema())
class TestCustomProjectSchemaWithPrefixSnowflake(DBTIntegrationTest):
def setUp(self):
super().setUp()
self._created_schemas.add(
self._get_schema_fqn(self.default_database, self.v1_schema())
)
self._created_schemas.add(
self._get_schema_fqn(self.default_database, self.v2_schema())
)
self._created_schemas.add(
self._get_schema_fqn(self.default_database, self.xf_schema())
)
@property
def schema(self):
return "sf_custom_prefix_024"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
"models": {
"schema": "dbt_test"
}
}
def v1_schema(self):
return f"{self.unique_schema()}_DBT_TEST"
def v2_schema(self):
return f"{self.unique_schema()}_CUSTOM"
def xf_schema(self):
return f"{self.unique_schema()}_TEST"
@use_profile('snowflake')
def test__snowflake__custom_schema_with_prefix(self):
self.run_sql_file("seed.sql")
results = self.run_dbt()
self.assertEqual(len(results), 3)
schema = self.unique_schema().upper()
self.assertTablesEqual("SEED", "VIEW_1", schema, self.v1_schema())
self.assertTablesEqual("SEED", "VIEW_2", schema, self.v2_schema())
self.assertTablesEqual("AGG", "VIEW_3", schema, self.xf_schema())
class TestCustomSchemaWithCustomMacro(DBTIntegrationTest):
def setUp(self):
super().setUp()

View File

@@ -39,12 +39,6 @@ class TestAliases(DBTIntegrationTest):
self.assertEqual(len(results), 4)
self.run_dbt(['test'])
@use_profile('snowflake')
def test__alias_model_name_snowflake(self):
results = self.run_dbt(['run'])
self.assertEqual(len(results), 4)
self.run_dbt(['test'])
class TestAliasErrors(DBTIntegrationTest):
@property

View File

@@ -1,12 +1,7 @@
{%- if adapter.type() == 'snowflake' -%}
{%- set schema_suffix = 'TEST' -%}
{%- else -%}
{%- set schema_suffix = 'test' -%}
{%- endif -%}
{{
config(
materialized='view',
schema=schema_suffix,
schema='test',
)
}}

View File

@@ -99,17 +99,13 @@ class TestDocsGenerate(DBTIntegrationTest):
setup_alternate_db = True
def adapter_case(self, value):
if self.adapter_type == 'snowflake':
return value.upper()
else:
return value.lower()
return value.lower()
def setUp(self):
super().setUp()
self.maxDiff = None
self.alternate_schema = self.unique_schema() + '_test'
if self.adapter_type == 'snowflake':
self.alternate_schema = self.alternate_schema.upper()
self.alternate_schema = self.alternate_schema.upper()
self._created_schemas.add(self.alternate_schema)
os.environ['DBT_ENV_CUSTOM_ENV_env_key'] = 'env_value'
@@ -181,105 +177,6 @@ class TestDocsGenerate(DBTIntegrationTest):
},
}
def _redshift_stats(self):
return {
"has_stats": {
"id": "has_stats",
"label": "Has Stats?",
"value": True,
"description": "Indicates whether there are statistics for this table",
"include": False
},
"encoded": {
"id": "encoded",
"label": "Encoded",
"value": AnyStringWith('Y'),
"description": "Indicates whether any column in the table has compression encoding defined.",
"include": True
},
"diststyle": {
"id": "diststyle",
"label": "Dist Style",
"value": AnyStringWith('AUTO'),
"description": "Distribution style or distribution key column, if key distribution is defined.",
"include": True
},
"max_varchar": {
"id": "max_varchar",
"label": "Max Varchar",
"value": AnyFloat(),
"description": "Size of the largest column that uses a VARCHAR data type.",
"include": True
},
"size": {
"id": "size",
"label": "Approximate Size",
"value": AnyFloat(),
"description": "Approximate size of the table, calculated from a count of 1MB blocks",
"include": True
},
'sortkey1': {
'id': 'sortkey1',
'label': 'Sort Key 1',
'value': AnyStringWith('AUTO'),
'description': 'First column in the sort key.',
'include': True,
},
"pct_used": {
"id": "pct_used",
"label": "Disk Utilization",
"value": AnyFloat(),
"description": "Percent of available space that is used by the table.",
"include": True
},
"stats_off": {
"id": "stats_off",
"label": "Stats Off",
"value": AnyFloat(),
"description": "Number that indicates how stale the table statistics are; 0 is current, 100 is out of date.",
"include": True
},
"rows": {
"id": "rows",
"label": "Approximate Row Count",
"value": AnyFloat(),
"description": "Approximate number of rows in the table. This value includes rows marked for deletion, but not yet vacuumed.",
"include": True
},
}
def _snowflake_stats(self):
return {
'has_stats': {
'id': 'has_stats',
'label': 'Has Stats?',
'value': True,
'description': 'Indicates whether there are statistics for this table',
'include': False,
},
'bytes': {
'id': 'bytes',
'label': 'Approximate Size',
'value': AnyFloat(),
'description': 'Approximate size of the table as reported by Snowflake',
'include': True,
},
'last_modified': {
'id': 'last_modified',
'label': 'Last Modified',
'value': AnyString(),
'description': 'The timestamp for last update/change',
'include': True,
},
'row_count': {
'id': 'row_count',
'label': 'Row Count',
'value': 1.0,
'description': 'An approximate count of rows in this table',
'include': True,
}
}
def _bigquery_stats(self, is_table, partition=None, cluster=None):
stats = {}
@@ -450,14 +347,11 @@ class TestDocsGenerate(DBTIntegrationTest):
)
def get_role(self):
if self.adapter_type in {'postgres', 'redshift'}:
if self.adapter_type in {'postgres'}:
profile = self.get_profile(self.adapter_type)
target_name = profile['test']['target']
return profile['test']['outputs'][target_name]['user']
elif self.adapter_type == 'snowflake':
return self.run_sql('select current_role()', fetch='one')[0]
else: # bigquery, presto, other dbs that have no 'role'
return None
return None
def expected_postgres_references_catalog(self):
model_database = self.default_database
@@ -570,20 +464,6 @@ class TestDocsGenerate(DBTIntegrationTest):
},
}
def expected_snowflake_catalog(self, case_columns=False):
return self._expected_catalog(
id_type='NUMBER',
text_type='TEXT',
time_type='TIMESTAMP_NTZ',
view_type='VIEW',
table_type='BASE TABLE',
model_stats=self._no_stats(),
seed_stats=self._snowflake_stats(),
case=lambda x: x.upper(),
model_database=self.alternative_database,
case_columns=case_columns,
)
def expected_bigquery_catalog(self):
return self._expected_catalog(
id_type='INT64',
@@ -761,115 +641,6 @@ class TestDocsGenerate(DBTIntegrationTest):
'sources': {},
}
def expected_redshift_catalog(self):
return self._expected_catalog(
id_type='integer',
text_type=AnyStringWith('character varying'),
time_type='timestamp without time zone',
view_type='VIEW',
table_type='BASE TABLE',
model_stats=self._no_stats(),
seed_stats=self._redshift_stats(),
)
def expected_redshift_incremental_catalog(self):
my_schema_name = self.unique_schema()
role = self.get_role()
return {
'nodes': {
'model.test.model': {
'unique_id': 'model.test.model',
'metadata': {
'schema': my_schema_name,
'database': self.default_database,
'name': 'model',
'type': 'LATE BINDING VIEW',
'comment': None,
'owner': role,
},
# incremental views have no stats
'stats': self._no_stats(),
'columns': {
'id': {
'name': 'id',
'index': 1,
'type': 'integer',
'comment': None,
},
'first_name': {
'name': 'first_name',
'index': 2,
'type': 'character varying(5)',
'comment': None,
},
'email': {
'name': 'email',
'index': 3,
'type': 'character varying(23)',
'comment': None,
},
'ip_address': {
'name': 'ip_address',
'index': 4,
'type': 'character varying(14)',
'comment': None,
},
'updated_at': {
'name': 'updated_at',
'index': 5,
'type': 'timestamp without time zone',
'comment': None,
},
},
},
'seed.test.seed': {
'unique_id': 'seed.test.seed',
'metadata': {
'schema': my_schema_name,
'database': self.default_database,
'name': 'seed',
'type': 'BASE TABLE',
'comment': None,
'owner': role,
},
'stats': self._redshift_stats(),
'columns': {
'id': {
'name': 'id',
'index': 1,
'type': 'integer',
'comment': None,
},
'first_name': {
'name': 'first_name',
'index': 2,
'type': 'character varying(5)',
'comment': None,
},
'email': {
'name': 'email',
'index': 3,
'type': 'character varying(23)',
'comment': None,
},
'ip_address': {
'name': 'ip_address',
'index': 4,
'type': 'character varying(14)',
'comment': None,
},
'updated_at': {
'name': 'updated_at',
'index': 5,
'type': 'timestamp without time zone',
'comment': None,
},
},
},
},
'sources': {},
}
def verify_catalog(self, expected):
self.assertTrue(os.path.exists('./target/catalog.json'))
@@ -1084,7 +855,7 @@ class TestDocsGenerate(DBTIntegrationTest):
target_schema=self.alternate_schema
)
quote_database = quote_schema = self.adapter_type != 'snowflake'
quote_database = quote_schema = True
relation_name_node_format = self._relation_name_format(
quote_database, quote_schema, quote_model
)
@@ -2688,242 +2459,6 @@ class TestDocsGenerate(DBTIntegrationTest):
self._quote("{2}") if quote_identifier else '{2}',
))
def expected_redshift_incremental_view_manifest(self):
model_sql_path = self.dir('rs_models/model.sql')
my_schema_name = self.unique_schema()
seed_path = self.dir('seed/seed.csv')
snapshot_path = self.dir('snapshot/snapshot_seed.sql')
return {
'dbt_schema_version': 'https://schemas.getdbt.com/dbt/manifest/v3.json',
'dbt_version': dbt.version.__version__,
'nodes': {
'model.test.model': {
'compiled_path': Normalized('target/compiled/test/rs_models/model.sql'),
'build_path': None,
'created_at': ANY,
'name': 'model',
'root_path': self.test_root_realpath,
'relation_name': '"{0}"."{1}".model'.format(
self.default_database, my_schema_name
),
'resource_type': 'model',
'path': 'model.sql',
'original_file_path': model_sql_path,
'package_name': 'test',
'raw_sql': LineIndifferent(_read_file(model_sql_path).rstrip('\r\n')),
'refs': [['seed']],
'sources': [],
'depends_on': {
'nodes': ['seed.test.seed'],
'macros': [],
},
'unique_id': 'model.test.model',
'fqn': ['test', 'model'],
'tags': [],
'meta': {},
'config': self.rendered_model_config(bind=False),
'schema': my_schema_name,
'database': self.default_database,
'alias': 'model',
'deferred': False,
'description': 'The test model',
'columns': {
'id': {
'name': 'id',
'description': 'The user ID number',
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'first_name': {
'name': 'first_name',
'description': "The user's first name",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'email': {
'name': 'email',
'description': "The user's email",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'ip_address': {
'name': 'ip_address',
'description': "The user's IP address",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'updated_at': {
'name': 'updated_at',
'description': "The last time this user's email was updated",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
},
'patch_path': 'test://' + self.dir('rs_models/schema.yml'),
'docs': {'show': True},
'compiled': True,
'compiled_sql': ANY,
'extra_ctes_injected': True,
'extra_ctes': [],
'checksum': self._checksum_file(model_sql_path),
'unrendered_config': self.unrendered_model_config(bind=False, materialized='view'),
},
'seed.test.seed': {
'compiled_path': None,
'build_path': None,
'created_at': ANY,
'patch_path': 'test://' + self.dir('seed/schema.yml'),
'path': 'seed.csv',
'name': 'seed',
'root_path': self.test_root_realpath,
'relation_name': '"{0}"."{1}".seed'.format(
self.default_database, my_schema_name
),
'resource_type': 'seed',
'raw_sql': '',
'package_name': 'test',
'original_file_path': seed_path,
'refs': [],
'sources': [],
'depends_on': {
'nodes': [],
'macros': [],
},
'unique_id': 'seed.test.seed',
'fqn': ['test', 'seed'],
'tags': [],
'meta': {},
'config': self.rendered_seed_config(),
'schema': my_schema_name,
'database': self.default_database,
'alias': 'seed',
'columns': {
'id': {
'name': 'id',
'description': 'The user ID number',
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'first_name': {
'name': 'first_name',
'description': "The user's first name",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'email': {
'name': 'email',
'description': "The user's email",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'ip_address': {
'name': 'ip_address',
'description': "The user's IP address",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
'updated_at': {
'name': 'updated_at',
'description': "The last time this user's email was updated",
'data_type': None,
'meta': {},
'quote': None,
'tags': [],
},
},
'deferred': False,
'description': 'The test seed',
'docs': {'show': True},
'compiled': True,
'compiled_sql': ANY,
'extra_ctes_injected': True,
'extra_ctes': [],
'checksum': self._checksum_file(seed_path),
'unrendered_config': self.unrendered_seed_config(),
},
'snapshot.test.snapshot_seed': {
'alias': 'snapshot_seed',
'compiled_path': None,
'build_path': None,
'created_at': ANY,
'checksum': self._checksum_file(snapshot_path),
'columns': {},
'compiled': True,
'compiled_sql': ANY,
'config': self.rendered_snapshot_config(
target_schema=self.alternate_schema
),
'database': self.default_database,
'deferred': False,
'depends_on': {'macros': [],
'nodes': ['seed.test.seed']},
'description': '',
'docs': {'show': True},
'extra_ctes': [],
'extra_ctes_injected': True,
'fqn': ['test', 'snapshot_seed', 'snapshot_seed'],
'meta': {},
'name': 'snapshot_seed',
'original_file_path': snapshot_path,
'package_name': 'test',
'patch_path': None,
'path': 'snapshot_seed.sql',
'raw_sql': ANY,
'refs': [['seed']],
'relation_name': '"{0}"."{1}".snapshot_seed'.format(
self.default_database, self.alternate_schema
),
'resource_type': 'snapshot',
'root_path': self.test_root_realpath,
'schema': self.alternate_schema,
'sources': [],
'tags': [],
'unique_id': 'snapshot.test.snapshot_seed',
'unrendered_config': self.unrendered_snapshot_config(
target_schema=self.alternate_schema
)}
},
'sources': {},
'exposures': {},
'selectors': {},
'parent_map': {
'model.test.model': ['seed.test.seed'],
'seed.test.seed': [],
'snapshot.test.snapshot_seed': ['seed.test.seed']
},
'child_map': {
'model.test.model': [],
'seed.test.seed': ['model.test.model',
'snapshot.test.snapshot_seed'],
'snapshot.test.snapshot_seed': []
},
'docs': {
'dbt.__overview__': ANY,
'test.macro_info': ANY,
'test.macro_arg_info': ANY,
},
'disabled': [],
}
def verify_metadata(self, metadata, dbt_schema_version):
assert 'generated_at' in metadata
self.assertBetween(metadata['generated_at'],
@@ -3155,38 +2690,6 @@ class TestDocsGenerate(DBTIntegrationTest):
assert not os.path.exists('./target/non-existent-assets')
@use_profile('snowflake')
def test__snowflake__run_and_generate(self):
self.run_and_generate()
self.verify_catalog(self.expected_snowflake_catalog())
self.verify_manifest(self.expected_seeded_manifest())
self.verify_run_results(self.expected_run_results())
@use_profile('snowflake')
def test__snowflake__run_and_generate_ignore_quoting_parameter(self):
# with optional adapters, this package could easily just not exist!
# accordingly, only run it when we think snowflake things should work
from dbt.adapters.snowflake import connections as snowflake_conn
old_connect = snowflake_conn.snowflake.connector.connect
def connect(*args, **kwargs):
kwargs['session_parameters'] = {
'QUOTED_IDENTIFIERS_IGNORE_CASE': True
}
return old_connect(*args, **kwargs)
with patch.object(snowflake_conn.snowflake.connector, 'connect', connect):
self.run_and_generate({
'quoting': {
'identifier': True,
}
})
self.verify_catalog(self.expected_snowflake_catalog(case_columns=True))
self.verify_manifest(self.expected_seeded_manifest(quote_model=True))
self.verify_run_results(self.expected_run_results())
@use_profile('bigquery')
def test__bigquery__run_and_generate(self):
self.run_and_generate()
@@ -3205,35 +2708,6 @@ class TestDocsGenerate(DBTIntegrationTest):
self.verify_catalog(self.expected_bigquery_complex_catalog())
self.verify_manifest(self.expected_bigquery_complex_manifest())
@use_profile('redshift')
def test__redshift__run_and_generate(self):
self.run_and_generate(alternate_db=self.default_database)
self.verify_catalog(self.expected_redshift_catalog())
self.verify_manifest(self.expected_seeded_manifest(
model_database=self.default_database
))
self.verify_run_results(self.expected_run_results())
@use_profile('redshift')
def test__redshift__incremental_view(self):
self.run_and_generate(
{'source-paths': [self.dir('rs_models')]},
alternate_db=self.default_database,
model_count=1,
)
self.verify_catalog(self.expected_redshift_incremental_catalog())
self.verify_manifest(
self.expected_redshift_incremental_view_manifest())
@use_profile('presto')
def test__presto__run_and_generate(self):
self.run_and_generate(alternate_db=self.default_database)
self.verify_catalog(self.expected_presto_catalog())
self.verify_manifest(self.expected_seeded_manifest(
model_database=self.default_database
))
self.verify_run_results(self.expected_run_results())
class TestDocsGenerateMissingSchema(DBTIntegrationTest):
@property

View File

@@ -35,17 +35,6 @@ class TestStatements(DBTIntegrationTest):
self.assertTablesEqual("statement_actual", "statement_expected")
@use_profile("snowflake")
def test_snowflake_statements(self):
self.use_default_project({"data-paths": [self.dir("snowflake-seed")]})
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 2)
results = self.run_dbt()
self.assertEqual(len(results), 1)
self.assertManyTablesEqual(["STATEMENT_ACTUAL", "STATEMENT_EXPECTED"])
@use_profile("presto")
def test_presto_statements(self):
self.use_default_project({"data-paths": [self.dir("seed")]})

View File

@@ -1,33 +0,0 @@
This test warrants some explanation. In dbt <=0.10.1, Redshift table and view materializations suffered from issues around concurrent transactions. In order to reliably reproduce this error, a query needs to select from a dbt model as the table is being rebuilt. Critically, this concurrent select needs to query the table during the drop/swap portition of the materialization. This looks like:
```sql
begin;
create table as (...);
drop table old_table cascade;
// <---- The concurrent query needs to be running here!
alter table new_table rename to old_table;
commit;
```
In order to reliably reproduce this failure, the model shown above needs to block for a long time between the `drop` and `alter` statements. We can't just stick a sleep() call in there, as this code is defined in the materialization. Instead, we can reliably reproduce the failure by:
1) creating a view that depends on this model
2) issuing a long-running query on the view before `dbt run` is invoked
3) issuing _another_ long-running query against the original model
Since long-running query (step 2) is selecting from the view, Redshift blocks on the `drop ... cascade`, of the materialization, which causes the query from step 3 time to overlap with the critical section of the materialization between the `drop` and `alter` statements.
In dbt v0.10.1, this integration test results in:
```
======================================================================
FAIL: test__redshift__concurrent_transaction (test_concurrent_transaction.TestConcurrentTransaction)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/src/app/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py", line 84, in test__redshift__concurrent_transaction
self.assertEqual(self.query_state['model_1'], 'good')
AssertionError: 'error: table 3379442 dropped by concurrent transaction\n' != 'good'
- error: table 3379442 dropped by concurrent transaction
+ good
```

View File

@@ -1,13 +0,0 @@
{% macro create_udfs() %}
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_sleep (x float)
RETURNS bool IMMUTABLE
AS
$$
from time import sleep
sleep(x)
return True
$$ LANGUAGE plpythonu;
{% endmacro %}

View File

@@ -1,9 +0,0 @@
{{ config(materialized='incremental', unique_key='id') }}
-- incremental model
select 1 as id
{% if is_incremental() %}
where TRUE
{% endif %}

View File

@@ -1,3 +0,0 @@
select * from {{ ref('model_1') }}

View File

@@ -1,5 +0,0 @@
{{ config(materialized='table') }}
-- table model
select 1 as id

View File

@@ -1,3 +0,0 @@
select * from {{ ref('model_1') }}

View File

@@ -1,5 +0,0 @@
{{ config(materialized='view') }}
-- view model
select 1 as id

View File

@@ -1,3 +0,0 @@
select * from {{ ref('model_1') }}

View File

@@ -1,136 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
import threading
from dbt.adapters.factory import FACTORY
def get_adapter_standalone(config):
plugin = FACTORY.plugins[config.credentials.type]
cls = plugin.adapter
return cls(config)
class BaseTestConcurrentTransaction(DBTIntegrationTest):
def reset(self):
self.query_state = {
'view_model': 'wait',
'model_1': 'wait',
}
def setUp(self):
super().setUp()
self._secret_adapter = get_adapter_standalone(self.config)
self.reset()
def tearDown(self):
self._secret_adapter.cleanup_connections()
super().tearDown()
@property
def schema(self):
return "concurrent_transaction_032"
@property
def project_config(self):
return {
'config-version': 2,
"macro-paths": ["macros"],
"on-run-start": [
"{{ create_udfs() }}",
],
}
def run_select_and_check(self, rel, sql):
connection_name = '__test_{}'.format(id(threading.current_thread()))
try:
with self._secret_adapter.connection_named(connection_name):
conn = self._secret_adapter.connections.get_thread_connection()
res = self.run_sql_common(self.transform_sql(sql), 'one', conn)
# The result is the output of f_sleep(), which is True
if res[0]:
self.query_state[rel] = 'good'
else:
self.query_state[rel] = 'bad'
except Exception as e:
if 'concurrent transaction' in str(e):
self.query_state[rel] = 'error: {}'.format(e)
else:
self.query_state[rel] = 'error: {}'.format(e)
def async_select(self, rel, sleep=10):
# Run the select statement in a thread. When the query returns, the global
# query_state will be update with a state of good/bad/error, and the associated
# error will be reported if one was raised.
schema = self.unique_schema()
query = '''
-- async_select: {rel}
select {schema}.f_sleep({sleep}) from {schema}.{rel}
'''.format(
schema=schema,
sleep=sleep,
rel=rel)
thread = threading.Thread(target=self.run_select_and_check, args=(rel, query))
thread.start()
return thread
def run_test(self):
self.use_profile("redshift")
# First run the project to make sure the models exist
results = self.run_dbt(args=['run'])
self.assertEqual(len(results), 2)
# Execute long-running queries in threads
t1 = self.async_select('view_model', 10)
t2 = self.async_select('model_1', 5)
# While the queries are executing, re-run the project
res = self.run_dbt(args=['run', '--threads', '8'])
self.assertEqual(len(res), 2)
# Finally, wait for these threads to finish
t1.join()
t2.join()
self.assertTrue(len(res) > 0)
# If the query succeeded, the global query_state should be 'good'
self.assertEqual(self.query_state['view_model'], 'good')
self.assertEqual(self.query_state['model_1'], 'good')
class TableTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "models-table"
@use_profile("redshift")
def test__redshift__concurrent_transaction_table(self):
self.reset()
self.run_test()
class ViewTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "models-view"
@use_profile("redshift")
def test__redshift__concurrent_transaction_view(self):
self.reset()
self.run_test()
class IncrementalTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "models-incremental"
@use_profile("redshift")
def test__redshift__concurrent_transaction_incremental(self):
self.reset()
self.run_test()

View File

@@ -1,7 +0,0 @@
{{
config(
materialized='view', bind=False
)
}}
select * from {{ ref('seed') }}

View File

@@ -1,2 +0,0 @@
id,first_name,email,ip_address,updated_at
1,Larry,lking0@miitbeian.gov.cn,69.135.206.194,2008-09-12 19:08:31
1 id first_name email ip_address updated_at
2 1 Larry lking0@miitbeian.gov.cn 69.135.206.194 2008-09-12 19:08:31

View File

@@ -1,39 +0,0 @@
import os
from test.integration.base import DBTIntegrationTest, use_profile
class TestLateBindingView(DBTIntegrationTest):
@property
def schema(self):
return 'late_binding_view_034'
@staticmethod
def dir(path):
return os.path.normpath(path)
@property
def models(self):
return self.dir("models")
@property
def project_config(self):
return {
'config-version': 2,
'data-paths': [self.dir('seed')],
'seeds': {
'quote_columns': False,
}
}
@use_profile('redshift')
def test__redshift_late_binding_view_query(self):
self.assertEqual(len(self.run_dbt(["seed"])), 1)
self.assertEqual(len(self.run_dbt()), 1)
# remove the table. Use 'cascade' here so that if late-binding views
# didn't work as advertised, the following dbt run will fail.
drop = 'drop table if exists {}.seed cascade'.format(
self.unique_schema()
)
self.run_sql(drop)
self.assertEqual(len(self.run_dbt()), 1)

View File

@@ -44,14 +44,6 @@ class TestChangingRelationType(DBTIntegrationTest):
def test__postgres__switch_materialization(self):
self.swap_types_and_test()
@use_profile("snowflake")
def test__snowflake__switch_materialization(self):
self.swap_types_and_test()
@use_profile("redshift")
def test__redshift__switch_materialization(self):
self.swap_types_and_test()
@mark.flaky(rerun_filter=bigquery_rate_limiter, max_runs=3)
@use_profile("bigquery")
def test__bigquery__switch_materialization(self):
@@ -79,19 +71,3 @@ class TestChangingRelationType(DBTIntegrationTest):
results = self.run_dbt(['run', '--vars', 'materialized: view', "--full-refresh"])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)
@use_profile('presto')
def test__presto__switch_materialization(self):
# presto can't do incremental materializations so there's less to this
results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: table'])
self.assertEqual(results[0].node.config.materialized, 'table')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)

View File

@@ -1,4 +0,0 @@
id,name
1,Drew
2,Jake
3,Connor
1 id name
2 1 Drew
3 2 Jake
4 3 Connor

View File

@@ -1,9 +0,0 @@
{{ config(materialized='table') }}
select *
{% if var('add_table_field', False) %}
, 1 as new_field
{% endif %}
from {{ ref('people') }}

View File

@@ -1,8 +0,0 @@
{% if var('dependent_type', 'view') == 'view' %}
{{ config(materialized='view') }}
{% else %}
{{ config(materialized='table') }}
{% endif %}
select * from {{ ref('base_table') }}

View File

@@ -1,136 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
class TestSnowflakeLateBindingViewDependency(DBTIntegrationTest):
@property
def schema(self):
return "snowflake_view_dependency_test_036"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'data-paths': ['data'],
'seeds': {
'quote_columns': False,
},
'quoting': {
'schema': False,
'identifier': False
}
}
"""
Snowflake views are not bound to the relations they select from. A Snowflake view
can have entirely invalid SQL if, for example, the table it selects from is dropped
and recreated with a different schema. In these scenarios, Snowflake will raise an error if:
1) The view is queried
2) The view is altered
dbt's logic should avoid running these types of queries against views in situations
where they _may_ have invalid definitions. These tests assert that views are handled
correctly in various different scenarios
"""
@use_profile('snowflake')
def test__snowflake__changed_table_schema_for_downstream_view(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["PEOPLE", "BASE_TABLE", "DEPENDENT_MODEL"])
# Change the schema of base_table, assert that dependent_model doesn't fail
results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: view}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["BASE_TABLE", "DEPENDENT_MODEL"])
"""
This test is similar to the one above, except the downstream model starts as a view, and
then is changed to be a table. This checks that the table materialization does not
errantly rename a view that might have an invalid definition, which would cause an error
"""
@use_profile('snowflake')
def test__snowflake__changed_table_schema_for_downstream_view_changed_to_table(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["PEOPLE", "BASE_TABLE", "DEPENDENT_MODEL"])
expected_types = {
'base_table': 'table',
'dependent_model': 'view'
}
# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config.materialized, expected_types[node_name])
results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: table}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["BASE_TABLE", "DEPENDENT_MODEL"])
expected_types = {
'base_table': 'table',
'dependent_model': 'table'
}
# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config.materialized, expected_types[node_name])
@use_profile('presto')
def test__presto__changed_table_schema_for_downstream_view(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["people", "base_table", "dependent_model"])
# Change the schema of base_table, assert that dependent_model doesn't fail
results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: view}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["base_table", "dependent_model"])
@use_profile('presto')
def test__presto__changed_table_schema_for_downstream_view_changed_to_table(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["people", "base_table", "dependent_model"])
expected_types = {
'base_table': 'table',
'dependent_model': 'view'
}
# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config.materialized, expected_types[node_name])
results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: table}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["base_table", "dependent_model"])
expected_types = {
'base_table': 'table',
'dependent_model': 'table'
}
# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config.materialized, expected_types[node_name])

View File

@@ -39,10 +39,6 @@ class TestCachingLowercaseModel(TestBaseCaching):
def models(self):
return "models"
@use_profile('snowflake')
def test_snowflake_cache(self):
self.cache_run()
@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()
@@ -52,10 +48,6 @@ class TestCachingUppercaseModel(TestBaseCaching):
def models(self):
return "shouting_models"
@use_profile('snowflake')
def test_snowflake_cache(self):
self.cache_run()
@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()

View File

@@ -15,42 +15,7 @@ class BaseOverrideDatabase(DBTIntegrationTest):
@property
def alternative_database(self):
if self.adapter_type == 'snowflake':
return os.getenv('SNOWFLAKE_TEST_DATABASE')
else:
return super().alternative_database
def snowflake_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default2': {
'type': 'snowflake',
'threads': 4,
'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'),
'user': os.getenv('SNOWFLAKE_TEST_USER'),
'password': os.getenv('SNOWFLAKE_TEST_PASSWORD'),
'database': os.getenv('SNOWFLAKE_TEST_QUOTED_DATABASE'),
'schema': self.unique_schema(),
'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'),
},
'noaccess': {
'type': 'snowflake',
'threads': 4,
'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'),
'user': 'noaccess',
'password': 'password',
'database': os.getenv('SNOWFLAKE_TEST_DATABASE'),
'schema': self.unique_schema(),
'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'),
}
},
'target': 'default2'
}
}
return super().alternative_database
@property
def project_config(self):
@@ -71,10 +36,7 @@ class BaseOverrideDatabase(DBTIntegrationTest):
class TestModelOverride(BaseOverrideDatabase):
def run_database_override(self):
if self.adapter_type == 'snowflake':
func = lambda x: x.upper()
else:
func = lambda x: x
func = lambda x: x
self.run_dbt(['seed'])
@@ -91,10 +53,6 @@ class TestModelOverride(BaseOverrideDatabase):
def test_bigquery_database_override(self):
self.run_database_override()
@use_profile('snowflake')
def test_snowflake_database_override(self):
self.run_database_override()
class BaseTestProjectModelOverride(BaseOverrideDatabase):
# this is janky, but I really want to access self.default_database in
@@ -106,8 +64,6 @@ class BaseTestProjectModelOverride(BaseOverrideDatabase):
for key in ['database', 'project', 'dbname']:
if key in profile:
database = profile[key]
if self.adapter_type == 'snowflake':
return database.upper()
return database
assert False, 'No profile database found!'
@@ -117,10 +73,7 @@ class BaseTestProjectModelOverride(BaseOverrideDatabase):
self.assertExpectedRelations()
def assertExpectedRelations(self):
if self.adapter_type == 'snowflake':
func = lambda x: x.upper()
else:
func = lambda x: x
func = lambda x: x
self.assertManyRelationsEqual([
(func('seed'), self.unique_schema(), self.default_database),
@@ -163,10 +116,6 @@ class TestProjectModelOverride(BaseTestProjectModelOverride):
def test_bigquery_database_override(self):
self.run_database_override()
@use_profile('snowflake')
def test_snowflake_database_override(self):
self.run_database_override()
class TestProjectModelAliasOverride(BaseTestProjectModelOverride):
@property
@@ -203,10 +152,7 @@ class TestProjectModelAliasOverride(BaseTestProjectModelOverride):
class TestProjectSeedOverride(BaseOverrideDatabase):
def run_database_override(self):
if self.adapter_type == 'snowflake':
func = lambda x: x.upper()
else:
func = lambda x: x
func = lambda x: x
self.use_default_project({
'config-version': 2,
@@ -228,7 +174,3 @@ class TestProjectSeedOverride(BaseOverrideDatabase):
@use_profile('bigquery')
def test_bigquery_database_override(self):
self.run_database_override()
@use_profile('snowflake')
def test_snowflake_database_override(self):
self.run_database_override()

View File

@@ -357,14 +357,6 @@ class TestSourceFreshness(SuccessfulSourcesTest):
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/pass_source.json', 'pass')
@use_profile('snowflake')
def test_snowflake_source_freshness(self):
self._run_source_freshness()
@use_profile('redshift')
def test_redshift_source_freshness(self):
self._run_source_freshness()
@use_profile('bigquery')
def test_bigquery_source_freshness(self):
self._run_source_freshness()
@@ -492,8 +484,3 @@ class TestUnquotedSources(SuccessfulSourcesTest):
def test_postgres_catalog(self):
self.run_dbt_with_vars(['run'])
self.run_dbt_with_vars(['docs', 'generate'])
@use_profile('redshift')
def test_redshift_catalog(self):
self.run_dbt_with_vars(['run'])
self.run_dbt_with_vars(['docs', 'generate'])

View File

@@ -1,2 +0,0 @@
{{ config(materialized='table') }}
select '{{ env_var("SNOWFLAKE_TEST_ALT_WAREHOUSE", "DBT_TEST_ALT") }}' as warehouse

View File

@@ -1,2 +0,0 @@
{{ config(snowflake_warehouse='DBT_TEST_DOES_NOT_EXIST') }}
select current_warehouse() as warehouse

View File

@@ -1,2 +0,0 @@
{{ config(snowflake_warehouse=env_var('SNOWFLAKE_TEST_ALT_WAREHOUSE', 'DBT_TEST_ALT'), materialized='table') }}
select current_warehouse() as warehouse

View File

@@ -1,2 +0,0 @@
{{ config(materialized='table') }}
select '{{ env_var("SNOWFLAKE_TEST_ALT_WAREHOUSE", "DBT_TEST_ALT") }}' as warehouse

View File

@@ -1,2 +0,0 @@
{{ config(materialized='table') }}
select current_warehouse() as warehouse

View File

@@ -1,62 +0,0 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
class TestModelWarehouse(DBTIntegrationTest):
@property
def schema(self):
return 'dbt_warehouse_050'
@staticmethod
def dir(value):
return os.path.normpath(value)
@property
def models(self):
return self.dir('models')
@use_profile('snowflake')
def test_snowflake_override_ok(self):
self.run_dbt([
'run',
'--models', 'override_warehouse', 'expected_warehouse',
])
self.assertManyRelationsEqual([['OVERRIDE_WAREHOUSE'], ['EXPECTED_WAREHOUSE']])
@use_profile('snowflake')
def test_snowflake_override_noexist(self):
self.run_dbt(['run', '--models', 'invalid_warehouse'], expect_pass=False)
class TestConfigWarehouse(DBTIntegrationTest):
@property
def schema(self):
return 'dbt_warehouse_050'
@property
def project_config(self):
return {
'config-version': 2,
'source-paths': ['project-config-models'],
'models': {
'test': {
'snowflake_warehouse': os.getenv('SNOWFLAKE_TEST_ALT_WAREHOUSE', 'DBT_TEST_ALT'),
},
},
}
@staticmethod
def dir(value):
return os.path.normpath(value)
@property
def models(self):
return self.dir('models')
@use_profile('snowflake')
def test_snowflake_override_ok(self):
self.run_dbt([
'run',
'--models', 'override_warehouse', 'expected_warehouse',
])
self.assertManyRelationsEqual([['OVERRIDE_WAREHOUSE'], ['EXPECTED_WAREHOUSE']])

View File

@@ -12,7 +12,7 @@
{% set required = ['name', 'schema', 'type', 'threads'] %}
{# Require what we docuement at https://docs.getdbt.com/docs/target #}
{% if target.type == 'postgres' or target.type == 'redshift' %}
{% if target.type == 'postgres' %}
{% do required.extend(['dbname', 'host', 'user', 'port']) %}
{% elif target.type == 'snowflake' %}
{% do required.extend(['database', 'warehouse', 'user', 'role', 'account']) %}

View File

@@ -94,14 +94,6 @@ class TestDefaultQueryComments(DBTIntegrationTest):
def test_postgres_comments(self):
self.run_assert_comments()
@use_profile('redshift')
def test_redshift_comments(self):
self.run_assert_comments()
@use_profile('snowflake')
def test_snowflake_comments(self):
self.run_assert_comments()
@use_profile('bigquery')
def test_bigquery_comments(self):
self.run_assert_comments()

View File

@@ -39,34 +39,11 @@ class TestColumnQuotingDefault(BaseColumnQuotingTest):
def test_postgres_column_quotes(self):
self._run_columnn_quotes()
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()
@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes(strategy='merge')
class TestColumnQuotingSnowflakeDefault(BaseColumnQuotingTest):
@property
def project_config(self):
return {
'config-version': 2
}
@property
def models(self):
return self.dir('models-unquoted')
def run_dbt(self, *args, **kwargs):
return super().run_dbt(*args, **kwargs)
@use_profile('snowflake')
def test_snowflake_column_quotes(self):
self._run_columnn_quotes()
class TestColumnQuotingDisabled(BaseColumnQuotingTest):
@property
def models(self):
@@ -85,18 +62,6 @@ class TestColumnQuotingDisabled(BaseColumnQuotingTest):
def test_postgres_column_quotes(self):
self._run_columnn_quotes()
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()
@use_profile('snowflake')
def test_snowflake_column_quotes(self):
self._run_columnn_quotes()
@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')
@use_profile('bigquery')
def test_bigquery_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')
@@ -120,18 +85,6 @@ class TestColumnQuotingEnabled(BaseColumnQuotingTest):
def test_postgres_column_quotes(self):
self._run_columnn_quotes()
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()
@use_profile('snowflake')
def test_snowflake_column_quotes(self):
self._run_columnn_quotes()
@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')
@use_profile('bigquery')
def test_bigquery_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')

View File

@@ -24,18 +24,6 @@ class TestBaseCaching(DBTIntegrationTest):
self.run_dbt()
self.assertTablesEqual('model', 'expected')
@use_profile('redshift')
def test_redshift_adapter_methods(self):
self.run_dbt(['compile']) # trigger any compile-time issues
self.run_dbt()
self.assertTablesEqual('model', 'expected')
@use_profile('snowflake')
def test_snowflake_adapter_methods(self):
self.run_dbt(['compile']) # trigger any compile-time issues
self.run_dbt()
self.assertTablesEqual('MODEL', 'EXPECTED')
@use_profile('bigquery')
def test_bigquery_adapter_methods(self):
self.run_dbt(['compile']) # trigger any compile-time issues

View File

@@ -1,4 +1,3 @@
import pytest
from test.integration.base import DBTIntegrationTest, use_profile
@@ -21,25 +20,6 @@ class TestPostgresColumnTypes(TestColumnTypes):
def test_postgres_column_types(self):
self.run_and_test()
class TestRedshiftColumnTypes(TestColumnTypes):
@property
def models(self):
return 'rs_models'
@use_profile('redshift')
def test_redshift_column_types(self):
self.run_and_test()
class TestSnowflakeColumnTypes(TestColumnTypes):
@property
def models(self):
return 'sf_models'
@use_profile('snowflake')
def test_snowflake_column_types(self):
self.run_and_test()
class TestBigQueryColumnTypes(TestColumnTypes):
@property

View File

@@ -1 +0,0 @@
select 1 as id

Some files were not shown because too many files have changed in this diff Show More