mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
8 Commits
update_sql
...
kwigley/pu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ceed5ce55 | ||
|
|
b145e0524c | ||
|
|
b610b58d2e | ||
|
|
1ba9f89c54 | ||
|
|
c0ae76690d | ||
|
|
08b762cbf2 | ||
|
|
052b14cf79 | ||
|
|
2d938e511d |
@@ -34,16 +34,12 @@ first_value = 1
|
||||
|
||||
[bumpversion:file:plugins/postgres/setup.py]
|
||||
|
||||
[bumpversion:file:plugins/redshift/setup.py]
|
||||
|
||||
[bumpversion:file:plugins/snowflake/setup.py]
|
||||
|
||||
[bumpversion:file:plugins/bigquery/setup.py]
|
||||
|
||||
[bumpversion:file:plugins/postgres/dbt/adapters/postgres/__version__.py]
|
||||
|
||||
[bumpversion:file:plugins/redshift/dbt/adapters/redshift/__version__.py]
|
||||
|
||||
[bumpversion:file:plugins/snowflake/dbt/adapters/snowflake/__version__.py]
|
||||
|
||||
[bumpversion:file:plugins/bigquery/dbt/adapters/bigquery/__version__.py]
|
||||
|
||||
5
.github/dependabot.yml
vendored
5
.github/dependabot.yml
vendored
@@ -21,11 +21,6 @@ updates:
|
||||
schedule:
|
||||
interval: "daily"
|
||||
rebase-strategy: "disabled"
|
||||
- package-ecosystem: "pip"
|
||||
directory: "/plugins/redshift"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
rebase-strategy: "disabled"
|
||||
- package-ecosystem: "pip"
|
||||
directory: "/plugins/snowflake"
|
||||
schedule:
|
||||
|
||||
14
.github/workflows/integration.yml
vendored
14
.github/workflows/integration.yml
vendored
@@ -97,10 +97,6 @@ jobs:
|
||||
bigquery:
|
||||
- 'core/**'
|
||||
- 'plugins/bigquery/**'
|
||||
redshift:
|
||||
- 'core/**'
|
||||
- 'plugins/redshift/**'
|
||||
- 'plugins/postgres/**'
|
||||
|
||||
- name: Generate integration test matrix
|
||||
id: generate-matrix
|
||||
@@ -191,16 +187,6 @@ jobs:
|
||||
if: matrix.adapter == 'postgres'
|
||||
run: tox
|
||||
|
||||
- name: Run tox (redshift)
|
||||
if: matrix.adapter == 'redshift'
|
||||
env:
|
||||
REDSHIFT_TEST_DBNAME: ${{ secrets.REDSHIFT_TEST_DBNAME }}
|
||||
REDSHIFT_TEST_PASS: ${{ secrets.REDSHIFT_TEST_PASS }}
|
||||
REDSHIFT_TEST_USER: ${{ secrets.REDSHIFT_TEST_USER }}
|
||||
REDSHIFT_TEST_PORT: ${{ secrets.REDSHIFT_TEST_PORT }}
|
||||
REDSHIFT_TEST_HOST: ${{ secrets.REDSHIFT_TEST_HOST }}
|
||||
run: tox
|
||||
|
||||
- name: Run tox (snowflake)
|
||||
if: matrix.adapter == 'snowflake'
|
||||
env:
|
||||
|
||||
0
plugins/redshift/tests/__init__.py
Normal file
0
plugins/redshift/tests/__init__.py
Normal file
0
plugins/redshift/tests/integration/__init__.py
Normal file
0
plugins/redshift/tests/integration/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
-- Macro to rename a relation
|
||||
{% macro rename_named_relation(from_name, to_name) %}
|
||||
{%- set from_relation = api.Relation.create(database=target.database, schema=target.schema, identifier=from_name, type='table') -%}
|
||||
{%- set to_relation = api.Relation.create(database=target.database, schema=target.schema, identifier=to_name, type='table') -%}
|
||||
{% do adapter.rename_relation(from_relation, to_relation) %}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,3 @@
|
||||
-- make sure this runs after 'model'
|
||||
-- {{ ref('model') }}
|
||||
select 2 as id
|
||||
@@ -0,0 +1,19 @@
|
||||
|
||||
{% set upstream = ref('upstream') %}
|
||||
|
||||
{% if execute %}
|
||||
{# don't ever do any of this #}
|
||||
{%- do adapter.drop_schema(upstream) -%}
|
||||
{% set existing = adapter.get_relation(upstream.database, upstream.schema, upstream.identifier) %}
|
||||
{% if existing is not none %}
|
||||
{% do exceptions.raise_compiler_error('expected ' ~ ' to not exist, but it did') %}
|
||||
{% endif %}
|
||||
|
||||
{%- do adapter.create_schema(upstream) -%}
|
||||
|
||||
{% set sql = create_view_as(upstream, 'select 2 as id') %}
|
||||
{% do run_query(sql) %}
|
||||
{% endif %}
|
||||
|
||||
|
||||
select * from {{ upstream }}
|
||||
@@ -0,0 +1 @@
|
||||
select 1 as id
|
||||
@@ -0,0 +1,24 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
|
||||
|
||||
class TestBaseCaching(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "caching"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'test-paths': ['tests']
|
||||
}
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_adapter_methods(self):
|
||||
self.run_dbt(['compile']) # trigger any compile-time issues
|
||||
self.run_dbt()
|
||||
self.assertTablesEqual('model', 'expected')
|
||||
@@ -0,0 +1,7 @@
|
||||
{% set columns = adapter.get_columns_in_relation(ref('model')) %}
|
||||
{% set limit_query = 0 %}
|
||||
{% if (columns | length) == 0 %}
|
||||
{% set limit_query = 1 %}
|
||||
{% endif %}
|
||||
|
||||
select 1 as id limit {{ limit_query }}
|
||||
946
plugins/redshift/tests/integration/base.py
Normal file
946
plugins/redshift/tests/integration/base.py
Normal file
@@ -0,0 +1,946 @@
|
||||
import os
|
||||
import io
|
||||
import random
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
from unittest.mock import patch
|
||||
|
||||
import dbt.main as dbt
|
||||
from dbt import flags
|
||||
from dbt.deprecations import reset_deprecations
|
||||
from dbt.adapters.factory import get_adapter, reset_adapters, register_adapter
|
||||
from dbt.clients.jinja import template_cache
|
||||
from dbt.config import RuntimeConfig
|
||||
from dbt.context import providers
|
||||
from dbt.logger import GLOBAL_LOGGER as logger, log_manager
|
||||
from dbt.contracts.graph.manifest import Manifest
|
||||
|
||||
|
||||
INITIAL_ROOT = os.getcwd()
|
||||
|
||||
|
||||
def normalize(path):
|
||||
"""On windows, neither is enough on its own:
|
||||
|
||||
>>> normcase('C:\\documents/ALL CAPS/subdir\\..')
|
||||
'c:\\documents\\all caps\\subdir\\..'
|
||||
>>> normpath('C:\\documents/ALL CAPS/subdir\\..')
|
||||
'C:\\documents\\ALL CAPS'
|
||||
>>> normpath(normcase('C:\\documents/ALL CAPS/subdir\\..'))
|
||||
'c:\\documents\\all caps'
|
||||
"""
|
||||
return os.path.normcase(os.path.normpath(path))
|
||||
|
||||
|
||||
class Normalized:
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __repr__(self):
|
||||
return f'Normalized({self.value!r})'
|
||||
|
||||
def __str__(self):
|
||||
return f'Normalized({self.value!s})'
|
||||
|
||||
def __eq__(self, other):
|
||||
return normalize(self.value) == normalize(other)
|
||||
|
||||
|
||||
class FakeArgs:
|
||||
def __init__(self):
|
||||
self.threads = 1
|
||||
self.data = False
|
||||
self.defer = False
|
||||
self.schema = True
|
||||
self.full_refresh = False
|
||||
self.models = None
|
||||
self.select = None
|
||||
self.exclude = None
|
||||
self.single_threaded = False
|
||||
self.selector_name = None
|
||||
self.state = None
|
||||
self.defer = None
|
||||
|
||||
|
||||
class TestArgs:
|
||||
def __init__(self, kwargs):
|
||||
self.which = 'run'
|
||||
self.single_threaded = False
|
||||
self.profiles_dir = None
|
||||
self.project_dir = None
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
|
||||
def _profile_from_test_name(test_name):
|
||||
adapter_names = ('redshift',)
|
||||
adapters_in_name = sum(x in test_name for x in adapter_names)
|
||||
if adapters_in_name != 1:
|
||||
raise ValueError(
|
||||
'test names must have exactly 1 profile choice embedded, {} has {}'
|
||||
.format(test_name, adapters_in_name)
|
||||
)
|
||||
|
||||
for adapter_name in adapter_names:
|
||||
if adapter_name in test_name:
|
||||
return adapter_name
|
||||
|
||||
raise ValueError(
|
||||
'could not find adapter name in test name {}'.format(test_name)
|
||||
)
|
||||
|
||||
|
||||
def _pytest_test_name():
|
||||
return os.environ['PYTEST_CURRENT_TEST'].split()[0]
|
||||
|
||||
|
||||
def _pytest_get_test_root():
|
||||
test_path = _pytest_test_name().split('::')[0]
|
||||
relative_to = INITIAL_ROOT
|
||||
head = os.path.relpath(test_path, relative_to)
|
||||
|
||||
path_parts = []
|
||||
while head:
|
||||
head, tail = os.path.split(head)
|
||||
path_parts.append(tail)
|
||||
path_parts.reverse()
|
||||
# dbt tests are all of the form 'test/integration/XXX_suite_name'
|
||||
target = os.path.join(*path_parts[:5]) # TODO: try to not hard code this
|
||||
return os.path.join(relative_to, target)
|
||||
|
||||
|
||||
def _really_makedirs(path):
|
||||
while not os.path.exists(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except EnvironmentError:
|
||||
raise
|
||||
|
||||
|
||||
class DBTIntegrationTest(unittest.TestCase):
|
||||
CREATE_SCHEMA_STATEMENT = 'CREATE SCHEMA {}'
|
||||
DROP_SCHEMA_STATEMENT = 'DROP SCHEMA IF EXISTS {} CASCADE'
|
||||
|
||||
_randint = random.randint(0, 9999)
|
||||
_runtime_timedelta = (datetime.utcnow() - datetime(1970, 1, 1, 0, 0, 0))
|
||||
_runtime = (
|
||||
(int(_runtime_timedelta.total_seconds() * 1e6)) +
|
||||
_runtime_timedelta.microseconds
|
||||
)
|
||||
|
||||
prefix = f'test{_runtime}{_randint:04}'
|
||||
setup_alternate_db = False
|
||||
|
||||
def redshift_profile(self):
|
||||
return {
|
||||
'config': {
|
||||
'send_anonymous_usage_stats': False
|
||||
},
|
||||
'test': {
|
||||
'outputs': {
|
||||
'default2': {
|
||||
'type': 'redshift',
|
||||
'threads': 1,
|
||||
'host': os.getenv('REDSHIFT_TEST_HOST'),
|
||||
'port': int(os.getenv('REDSHIFT_TEST_PORT')),
|
||||
'user': os.getenv('REDSHIFT_TEST_USER'),
|
||||
'pass': os.getenv('REDSHIFT_TEST_PASS'),
|
||||
'dbname': os.getenv('REDSHIFT_TEST_DBNAME'),
|
||||
'schema': self.unique_schema()
|
||||
}
|
||||
},
|
||||
'target': 'default2'
|
||||
}
|
||||
}
|
||||
|
||||
@property
|
||||
def packages_config(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def selectors_config(self):
|
||||
return None
|
||||
|
||||
def unique_schema(self):
|
||||
schema = self.schema
|
||||
|
||||
to_return = "{}_{}".format(self.prefix, schema)
|
||||
|
||||
return to_return.lower()
|
||||
|
||||
@property
|
||||
def default_database(self):
|
||||
database = self.config.credentials.database
|
||||
return database
|
||||
|
||||
@property
|
||||
def alternative_database(self):
|
||||
return None
|
||||
|
||||
def get_profile(self, adapter_type):
|
||||
if adapter_type == 'redshift':
|
||||
return self.redshift_profile()
|
||||
else:
|
||||
raise ValueError('invalid adapter type {}'.format(adapter_type))
|
||||
|
||||
def _pick_profile(self):
|
||||
test_name = self.id().split('.')[-1]
|
||||
return _profile_from_test_name(test_name)
|
||||
|
||||
def _symlink_test_folders(self):
|
||||
for entry in os.listdir(self.test_original_source_path):
|
||||
src = os.path.join(self.test_original_source_path, entry)
|
||||
tst = os.path.join(self.test_root_dir, entry)
|
||||
if os.path.isdir(src) or src.endswith('.sql'):
|
||||
# symlink all sql files and all directories.
|
||||
os.symlink(src, tst)
|
||||
os.symlink(self._logs_dir, os.path.join(self.test_root_dir, 'logs'))
|
||||
|
||||
@property
|
||||
def test_root_realpath(self):
|
||||
if sys.platform == 'darwin':
|
||||
return os.path.realpath(self.test_root_dir)
|
||||
else:
|
||||
return self.test_root_dir
|
||||
|
||||
def _generate_test_root_dir(self):
|
||||
return normalize(tempfile.mkdtemp(prefix='dbt-int-test-'))
|
||||
|
||||
def setUp(self):
|
||||
self.dbt_core_install_root = os.path.dirname(dbt.__file__)
|
||||
log_manager.reset_handlers()
|
||||
self.initial_dir = INITIAL_ROOT
|
||||
os.chdir(self.initial_dir)
|
||||
# before we go anywhere, collect the initial path info
|
||||
self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix)
|
||||
_really_makedirs(self._logs_dir)
|
||||
self.test_original_source_path = _pytest_get_test_root()
|
||||
self.test_root_dir = self._generate_test_root_dir()
|
||||
|
||||
os.chdir(self.test_root_dir)
|
||||
try:
|
||||
self._symlink_test_folders()
|
||||
except Exception as exc:
|
||||
msg = '\n\t'.join((
|
||||
'Failed to symlink test folders!',
|
||||
'initial_dir={0.initial_dir}',
|
||||
'test_original_source_path={0.test_original_source_path}',
|
||||
'test_root_dir={0.test_root_dir}'
|
||||
)).format(self)
|
||||
logger.exception(msg)
|
||||
|
||||
# if logging isn't set up, I still really want this message.
|
||||
print(msg)
|
||||
traceback.print_exc()
|
||||
|
||||
raise
|
||||
|
||||
self._created_schemas = set()
|
||||
reset_deprecations()
|
||||
template_cache.clear()
|
||||
|
||||
self.use_profile(self._pick_profile())
|
||||
self.use_default_project()
|
||||
self.set_packages()
|
||||
self.set_selectors()
|
||||
self.load_config()
|
||||
|
||||
def use_default_project(self, overrides=None):
|
||||
# create a dbt_project.yml
|
||||
base_project_config = {
|
||||
'name': 'test',
|
||||
'version': '1.0',
|
||||
'config-version': 2,
|
||||
'test-paths': [],
|
||||
'source-paths': [self.models],
|
||||
'profile': 'test',
|
||||
}
|
||||
|
||||
project_config = {}
|
||||
project_config.update(base_project_config)
|
||||
project_config.update(self.project_config)
|
||||
project_config.update(overrides or {})
|
||||
|
||||
with open("dbt_project.yml", 'w') as f:
|
||||
yaml.safe_dump(project_config, f, default_flow_style=True)
|
||||
|
||||
def use_profile(self, adapter_type):
|
||||
self.adapter_type = adapter_type
|
||||
|
||||
profile_config = {}
|
||||
default_profile_config = self.get_profile(adapter_type)
|
||||
|
||||
profile_config.update(default_profile_config)
|
||||
profile_config.update(self.profile_config)
|
||||
|
||||
if not os.path.exists(self.test_root_dir):
|
||||
os.makedirs(self.test_root_dir)
|
||||
|
||||
flags.PROFILES_DIR = self.test_root_dir
|
||||
profiles_path = os.path.join(self.test_root_dir, 'profiles.yml')
|
||||
with open(profiles_path, 'w') as f:
|
||||
yaml.safe_dump(profile_config, f, default_flow_style=True)
|
||||
self._profile_config = profile_config
|
||||
|
||||
def set_packages(self):
|
||||
if self.packages_config is not None:
|
||||
with open('packages.yml', 'w') as f:
|
||||
yaml.safe_dump(self.packages_config, f, default_flow_style=True)
|
||||
|
||||
def set_selectors(self):
|
||||
if self.selectors_config is not None:
|
||||
with open('selectors.yml', 'w') as f:
|
||||
yaml.safe_dump(self.selectors_config, f, default_flow_style=True)
|
||||
|
||||
def load_config(self):
|
||||
# we've written our profile and project. Now we want to instantiate a
|
||||
# fresh adapter for the tests.
|
||||
# it's important to use a different connection handle here so
|
||||
# we don't look into an incomplete transaction
|
||||
kwargs = {
|
||||
'profile': None,
|
||||
'profiles_dir': self.test_root_dir,
|
||||
'target': None,
|
||||
}
|
||||
|
||||
config = RuntimeConfig.from_args(TestArgs(kwargs))
|
||||
|
||||
register_adapter(config)
|
||||
adapter = get_adapter(config)
|
||||
adapter.cleanup_connections()
|
||||
self.adapter_type = adapter.type()
|
||||
self.adapter = adapter
|
||||
self.config = config
|
||||
|
||||
self._drop_schemas()
|
||||
self._create_schemas()
|
||||
|
||||
def quote_as_configured(self, value, quote_key):
|
||||
return self.adapter.quote_as_configured(value, quote_key)
|
||||
|
||||
def tearDown(self):
|
||||
# get any current run adapter and clean up its connections before we
|
||||
# reset them. It'll probably be different from ours because
|
||||
# handle_and_check() calls reset_adapters().
|
||||
register_adapter(self.config)
|
||||
adapter = get_adapter(self.config)
|
||||
if adapter is not self.adapter:
|
||||
adapter.cleanup_connections()
|
||||
if not hasattr(self, 'adapter'):
|
||||
self.adapter = adapter
|
||||
|
||||
self._drop_schemas()
|
||||
|
||||
self.adapter.cleanup_connections()
|
||||
reset_adapters()
|
||||
os.chdir(INITIAL_ROOT)
|
||||
try:
|
||||
shutil.rmtree(self.test_root_dir)
|
||||
except EnvironmentError:
|
||||
logger.exception('Could not clean up after test - {} not removable'
|
||||
.format(self.test_root_dir))
|
||||
|
||||
def _get_schema_fqn(self, database, schema):
|
||||
schema_fqn = self.quote_as_configured(schema, 'schema')
|
||||
return schema_fqn
|
||||
|
||||
def _create_schema_named(self, database, schema):
|
||||
schema_fqn = self._get_schema_fqn(database, schema)
|
||||
self.run_sql(self.CREATE_SCHEMA_STATEMENT.format(schema_fqn))
|
||||
self._created_schemas.add(schema_fqn)
|
||||
|
||||
def _drop_schema_named(self, database, schema):
|
||||
schema_fqn = self._get_schema_fqn(database, schema)
|
||||
self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn))
|
||||
|
||||
def _create_schemas(self):
|
||||
schema = self.unique_schema()
|
||||
with self.adapter.connection_named('__test'):
|
||||
self._create_schema_named(self.default_database, schema)
|
||||
|
||||
def _drop_schemas_sql(self):
|
||||
schema = self.unique_schema()
|
||||
# we always want to drop these if necessary, we'll clear it soon.
|
||||
self._created_schemas.add(
|
||||
self._get_schema_fqn(self.default_database, schema)
|
||||
)
|
||||
drop_alternative = (
|
||||
self.setup_alternate_db and
|
||||
self.adapter_type not in {'redshift'} and
|
||||
self.alternative_database
|
||||
)
|
||||
if drop_alternative:
|
||||
self._created_schemas.add(
|
||||
self._get_schema_fqn(self.alternative_database, schema)
|
||||
)
|
||||
|
||||
for schema_fqn in self._created_schemas:
|
||||
self.run_sql(self.DROP_SCHEMA_STATEMENT.format(schema_fqn))
|
||||
|
||||
self._created_schemas.clear()
|
||||
|
||||
def _drop_schemas(self):
|
||||
with self.adapter.connection_named('__test'):
|
||||
self._drop_schemas_sql()
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
}
|
||||
|
||||
@property
|
||||
def profile_config(self):
|
||||
return {}
|
||||
|
||||
def run_dbt(self, args=None, expect_pass=True, profiles_dir=True):
|
||||
res, success = self.run_dbt_and_check(args=args, profiles_dir=profiles_dir)
|
||||
self.assertEqual(
|
||||
success, expect_pass,
|
||||
"dbt exit state did not match expected")
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def run_dbt_and_capture(self, *args, **kwargs):
|
||||
try:
|
||||
initial_stdout = log_manager.stdout
|
||||
initial_stderr = log_manager.stderr
|
||||
stringbuf = io.StringIO()
|
||||
log_manager.set_output_stream(stringbuf)
|
||||
|
||||
res = self.run_dbt(*args, **kwargs)
|
||||
stdout = stringbuf.getvalue()
|
||||
|
||||
finally:
|
||||
log_manager.set_output_stream(initial_stdout, initial_stderr)
|
||||
|
||||
return res, stdout
|
||||
|
||||
def run_dbt_and_check(self, args=None, profiles_dir=True):
|
||||
log_manager.reset_handlers()
|
||||
if args is None:
|
||||
args = ["run"]
|
||||
|
||||
final_args = []
|
||||
|
||||
if os.getenv('DBT_TEST_SINGLE_THREADED') in ('y', 'Y', '1'):
|
||||
final_args.append('--single-threaded')
|
||||
|
||||
final_args.extend(args)
|
||||
|
||||
if profiles_dir:
|
||||
final_args.extend(['--profiles-dir', self.test_root_dir])
|
||||
final_args.append('--log-cache-events')
|
||||
|
||||
logger.info("Invoking dbt with {}".format(final_args))
|
||||
return dbt.handle_and_check(final_args)
|
||||
|
||||
def run_sql_file(self, path, kwargs=None):
|
||||
with open(path, 'r') as f:
|
||||
statements = f.read().split(";")
|
||||
for statement in statements:
|
||||
self.run_sql(statement, kwargs=kwargs)
|
||||
|
||||
def transform_sql(self, query, kwargs=None):
|
||||
to_return = query
|
||||
|
||||
base_kwargs = {
|
||||
'schema': self.unique_schema(),
|
||||
'database': self.adapter.quote(self.default_database),
|
||||
}
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
base_kwargs.update(kwargs)
|
||||
|
||||
to_return = to_return.format(**base_kwargs)
|
||||
|
||||
return to_return
|
||||
|
||||
def run_sql_common(self, sql, fetch, conn):
|
||||
with conn.handle.cursor() as cursor:
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
conn.handle.commit()
|
||||
if fetch == 'one':
|
||||
return cursor.fetchone()
|
||||
elif fetch == 'all':
|
||||
return cursor.fetchall()
|
||||
else:
|
||||
return
|
||||
except BaseException as e:
|
||||
if conn.handle and not getattr(conn.handle, 'closed', True):
|
||||
conn.handle.rollback()
|
||||
print(sql)
|
||||
print(e)
|
||||
raise
|
||||
finally:
|
||||
conn.transaction_open = False
|
||||
|
||||
def run_sql(self, query, fetch='None', kwargs=None, connection_name=None):
|
||||
if connection_name is None:
|
||||
connection_name = '__test'
|
||||
|
||||
if query.strip() == "":
|
||||
return
|
||||
|
||||
sql = self.transform_sql(query, kwargs=kwargs)
|
||||
|
||||
with self.get_connection(connection_name) as conn:
|
||||
logger.debug('test connection "{}" executing: {}'.format(conn.name, sql))
|
||||
return self.run_sql_common(sql, fetch, conn)
|
||||
|
||||
def _ilike(self, target, value):
|
||||
return "{} ilike '{}'".format(target, value)
|
||||
|
||||
def get_many_table_columns_information_schema(self, tables, schema, database=None):
|
||||
columns = 'table_name, column_name, data_type, character_maximum_length'
|
||||
|
||||
sql = """
|
||||
select {columns}
|
||||
from {db_string}information_schema.columns
|
||||
where {schema_filter}
|
||||
and ({table_filter})
|
||||
order by column_name asc"""
|
||||
|
||||
db_string = ''
|
||||
if database:
|
||||
db_string = self.quote_as_configured(database, 'database') + '.'
|
||||
|
||||
table_filters_s = " OR ".join(
|
||||
self._ilike('table_name', table.replace('"', ''))
|
||||
for table in tables
|
||||
)
|
||||
schema_filter = self._ilike('table_schema', schema)
|
||||
|
||||
sql = sql.format(
|
||||
columns=columns,
|
||||
schema_filter=schema_filter,
|
||||
table_filter=table_filters_s,
|
||||
db_string=db_string)
|
||||
|
||||
columns = self.run_sql(sql, fetch='all')
|
||||
return list(map(self.filter_many_columns, columns))
|
||||
|
||||
def get_many_table_columns(self, tables, schema, database=None):
|
||||
result = self.get_many_table_columns_information_schema(tables, schema, database)
|
||||
result.sort(key=lambda x: '{}.{}'.format(x[0], x[1]))
|
||||
return result
|
||||
|
||||
def filter_many_columns(self, column):
|
||||
if len(column) == 3:
|
||||
table_name, column_name, data_type = column
|
||||
char_size = None
|
||||
else:
|
||||
table_name, column_name, data_type, char_size = column
|
||||
return (table_name, column_name, data_type, char_size)
|
||||
|
||||
@contextmanager
|
||||
def get_connection(self, name=None):
|
||||
"""Create a test connection context where all executed macros, etc will
|
||||
get self.adapter as the adapter.
|
||||
|
||||
This allows tests to run normal adapter macros as if reset_adapters()
|
||||
were not called by handle_and_check (for asserts, etc)
|
||||
"""
|
||||
if name is None:
|
||||
name = '__test'
|
||||
with patch.object(providers, 'get_adapter', return_value=self.adapter):
|
||||
with self.adapter.connection_named(name):
|
||||
conn = self.adapter.connections.get_thread_connection()
|
||||
yield conn
|
||||
|
||||
def get_relation_columns(self, relation):
|
||||
with self.get_connection():
|
||||
columns = self.adapter.get_columns_in_relation(relation)
|
||||
|
||||
return sorted(((c.name, c.dtype, c.char_size) for c in columns),
|
||||
key=lambda x: x[0])
|
||||
|
||||
def get_table_columns(self, table, schema=None, database=None):
|
||||
schema = self.unique_schema() if schema is None else schema
|
||||
database = self.default_database if database is None else database
|
||||
relation = self.adapter.Relation.create(
|
||||
database=database,
|
||||
schema=schema,
|
||||
identifier=table,
|
||||
type='table',
|
||||
quote_policy=self.config.quoting
|
||||
)
|
||||
return self.get_relation_columns(relation)
|
||||
|
||||
def get_table_columns_as_dict(self, tables, schema=None):
|
||||
col_matrix = self.get_many_table_columns(tables, schema)
|
||||
res = {}
|
||||
for row in col_matrix:
|
||||
table_name = row[0]
|
||||
col_def = row[1:]
|
||||
if table_name not in res:
|
||||
res[table_name] = []
|
||||
res[table_name].append(col_def)
|
||||
return res
|
||||
|
||||
def get_models_in_schema(self, schema=None):
|
||||
schema = self.unique_schema() if schema is None else schema
|
||||
sql = """
|
||||
select table_name,
|
||||
case when table_type = 'BASE TABLE' then 'table'
|
||||
when table_type = 'VIEW' then 'view'
|
||||
else table_type
|
||||
end as materialization
|
||||
from information_schema.tables
|
||||
where {}
|
||||
order by table_name
|
||||
"""
|
||||
|
||||
sql = sql.format(self._ilike('table_schema', schema))
|
||||
result = self.run_sql(sql, fetch='all')
|
||||
|
||||
return {model_name: materialization for (model_name, materialization) in result}
|
||||
|
||||
def _assertTablesEqualSql(self, relation_a, relation_b, columns=None):
|
||||
if columns is None:
|
||||
columns = self.get_relation_columns(relation_a)
|
||||
column_names = [c[0] for c in columns]
|
||||
|
||||
sql = self.adapter.get_rows_different_sql(
|
||||
relation_a, relation_b, column_names
|
||||
)
|
||||
|
||||
return sql
|
||||
|
||||
def assertTablesEqual(self, table_a, table_b,
|
||||
table_a_schema=None, table_b_schema=None,
|
||||
table_a_db=None, table_b_db=None):
|
||||
if table_a_schema is None:
|
||||
table_a_schema = self.unique_schema()
|
||||
|
||||
if table_b_schema is None:
|
||||
table_b_schema = self.unique_schema()
|
||||
|
||||
if table_a_db is None:
|
||||
table_a_db = self.default_database
|
||||
|
||||
if table_b_db is None:
|
||||
table_b_db = self.default_database
|
||||
|
||||
relation_a = self._make_relation(table_a, table_a_schema, table_a_db)
|
||||
relation_b = self._make_relation(table_b, table_b_schema, table_b_db)
|
||||
|
||||
self._assertTableColumnsEqual(relation_a, relation_b)
|
||||
|
||||
sql = self._assertTablesEqualSql(relation_a, relation_b)
|
||||
result = self.run_sql(sql, fetch='one')
|
||||
|
||||
self.assertEqual(
|
||||
result[0],
|
||||
0,
|
||||
'row_count_difference nonzero: ' + sql
|
||||
)
|
||||
self.assertEqual(
|
||||
result[1],
|
||||
0,
|
||||
'num_mismatched nonzero: ' + sql
|
||||
)
|
||||
|
||||
def _make_relation(self, identifier, schema=None, database=None):
|
||||
if schema is None:
|
||||
schema = self.unique_schema()
|
||||
if database is None:
|
||||
database = self.default_database
|
||||
return self.adapter.Relation.create(
|
||||
database=database,
|
||||
schema=schema,
|
||||
identifier=identifier,
|
||||
quote_policy=self.config.quoting
|
||||
)
|
||||
|
||||
def get_many_relation_columns(self, relations):
|
||||
"""Returns a dict of (datbase, schema) -> (dict of (table_name -> list of columns))
|
||||
"""
|
||||
schema_fqns = {}
|
||||
for rel in relations:
|
||||
this_schema = schema_fqns.setdefault((rel.database, rel.schema), [])
|
||||
this_schema.append(rel.identifier)
|
||||
|
||||
column_specs = {}
|
||||
for key, tables in schema_fqns.items():
|
||||
database, schema = key
|
||||
columns = self.get_many_table_columns(tables, schema, database=database)
|
||||
table_columns = {}
|
||||
for col in columns:
|
||||
table_columns.setdefault(col[0], []).append(col[1:])
|
||||
for rel_name, columns in table_columns.items():
|
||||
key = (database, schema, rel_name)
|
||||
column_specs[key] = columns
|
||||
|
||||
return column_specs
|
||||
|
||||
def assertManyRelationsEqual(self, relations, default_schema=None, default_database=None):
|
||||
if default_schema is None:
|
||||
default_schema = self.unique_schema()
|
||||
if default_database is None:
|
||||
default_database = self.default_database
|
||||
|
||||
specs = []
|
||||
for relation in relations:
|
||||
if not isinstance(relation, (tuple, list)):
|
||||
relation = [relation]
|
||||
|
||||
assert len(relation) <= 3
|
||||
|
||||
if len(relation) == 3:
|
||||
relation = self._make_relation(*relation)
|
||||
elif len(relation) == 2:
|
||||
relation = self._make_relation(relation[0], relation[1], default_database)
|
||||
elif len(relation) == 1:
|
||||
relation = self._make_relation(relation[0], default_schema, default_database)
|
||||
else:
|
||||
raise ValueError('relation must be a sequence of 1, 2, or 3 values')
|
||||
|
||||
specs.append(relation)
|
||||
|
||||
with self.get_connection():
|
||||
column_specs = self.get_many_relation_columns(specs)
|
||||
|
||||
# make sure everyone has equal column definitions
|
||||
first_columns = None
|
||||
for relation in specs:
|
||||
key = (relation.database, relation.schema, relation.identifier)
|
||||
# get a good error here instead of a hard-to-diagnose KeyError
|
||||
self.assertIn(key, column_specs, f'No columns found for {key}')
|
||||
columns = column_specs[key]
|
||||
if first_columns is None:
|
||||
first_columns = columns
|
||||
else:
|
||||
self.assertEqual(
|
||||
first_columns, columns,
|
||||
'{} did not match {}'.format(str(specs[0]), str(relation))
|
||||
)
|
||||
|
||||
# make sure everyone has the same data. if we got here, everyone had
|
||||
# the same column specs!
|
||||
first_relation = None
|
||||
for relation in specs:
|
||||
if first_relation is None:
|
||||
first_relation = relation
|
||||
else:
|
||||
sql = self._assertTablesEqualSql(first_relation, relation,
|
||||
columns=first_columns)
|
||||
result = self.run_sql(sql, fetch='one')
|
||||
|
||||
self.assertEqual(
|
||||
result[0],
|
||||
0,
|
||||
'row_count_difference nonzero: ' + sql
|
||||
)
|
||||
self.assertEqual(
|
||||
result[1],
|
||||
0,
|
||||
'num_mismatched nonzero: ' + sql
|
||||
)
|
||||
|
||||
def assertManyTablesEqual(self, *args):
|
||||
schema = self.unique_schema()
|
||||
|
||||
all_tables = []
|
||||
for table_equivalencies in args:
|
||||
all_tables += list(table_equivalencies)
|
||||
|
||||
all_cols = self.get_table_columns_as_dict(all_tables, schema)
|
||||
|
||||
for table_equivalencies in args:
|
||||
first_table = table_equivalencies[0]
|
||||
first_relation = self._make_relation(first_table)
|
||||
|
||||
# assert that all tables have the same columns
|
||||
base_result = all_cols[first_table]
|
||||
self.assertTrue(len(base_result) > 0)
|
||||
|
||||
for other_table in table_equivalencies[1:]:
|
||||
other_result = all_cols[other_table]
|
||||
self.assertTrue(len(other_result) > 0)
|
||||
self.assertEqual(base_result, other_result)
|
||||
|
||||
other_relation = self._make_relation(other_table)
|
||||
sql = self._assertTablesEqualSql(first_relation,
|
||||
other_relation,
|
||||
columns=base_result)
|
||||
result = self.run_sql(sql, fetch='one')
|
||||
|
||||
self.assertEqual(
|
||||
result[0],
|
||||
0,
|
||||
'row_count_difference nonzero: ' + sql
|
||||
)
|
||||
self.assertEqual(
|
||||
result[1],
|
||||
0,
|
||||
'num_mismatched nonzero: ' + sql
|
||||
)
|
||||
|
||||
|
||||
def _assertTableRowCountsEqual(self, relation_a, relation_b):
|
||||
cmp_query = """
|
||||
with table_a as (
|
||||
|
||||
select count(*) as num_rows from {}
|
||||
|
||||
), table_b as (
|
||||
|
||||
select count(*) as num_rows from {}
|
||||
|
||||
)
|
||||
|
||||
select table_a.num_rows - table_b.num_rows as difference
|
||||
from table_a, table_b
|
||||
|
||||
""".format(str(relation_a), str(relation_b))
|
||||
|
||||
res = self.run_sql(cmp_query, fetch='one')
|
||||
|
||||
self.assertEqual(int(res[0]), 0, "Row count of table {} doesn't match row count of table {}. ({} rows different)".format(
|
||||
relation_a.identifier,
|
||||
relation_b.identifier,
|
||||
res[0]
|
||||
)
|
||||
)
|
||||
|
||||
def assertTableDoesNotExist(self, table, schema=None, database=None):
|
||||
columns = self.get_table_columns(table, schema, database)
|
||||
|
||||
self.assertEqual(
|
||||
len(columns),
|
||||
0
|
||||
)
|
||||
|
||||
def assertTableDoesExist(self, table, schema=None, database=None):
|
||||
columns = self.get_table_columns(table, schema, database)
|
||||
|
||||
self.assertGreater(
|
||||
len(columns),
|
||||
0
|
||||
)
|
||||
|
||||
def _assertTableColumnsEqual(self, relation_a, relation_b):
|
||||
table_a_result = self.get_relation_columns(relation_a)
|
||||
table_b_result = self.get_relation_columns(relation_b)
|
||||
|
||||
text_types = {'text', 'character varying', 'character', 'varchar'}
|
||||
|
||||
self.assertEqual(len(table_a_result), len(table_b_result))
|
||||
for a_column, b_column in zip(table_a_result, table_b_result):
|
||||
a_name, a_type, a_size = a_column
|
||||
b_name, b_type, b_size = b_column
|
||||
self.assertEqual(a_name, b_name,
|
||||
'{} vs {}: column "{}" != "{}"'.format(
|
||||
relation_a, relation_b, a_name, b_name
|
||||
))
|
||||
|
||||
self.assertEqual(a_type, b_type,
|
||||
'{} vs {}: column "{}" has type "{}" != "{}"'.format(
|
||||
relation_a, relation_b, a_name, a_type, b_type
|
||||
))
|
||||
|
||||
self.assertEqual(a_size, b_size,
|
||||
'{} vs {}: column "{}" has size "{}" != "{}"'.format(
|
||||
relation_a, relation_b, a_name, a_size, b_size
|
||||
))
|
||||
|
||||
def assertEquals(self, *args, **kwargs):
|
||||
# assertEquals is deprecated. This makes the warnings less chatty
|
||||
self.assertEqual(*args, **kwargs)
|
||||
|
||||
def assertBetween(self, timestr, start, end=None):
|
||||
datefmt = '%Y-%m-%dT%H:%M:%S.%fZ'
|
||||
if end is None:
|
||||
end = datetime.utcnow()
|
||||
|
||||
parsed = datetime.strptime(timestr, datefmt)
|
||||
|
||||
self.assertLessEqual(start, parsed,
|
||||
'parsed date {} happened before {}'.format(
|
||||
parsed,
|
||||
start.strftime(datefmt))
|
||||
)
|
||||
self.assertGreaterEqual(end, parsed,
|
||||
'parsed date {} happened after {}'.format(
|
||||
parsed,
|
||||
end.strftime(datefmt))
|
||||
)
|
||||
|
||||
|
||||
def use_profile(profile_name):
|
||||
"""A decorator to declare a test method as using a particular profile.
|
||||
Handles both setting the nose attr and calling self.use_profile.
|
||||
|
||||
Use like this:
|
||||
|
||||
class TestSomething(DBIntegrationTest):
|
||||
@use_profile('postgres')
|
||||
def test_postgres_thing(self):
|
||||
self.assertEqual(self.adapter_type, 'postgres')
|
||||
|
||||
@use_profile('snowflake')
|
||||
def test_snowflake_thing(self):
|
||||
self.assertEqual(self.adapter_type, 'snowflake')
|
||||
"""
|
||||
def outer(wrapped):
|
||||
@getattr(pytest.mark, 'profile_'+profile_name)
|
||||
@wraps(wrapped)
|
||||
def func(self, *args, **kwargs):
|
||||
return wrapped(self, *args, **kwargs)
|
||||
# sanity check at import time
|
||||
assert _profile_from_test_name(wrapped.__name__) == profile_name
|
||||
return func
|
||||
return outer
|
||||
|
||||
|
||||
class AnyFloat:
|
||||
"""Any float. Use this in assertEqual() calls to assert that it is a float.
|
||||
"""
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, float)
|
||||
|
||||
|
||||
class AnyString:
|
||||
"""Any string. Use this in assertEqual() calls to assert that it is a string.
|
||||
"""
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, str)
|
||||
|
||||
|
||||
class AnyStringWith:
|
||||
def __init__(self, contains=None):
|
||||
self.contains = contains
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, str):
|
||||
return False
|
||||
|
||||
if self.contains is None:
|
||||
return True
|
||||
|
||||
return self.contains in other
|
||||
|
||||
def __repr__(self):
|
||||
return 'AnyStringWith<{!r}>'.format(self.contains)
|
||||
|
||||
|
||||
def get_manifest():
|
||||
path = './target/partial_parse.msgpack'
|
||||
if os.path.exists(path):
|
||||
with open(path, 'rb') as fp:
|
||||
manifest_mp = fp.read()
|
||||
manifest: Manifest = Manifest.from_msgpack(manifest_mp)
|
||||
return manifest
|
||||
else:
|
||||
return None
|
||||
@@ -0,0 +1,8 @@
|
||||
|
||||
{{ config(materialized=var('materialized')) }}
|
||||
|
||||
select '{{ var("materialized") }}' as materialization
|
||||
|
||||
{% if var('materialized') == 'incremental' and is_incremental() %}
|
||||
where 'abc' != (select max(materialization) from {{ this }})
|
||||
{% endif %}
|
||||
@@ -0,0 +1,44 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
|
||||
|
||||
class TestChangingRelationType(DBTIntegrationTest):
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return "changing_relation_type"
|
||||
|
||||
@staticmethod
|
||||
def dir(path):
|
||||
return path.lstrip("/")
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir("models")
|
||||
|
||||
def swap_types_and_test(self):
|
||||
# test that dbt is able to do intelligent things when changing
|
||||
# between materializations that create tables and views.
|
||||
|
||||
results = self.run_dbt(['run', '--vars', 'materialized: view'])
|
||||
self.assertEqual(results[0].node.config.materialized, 'view')
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
results = self.run_dbt(['run', '--vars', 'materialized: table'])
|
||||
self.assertEqual(results[0].node.config.materialized, 'table')
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
results = self.run_dbt(['run', '--vars', 'materialized: view'])
|
||||
self.assertEqual(results[0].node.config.materialized, 'view')
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
results = self.run_dbt(['run', '--vars', 'materialized: incremental'])
|
||||
self.assertEqual(results[0].node.config.materialized, 'incremental')
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
results = self.run_dbt(['run', '--vars', 'materialized: view'])
|
||||
self.assertEqual(results[0].node.config.materialized, 'view')
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
@use_profile("redshift")
|
||||
def test__redshift__switch_materialization(self):
|
||||
self.swap_types_and_test()
|
||||
@@ -0,0 +1 @@
|
||||
select 1 as {{ adapter.quote("2id") }}
|
||||
@@ -0,0 +1,9 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: quote_model
|
||||
description: "model to test column quotes and comments"
|
||||
columns:
|
||||
- name: 2id
|
||||
description: "XXX My description"
|
||||
quote: true
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class TestColumnComment(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "column_comment"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'models': {
|
||||
'test': {
|
||||
'materialized': 'table',
|
||||
'+persist_docs': {
|
||||
"relation": True,
|
||||
"columns": True,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def run_has_comments(self):
|
||||
self.run_dbt()
|
||||
self.run_dbt(['docs', 'generate'])
|
||||
with open('target/catalog.json') as fp:
|
||||
catalog_data = json.load(fp)
|
||||
assert 'nodes' in catalog_data
|
||||
assert len(catalog_data['nodes']) == 1
|
||||
column_node = catalog_data['nodes']['model.test.quote_model']
|
||||
column_comment = column_node['columns']['2id']['comment']
|
||||
assert column_comment.startswith('XXX')
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_comments(self):
|
||||
self.run_has_comments()
|
||||
@@ -0,0 +1,4 @@
|
||||
col_A,col_B
|
||||
1,2
|
||||
3,4
|
||||
5,6
|
||||
|
@@ -0,0 +1,19 @@
|
||||
{% set col_a = '"col_a"' %}
|
||||
{% set col_b = '"col_b"' %}
|
||||
{% if adapter.type() == 'bigquery' %}
|
||||
{% set col_a = '`col_a`' %}
|
||||
{% set col_b = '`col_b`' %}
|
||||
{% elif adapter.type() == 'snowflake' %}
|
||||
{% set col_a = '"COL_A"' %}
|
||||
{% set col_b = '"COL_B"' %}
|
||||
{% endif %}
|
||||
|
||||
{{config(
|
||||
materialized = 'incremental',
|
||||
unique_key = col_a,
|
||||
incremental_strategy = var('strategy')
|
||||
)}}
|
||||
|
||||
select
|
||||
{{ col_a }}, {{ col_b }}
|
||||
from {{ref('seed')}}
|
||||
@@ -0,0 +1,16 @@
|
||||
{% set col_a = '"col_A"' %}
|
||||
{% set col_b = '"col_B"' %}
|
||||
{% if adapter.type() == 'bigquery' %}
|
||||
{% set col_a = '`col_A`' %}
|
||||
{% set col_b = '`col_B`' %}
|
||||
{% endif %}
|
||||
|
||||
{{config(
|
||||
materialized = 'incremental',
|
||||
unique_key = col_a,
|
||||
incremental_strategy = var('strategy')
|
||||
)}}
|
||||
|
||||
select
|
||||
{{ col_a }}, {{ col_b }}
|
||||
from {{ref('seed')}}
|
||||
@@ -0,0 +1,78 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
import os
|
||||
|
||||
|
||||
class BaseColumnQuotingTest(DBTIntegrationTest):
|
||||
def column_quoting(self):
|
||||
raise NotImplementedError('column_quoting not implemented')
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return 'dbt_column_quoting'
|
||||
|
||||
@staticmethod
|
||||
def dir(value):
|
||||
return os.path.normpath(value)
|
||||
|
||||
def _run_columnn_quotes(self, strategy='delete+insert'):
|
||||
strategy_vars = '{{"strategy": "{}"}}'.format(strategy)
|
||||
self.run_dbt(['seed', '--vars', strategy_vars])
|
||||
self.run_dbt(['run', '--vars', strategy_vars])
|
||||
self.run_dbt(['run', '--vars', strategy_vars])
|
||||
|
||||
|
||||
class TestColumnQuotingDefault(BaseColumnQuotingTest):
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2
|
||||
}
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir('models-unquoted')
|
||||
|
||||
def run_dbt(self, *args, **kwargs):
|
||||
return super().run_dbt(*args, **kwargs)
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_column_quotes(self):
|
||||
self._run_columnn_quotes()
|
||||
|
||||
|
||||
class TestColumnQuotingDisabled(BaseColumnQuotingTest):
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir('models-unquoted')
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seeds': {
|
||||
'quote_columns': False,
|
||||
},
|
||||
}
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_column_quotes(self):
|
||||
self._run_columnn_quotes()
|
||||
|
||||
|
||||
class TestColumnQuotingEnabled(BaseColumnQuotingTest):
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir('models')
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seeds': {
|
||||
'quote_columns': True,
|
||||
},
|
||||
}
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_column_quotes(self):
|
||||
self._run_columnn_quotes()
|
||||
@@ -0,0 +1,5 @@
|
||||
select
|
||||
CAST(1 as int64) as int64_col,
|
||||
CAST(2.0 as float64) as float64_col,
|
||||
CAST(3.0 as numeric) as numeric_col,
|
||||
CAST('3' as string) as string_col,
|
||||
@@ -0,0 +1,10 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: model
|
||||
tests:
|
||||
- is_type:
|
||||
column_map:
|
||||
int64_col: ['integer', 'number']
|
||||
float64_col: ['float', 'number']
|
||||
numeric_col: ['numeric', 'number']
|
||||
string_col: ['string', 'not number']
|
||||
@@ -0,0 +1,10 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: model
|
||||
tests:
|
||||
- is_type:
|
||||
column_map:
|
||||
int64_col: ['string', 'not number']
|
||||
float64_col: ['float', 'number']
|
||||
numeric_col: ['numeric', 'number']
|
||||
string_col: ['string', 'not number']
|
||||
@@ -0,0 +1,6 @@
|
||||
{{ config(materialized='table') }}
|
||||
select
|
||||
CAST(1 as int64) as int64_col,
|
||||
CAST(2.0 as float64) as float64_col,
|
||||
CAST(3.0 as numeric) as numeric_col,
|
||||
CAST('3' as string) as string_col,
|
||||
@@ -0,0 +1,5 @@
|
||||
-- Macro to alter a column type
|
||||
{% macro test_alter_column_type(model_name, column_name, new_column_type) %}
|
||||
{% set relation = ref(model_name) %}
|
||||
{{ alter_column_type(relation, column_name, new_column_type) }}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,72 @@
|
||||
|
||||
{% macro simple_type_check_column(column, check) %}
|
||||
{% if check == 'string' %}
|
||||
{{ return(column.is_string()) }}
|
||||
{% elif check == 'float' %}
|
||||
{{ return(column.is_float()) }}
|
||||
{% elif check == 'number' %}
|
||||
{{ return(column.is_number()) }}
|
||||
{% elif check == 'numeric' %}
|
||||
{{ return(column.is_numeric()) }}
|
||||
{% elif check == 'integer' %}
|
||||
{{ return(column.is_integer()) }}
|
||||
{% else %}
|
||||
{% do exceptions.raise_compiler_error('invalid type check value: ' ~ check) %}
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro type_check_column(column, type_checks) %}
|
||||
{% set failures = [] %}
|
||||
{% for type_check in type_checks %}
|
||||
{% if type_check.startswith('not ') %}
|
||||
{% if simple_type_check_column(column, type_check[4:]) %}
|
||||
{% do log('simple_type_check_column got ', True) %}
|
||||
{% do failures.append(type_check) %}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{% if not simple_type_check_column(column, type_check) %}
|
||||
{% do failures.append(type_check) %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% if (failures | length) > 0 %}
|
||||
{% do log('column ' ~ column.name ~ ' had failures: ' ~ failures, info=True) %}
|
||||
{% endif %}
|
||||
{% do return((failures | length) == 0) %}
|
||||
{% endmacro %}
|
||||
|
||||
{% test is_type(model, column_map) %}
|
||||
{% if not execute %}
|
||||
{{ return(None) }}
|
||||
{% endif %}
|
||||
{% if not column_map %}
|
||||
{% do exceptions.raise_compiler_error('test_is_type must have a column name') %}
|
||||
{% endif %}
|
||||
{% set columns = adapter.get_columns_in_relation(model) %}
|
||||
{% if (column_map | length) != (columns | length) %}
|
||||
{% set column_map_keys = (column_map | list | string) %}
|
||||
{% set column_names = (columns | map(attribute='name') | list | string) %}
|
||||
{% do exceptions.raise_compiler_error('did not get all the columns/all columns not specified:\n' ~ column_map_keys ~ '\nvs\n' ~ column_names) %}
|
||||
{% endif %}
|
||||
{% set bad_columns = [] %}
|
||||
{% for column in columns %}
|
||||
{% set column_key = (column.name | lower) %}
|
||||
{% if column_key in column_map %}
|
||||
{% set type_checks = column_map[column_key] %}
|
||||
{% if not type_checks %}
|
||||
{% do exceptions.raise_compiler_error('no type checks?') %}
|
||||
{% endif %}
|
||||
{% if not type_check_column(column, type_checks) %}
|
||||
{% do bad_columns.append(column.name) %}
|
||||
{% endif %}
|
||||
{% else %}
|
||||
{% do exceptions.raise_compiler_error('column key ' ~ column_key ~ ' not found in ' ~ (column_map | list | string)) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% do log('bad columns: ' ~ bad_columns, info=True) %}
|
||||
{% for bad_column in bad_columns %}
|
||||
select '{{ bad_column }}' as bad_column
|
||||
{{ 'union all' if not loop.last }}
|
||||
{% endfor %}
|
||||
select * from (select 1 limit 0) as nothing
|
||||
{% endtest %}
|
||||
@@ -0,0 +1,9 @@
|
||||
select
|
||||
1::smallint as smallint_col,
|
||||
2::integer as int_col,
|
||||
3::bigint as bigint_col,
|
||||
4.0::real as real_col,
|
||||
5.0::double precision as double_col,
|
||||
6.0::numeric as numeric_col,
|
||||
'7'::text as text_col,
|
||||
'8'::varchar(20) as varchar_col
|
||||
@@ -0,0 +1,14 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: model
|
||||
tests:
|
||||
- is_type:
|
||||
column_map:
|
||||
smallint_col: ['integer', 'number']
|
||||
int_col: ['integer', 'number']
|
||||
bigint_col: ['integer', 'number']
|
||||
real_col: ['float', 'number']
|
||||
double_col: ['float', 'number']
|
||||
numeric_col: ['numeric', 'number']
|
||||
text_col: ['string', 'not number']
|
||||
varchar_col: ['string', 'not number']
|
||||
@@ -0,0 +1,17 @@
|
||||
select
|
||||
1::smallint as smallint_col,
|
||||
2::int as int_col,
|
||||
3::bigint as bigint_col,
|
||||
4::int2 as int2_col,
|
||||
5::int4 as int4_col,
|
||||
6::int8 as int8_col,
|
||||
7::integer as integer_col,
|
||||
8.0::real as real_col,
|
||||
9.0::float4 as float4_col,
|
||||
10.0::float8 as float8_col,
|
||||
11.0::float as float_col,
|
||||
12.0::double precision as double_col,
|
||||
13.0::numeric as numeric_col,
|
||||
14.0::decimal as decimal_col,
|
||||
'15'::varchar(20) as varchar_col,
|
||||
'16'::text as text_col
|
||||
@@ -0,0 +1,22 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: model
|
||||
tests:
|
||||
- is_type:
|
||||
column_map:
|
||||
smallint_col: ['integer', 'number']
|
||||
int_col: ['integer', 'number']
|
||||
bigint_col: ['integer', 'number']
|
||||
int2_col: ['integer', 'number']
|
||||
int4_col: ['integer', 'number']
|
||||
int8_col: ['integer', 'number']
|
||||
integer_col: ['integer', 'number']
|
||||
real_col: ['float', 'number']
|
||||
double_col: ['float', 'number']
|
||||
float4_col: ['float', 'number']
|
||||
float8_col: ['float', 'number']
|
||||
float_col: ['float', 'number']
|
||||
numeric_col: ['numeric', 'number']
|
||||
decimal_col: ['numeric', 'number']
|
||||
varchar_col: ['string', 'not number']
|
||||
text_col: ['string', 'not number']
|
||||
@@ -0,0 +1,18 @@
|
||||
select
|
||||
1::smallint as smallint_col,
|
||||
2::int as int_col,
|
||||
3::bigint as bigint_col,
|
||||
4::integer as integer_col,
|
||||
5::tinyint as tinyint_col,
|
||||
6::byteint as byteint_col,
|
||||
7.0::float as float_col,
|
||||
8.0::float4 as float4_col,
|
||||
9.0::float8 as float8_col,
|
||||
10.0::double as double_col,
|
||||
11.0::double precision as double_p_col,
|
||||
12.0::real as real_col,
|
||||
13.0::numeric as numeric_col,
|
||||
14.0::decimal as decimal_col,
|
||||
15.0::number as number_col,
|
||||
'16'::text as text_col,
|
||||
'17'::varchar(20) as varchar_col
|
||||
@@ -0,0 +1,23 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: model
|
||||
tests:
|
||||
- is_type:
|
||||
column_map:
|
||||
smallint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
int_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
bigint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
integer_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
tinyint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
byteint_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
float_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
float4_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
float8_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
double_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
double_p_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
real_col: ['float', 'number', 'not string', 'not integer', 'not numeric']
|
||||
numeric_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
decimal_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
number_col: ['numeric', 'number', 'not string', 'not float', 'not integer']
|
||||
text_col: ['string', 'not number']
|
||||
varchar_col: ['string', 'not number']
|
||||
@@ -0,0 +1,21 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
|
||||
|
||||
class TestColumnTypes(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return 'column_types'
|
||||
|
||||
def run_and_test(self):
|
||||
self.assertEqual(len(self.run_dbt(['run'])), 1)
|
||||
self.assertEqual(len(self.run_dbt(['test'])), 1)
|
||||
|
||||
|
||||
class TestRedshiftColumnTypes(TestColumnTypes):
|
||||
@property
|
||||
def models(self):
|
||||
return 'rs_models'
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_column_types(self):
|
||||
self.run_and_test()
|
||||
@@ -1,4 +1,4 @@
|
||||
from test.integration.base import DBTIntegrationTest, use_profile
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
import threading
|
||||
from dbt.adapters.factory import FACTORY
|
||||
|
||||
@@ -28,7 +28,7 @@ class BaseTestConcurrentTransaction(DBTIntegrationTest):
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return "concurrent_transaction_032"
|
||||
return "concurrent_transaction"
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(materialized='ephemeral') }}
|
||||
select * from {{ ref('view_model') }}
|
||||
@@ -0,0 +1,9 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: view_model
|
||||
columns:
|
||||
- name: id
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: name
|
||||
@@ -0,0 +1,5 @@
|
||||
{{ config(materialized='table') }}
|
||||
select * from {{ ref('ephemeral_model') }}
|
||||
|
||||
-- establish a macro dependency to trigger state:modified.macros
|
||||
-- depends on: {{ my_macro() }}
|
||||
@@ -0,0 +1 @@
|
||||
select * from no.such.table
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(materialized='ephemeral') }}
|
||||
select * from no.such.table
|
||||
@@ -0,0 +1,9 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: view_model
|
||||
columns:
|
||||
- name: id
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: name
|
||||
@@ -0,0 +1,5 @@
|
||||
{{ config(materialized='table') }}
|
||||
select * from {{ ref('ephemeral_model') }}
|
||||
|
||||
-- establish a macro dependency to trigger state:modified.macros
|
||||
-- depends on: {{ my_macro() }}
|
||||
@@ -0,0 +1 @@
|
||||
select * from no.such.table
|
||||
@@ -0,0 +1,9 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: view_model
|
||||
columns:
|
||||
- name: id
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: name
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(materialized='table') }}
|
||||
select 1 as fun
|
||||
@@ -0,0 +1 @@
|
||||
select * from {{ ref('seed') }}
|
||||
@@ -0,0 +1,3 @@
|
||||
id,name
|
||||
1,Alice
|
||||
2,Bob
|
||||
|
@@ -0,0 +1,3 @@
|
||||
{% macro my_macro() %}
|
||||
{% do log('in a macro' ) %}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(materialized='ephemeral') }}
|
||||
select * from {{ ref('view_model') }}
|
||||
@@ -0,0 +1,8 @@
|
||||
version: 2
|
||||
exposures:
|
||||
- name: my_exposure
|
||||
type: application
|
||||
depends_on:
|
||||
- ref('view_model')
|
||||
owner:
|
||||
email: test@example.com
|
||||
@@ -0,0 +1,9 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: view_model
|
||||
columns:
|
||||
- name: id
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: name
|
||||
@@ -0,0 +1,5 @@
|
||||
{{ config(materialized='table') }}
|
||||
select * from {{ ref('ephemeral_model') }}
|
||||
|
||||
-- establish a macro dependency to trigger state:modified.macros
|
||||
-- depends on: {{ my_macro() }}
|
||||
@@ -0,0 +1 @@
|
||||
select * from {{ ref('seed') }}
|
||||
@@ -0,0 +1,14 @@
|
||||
{% snapshot my_cool_snapshot %}
|
||||
|
||||
{{
|
||||
config(
|
||||
target_database=database,
|
||||
target_schema=schema,
|
||||
unique_key='id',
|
||||
strategy='check',
|
||||
check_cols=['id'],
|
||||
)
|
||||
}}
|
||||
select * from {{ ref('view_model') }}
|
||||
|
||||
{% endsnapshot %}
|
||||
@@ -0,0 +1,153 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
||||
|
||||
class TestDeferState(DBTIntegrationTest):
|
||||
@property
|
||||
def schema(self):
|
||||
return "defer_state"
|
||||
|
||||
@property
|
||||
def models(self):
|
||||
return "models"
|
||||
|
||||
def setUp(self):
|
||||
self.other_schema = None
|
||||
super().setUp()
|
||||
self._created_schemas.add(self.other_schema)
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'seeds': {
|
||||
'test': {
|
||||
'quote_columns': False,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def get_profile(self, adapter_type):
|
||||
if self.other_schema is None:
|
||||
self.other_schema = self.unique_schema() + '_other'
|
||||
if self.adapter_type == 'snowflake':
|
||||
self.other_schema = self.other_schema.upper()
|
||||
profile = super().get_profile(adapter_type)
|
||||
default_name = profile['test']['target']
|
||||
profile['test']['outputs']['otherschema'] = copy.deepcopy(profile['test']['outputs'][default_name])
|
||||
profile['test']['outputs']['otherschema']['schema'] = self.other_schema
|
||||
return profile
|
||||
|
||||
def copy_state(self):
|
||||
assert not os.path.exists('state')
|
||||
os.makedirs('state')
|
||||
shutil.copyfile('target/manifest.json', 'state/manifest.json')
|
||||
|
||||
def run_and_defer(self):
|
||||
results = self.run_dbt(['seed'])
|
||||
assert len(results) == 1
|
||||
assert not any(r.node.deferred for r in results)
|
||||
results = self.run_dbt(['run'])
|
||||
assert len(results) == 2
|
||||
assert not any(r.node.deferred for r in results)
|
||||
results = self.run_dbt(['test'])
|
||||
assert len(results) == 2
|
||||
|
||||
# copy files over from the happy times when we had a good target
|
||||
self.copy_state()
|
||||
|
||||
# test tests first, because run will change things
|
||||
# no state, wrong schema, failure.
|
||||
self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False)
|
||||
|
||||
# no state, run also fails
|
||||
self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False)
|
||||
|
||||
# defer test, it succeeds
|
||||
results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--target', 'otherschema'])
|
||||
|
||||
# with state it should work though
|
||||
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'])
|
||||
assert self.other_schema not in results[0].node.compiled_sql
|
||||
assert self.unique_schema() in results[0].node.compiled_sql
|
||||
|
||||
with open('target/manifest.json') as fp:
|
||||
data = json.load(fp)
|
||||
assert data['nodes']['seed.test.seed']['deferred']
|
||||
|
||||
assert len(results) == 1
|
||||
|
||||
def run_switchdirs_defer(self):
|
||||
results = self.run_dbt(['seed'])
|
||||
assert len(results) == 1
|
||||
results = self.run_dbt(['run'])
|
||||
assert len(results) == 2
|
||||
|
||||
# copy files over from the happy times when we had a good target
|
||||
self.copy_state()
|
||||
|
||||
self.use_default_project({'source-paths': ['changed_models']})
|
||||
# the sql here is just wrong, so it should fail
|
||||
self.run_dbt(
|
||||
['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'],
|
||||
expect_pass=False,
|
||||
)
|
||||
# but this should work since we just use the old happy model
|
||||
self.run_dbt(
|
||||
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
|
||||
expect_pass=True,
|
||||
)
|
||||
|
||||
self.use_default_project({'source-paths': ['changed_models_bad']})
|
||||
# this should fail because the table model refs a broken ephemeral
|
||||
# model, which it should see
|
||||
self.run_dbt(
|
||||
['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'],
|
||||
expect_pass=False,
|
||||
)
|
||||
|
||||
def run_defer_iff_not_exists(self):
|
||||
results = self.run_dbt(['seed', '--target', 'otherschema'])
|
||||
assert len(results) == 1
|
||||
results = self.run_dbt(['run', '--target', 'otherschema'])
|
||||
assert len(results) == 2
|
||||
|
||||
# copy files over from the happy times when we had a good target
|
||||
self.copy_state()
|
||||
results = self.run_dbt(['seed'])
|
||||
assert len(results) == 1
|
||||
results = self.run_dbt(['run', '--state', 'state', '--defer'])
|
||||
assert len(results) == 2
|
||||
|
||||
# because the seed now exists in our schema, we shouldn't defer it
|
||||
assert self.other_schema not in results[0].node.compiled_sql
|
||||
assert self.unique_schema() in results[0].node.compiled_sql
|
||||
|
||||
def run_defer_deleted_upstream(self):
|
||||
results = self.run_dbt(['seed'])
|
||||
assert len(results) == 1
|
||||
results = self.run_dbt(['run'])
|
||||
assert len(results) == 2
|
||||
|
||||
# copy files over from the happy times when we had a good target
|
||||
self.copy_state()
|
||||
|
||||
self.use_default_project({'source-paths': ['changed_models_missing']})
|
||||
# ephemeral_model is now gone. previously this caused a
|
||||
# keyerror (dbt#2875), now it should pass
|
||||
self.run_dbt(
|
||||
['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'],
|
||||
expect_pass=True,
|
||||
)
|
||||
|
||||
# despite deferral, test should use models just created in our schema
|
||||
results = self.run_dbt(['test', '--state', 'state', '--defer'])
|
||||
assert self.other_schema not in results[0].node.compiled_sql
|
||||
assert self.unique_schema() in results[0].node.compiled_sql
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_state_changetarget(self):
|
||||
self.run_and_defer()
|
||||
@@ -0,0 +1,17 @@
|
||||
{% macro some_macro(arg1, arg2) -%}
|
||||
{{ adapter_macro('some_macro', arg1, arg2) }}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
{% macro default__some_macro(arg1, arg2) %}
|
||||
{% do exceptions.raise_compiler_error('not allowed') %}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro postgres__some_macro(arg1, arg2) -%}
|
||||
{{ arg1 }}{{ arg2 }}
|
||||
{%- endmacro %}
|
||||
|
||||
|
||||
{% macro some_other_macro(arg1, arg2) -%}
|
||||
{{ adapter_macro('test.some_macro', arg1, arg2) }}
|
||||
{%- endmacro %}
|
||||
@@ -0,0 +1,4 @@
|
||||
{% if some_other_macro('foo', 'bar') != 'foobar' %}
|
||||
{% do exceptions.raise_compiler_error('invalid foobar') %}
|
||||
{% endif %}
|
||||
select 1 as id
|
||||
@@ -0,0 +1,4 @@
|
||||
{% if some_macro('foo', 'bar') != 'foobar' %}
|
||||
{% do exceptions.raise_compiler_error('invalid foobar') %}
|
||||
{% endif %}
|
||||
select 1 as id
|
||||
@@ -0,0 +1 @@
|
||||
select 1 as id
|
||||
@@ -0,0 +1,65 @@
|
||||
{%- materialization custom, default -%}
|
||||
|
||||
{%- set identifier = model['alias'] -%}
|
||||
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
|
||||
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}
|
||||
|
||||
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
|
||||
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database,
|
||||
type='view') -%}
|
||||
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
|
||||
schema=schema, database=database, type='view') -%}
|
||||
|
||||
/*
|
||||
This relation (probably) doesn't exist yet. If it does exist, it's a leftover from
|
||||
a previous run, and we're going to try to drop it immediately. At the end of this
|
||||
materialization, we're going to rename the "old_relation" to this identifier,
|
||||
and then we're going to drop it. In order to make sure we run the correct one of:
|
||||
- drop view ...
|
||||
- drop table ...
|
||||
|
||||
We need to set the type of this relation to be the type of the old_relation, if it exists,
|
||||
or else "view" as a sane default if it does not. Note that if the old_relation does not
|
||||
exist, then there is nothing to move out of the way and subsequentally drop. In that case,
|
||||
this relation will be effectively unused.
|
||||
*/
|
||||
{%- set backup_relation_type = 'view' if old_relation is none else old_relation.type -%}
|
||||
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
|
||||
schema=schema, database=database,
|
||||
type=backup_relation_type) -%}
|
||||
|
||||
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
|
||||
|
||||
{{ run_hooks(pre_hooks, inside_transaction=False) }}
|
||||
|
||||
-- drop the temp relations if they exists for some reason
|
||||
{{ adapter.drop_relation(intermediate_relation) }}
|
||||
{{ adapter.drop_relation(backup_relation) }}
|
||||
|
||||
-- `BEGIN` happens here:
|
||||
{{ run_hooks(pre_hooks, inside_transaction=True) }}
|
||||
|
||||
-- build model
|
||||
{% call statement('main') -%}
|
||||
{{ create_view_as(intermediate_relation, sql) }}
|
||||
{%- endcall %}
|
||||
|
||||
-- cleanup
|
||||
-- move the existing view out of the way
|
||||
{% if old_relation is not none %}
|
||||
{{ adapter.rename_relation(target_relation, backup_relation) }}
|
||||
{% endif %}
|
||||
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=True) }}
|
||||
|
||||
{{ adapter.commit() }}
|
||||
|
||||
{{ drop_relation_if_exists(backup_relation) }}
|
||||
|
||||
{{ run_hooks(post_hooks, inside_transaction=False) }}
|
||||
|
||||
{# do not return anything! #}
|
||||
{# {{ return({'relations': [target_relation]}) }} #}
|
||||
|
||||
{%- endmaterialization -%}
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(materialized='custom') }}
|
||||
select 1 as id
|
||||
@@ -0,0 +1,5 @@
|
||||
id,dupe
|
||||
1,a
|
||||
2,a
|
||||
3,a
|
||||
4,a
|
||||
|
@@ -0,0 +1,3 @@
|
||||
a,b
|
||||
1,hello
|
||||
2,goodbye
|
||||
|
@@ -0,0 +1,13 @@
|
||||
|
||||
|
||||
{% macro string_literal(s) -%}
|
||||
{{ adapter.dispatch('string_literal', packages=['test'])(s) }}
|
||||
{%- endmacro %}
|
||||
|
||||
{% macro default__string_literal(s) %}
|
||||
'{{ s }}'::text
|
||||
{% endmacro %}
|
||||
|
||||
{% macro bigquery__string_literal(s) %}
|
||||
cast('{{ s }}' as string)
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,10 @@
|
||||
|
||||
-- cross-db compatible test, similar to accepted_values
|
||||
|
||||
{% test expect_value(model, field, value) %}
|
||||
|
||||
select *
|
||||
from {{ model }}
|
||||
where {{ field }} != '{{ value }}'
|
||||
|
||||
{% endtest %}
|
||||
@@ -0,0 +1,2 @@
|
||||
|
||||
select {{ string_literal(this.name) }} as tablename
|
||||
@@ -0,0 +1,4 @@
|
||||
|
||||
{{ config(alias='override_alias') }}
|
||||
|
||||
select {{ string_literal(this.name) }} as tablename
|
||||
@@ -0,0 +1,9 @@
|
||||
|
||||
{{
|
||||
config(
|
||||
alias='foo',
|
||||
materialized='table'
|
||||
)
|
||||
}}
|
||||
|
||||
select {{ string_literal(this.name) }} as tablename
|
||||
@@ -0,0 +1,16 @@
|
||||
|
||||
{{
|
||||
config(
|
||||
materialized='table'
|
||||
)
|
||||
}}
|
||||
|
||||
with trigger_ref as (
|
||||
|
||||
-- we should still be able to ref a model by its filepath
|
||||
select * from {{ ref('foo_alias') }}
|
||||
|
||||
)
|
||||
|
||||
-- this name should still be the filename
|
||||
select {{ string_literal(this.name) }} as tablename
|
||||
@@ -0,0 +1,22 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: foo_alias
|
||||
tests:
|
||||
- expect_value:
|
||||
field: tablename
|
||||
value: foo
|
||||
- name: ref_foo_alias
|
||||
tests:
|
||||
- expect_value:
|
||||
field: tablename
|
||||
value: ref_foo_alias
|
||||
- name: alias_in_project
|
||||
tests:
|
||||
- expect_value:
|
||||
field: tablename
|
||||
value: project_alias
|
||||
- name: alias_in_project_with_override
|
||||
tests:
|
||||
- expect_value:
|
||||
field: tablename
|
||||
value: override_alias
|
||||
@@ -0,0 +1,4 @@
|
||||
version: 2
|
||||
models:
|
||||
- name: seed
|
||||
description: my cool seed
|
||||
@@ -0,0 +1,5 @@
|
||||
select 1 as id
|
||||
|
||||
{% if adapter.already_exists(this.schema, this.identifier) and not should_full_refresh() %}
|
||||
where id > (select max(id) from {{this}})
|
||||
{% endif %}
|
||||
@@ -0,0 +1,60 @@
|
||||
from tests.integration.base import DBTIntegrationTest, use_profile
|
||||
|
||||
from dbt import deprecations
|
||||
|
||||
|
||||
class BaseTestDeprecations(DBTIntegrationTest):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
deprecations.reset_deprecations()
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return "deprecation_test"
|
||||
|
||||
@staticmethod
|
||||
def dir(path):
|
||||
return path.lstrip("/")
|
||||
|
||||
|
||||
class TestAdapterMacroDeprecation(BaseTestDeprecations):
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir('adapter-macro-models')
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': [self.dir('adapter-macro-macros')]
|
||||
}
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_adapter_macro(self):
|
||||
self.assertEqual(deprecations.active_deprecations, set())
|
||||
# pick up the postgres macro
|
||||
self.run_dbt()
|
||||
expected = {'adapter-macro'}
|
||||
self.assertEqual(expected, deprecations.active_deprecations)
|
||||
|
||||
|
||||
class TestAdapterMacroDeprecationPackages(BaseTestDeprecations):
|
||||
@property
|
||||
def models(self):
|
||||
return self.dir('adapter-macro-models-package')
|
||||
|
||||
@property
|
||||
def project_config(self):
|
||||
return {
|
||||
'config-version': 2,
|
||||
'macro-paths': [self.dir('adapter-macro-macros')]
|
||||
}
|
||||
|
||||
@use_profile('redshift')
|
||||
def test_redshift_adapter_macro_pkg(self):
|
||||
self.assertEqual(deprecations.active_deprecations, set())
|
||||
# pick up the postgres macro
|
||||
self.assertEqual(deprecations.active_deprecations, set())
|
||||
self.run_dbt()
|
||||
expected = {'adapter-macro'}
|
||||
self.assertEqual(expected, deprecations.active_deprecations)
|
||||
@@ -0,0 +1 @@
|
||||
Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
|
||||
@@ -0,0 +1,9 @@
|
||||
{{
|
||||
config(
|
||||
materialized='table',
|
||||
partition_by={'field': 'updated_at', 'data_type': 'date'},
|
||||
cluster_by=['first_name']
|
||||
)
|
||||
}}
|
||||
|
||||
select id,first_name,email,ip_address,DATE(updated_at) as updated_at from {{ ref('seed') }}
|
||||
@@ -0,0 +1,9 @@
|
||||
{{
|
||||
config(
|
||||
materialized='table',
|
||||
partition_by={'field': 'updated_at', 'data_type': 'date'},
|
||||
cluster_by=['first_name','email']
|
||||
)
|
||||
}}
|
||||
|
||||
select id,first_name,email,ip_address,DATE(updated_at) as updated_at from {{ ref('seed') }}
|
||||
@@ -0,0 +1,15 @@
|
||||
{{
|
||||
config(
|
||||
materialized='table'
|
||||
)
|
||||
}}
|
||||
|
||||
select
|
||||
1 as field_1,
|
||||
2 as field_2,
|
||||
3 as field_3,
|
||||
|
||||
struct(
|
||||
5 as field_5,
|
||||
6 as field_6
|
||||
) as nested_field
|
||||
@@ -0,0 +1,7 @@
|
||||
{{
|
||||
config(
|
||||
materialized='view'
|
||||
)
|
||||
}}
|
||||
|
||||
select * from {{ ref('nested_table') }}
|
||||
@@ -0,0 +1,44 @@
|
||||
version: 2
|
||||
|
||||
models:
|
||||
- name: nested_view
|
||||
description: "The test model"
|
||||
columns:
|
||||
- name: field_1
|
||||
description: The first field
|
||||
- name: field_2
|
||||
description: The second field
|
||||
- name: field_3
|
||||
description: The third field
|
||||
- name: nested_field.field_4
|
||||
description: The first nested field
|
||||
- name: nested_field.field_5
|
||||
description: The second nested field
|
||||
- name: clustered
|
||||
description: "A clustered and partitioned copy of the test model"
|
||||
columns:
|
||||
- name: id
|
||||
description: The user id
|
||||
- name: first_name
|
||||
description: The user's name
|
||||
- name: email
|
||||
description: The user's email
|
||||
- name: ip_address
|
||||
description: The user's IP address
|
||||
- name: updated_at
|
||||
description: When the user was updated
|
||||
- name: multi_clustered
|
||||
description: "A clustered and partitioned copy of the test model, clustered on multiple columns"
|
||||
columns:
|
||||
- name: id
|
||||
description: The user id
|
||||
- name: first_name
|
||||
description: The user's name
|
||||
- name: email
|
||||
description: The user's email
|
||||
- name: ip_address
|
||||
description: The user's IP address
|
||||
- name: updated_at
|
||||
description: When the user was updated
|
||||
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(disabled=true, schema='notrealnotreal') }}
|
||||
select 1 as id
|
||||
@@ -0,0 +1,2 @@
|
||||
{{ config(schema=var('extra_schema')) }}
|
||||
select 1 as id
|
||||
@@ -0,0 +1,8 @@
|
||||
{{
|
||||
config(
|
||||
materialized='view',
|
||||
database=var('alternate_db')
|
||||
)
|
||||
}}
|
||||
|
||||
select * from {{ ref('seed') }}
|
||||
@@ -0,0 +1,21 @@
|
||||
version: 2
|
||||
|
||||
models:
|
||||
- name: model
|
||||
description: "The test model"
|
||||
columns:
|
||||
- name: id
|
||||
description: The user ID number
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: first_name
|
||||
description: The user's first name
|
||||
- name: email
|
||||
description: The user's email
|
||||
- name: ip_address
|
||||
description: The user's IP address
|
||||
- name: updated_at
|
||||
description: The last time this user's email was updated
|
||||
tests:
|
||||
- test.nothing
|
||||
@@ -0,0 +1,3 @@
|
||||
{% macro get_catalog(information_schema, schemas) %}
|
||||
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
|
||||
{% endmacro %}
|
||||
@@ -0,0 +1,9 @@
|
||||
|
||||
{% test nothing(model) %}
|
||||
|
||||
-- a silly test to make sure that table-level tests show up in the manifest
|
||||
-- without a column_name field
|
||||
select 0
|
||||
|
||||
{% endtest %}
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
|
||||
{% docs macro_info %}
|
||||
My custom test that I wrote that does nothing
|
||||
{% enddocs %}
|
||||
|
||||
{% docs macro_arg_info %}
|
||||
The model for my custom test
|
||||
{% enddocs %}
|
||||
@@ -0,0 +1,10 @@
|
||||
version: 2
|
||||
macros:
|
||||
- name: test_nothing
|
||||
description: "{{ doc('macro_info') }}"
|
||||
meta:
|
||||
some_key: 100
|
||||
arguments:
|
||||
- name: model
|
||||
type: Relation
|
||||
description: "{{ doc('macro_arg_info') }}"
|
||||
@@ -0,0 +1,8 @@
|
||||
{{
|
||||
config(
|
||||
materialized='view',
|
||||
database=var('alternate_db')
|
||||
)
|
||||
}}
|
||||
|
||||
select * from {{ ref('seed') }}
|
||||
@@ -0,0 +1 @@
|
||||
This is a readme.md file with {{ invalid-ish jinja }} in it
|
||||
@@ -0,0 +1,82 @@
|
||||
version: 2
|
||||
|
||||
models:
|
||||
- name: model
|
||||
description: "The test model"
|
||||
docs:
|
||||
show: false
|
||||
columns:
|
||||
- name: id
|
||||
description: The user ID number
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
- name: first_name
|
||||
description: The user's first name
|
||||
- name: email
|
||||
description: The user's email
|
||||
- name: ip_address
|
||||
description: The user's IP address
|
||||
- name: updated_at
|
||||
description: The last time this user's email was updated
|
||||
tests:
|
||||
- test.nothing
|
||||
|
||||
- name: second_model
|
||||
description: "The second test model"
|
||||
docs:
|
||||
show: false
|
||||
columns:
|
||||
- name: id
|
||||
description: The user ID number
|
||||
- name: first_name
|
||||
description: The user's first name
|
||||
- name: email
|
||||
description: The user's email
|
||||
- name: ip_address
|
||||
description: The user's IP address
|
||||
- name: updated_at
|
||||
description: The last time this user's email was updated
|
||||
|
||||
|
||||
sources:
|
||||
- name: my_source
|
||||
description: "My source"
|
||||
loader: a_loader
|
||||
schema: "{{ var('test_schema') }}"
|
||||
tables:
|
||||
- name: my_table
|
||||
description: "My table"
|
||||
identifier: seed
|
||||
quoting:
|
||||
identifier: True
|
||||
columns:
|
||||
- name: id
|
||||
description: "An ID field"
|
||||
|
||||
|
||||
exposures:
|
||||
- name: simple_exposure
|
||||
type: dashboard
|
||||
depends_on:
|
||||
- ref('model')
|
||||
- source('my_source', 'my_table')
|
||||
owner:
|
||||
email: something@example.com
|
||||
- name: notebook_exposure
|
||||
type: notebook
|
||||
depends_on:
|
||||
- ref('model')
|
||||
- ref('second_model')
|
||||
owner:
|
||||
email: something@example.com
|
||||
name: Some name
|
||||
description: >
|
||||
A description of the complex exposure
|
||||
maturity: medium
|
||||
meta:
|
||||
tool: 'my_tool'
|
||||
languages:
|
||||
- python
|
||||
tags: ['my_department']
|
||||
url: http://example.com/notebook/1
|
||||
@@ -0,0 +1,13 @@
|
||||
{%- if adapter.type() == 'snowflake' -%}
|
||||
{%- set schema_suffix = 'TEST' -%}
|
||||
{%- else -%}
|
||||
{%- set schema_suffix = 'test' -%}
|
||||
{%- endif -%}
|
||||
{{
|
||||
config(
|
||||
materialized='view',
|
||||
schema=schema_suffix,
|
||||
)
|
||||
}}
|
||||
|
||||
select * from {{ ref('seed') }}
|
||||
@@ -0,0 +1,31 @@
|
||||
{% docs ephemeral_summary %}
|
||||
A summmary table of the ephemeral copy of the seed data
|
||||
{% enddocs %}
|
||||
|
||||
{% docs summary_first_name %}
|
||||
The first name being summarized
|
||||
{% enddocs %}
|
||||
|
||||
{% docs summary_count %}
|
||||
The number of instances of the first name
|
||||
{% enddocs %}
|
||||
|
||||
{% docs view_summary %}
|
||||
A view of the summary of the ephemeral copy of the seed data
|
||||
{% enddocs %}
|
||||
|
||||
{% docs source_info %}
|
||||
My source
|
||||
{% enddocs %}
|
||||
|
||||
{% docs table_info %}
|
||||
My table
|
||||
{% enddocs %}
|
||||
|
||||
{% docs column_info %}
|
||||
An ID field
|
||||
{% enddocs %}
|
||||
|
||||
{% docs notebook_info %}
|
||||
A description of the complex exposure
|
||||
{% enddocs %}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user