Compare commits

...

8 Commits

Author SHA1 Message Date
Kyle Wigley
6ceed5ce55 fix docs test bc of schema version change 2021-09-28 11:59:54 -04:00
Kyle Wigley
b145e0524c fix tests! 2021-09-28 11:59:54 -04:00
Kyle Wigley
b610b58d2e cleanup CI 2021-09-28 11:59:53 -04:00
Kyle Wigley
1ba9f89c54 rm all references to redshift except integration/100_rpc_test 🙁 2021-09-28 11:59:53 -04:00
Kyle Wigley
c0ae76690d rm references to redshift 2021-09-28 11:59:53 -04:00
Kyle Wigley
08b762cbf2 fix typo; rm breakpoints 2021-09-28 11:59:53 -04:00
Kyle Wigley
052b14cf79 Start moving unit tests 2021-09-28 11:59:53 -04:00
Kyle Wigley
2d938e511d first pass moving redshift tests 2021-09-28 11:59:52 -04:00
293 changed files with 10821 additions and 1003 deletions

View File

@@ -34,16 +34,12 @@ first_value = 1
[bumpversion:file:plugins/postgres/setup.py]
[bumpversion:file:plugins/redshift/setup.py]
[bumpversion:file:plugins/snowflake/setup.py]
[bumpversion:file:plugins/bigquery/setup.py]
[bumpversion:file:plugins/postgres/dbt/adapters/postgres/__version__.py]
[bumpversion:file:plugins/redshift/dbt/adapters/redshift/__version__.py]
[bumpversion:file:plugins/snowflake/dbt/adapters/snowflake/__version__.py]
[bumpversion:file:plugins/bigquery/dbt/adapters/bigquery/__version__.py]

View File

@@ -21,11 +21,6 @@ updates:
schedule:
interval: "daily"
rebase-strategy: "disabled"
- package-ecosystem: "pip"
directory: "/plugins/redshift"
schedule:
interval: "daily"
rebase-strategy: "disabled"
- package-ecosystem: "pip"
directory: "/plugins/snowflake"
schedule:

View File

@@ -97,10 +97,6 @@ jobs:
bigquery:
- 'core/**'
- 'plugins/bigquery/**'
redshift:
- 'core/**'
- 'plugins/redshift/**'
- 'plugins/postgres/**'
- name: Generate integration test matrix
id: generate-matrix
@@ -191,16 +187,6 @@ jobs:
if: matrix.adapter == 'postgres'
run: tox
- name: Run tox (redshift)
if: matrix.adapter == 'redshift'
env:
REDSHIFT_TEST_DBNAME: ${{ secrets.REDSHIFT_TEST_DBNAME }}
REDSHIFT_TEST_PASS: ${{ secrets.REDSHIFT_TEST_PASS }}
REDSHIFT_TEST_USER: ${{ secrets.REDSHIFT_TEST_USER }}
REDSHIFT_TEST_PORT: ${{ secrets.REDSHIFT_TEST_PORT }}
REDSHIFT_TEST_HOST: ${{ secrets.REDSHIFT_TEST_HOST }}
run: tox
- name: Run tox (snowflake)
if: matrix.adapter == 'snowflake'
env:

View File

View File

@@ -0,0 +1,6 @@
-- Macro to rename a relation
{% macro rename_named_relation(from_name, to_name) %}
{%- set from_relation = api.Relation.create(database=target.database, schema=target.schema, identifier=from_name, type='table') -%}
{%- set to_relation = api.Relation.create(database=target.database, schema=target.schema, identifier=to_name, type='table') -%}
{% do adapter.rename_relation(from_relation, to_relation) %}
{% endmacro %}

View File

@@ -0,0 +1,3 @@
-- make sure this runs after 'model'
-- {{ ref('model') }}
select 2 as id

View File

@@ -0,0 +1,19 @@
{% set upstream = ref('upstream') %}
{% if execute %}
{# don't ever do any of this #}
{%- do adapter.drop_schema(upstream) -%}
{% set existing = adapter.get_relation(upstream.database, upstream.schema, upstream.identifier) %}
{% if existing is not none %}
{% do exceptions.raise_compiler_error('expected ' ~ ' to not exist, but it did') %}
{% endif %}
{%- do adapter.create_schema(upstream) -%}
{% set sql = create_view_as(upstream, 'select 2 as id') %}
{% do run_query(sql) %}
{% endif %}
select * from {{ upstream }}

View File

@@ -0,0 +1 @@
select 1 as id

View File

@@ -0,0 +1,24 @@
from tests.integration.base import DBTIntegrationTest, use_profile
class TestBaseCaching(DBTIntegrationTest):
@property
def schema(self):
return "caching"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'test-paths': ['tests']
}
@use_profile('redshift')
def test_redshift_adapter_methods(self):
self.run_dbt(['compile']) # trigger any compile-time issues
self.run_dbt()
self.assertTablesEqual('model', 'expected')

View File

@@ -0,0 +1,7 @@
{% set columns = adapter.get_columns_in_relation(ref('model')) %}
{% set limit_query = 0 %}
{% if (columns | length) == 0 %}
{% set limit_query = 1 %}
{% endif %}
select 1 as id limit {{ limit_query }}

View File

@@ -0,0 +1,946 @@
import os
import io
import random
import shutil
import sys
import tempfile
import traceback
import unittest
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
import pytest
import yaml
from unittest.mock import patch
import dbt.main as dbt
from dbt import flags
from dbt.deprecations import reset_deprecations
from dbt.adapters.factory import get_adapter, reset_adapters, register_adapter
from dbt.clients.jinja import template_cache
from dbt.config import RuntimeConfig
from dbt.context import providers
from dbt.logger import GLOBAL_LOGGER as logger, log_manager
from dbt.contracts.graph.manifest import Manifest
INITIAL_ROOT = os.getcwd()
def normalize(path):
"""On windows, neither is enough on its own:
>>> normcase('C:\\documents/ALL CAPS/subdir\\..')
'c:\\documents\\all caps\\subdir\\..'
>>> normpath('C:\\documents/ALL CAPS/subdir\\..')
'C:\\documents\\ALL CAPS'
>>> normpath(normcase('C:\\documents/ALL CAPS/subdir\\..'))
'c:\\documents\\all caps'
"""
return os.path.normcase(os.path.normpath(path))
class Normalized:
def __init__(self, value):
self.value = value
def __repr__(self):
return f'Normalized({self.value!r})'
def __str__(self):
return f'Normalized({self.value!s})'
def __eq__(self, other):
return normalize(self.value) == normalize(other)
class FakeArgs:
def __init__(self):
self.threads = 1
self.data = False
self.defer = False
self.schema = True
self.full_refresh = False
self.models = None
self.select = None
self.exclude = None
self.single_threaded = False
self.selector_name = None
self.state = None
self.defer = None
class TestArgs:
def __init__(self, kwargs):
self.which = 'run'
self.single_threaded = False
self.profiles_dir = None
self.project_dir = None
self.__dict__.update(kwargs)
def _profile_from_test_name(test_name):
adapter_names = ('redshift',)
adapters_in_name = sum(x in test_name for x in adapter_names)
if adapters_in_name != 1:
raise ValueError(
'test names must have exactly 1 profile choice embedded, {} has {}'
.format(test_name, adapters_in_name)
)
for adapter_name in adapter_names:
if adapter_name in test_name:
return adapter_name
raise ValueError(
'could not find adapter name in test name {}'.format(test_name)
)
def _pytest_test_name():
return os.environ['PYTEST_CURRENT_TEST'].split()[0]
def _pytest_get_test_root():
test_path = _pytest_test_name().split('::')[0]
relative_to = INITIAL_ROOT
head = os.path.relpath(test_path, relative_to)
path_parts = []
while head:
head, tail = os.path.split(head)
path_parts.append(tail)
path_parts.reverse()
# dbt tests are all of the form 'test/integration/XXX_suite_name'
target = os.path.join(*path_parts[:5]) # TODO: try to not hard code this
return os.path.join(relative_to, target)
def _really_makedirs(path):
while not os.path.exists(path):
try:
os.makedirs(path)
except EnvironmentError:
raise
class DBTIntegrationTest(unittest.TestCase):
CREATE_SCHEMA_STATEMENT = 'CREATE SCHEMA {}'
DROP_SCHEMA_STATEMENT = 'DROP SCHEMA IF EXISTS {} CASCADE'
_randint = random.randint(0, 9999)
_runtime_timedelta = (datetime.utcnow() - datetime(1970, 1, 1, 0, 0, 0))
_runtime = (
(int(_runtime_timedelta.total_seconds() * 1e6)) +
_runtime_timedelta.microseconds
)
prefix = f'test{_runtime}{_randint:04}'
setup_alternate_db = False
def redshift_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default2': {
'type': 'redshift',
'threads': 1,
'host': os.getenv('REDSHIFT_TEST_HOST'),
'port': int(os.getenv('REDSHIFT_TEST_PORT')),
'user': os.getenv('REDSHIFT_TEST_USER'),
'pass': os.getenv('REDSHIFT_TEST_PASS'),
'dbname': os.getenv('REDSHIFT_TEST_DBNAME'),
'schema': self.unique_schema()
}
},
'target': 'default2'
}
}
@property
def packages_config(self):
return None
@property
def selectors_config(self):
return None
def unique_schema(self):
schema = self.schema
to_return = "{}_{}".format(self.prefix, schema)
return to_return.lower()
@property
def default_database(self):
database = self.config.credentials.database
return database
@property
def alternative_database(self):
return None
def get_profile(self, adapter_type):
if adapter_type == 'redshift':
return self.redshift_profile()
else:
raise ValueError('invalid adapter type {}'.format(adapter_type))
def _pick_profile(self):
test_name = self.id().split('.')[-1]
return _profile_from_test_name(test_name)
def _symlink_test_folders(self):
for entry in os.listdir(self.test_original_source_path):
src = os.path.join(self.test_original_source_path, entry)
tst = os.path.join(self.test_root_dir, entry)
if os.path.isdir(src) or src.endswith('.sql'):
# symlink all sql files and all directories.
os.symlink(src, tst)
os.symlink(self._logs_dir, os.path.join(self.test_root_dir, 'logs'))
@property
def test_root_realpath(self):
if sys.platform == 'darwin':
return os.path.realpath(self.test_root_dir)
else:
return self.test_root_dir
def _generate_test_root_dir(self):
return normalize(tempfile.mkdtemp(prefix='dbt-int-test-'))
def setUp(self):
self.dbt_core_install_root = os.path.dirname(dbt.__file__)
log_manager.reset_handlers()
self.initial_dir = INITIAL_ROOT
os.chdir(self.initial_dir)
# before we go anywhere, collect the initial path info
self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix)
_really_makedirs(self._logs_dir)
self.test_original_source_path = _pytest_get_test_root()
self.test_root_dir = self._generate_test_root_dir()
os.chdir(self.test_root_dir)
try:
self._symlink_test_folders()
except Exception as exc:
msg = '\n\t'.join((
'Failed to symlink test folders!',
'initial_dir={0.initial_dir}',
'test_original_source_path={0.test_original_source_path}',
'test_root_dir={0.test_root_dir}'
)).format(self)
logger.exception(msg)
# if logging isn't set up, I still really want this message.
print(msg)
traceback.print_exc()
raise
self._created_schemas = set()
reset_deprecations()
template_cache.clear()
self.use_profile(self._pick_profile())
self.use_default_project()
self.set_packages()
self.set_selectors()
self.load_config()
def use_default_project(self, overrides=None):
# create a dbt_project.yml
base_project_config = {
'name': 'test',
'version': '1.0',
'config-version': 2,
'test-paths': [],
'source-paths': [self.models],
'profile': 'test',
}
project_config = {}
project_config.update(base_project_config)
project_config.update(self.project_config)
project_config.update(overrides or {})
with open("dbt_project.yml", 'w') as f:
yaml.safe_dump(project_config, f, default_flow_style=True)
def use_profile(self, adapter_type):
self.adapter_type = adapter_type
profile_config = {}
default_profile_config = self.get_profile(adapter_type)
profile_config.update(default_profile_config)
profile_config.update(self.profile_config)
if not os.path.exists(self.test_root_dir):
os.makedirs(self.test_root_dir)
flags.PROFILES_DIR = self.test_root_dir
profiles_path = os.path.join(self.test_root_dir, 'profiles.yml')
with open(profiles_path, 'w') as f:
yaml.safe_dump(profile_config, f, default_flow_style=True)
self._profile_config = profile_config
def set_packages(self):
if self.packages_config is not None:
with open('packages.yml', 'w') as f:
yaml.safe_dump(self.packages_config, f, default_flow_style=True)
def set_selectors(self):
if self.selectors_config is not None:
with open('selectors.yml', 'w') as f:
yaml.safe_dump(self.selectors_config, f, default_flow_style=True)
def load_config(self):
# we've written our profile and project. Now we want to instantiate a
# fresh adapter for the tests.
# it's important to use a different connection handle here so
# we don't look into an incomplete transaction
kwargs = {
'profile': None,
'profiles_dir': self.test_root_dir,
'target': None,
}
config = RuntimeConfig.from_args(TestArgs(kwargs))
register_adapter(config)
adapter = get_adapter(config)
adapter.cleanup_connections()
self.adapter_type = adapter.type()
self.adapter = adapter
self.config = config
self._drop_schemas()
self._create_schemas()
def quote_as_configured(self, value, quote_key):
return self.adapter.quote_as_configured(value, quote_key)
def tearDown(self):
# get any current run adapter and clean up its connections before we
# reset them. It'll probably be different from ours because
# handle_and_check() calls reset_adapters().
register_adapter(self.config)
adapter = get_adapter(self.config)
if adapter is not self.adapter:
adapter.cleanup_connections()
if not hasattr(self, 'adapter'):
self.adapter = adapter
self._drop_schemas()
self.adapter.cleanup_connections()
reset_adapters()
os.chdir(INITIAL_ROOT)
try:
shutil.rmtree(self.test_root_dir)
except EnvironmentError:
logger.exception('Could not clean up after test - {} not removable'
.format(self.test_root_dir))
def _get_schema_fqn(self, database, schema):
schema_fqn = self.quote_as_configured(schema, 'schema')
return schema_fqn
def _create_schema_named(self, database, schema):
schema_fqn = self._get_schema_fqn(database, schema)
self.run_sql(self.CREATE_SCHEMA_STATEMENT.format(schema_fqn))
self._created_schemas.add(schema_fqn)
def _drop_schema_named(self, database, schema):
schema_fqn = self._get_schema_fqn(database, schema)
self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn))
def _create_schemas(self):
schema = self.unique_schema()
with self.adapter.connection_named('__test'):
self._create_schema_named(self.default_database, schema)
def _drop_schemas_sql(self):
schema = self.unique_schema()
# we always want to drop these if necessary, we'll clear it soon.
self._created_schemas.add(
self._get_schema_fqn(self.default_database, schema)
)
drop_alternative = (
self.setup_alternate_db and
self.adapter_type not in {'redshift'} and
self.alternative_database
)
if drop_alternative:
self._created_schemas.add(
self._get_schema_fqn(self.alternative_database, schema)
)
for schema_fqn in self._created_schemas:
self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn))
self._created_schemas.clear()
def _drop_schemas(self):
with self.adapter.connection_named('__test'):
self._drop_schemas_sql()
@property
def project_config(self):
return {
'config-version': 2,
}
@property
def profile_config(self):
return {}
def run_dbt(self, args=None, expect_pass=True, profiles_dir=True):
res, success = self.run_dbt_and_check(args=args, profiles_dir=profiles_dir)
self.assertEqual(
success, expect_pass,
"dbt exit state did not match expected")
return res
def run_dbt_and_capture(self, *args, **kwargs):
try:
initial_stdout = log_manager.stdout
initial_stderr = log_manager.stderr
stringbuf = io.StringIO()
log_manager.set_output_stream(stringbuf)
res = self.run_dbt(*args, **kwargs)
stdout = stringbuf.getvalue()
finally:
log_manager.set_output_stream(initial_stdout, initial_stderr)
return res, stdout
def run_dbt_and_check(self, args=None, profiles_dir=True):
log_manager.reset_handlers()
if args is None:
args = ["run"]
final_args = []
if os.getenv('DBT_TEST_SINGLE_THREADED') in ('y', 'Y', '1'):
final_args.append('--single-threaded')
final_args.extend(args)
if profiles_dir:
final_args.extend(['--profiles-dir', self.test_root_dir])
final_args.append('--log-cache-events')
logger.info("Invoking dbt with {}".format(final_args))
return dbt.handle_and_check(final_args)
def run_sql_file(self, path, kwargs=None):
with open(path, 'r') as f:
statements = f.read().split(";")
for statement in statements:
self.run_sql(statement, kwargs=kwargs)
def transform_sql(self, query, kwargs=None):
to_return = query
base_kwargs = {
'schema': self.unique_schema(),
'database': self.adapter.quote(self.default_database),
}
if kwargs is None:
kwargs = {}
base_kwargs.update(kwargs)
to_return = to_return.format(**base_kwargs)
return to_return
def run_sql_common(self, sql, fetch, conn):
with conn.handle.cursor() as cursor:
try:
cursor.execute(sql)
conn.handle.commit()
if fetch == 'one':
return cursor.fetchone()
elif fetch == 'all':
return cursor.fetchall()
else:
return
except BaseException as e:
if conn.handle and not getattr(conn.handle, 'closed', True):
conn.handle.rollback()
print(sql)
print(e)
raise
finally:
conn.transaction_open = False
def run_sql(self, query, fetch='None', kwargs=None, connection_name=None):
if connection_name is None:
connection_name = '__test'
if query.strip() == "":
return
sql = self.transform_sql(query, kwargs=kwargs)
with self.get_connection(connection_name) as conn:
logger.debug('test connection "{}" executing: {}'.format(conn.name, sql))
return self.run_sql_common(sql, fetch, conn)
def _ilike(self, target, value):
return "{} ilike '{}'".format(target, value)
def get_many_table_columns_information_schema(self, tables, schema, database=None):
columns = 'table_name, column_name, data_type, character_maximum_length'
sql = """
select {columns}
from {db_string}information_schema.columns
where {schema_filter}
and ({table_filter})
order by column_name asc"""
db_string = ''
if database:
db_string = self.quote_as_configured(database, 'database') + '.'
table_filters_s = " OR ".join(
self._ilike('table_name', table.replace('"', ''))
for table in tables
)
schema_filter = self._ilike('table_schema', schema)
sql = sql.format(
columns=columns,
schema_filter=schema_filter,
table_filter=table_filters_s,
db_string=db_string)
columns = self.run_sql(sql, fetch='all')
return list(map(self.filter_many_columns, columns))
def get_many_table_columns(self, tables, schema, database=None):
result = self.get_many_table_columns_information_schema(tables, schema, database)
result.sort(key=lambda x: '{}.{}'.format(x[0], x[1]))
return result
def filter_many_columns(self, column):
if len(column) == 3:
table_name, column_name, data_type = column
char_size = None
else:
table_name, column_name, data_type, char_size = column
return (table_name, column_name, data_type, char_size)
@contextmanager
def get_connection(self, name=None):
"""Create a test connection context where all executed macros, etc will
get self.adapter as the adapter.
This allows tests to run normal adapter macros as if reset_adapters()
were not called by handle_and_check (for asserts, etc)
"""
if name is None:
name = '__test'
with patch.object(providers, 'get_adapter', return_value=self.adapter):
with self.adapter.connection_named(name):
conn = self.adapter.connections.get_thread_connection()
yield conn
def get_relation_columns(self, relation):
with self.get_connection():
columns = self.adapter.get_columns_in_relation(relation)
return sorted(((c.name, c.dtype, c.char_size) for c in columns),
key=lambda x: x[0])
def get_table_columns(self, table, schema=None, database=None):
schema = self.unique_schema() if schema is None else schema
database = self.default_database if database is None else database
relation = self.adapter.Relation.create(
database=database,
schema=schema,
identifier=table,
type='table',
quote_policy=self.config.quoting
)
return self.get_relation_columns(relation)
def get_table_columns_as_dict(self, tables, schema=None):
col_matrix = self.get_many_table_columns(tables, schema)
res = {}
for row in col_matrix:
table_name = row[0]
col_def = row[1:]
if table_name not in res:
res[table_name] = []
res[table_name].append(col_def)
return res
def get_models_in_schema(self, schema=None):
schema = self.unique_schema() if schema is None else schema
sql = """
select table_name,
case when table_type = 'BASE TABLE' then 'table'
when table_type = 'VIEW' then 'view'
else table_type
end as materialization
from information_schema.tables
where {}
order by table_name
"""
sql = sql.format(self._ilike('table_schema', schema))
result = self.run_sql(sql, fetch='all')
return {model_name: materialization for (model_name, materialization) in result}
def _assertTablesEqualSql(self, relation_a, relation_b, columns=None):
if columns is None:
columns = self.get_relation_columns(relation_a)
column_names = [c[0] for c in columns]
sql = self.adapter.get_rows_different_sql(
relation_a, relation_b, column_names
)
return sql
def assertTablesEqual(self, table_a, table_b,
table_a_schema=None, table_b_schema=None,
table_a_db=None, table_b_db=None):
if table_a_schema is None:
table_a_schema = self.unique_schema()
if table_b_schema is None:
table_b_schema = self.unique_schema()
if table_a_db is None:
table_a_db = self.default_database
if table_b_db is None:
table_b_db = self.default_database
relation_a = self._make_relation(table_a, table_a_schema, table_a_db)
relation_b = self._make_relation(table_b, table_b_schema, table_b_db)
self._assertTableColumnsEqual(relation_a, relation_b)
sql = self._assertTablesEqualSql(relation_a, relation_b)
result = self.run_sql(sql, fetch='one')
self.assertEqual(
result[0],
0,
'row_count_difference nonzero: ' + sql
)
self.assertEqual(
result[1],
0,
'num_mismatched nonzero: ' + sql
)
def _make_relation(self, identifier, schema=None, database=None):
if schema is None:
schema = self.unique_schema()
if database is None:
database = self.default_database
return self.adapter.Relation.create(
database=database,
schema=schema,
identifier=identifier,
quote_policy=self.config.quoting
)
def get_many_relation_columns(self, relations):
"""Returns a dict of (datbase, schema) -> (dict of (table_name -> list of columns))
"""
schema_fqns = {}
for rel in relations:
this_schema = schema_fqns.setdefault((rel.database, rel.schema), [])
this_schema.append(rel.identifier)
column_specs = {}
for key, tables in schema_fqns.items():
database, schema = key
columns = self.get_many_table_columns(tables, schema, database=database)
table_columns = {}
for col in columns:
table_columns.setdefault(col[0], []).append(col[1:])
for rel_name, columns in table_columns.items():
key = (database, schema, rel_name)
column_specs[key] = columns
return column_specs
def assertManyRelationsEqual(self, relations, default_schema=None, default_database=None):
if default_schema is None:
default_schema = self.unique_schema()
if default_database is None:
default_database = self.default_database
specs = []
for relation in relations:
if not isinstance(relation, (tuple, list)):
relation = [relation]
assert len(relation) <= 3
if len(relation) == 3:
relation = self._make_relation(*relation)
elif len(relation) == 2:
relation = self._make_relation(relation[0], relation[1], default_database)
elif len(relation) == 1:
relation = self._make_relation(relation[0], default_schema, default_database)
else:
raise ValueError('relation must be a sequence of 1, 2, or 3 values')
specs.append(relation)
with self.get_connection():
column_specs = self.get_many_relation_columns(specs)
# make sure everyone has equal column definitions
first_columns = None
for relation in specs:
key = (relation.database, relation.schema, relation.identifier)
# get a good error here instead of a hard-to-diagnose KeyError
self.assertIn(key, column_specs, f'No columns found for {key}')
columns = column_specs[key]
if first_columns is None:
first_columns = columns
else:
self.assertEqual(
first_columns, columns,
'{} did not match {}'.format(str(specs[0]), str(relation))
)
# make sure everyone has the same data. if we got here, everyone had
# the same column specs!
first_relation = None
for relation in specs:
if first_relation is None:
first_relation = relation
else:
sql = self._assertTablesEqualSql(first_relation, relation,
columns=first_columns)
result = self.run_sql(sql, fetch='one')
self.assertEqual(
result[0],
0,
'row_count_difference nonzero: ' + sql
)
self.assertEqual(
result[1],
0,
'num_mismatched nonzero: ' + sql
)
def assertManyTablesEqual(self, *args):
schema = self.unique_schema()
all_tables = []
for table_equivalencies in args:
all_tables += list(table_equivalencies)
all_cols = self.get_table_columns_as_dict(all_tables, schema)
for table_equivalencies in args:
first_table = table_equivalencies[0]
first_relation = self._make_relation(first_table)
# assert that all tables have the same columns
base_result = all_cols[first_table]
self.assertTrue(len(base_result) > 0)
for other_table in table_equivalencies[1:]:
other_result = all_cols[other_table]
self.assertTrue(len(other_result) > 0)
self.assertEqual(base_result, other_result)
other_relation = self._make_relation(other_table)
sql = self._assertTablesEqualSql(first_relation,
other_relation,
columns=base_result)
result = self.run_sql(sql, fetch='one')
self.assertEqual(
result[0],
0,
'row_count_difference nonzero: ' + sql
)
self.assertEqual(
result[1],
0,
'num_mismatched nonzero: ' + sql
)
def _assertTableRowCountsEqual(self, relation_a, relation_b):
cmp_query = """
with table_a as (
select count(*) as num_rows from {}
), table_b as (
select count(*) as num_rows from {}
)
select table_a.num_rows - table_b.num_rows as difference
from table_a, table_b
""".format(str(relation_a), str(relation_b))
res = self.run_sql(cmp_query, fetch='one')
self.assertEqual(int(res[0]), 0, "Row count of table {} doesn't match row count of table {}. ({} rows different)".format(
relation_a.identifier,
relation_b.identifier,
res[0]
)
)
def assertTableDoesNotExist(self, table, schema=None, database=None):
columns = self.get_table_columns(table, schema, database)
self.assertEqual(
len(columns),
0
)
def assertTableDoesExist(self, table, schema=None, database=None):
columns = self.get_table_columns(table, schema, database)
self.assertGreater(
len(columns),
0
)
def _assertTableColumnsEqual(self, relation_a, relation_b):
table_a_result = self.get_relation_columns(relation_a)
table_b_result = self.get_relation_columns(relation_b)
text_types = {'text', 'character varying', 'character', 'varchar'}
self.assertEqual(len(table_a_result), len(table_b_result))
for a_column, b_column in zip(table_a_result, table_b_result):
a_name, a_type, a_size = a_column
b_name, b_type, b_size = b_column
self.assertEqual(a_name, b_name,
'{} vs {}: column "{}" != "{}"'.format(
relation_a, relation_b, a_name, b_name
))
self.assertEqual(a_type, b_type,
'{} vs {}: column "{}" has type "{}" != "{}"'.format(
relation_a, relation_b, a_name, a_type, b_type
))
self.assertEqual(a_size, b_size,
'{} vs {}: column "{}" has size "{}" != "{}"'.format(
relation_a, relation_b, a_name, a_size, b_size
))
def assertEquals(self, *args, **kwargs):
# assertEquals is deprecated. This makes the warnings less chatty
self.assertEqual(*args, **kwargs)
def assertBetween(self, timestr, start, end=None):
datefmt = '%Y-%m-%dT%H:%M:%S.%fZ'
if end is None:
end = datetime.utcnow()
parsed = datetime.strptime(timestr, datefmt)
self.assertLessEqual(start, parsed,
'parsed date {} happened before {}'.format(
parsed,
start.strftime(datefmt))
)
self.assertGreaterEqual(end, parsed,
'parsed date {} happened after {}'.format(
parsed,
end.strftime(datefmt))
)
def use_profile(profile_name):
"""A decorator to declare a test method as using a particular profile.
Handles both setting the nose attr and calling self.use_profile.
Use like this:
class TestSomething(DBIntegrationTest):
@use_profile('postgres')
def test_postgres_thing(self):
self.assertEqual(self.adapter_type, 'postgres')
@use_profile('snowflake')
def test_snowflake_thing(self):
self.assertEqual(self.adapter_type, 'snowflake')
"""
def outer(wrapped):
@getattr(pytest.mark, 'profile_'+profile_name)
@wraps(wrapped)
def func(self, *args, **kwargs):
return wrapped(self, *args, **kwargs)
# sanity check at import time
assert _profile_from_test_name(wrapped.__name__) == profile_name
return func
return outer
class AnyFloat:
"""Any float. Use this in assertEqual() calls to assert that it is a float.
"""
def __eq__(self, other):
return isinstance(other, float)
class AnyString:
"""Any string. Use this in assertEqual() calls to assert that it is a string.
"""
def __eq__(self, other):
return isinstance(other, str)
class AnyStringWith:
def __init__(self, contains=None):
self.contains = contains
def __eq__(self, other):
if not isinstance(other, str):
return False
if self.contains is None:
return True
return self.contains in other
def __repr__(self):
return 'AnyStringWith<{!r}>'.format(self.contains)
def get_manifest():
path = './target/partial_parse.msgpack'
if os.path.exists(path):
with open(path, 'rb') as fp:
manifest_mp = fp.read()
manifest: Manifest = Manifest.from_msgpack(manifest_mp)
return manifest
else:
return None

View File

@@ -0,0 +1,8 @@
{{ config(materialized=var('materialized')) }}
select '{{ var("materialized") }}' as materialization
{% if var('materialized') == 'incremental' and is_incremental() %}
where 'abc' != (select max(materialization) from {{ this }})
{% endif %}

View File

@@ -0,0 +1,44 @@
from tests.integration.base import DBTIntegrationTest, use_profile
class TestChangingRelationType(DBTIntegrationTest):
@property
def schema(self):
return "changing_relation_type"
@staticmethod
def dir(path):
return path.lstrip("/")
@property
def models(self):
return self.dir("models")
def swap_types_and_test(self):
# test that dbt is able to do intelligent things when changing
# between materializations that create tables and views.
results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: table'])
self.assertEqual(results[0].node.config.materialized, 'table')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: incremental'])
self.assertEqual(results[0].node.config.materialized, 'incremental')
self.assertEqual(len(results), 1)
results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEqual(results[0].node.config.materialized, 'view')
self.assertEqual(len(results), 1)
@use_profile("redshift")
def test__redshift__switch_materialization(self):
self.swap_types_and_test()

View File

@@ -0,0 +1 @@
select 1 as {{ adapter.quote("2id") }}

View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: quote_model
description: "model to test column quotes and comments"
columns:
- name: 2id
description: "XXX My description"
quote: true

View File

@@ -0,0 +1,43 @@
from tests.integration.base import DBTIntegrationTest, use_profile
import json
class TestColumnComment(DBTIntegrationTest):
@property
def schema(self):
return "column_comment"
@property
def models(self):
return "models"
@property
def project_config(self):
return {
'config-version': 2,
'models': {
'test': {
'materialized': 'table',
'+persist_docs': {
"relation": True,
"columns": True,
},
}
}
}
def run_has_comments(self):
self.run_dbt()
self.run_dbt(['docs', 'generate'])
with open('target/catalog.json') as fp:
catalog_data = json.load(fp)
assert 'nodes' in catalog_data
assert len(catalog_data['nodes']) == 1
column_node = catalog_data['nodes']['model.test.quote_model']
column_comment = column_node['columns']['2id']['comment']
assert column_comment.startswith('XXX')
@use_profile('redshift')
def test_redshift_comments(self):
self.run_has_comments()

View File

@@ -0,0 +1,4 @@
col_A,col_B
1,2
3,4
5,6
1 col_A col_B
2 1 2
3 3 4
4 5 6

View File

@@ -0,0 +1,19 @@
{% set col_a = '"col_a"' %}
{% set col_b = '"col_b"' %}
{% if adapter.type() == 'bigquery' %}
{% set col_a = '`col_a`' %}
{% set col_b = '`col_b`' %}
{% elif adapter.type() == 'snowflake' %}
{% set col_a = '"COL_A"' %}
{% set col_b = '"COL_B"' %}
{% endif %}
{{config(
materialized = 'incremental',
unique_key = col_a,
incremental_strategy = var('strategy')
)}}
select
{{ col_a }}, {{ col_b }}
from {{ref('seed')}}

View File

@@ -0,0 +1,16 @@
{% set col_a = '"col_A"' %}
{% set col_b = '"col_B"' %}
{% if adapter.type() == 'bigquery' %}
{% set col_a = '`col_A`' %}
{% set col_b = '`col_B`' %}
{% endif %}
{{config(
materialized = 'incremental',
unique_key = col_a,
incremental_strategy = var('strategy')
)}}
select
{{ col_a }}, {{ col_b }}
from {{ref('seed')}}

View File

@@ -0,0 +1,78 @@
from tests.integration.base import DBTIntegrationTest, use_profile
import os
class BaseColumnQuotingTest(DBTIntegrationTest):
def column_quoting(self):
raise NotImplementedError('column_quoting not implemented')
@property
def schema(self):
return 'dbt_column_quoting'
@staticmethod
def dir(value):
return os.path.normpath(value)
def _run_columnn_quotes(self, strategy='delete+insert'):
strategy_vars = '{{"strategy": "{}"}}'.format(strategy)
self.run_dbt(['seed', '--vars', strategy_vars])
self.run_dbt(['run', '--vars', strategy_vars])
self.run_dbt(['run', '--vars', strategy_vars])
class TestColumnQuotingDefault(BaseColumnQuotingTest):
@property
def project_config(self):
return {
'config-version': 2
}
@property
def models(self):
return self.dir('models-unquoted')
def run_dbt(self, *args, **kwargs):
return super().run_dbt(*args, **kwargs)
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()
class TestColumnQuotingDisabled(BaseColumnQuotingTest):
@property
def models(self):
return self.dir('models-unquoted')
@property
def project_config(self):
return {
'config-version': 2,
'seeds': {
'quote_columns': False,
},
}
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()
class TestColumnQuotingEnabled(BaseColumnQuotingTest):
@property
def models(self):
return self.dir('models')
@property
def project_config(self):
return {
'config-version': 2,
'seeds': {
'quote_columns': True,
},
}
@use_profile('redshift')
def test_redshift_column_quotes(self):
self._run_columnn_quotes()

View File

@@ -0,0 +1,5 @@
select
CAST(1 as int64) as int64_col,
CAST(2.0 as float64) as float64_col,
CAST(3.0 as numeric) as numeric_col,
CAST('3' as string) as string_col,

View File

@@ -0,0 +1,10 @@
version: 2
models:
- name: model
tests:
- is_type:
column_map:
int64_col: ['integer', 'number']
float64_col: ['float', 'number']
numeric_col: ['numeric', 'number']
string_col: ['string', 'not number']

View File

@@ -0,0 +1,10 @@
version: 2
models:
- name: model
tests:
- is_type:
column_map:
int64_col: ['string', 'not number']
float64_col: ['float', 'number']
numeric_col: ['numeric', 'number']
string_col: ['string', 'not number']

View File

@@ -0,0 +1,6 @@
{{ config(materialized='table') }}
select
CAST(1 as int64) as int64_col,
CAST(2.0 as float64) as float64_col,
CAST(3.0 as numeric) as numeric_col,
CAST('3' as string) as string_col,

View File

@@ -0,0 +1,5 @@
-- Macro to alter a column type
{% macro test_alter_column_type(model_name, column_name, new_column_type) %}
{% set relation = ref(model_name) %}
{{ alter_column_type(relation, column_name, new_column_type) }}
{% endmacro %}

View File

@@ -0,0 +1,72 @@
{% macro simple_type_check_column(column, check) %}
{% if check == 'string' %}
{{ return(column.is_string()) }}
{% elif check == 'float' %}
{{ return(column.is_float()) }}
{% elif check == 'number' %}
{{ return(column.is_number()) }}
{% elif check == 'numeric' %}
{{ return(column.is_numeric()) }}
{% elif check == 'integer' %}
{{ return(column.is_integer()) }}
{% else %}
{% do exceptions.raise_compiler_error('invalid type check value: ' ~ check) %}
{% endif %}
{% endmacro %}
{% macro type_check_column(column, type_checks) %}
{% set failures = [] %}
{% for type_check in type_checks %}
{% if type_check.startswith('not ') %}
{% if simple_type_check_column(column, type_check[4:]) %}
{% do log('simple_type_check_column got ', True) %}
{% do failures.append(type_check) %}
{% endif %}
{% else %}
{% if not simple_type_check_column(column, type_check) %}
{% do failures.append(type_check) %}
{% endif %}
{% endif %}
{% endfor %}
{% if (failures | length) > 0 %}
{% do log('column ' ~ column.name ~ ' had failures: ' ~ failures, info=True) %}
{% endif %}
{% do return((failures | length) == 0) %}
{% endmacro %}
{% test is_type(model, column_map) %}
{% if not execute %}
{{ return(None) }}
{% endif %}
{% if not column_map %}
{% do exceptions.raise_compiler_error('test_is_type must have a column name') %}
{% endif %}
{% set columns = adapter.get_columns_in_relation(model) %}
{% if (column_map | length) != (columns | length) %}
{% set column_map_keys = (column_map | list | string) %}
{% set column_names = (columns | map(attribute='name') | list | string) %}
{% do exceptions.raise_compiler_error('did not get all the columns/all columns not specified:\n' ~ column_map_keys ~ '\nvs\n' ~ column_names) %}
{% endif %}
{% set bad_columns = [] %}
{% for column in columns %}
{% set column_key = (column.name | lower) %}
{% if column_key in column_map %}
{% set type_checks = column_map[column_key] %}
{% if not type_checks %}
{% do exceptions.raise_compiler_error('no type checks?') %}
{% endif %}
{% if not type_check_column(column, type_checks) %}
{% do bad_columns.append(column.name) %}
{% endif %}
{% else %}
{% do exceptions.raise_compiler_error('column key ' ~ column_key ~ ' not found in ' ~ (column_map | list | string)) %}
{% endif %}
{% endfor %}
{% do log('bad columns: ' ~ bad_columns, info=True) %}
{% for bad_column in bad_columns %}
select '{{ bad_column }}' as bad_column
{{ 'union all' if not loop.last }}
{% endfor %}
select * from (select 1 limit 0) as nothing
{% endtest %}

View File

@@ -0,0 +1,9 @@
select
1::smallint as smallint_col,
2::integer as int_col,
3::bigint as bigint_col,
4.0::real as real_col,
5.0::double precision as double_col,
6.0::numeric as numeric_col,
'7'::text as text_col,
'8'::varchar(20) as varchar_col

View File

@@ -0,0 +1,14 @@
version: 2
models:
- name: model
tests:
- is_type:
column_map:
smallint_col: ['integer', 'number']
int_col: ['integer', 'number']
bigint_col: ['integer', 'number']
real_col: ['float', 'number']
double_col: ['float', 'number']
numeric_col: ['numeric', 'number']
text_col: ['string', 'not number']
varchar_col: ['string', 'not number']

View File

@@ -0,0 +1,17 @@
select
1::smallint as smallint_col,
2::int as int_col,
3::bigint as bigint_col,
4::int2 as int2_col,
5::int4 as int4_col,
6::int8 as int8_col,
7::integer as integer_col,
8.0::real as real_col,
9.0::float4 as float4_col,
10.0::float8 as float8_col,
11.0::float as float_col,
12.0::double precision as double_col,
13.0::numeric as numeric_col,
14.0::decimal as decimal_col,
'15'::varchar(20) as varchar_col,
'16'::text as text_col

View File

@@ -0,0 +1,22 @@
version: 2
models:
- name: model
tests:
- is_type:
column_map:
smallint_col: ['integer', 'number']
int_col: ['integer', 'number']
bigint_col: ['integer', 'number']
int2_col: ['integer', 'number']
int4_col: ['integer', 'number']
int8_col: ['integer', 'number']
integer_col: ['integer', 'number']
real_col: ['float', 'number']
double_col: ['float', 'number']
float4_col: ['float', 'number']
float8_col: ['float', 'number']
float_col: ['float', 'number']
numeric_col: ['numeric', 'number']
decimal_col: ['numeric', 'number']
varchar_col: ['string', 'not number']
text_col: ['string', 'not number']

View File

@@ -0,0 +1,18 @@
select
1::smallint as smallint_col,
2::int as int_col,
3::bigint as bigint_col,
4::integer as integer_col,
5::tinyint as tinyint_col,
6::byteint as byteint_col,
7.0::float as float_col,
8.0::float4 as float4_col,
9.0::float8 as float8_col,
10.0::double as double_col,
11.0::double precision as double_p_col,
12.0::real as real_col,
13.0::numeric as numeric_col,
14.0::decimal as decimal_col,
15.0::number as number_col,
'16'::text as text_col,
'17'::varchar(20) as varchar_col

View File

@@ -0,0 +1,23 @@
version: 2
models:
- name: model
tests:
- is_type:
column_map:
smallint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
int_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
bigint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
integer_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
tinyint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
byteint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
float_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
float4_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
float8_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
double_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
double_p_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
real_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
numeric_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
decimal_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
number_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
text_col: ['string', 'not number']
varchar_col: ['string', 'not number']

View File

@@ -0,0 +1,21 @@
from tests.integration.base import DBTIntegrationTest, use_profile
class TestColumnTypes(DBTIntegrationTest):
@property
def schema(self):
return 'column_types'
def run_and_test(self):
self.assertEqual(len(self.run_dbt(['run'])), 1)
self.assertEqual(len(self.run_dbt(['test'])), 1)
class TestRedshiftColumnTypes(TestColumnTypes):
@property
def models(self):
return 'rs_models'
@use_profile('redshift')
def test_redshift_column_types(self):
self.run_and_test()

View File

@@ -1,4 +1,4 @@
from test.integration.base import DBTIntegrationTest, use_profile
from tests.integration.base import DBTIntegrationTest, use_profile
import threading
from dbt.adapters.factory import FACTORY
@@ -28,7 +28,7 @@ class BaseTestConcurrentTransaction(DBTIntegrationTest):
@property
def schema(self):
return "concurrent_transaction_032"
return "concurrent_transaction"
@property
def project_config(self):

View File

@@ -0,0 +1,2 @@
{{ config(materialized='ephemeral') }}
select * from {{ ref('view_model') }}

View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique
- not_null
- name: name

View File

@@ -0,0 +1,5 @@
{{ config(materialized='table') }}
select * from {{ ref('ephemeral_model') }}
-- establish a macro dependency to trigger state:modified.macros
-- depends on: {{ my_macro() }}

View File

@@ -0,0 +1 @@
select * from no.such.table

View File

@@ -0,0 +1,2 @@
{{ config(materialized='ephemeral') }}
select * from no.such.table

View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique
- not_null
- name: name

View File

@@ -0,0 +1,5 @@
{{ config(materialized='table') }}
select * from {{ ref('ephemeral_model') }}
-- establish a macro dependency to trigger state:modified.macros
-- depends on: {{ my_macro() }}

View File

@@ -0,0 +1 @@
select * from no.such.table

View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique
- not_null
- name: name

View File

@@ -0,0 +1,2 @@
{{ config(materialized='table') }}
select 1 as fun

View File

@@ -0,0 +1 @@
select * from {{ ref('seed') }}

View File

@@ -0,0 +1,3 @@
id,name
1,Alice
2,Bob
1 id name
2 1 Alice
3 2 Bob

View File

@@ -0,0 +1,3 @@
{% macro my_macro() %}
{% do log('in a macro' ) %}
{% endmacro %}

View File

@@ -0,0 +1,2 @@
{{ config(materialized='ephemeral') }}
select * from {{ ref('view_model') }}

View File

@@ -0,0 +1,8 @@
version: 2
exposures:
- name: my_exposure
type: application
depends_on:
- ref('view_model')
owner:
email: test@example.com

View File

@@ -0,0 +1,9 @@
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique
- not_null
- name: name

View File

@@ -0,0 +1,5 @@
{{ config(materialized='table') }}
select * from {{ ref('ephemeral_model') }}
-- establish a macro dependency to trigger state:modified.macros
-- depends on: {{ my_macro() }}

View File

@@ -0,0 +1 @@
select * from {{ ref('seed') }}

View File

@@ -0,0 +1,14 @@
{% snapshot my_cool_snapshot %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='check',
check_cols=['id'],
)
}}
select * from {{ ref('view_model') }}
{% endsnapshot %}

View File

@@ -0,0 +1,153 @@
from tests.integration.base import DBTIntegrationTest, use_profile
import copy
import json
import os
import shutil
class TestDeferState(DBTIntegrationTest):
@property
def schema(self):
return "defer_state"
@property
def models(self):
return "models"
def setUp(self):
self.other_schema = None
super().setUp()
self._created_schemas.add(self.other_schema)
@property
def project_config(self):
return {
'config-version': 2,
'seeds': {
'test': {
'quote_columns': False,
}
}
}
def get_profile(self, adapter_type):
if self.other_schema is None:
self.other_schema = self.unique_schema() + '_other'
if self.adapter_type == 'snowflake':
self.other_schema = self.other_schema.upper()
profile = super().get_profile(adapter_type)
default_name = profile['test']['target']
profile['test']['outputs']['otherschema'] = copy.deepcopy(profile['test']['outputs'][default_name])
profile['test']['outputs']['otherschema']['schema'] = self.other_schema
return profile
def copy_state(self):
assert not os.path.exists('state')
os.makedirs('state')
shutil.copyfile('target/manifest.json', 'state/manifest.json')
def run_and_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['run'])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['test'])
assert len(results) == 2
# copy files over from the happy times when we had a good target
self.copy_state()
# test tests first, because run will change things
# no state, wrong schema, failure.
self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False)
# no state, run also fails
self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False)
# defer test, it succeeds
results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--target', 'otherschema'])
# with state it should work though
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
with open('target/manifest.json') as fp:
data = json.load(fp)
assert data['nodes']['seed.test.seed']['deferred']
assert len(results) == 1
def run_switchdirs_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2
# copy files over from the happy times when we had a good target
self.copy_state()
self.use_default_project({'source-paths': ['changed_models']})
# the sql here is just wrong, so it should fail
self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=False,
)
# but this should work since we just use the old happy model
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=True,
)
self.use_default_project({'source-paths': ['changed_models_bad']})
# this should fail because the table model refs a broken ephemeral
# model, which it should see
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=False,
)
def run_defer_iff_not_exists(self):
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
results = self.run_dbt(['run', '--target', 'otherschema'])
assert len(results) == 2
# copy files over from the happy times when we had a good target
self.copy_state()
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run', '--state', 'state', '--defer'])
assert len(results) == 2
# because the seed now exists in our schema, we shouldn't defer it
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
def run_defer_deleted_upstream(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2
# copy files over from the happy times when we had a good target
self.copy_state()
self.use_default_project({'source-paths': ['changed_models_missing']})
# ephemeral_model is now gone. previously this caused a
# keyerror (dbt#2875), now it should pass
self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'],
expect_pass=True,
)
# despite deferral, test should use models just created in our schema
results = self.run_dbt(['test', '--state', 'state', '--defer'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
@use_profile('redshift')
def test_redshift_state_changetarget(self):
self.run_and_defer()

View File

@@ -0,0 +1,17 @@
{% macro some_macro(arg1, arg2) -%}
{{ adapter_macro('some_macro', arg1, arg2) }}
{%- endmacro %}
{% macro default__some_macro(arg1, arg2) %}
{% do exceptions.raise_compiler_error('not allowed') %}
{% endmacro %}
{% macro postgres__some_macro(arg1, arg2) -%}
{{ arg1 }}{{ arg2 }}
{%- endmacro %}
{% macro some_other_macro(arg1, arg2) -%}
{{ adapter_macro('test.some_macro', arg1, arg2) }}
{%- endmacro %}

View File

@@ -0,0 +1,4 @@
{% if some_other_macro('foo', 'bar') != 'foobar' %}
{% do exceptions.raise_compiler_error('invalid foobar') %}
{% endif %}
select 1 as id

View File

@@ -0,0 +1,4 @@
{% if some_macro('foo', 'bar') != 'foobar' %}
{% do exceptions.raise_compiler_error('invalid foobar') %}
{% endif %}
select 1 as id

View File

@@ -0,0 +1 @@
select 1 as id

View File

@@ -0,0 +1,65 @@
{%- materialization custom, default -%}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database,
type='view') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, database=database, type='view') -%}
/*
This relation (probably) doesn't exist yet. If it does exist, it's a leftover from
a previous run, and we're going to try to drop it immediately. At the end of this
materialization, we're going to rename the "old_relation" to this identifier,
and then we're going to drop it. In order to make sure we run the correct one of:
- drop view ...
- drop table ...
We need to set the type of this relation to be the type of the old_relation, if it exists,
or else "view" as a sane default if it does not. Note that if the old_relation does not
exist, then there is nothing to move out of the way and subsequentally drop. In that case,
this relation will be effectively unused.
*/
{%- set backup_relation_type = 'view' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, database=database,
type=backup_relation_type) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ adapter.drop_relation(backup_relation) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
-- build model
{% call statement('main') -%}
{{ create_view_as(intermediate_relation, sql) }}
{%- endcall %}
-- cleanup
-- move the existing view out of the way
{% if old_relation is not none %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{# do not return anything! #}
{# {{ return({'relations': [target_relation]}) }} #}
{%- endmaterialization -%}

View File

@@ -0,0 +1,2 @@
{{ config(materialized='custom') }}
select 1 as id

View File

@@ -0,0 +1,5 @@
id,dupe
1,a
2,a
3,a
4,a
1 id dupe
2 1 a
3 2 a
4 3 a
5 4 a

View File

@@ -0,0 +1,3 @@
a,b
1,hello
2,goodbye
1 a b
2 1 hello
3 2 goodbye

View File

@@ -0,0 +1,13 @@
{% macro string_literal(s) -%}
{{ adapter.dispatch('string_literal', packages=['test'])(s) }}
{%- endmacro %}
{% macro default__string_literal(s) %}
'{{ s }}'::text
{% endmacro %}
{% macro bigquery__string_literal(s) %}
cast('{{ s }}' as string)
{% endmacro %}

View File

@@ -0,0 +1,10 @@
-- cross-db compatible test, similar to accepted_values
{% test expect_value(model, field, value) %}
select *
from {{ model }}
where {{ field }} != '{{ value }}'
{% endtest %}

View File

@@ -0,0 +1,2 @@
select {{ string_literal(this.name) }} as tablename

View File

@@ -0,0 +1,4 @@
{{ config(alias='override_alias') }}
select {{ string_literal(this.name) }} as tablename

View File

@@ -0,0 +1,9 @@
{{
config(
alias='foo',
materialized='table'
)
}}
select {{ string_literal(this.name) }} as tablename

View File

@@ -0,0 +1,16 @@
{{
config(
materialized='table'
)
}}
with trigger_ref as (
-- we should still be able to ref a model by its filepath
select * from {{ ref('foo_alias') }}
)
-- this name should still be the filename
select {{ string_literal(this.name) }} as tablename

View File

@@ -0,0 +1,22 @@
version: 2
models:
- name: foo_alias
tests:
- expect_value:
field: tablename
value: foo
- name: ref_foo_alias
tests:
- expect_value:
field: tablename
value: ref_foo_alias
- name: alias_in_project
tests:
- expect_value:
field: tablename
value: project_alias
- name: alias_in_project_with_override
tests:
- expect_value:
field: tablename
value: override_alias

View File

@@ -0,0 +1,4 @@
version: 2
models:
- name: seed
description: my cool seed

View File

@@ -0,0 +1,5 @@
select 1 as id
{% if adapter.already_exists(this.schema, this.identifier) and not should_full_refresh() %}
where id > (select max(id) from {{this}})
{% endif %}

View File

@@ -0,0 +1,60 @@
from tests.integration.base import DBTIntegrationTest, use_profile
from dbt import deprecations
class BaseTestDeprecations(DBTIntegrationTest):
def setUp(self):
super().setUp()
deprecations.reset_deprecations()
@property
def schema(self):
return "deprecation_test"
@staticmethod
def dir(path):
return path.lstrip("/")
class TestAdapterMacroDeprecation(BaseTestDeprecations):
@property
def models(self):
return self.dir('adapter-macro-models')
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': [self.dir('adapter-macro-macros')]
}
@use_profile('redshift')
def test_redshift_adapter_macro(self):
self.assertEqual(deprecations.active_deprecations, set())
# pick up the postgres macro
self.run_dbt()
expected = {'adapter-macro'}
self.assertEqual(expected, deprecations.active_deprecations)
class TestAdapterMacroDeprecationPackages(BaseTestDeprecations):
@property
def models(self):
return self.dir('adapter-macro-models-package')
@property
def project_config(self):
return {
'config-version': 2,
'macro-paths': [self.dir('adapter-macro-macros')]
}
@use_profile('redshift')
def test_redshift_adapter_macro_pkg(self):
self.assertEqual(deprecations.active_deprecations, set())
# pick up the postgres macro
self.assertEqual(deprecations.active_deprecations, set())
self.run_dbt()
expected = {'adapter-macro'}
self.assertEqual(expected, deprecations.active_deprecations)

View File

@@ -0,0 +1 @@
Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

View File

@@ -0,0 +1,9 @@
{{
config(
materialized='table',
partition_by={'field': 'updated_at', 'data_type': 'date'},
cluster_by=['first_name']
)
}}
select id,first_name,email,ip_address,DATE(updated_at) as updated_at from {{ ref('seed') }}

View File

@@ -0,0 +1,9 @@
{{
config(
materialized='table',
partition_by={'field': 'updated_at', 'data_type': 'date'},
cluster_by=['first_name','email']
)
}}
select id,first_name,email,ip_address,DATE(updated_at) as updated_at from {{ ref('seed') }}

View File

@@ -0,0 +1,15 @@
{{
config(
materialized='table'
)
}}
select
1 as field_1,
2 as field_2,
3 as field_3,
struct(
5 as field_5,
6 as field_6
) as nested_field

View File

@@ -0,0 +1,7 @@
{{
config(
materialized='view'
)
}}
select * from {{ ref('nested_table') }}

View File

@@ -0,0 +1,44 @@
version: 2
models:
- name: nested_view
description: "The test model"
columns:
- name: field_1
description: The first field
- name: field_2
description: The second field
- name: field_3
description: The third field
- name: nested_field.field_4
description: The first nested field
- name: nested_field.field_5
description: The second nested field
- name: clustered
description: "A clustered and partitioned copy of the test model"
columns:
- name: id
description: The user id
- name: first_name
description: The user's name
- name: email
description: The user's email
- name: ip_address
description: The user's IP address
- name: updated_at
description: When the user was updated
- name: multi_clustered
description: "A clustered and partitioned copy of the test model, clustered on multiple columns"
columns:
- name: id
description: The user id
- name: first_name
description: The user's name
- name: email
description: The user's email
- name: ip_address
description: The user's IP address
- name: updated_at
description: When the user was updated

View File

@@ -0,0 +1,2 @@
{{ config(disabled=true, schema='notrealnotreal') }}
select 1 as id

View File

@@ -0,0 +1,2 @@
{{ config(schema=var('extra_schema')) }}
select 1 as id

View File

@@ -0,0 +1,8 @@
{{
config(
materialized='view',
database=var('alternate_db')
)
}}
select * from {{ ref('seed') }}

View File

@@ -0,0 +1,21 @@
version: 2
models:
- name: model
description: "The test model"
columns:
- name: id
description: The user ID number
tests:
- unique
- not_null
- name: first_name
description: The user's first name
- name: email
description: The user's email
- name: ip_address
description: The user's IP address
- name: updated_at
description: The last time this user's email was updated
tests:
- test.nothing

View File

@@ -0,0 +1,3 @@
{% macro get_catalog(information_schema, schemas) %}
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
{% endmacro %}

View File

@@ -0,0 +1,9 @@
{% test nothing(model) %}
-- a silly test to make sure that table-level tests show up in the manifest
-- without a column_name field
select 0
{% endtest %}

View File

@@ -0,0 +1,8 @@
{% docs macro_info %}
My custom test that I wrote that does nothing
{% enddocs %}
{% docs macro_arg_info %}
The model for my custom test
{% enddocs %}

View File

@@ -0,0 +1,10 @@
version: 2
macros:
- name: test_nothing
description: "{{ doc('macro_info') }}"
meta:
some_key: 100
arguments:
- name: model
type: Relation
description: "{{ doc('macro_arg_info') }}"

View File

@@ -0,0 +1,8 @@
{{
config(
materialized='view',
database=var('alternate_db')
)
}}
select * from {{ ref('seed') }}

View File

@@ -0,0 +1 @@
This is a readme.md file with {{ invalid-ish jinja }} in it

View File

@@ -0,0 +1,82 @@
version: 2
models:
- name: model
description: "The test model"
docs:
show: false
columns:
- name: id
description: The user ID number
tests:
- unique
- not_null
- name: first_name
description: The user's first name
- name: email
description: The user's email
- name: ip_address
description: The user's IP address
- name: updated_at
description: The last time this user's email was updated
tests:
- test.nothing
- name: second_model
description: "The second test model"
docs:
show: false
columns:
- name: id
description: The user ID number
- name: first_name
description: The user's first name
- name: email
description: The user's email
- name: ip_address
description: The user's IP address
- name: updated_at
description: The last time this user's email was updated
sources:
- name: my_source
description: "My source"
loader: a_loader
schema: "{{ var('test_schema') }}"
tables:
- name: my_table
description: "My table"
identifier: seed
quoting:
identifier: True
columns:
- name: id
description: "An ID field"
exposures:
- name: simple_exposure
type: dashboard
depends_on:
- ref('model')
- source('my_source', 'my_table')
owner:
email: something@example.com
- name: notebook_exposure
type: notebook
depends_on:
- ref('model')
- ref('second_model')
owner:
email: something@example.com
name: Some name
description: >
A description of the complex exposure
maturity: medium
meta:
tool: 'my_tool'
languages:
- python
tags: ['my_department']
url: http://example.com/notebook/1

View File

@@ -0,0 +1,13 @@
{%- if adapter.type() == 'snowflake' -%}
{%- set schema_suffix = 'TEST' -%}
{%- else -%}
{%- set schema_suffix = 'test' -%}
{%- endif -%}
{{
config(
materialized='view',
schema=schema_suffix,
)
}}
select * from {{ ref('seed') }}

View File

@@ -0,0 +1,31 @@
{% docs ephemeral_summary %}
A summmary table of the ephemeral copy of the seed data
{% enddocs %}
{% docs summary_first_name %}
The first name being summarized
{% enddocs %}
{% docs summary_count %}
The number of instances of the first name
{% enddocs %}
{% docs view_summary %}
A view of the summary of the ephemeral copy of the seed data
{% enddocs %}
{% docs source_info %}
My source
{% enddocs %}
{% docs table_info %}
My table
{% enddocs %}
{% docs column_info %}
An ID field
{% enddocs %}
{% docs notebook_info %}
A description of the complex exposure
{% enddocs %}

Some files were not shown because too many files have changed in this diff Show More