mirror of
https://github.com/dbt-labs/dbt-core
synced 2025-12-17 19:31:34 +00:00
Compare commits
1 Commits
jerco/upda
...
jerco/get-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d8c5af513 |
@@ -269,6 +269,22 @@ class BaseAdapter(metaclass=AdapterMeta):
|
||||
"""
|
||||
return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch)
|
||||
|
||||
@available.parse(lambda *a, **k: [])
|
||||
def get_column_schema_from_query(
|
||||
self, sql: str, auto_begin: bool = False, fetch: bool = False
|
||||
) -> Tuple[AdapterResponse, agate.Table]:
|
||||
"""Execute the given SQL. This is a thin wrapper around
|
||||
ConnectionManager.execute.
|
||||
|
||||
:param str sql: The sql to execute.
|
||||
:param bool auto_begin: If set, and dbt is not currently inside a
|
||||
transaction, automatically begin one.
|
||||
:param bool fetch: If set, fetch results.
|
||||
:return: A tuple of the query status and results (empty if fetch=False).
|
||||
:rtype: List[(column_name: str, data_type: str]
|
||||
"""
|
||||
return self.connections.get_column_schema_from_query(sql=sql)
|
||||
|
||||
@available.parse(lambda *a, **k: ("", empty_table()))
|
||||
def get_partitions_metadata(self, table: str) -> Tuple[agate.Table]:
|
||||
"""Obtain partitions metadata for a BigQuery partitioned table.
|
||||
|
||||
@@ -128,6 +128,31 @@ class SQLConnectionManager(BaseConnectionManager):
|
||||
|
||||
return dbt.clients.agate_helper.table_from_data_flat(data, column_names)
|
||||
|
||||
@classmethod
|
||||
def data_type_code_to_name(cls, int) -> str:
|
||||
"""Get the string representation of the data type from the type_code."""
|
||||
# https://peps.python.org/pep-0249/#type-objects
|
||||
raise dbt.exceptions.NotImplementedError(
|
||||
"`data_type_code_to_name` is not implemented for this adapter!"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_column_schema_from_cursor(cls, cursor: Any) -> List[Tuple[str, str]]:
|
||||
# (column_name, data_type)
|
||||
columns: List[Tuple[str, str]] = []
|
||||
|
||||
if cursor.description is not None:
|
||||
# https://peps.python.org/pep-0249/#description
|
||||
columns = [
|
||||
# TODO: ignoring size, precision, scale for now
|
||||
# (though it is part of DB-API standard, and our Column class does have these attributes)
|
||||
# IMO user-defined contracts shouldn't have to match an exact size/precision/scale
|
||||
(col[0], cls.data_type_code_to_name(col[1]))
|
||||
for col in cursor.description
|
||||
]
|
||||
|
||||
return columns
|
||||
|
||||
def execute(
|
||||
self, sql: str, auto_begin: bool = False, fetch: bool = False
|
||||
) -> Tuple[AdapterResponse, agate.Table]:
|
||||
@@ -140,6 +165,20 @@ class SQLConnectionManager(BaseConnectionManager):
|
||||
table = dbt.clients.agate_helper.empty_table()
|
||||
return response, table
|
||||
|
||||
# TODO: do we need to care about auto_begin here?
|
||||
def get_column_schema_from_query(self, sql: str) -> List[Tuple[str, str]]:
|
||||
sql = self._add_query_comment(sql)
|
||||
_, cursor = self.add_query(sql)
|
||||
return self.get_column_schema_from_cursor(cursor)
|
||||
|
||||
# For dbt-bigquery
|
||||
# def get_column_schema_from_query(cls, sql: str) -> List[Tuple[str, str]]:
|
||||
# sql = self._add_query_comment(sql)
|
||||
# # auto_begin is ignored on bigquery, and only included for consistency
|
||||
# query_job, iterator = self.raw_execute(sql)
|
||||
# columns = [(field.name, field.field_type) for field in resp.iterator]
|
||||
# return columns
|
||||
|
||||
def add_begin_query(self):
|
||||
return self.add_query("BEGIN", auto_begin=False)
|
||||
|
||||
|
||||
@@ -17,23 +17,47 @@
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro get_empty_subquery_sql(select_sql) -%}
|
||||
{{ return(adapter.dispatch('get_empty_subquery_sql', 'dbt')(select_sql)) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__get_empty_subquery_sql(select_sql) %}
|
||||
select * from (
|
||||
{{ select_sql }}
|
||||
) as __dbt_sbq
|
||||
where false
|
||||
limit 0
|
||||
{% endmacro %}
|
||||
|
||||
{% macro get_column_schema_from_query(select_sql) -%}
|
||||
{{ return(adapter.dispatch('get_column_schema_from_query', 'dbt')(select_sql)) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__get_column_schema_from_query(select_sql) %}
|
||||
{% set columns = [] %}
|
||||
{% set sql = get_empty_subquery_sql(select_sql) %}
|
||||
{% set column_schema = adapter.get_column_schema_from_query(sql) %}
|
||||
{% for col in column_schema %}
|
||||
-- api.Column.create includes a step for translating data type
|
||||
-- TODO: could include size, precision, scale here
|
||||
{% set column = api.Column.create(col[0], col[1]) %}
|
||||
{% do columns.append(column) %}
|
||||
{% endfor %}
|
||||
{{ return(columns) }}
|
||||
{% endmacro %}
|
||||
|
||||
-- here for back compat
|
||||
{% macro get_columns_in_query(select_sql) -%}
|
||||
{{ return(adapter.dispatch('get_columns_in_query', 'dbt')(select_sql)) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro default__get_columns_in_query(select_sql) %}
|
||||
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
|
||||
select * from (
|
||||
{{ select_sql }}
|
||||
) as __dbt_sbq
|
||||
where false
|
||||
limit 0
|
||||
{{ get_empty_subquery_sql(select_sql) }}
|
||||
{% endcall %}
|
||||
|
||||
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro alter_column_type(relation, column_name, new_column_type) -%}
|
||||
{{ return(adapter.dispatch('alter_column_type', 'dbt')(relation, column_name, new_column_type)) }}
|
||||
{% endmacro %}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from contextlib import contextmanager
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extensions import string_types
|
||||
|
||||
import dbt.exceptions
|
||||
from dbt.adapters.base import Credentials
|
||||
@@ -190,3 +191,11 @@ class PostgresConnectionManager(SQLConnectionManager):
|
||||
status_messsage_strings = [part for part in status_message_parts if not part.isdigit()]
|
||||
code = " ".join(status_messsage_strings)
|
||||
return AdapterResponse(_message=message, code=code, rows_affected=rows)
|
||||
|
||||
@classmethod
|
||||
def data_type_code_to_name(cls, type_code: int) -> str:
|
||||
return string_types[type_code].name
|
||||
|
||||
# For dbt-snowflake
|
||||
# from snowflake.connector.constants import FIELD_ID_TO_NAME
|
||||
# return FIELD_ID_TO_NAME[type_code]
|
||||
|
||||
Reference in New Issue
Block a user