Compare commits

...

1 Commits

Author SHA1 Message Date
Jeremy Cohen
1d8c5af513 Experiment: column types from empty query 2023-01-20 21:15:48 +01:00
4 changed files with 95 additions and 7 deletions

View File

@@ -269,6 +269,22 @@ class BaseAdapter(metaclass=AdapterMeta):
"""
return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch)
@available.parse(lambda *a, **k: [])
def get_column_schema_from_query(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.
:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the query status and results (empty if fetch=False).
:rtype: List[(column_name: str, data_type: str]
"""
return self.connections.get_column_schema_from_query(sql=sql)
@available.parse(lambda *a, **k: ("", empty_table()))
def get_partitions_metadata(self, table: str) -> Tuple[agate.Table]:
"""Obtain partitions metadata for a BigQuery partitioned table.

View File

@@ -128,6 +128,31 @@ class SQLConnectionManager(BaseConnectionManager):
return dbt.clients.agate_helper.table_from_data_flat(data, column_names)
@classmethod
def data_type_code_to_name(cls, int) -> str:
"""Get the string representation of the data type from the type_code."""
# https://peps.python.org/pep-0249/#type-objects
raise dbt.exceptions.NotImplementedError(
"`data_type_code_to_name` is not implemented for this adapter!"
)
@classmethod
def get_column_schema_from_cursor(cls, cursor: Any) -> List[Tuple[str, str]]:
# (column_name, data_type)
columns: List[Tuple[str, str]] = []
if cursor.description is not None:
# https://peps.python.org/pep-0249/#description
columns = [
# TODO: ignoring size, precision, scale for now
# (though it is part of DB-API standard, and our Column class does have these attributes)
# IMO user-defined contracts shouldn't have to match an exact size/precision/scale
(col[0], cls.data_type_code_to_name(col[1]))
for col in cursor.description
]
return columns
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
@@ -140,6 +165,20 @@ class SQLConnectionManager(BaseConnectionManager):
table = dbt.clients.agate_helper.empty_table()
return response, table
# TODO: do we need to care about auto_begin here?
def get_column_schema_from_query(self, sql: str) -> List[Tuple[str, str]]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql)
return self.get_column_schema_from_cursor(cursor)
# For dbt-bigquery
# def get_column_schema_from_query(cls, sql: str) -> List[Tuple[str, str]]:
# sql = self._add_query_comment(sql)
# # auto_begin is ignored on bigquery, and only included for consistency
# query_job, iterator = self.raw_execute(sql)
# columns = [(field.name, field.field_type) for field in resp.iterator]
# return columns
def add_begin_query(self):
return self.add_query("BEGIN", auto_begin=False)

View File

@@ -17,23 +17,47 @@
{% endmacro %}
{% macro get_empty_subquery_sql(select_sql) -%}
{{ return(adapter.dispatch('get_empty_subquery_sql', 'dbt')(select_sql)) }}
{% endmacro %}
{% macro default__get_empty_subquery_sql(select_sql) %}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endmacro %}
{% macro get_column_schema_from_query(select_sql) -%}
{{ return(adapter.dispatch('get_column_schema_from_query', 'dbt')(select_sql)) }}
{% endmacro %}
{% macro default__get_column_schema_from_query(select_sql) %}
{% set columns = [] %}
{% set sql = get_empty_subquery_sql(select_sql) %}
{% set column_schema = adapter.get_column_schema_from_query(sql) %}
{% for col in column_schema %}
-- api.Column.create includes a step for translating data type
-- TODO: could include size, precision, scale here
{% set column = api.Column.create(col[0], col[1]) %}
{% do columns.append(column) %}
{% endfor %}
{{ return(columns) }}
{% endmacro %}
-- here for back compat
{% macro get_columns_in_query(select_sql) -%}
{{ return(adapter.dispatch('get_columns_in_query', 'dbt')(select_sql)) }}
{% endmacro %}
{% macro default__get_columns_in_query(select_sql) %}
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{{ get_empty_subquery_sql(select_sql) }}
{% endcall %}
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}
{% macro alter_column_type(relation, column_name, new_column_type) -%}
{{ return(adapter.dispatch('alter_column_type', 'dbt')(relation, column_name, new_column_type)) }}
{% endmacro %}

View File

@@ -1,6 +1,7 @@
from contextlib import contextmanager
import psycopg2
from psycopg2.extensions import string_types
import dbt.exceptions
from dbt.adapters.base import Credentials
@@ -190,3 +191,11 @@ class PostgresConnectionManager(SQLConnectionManager):
status_messsage_strings = [part for part in status_message_parts if not part.isdigit()]
code = " ".join(status_messsage_strings)
return AdapterResponse(_message=message, code=code, rows_affected=rows)
@classmethod
def data_type_code_to_name(cls, type_code: int) -> str:
return string_types[type_code].name
# For dbt-snowflake
# from snowflake.connector.constants import FIELD_ID_TO_NAME
# return FIELD_ID_TO_NAME[type_code]