mirror of
https://github.com/dlt-hub/dlt.git
synced 2025-12-17 19:31:30 +00:00
Implement bigquery column descriptions
This commit is contained in:
@@ -25,6 +25,7 @@ from dlt.common.schema.utils import get_inherited_table_hint, get_columns_names_
|
||||
from dlt.common.storages.load_package import destination_state
|
||||
from dlt.common.storages.load_storage import ParsedLoadJobFileName
|
||||
from dlt.common.typing import DictStrAny
|
||||
from dlt.common.data_writers.escape import escape_bigquery_literal
|
||||
from dlt.destinations.exceptions import (
|
||||
DatabaseTransientException,
|
||||
DatabaseUndefinedRelation,
|
||||
@@ -465,10 +466,22 @@ SELECT {",".join(self._get_storage_table_query_columns())}
|
||||
|
||||
def _get_column_def_sql(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
|
||||
column_def_sql = super()._get_column_def_sql(column, table)
|
||||
|
||||
# generate additional column options clause
|
||||
# see: https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#alter_column_set_options_statement
|
||||
options = []
|
||||
if column.get(ROUND_HALF_EVEN_HINT, False):
|
||||
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
|
||||
options.append("rounding_mode='ROUND_HALF_EVEN'")
|
||||
if column.get(ROUND_HALF_AWAY_FROM_ZERO_HINT, False):
|
||||
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_AWAY_FROM_ZERO')"
|
||||
options.append("rounding_mode='ROUND_HALF_AWAY_FROM_ZERO'")
|
||||
if column.get("description", False):
|
||||
escaped_description = escape_bigquery_literal(column.get("description"))
|
||||
options.append(f"description={escaped_description}")
|
||||
|
||||
if options:
|
||||
option_arguments = ", ".join(options)
|
||||
option_str = f" OPTIONS ({option_arguments})"
|
||||
column_def_sql += option_str
|
||||
return column_def_sql
|
||||
|
||||
def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigquery.LoadJob:
|
||||
|
||||
@@ -534,6 +534,68 @@ def test_bigquery_no_partition_by_integer(
|
||||
assert not has_partitions
|
||||
|
||||
|
||||
def test_bigquery_column_description(gcp_client: BigQueryClient) -> None:
|
||||
columns = deepcopy(TABLE_UPDATE)
|
||||
for column in columns:
|
||||
column["description"] = f"This is a test description for column {column['name']}"
|
||||
|
||||
sql = gcp_client._get_table_update_sql("event_test_table", columns, False)[0]
|
||||
sqlfluff.parse(sql, dialect="bigquery")
|
||||
assert sql.startswith("ALTER TABLE")
|
||||
assert "event_test_table" in sql
|
||||
for column in columns:
|
||||
assert (
|
||||
f"OPTIONS (description='This is a test description for column {column['name']}')" in sql
|
||||
)
|
||||
|
||||
|
||||
def test_bigquery_column_description_character_escaping(gcp_client: BigQueryClient) -> None:
|
||||
columns = deepcopy(TABLE_UPDATE)
|
||||
for column in columns:
|
||||
column["description"] = "'';) DROP TABLE --"
|
||||
|
||||
sql = gcp_client._get_table_update_sql("event_test_table", columns, False)[0]
|
||||
sqlfluff.parse(sql, dialect="bigquery")
|
||||
assert sql.startswith("ALTER TABLE")
|
||||
assert "event_test_table" in sql
|
||||
assert r"OPTIONS (description='\'\';) DROP TABLE --')" in sql
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"destination_config",
|
||||
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
|
||||
ids=lambda x: x.name,
|
||||
)
|
||||
def test_bigquery_with_column_description_and_rounding_mode_hints(
|
||||
destination_config: DestinationTestConfiguration,
|
||||
) -> None:
|
||||
@dlt.resource(
|
||||
columns=[
|
||||
{
|
||||
"name": "col1",
|
||||
"data_type": "bigint",
|
||||
"description": "This is a test description for column col1",
|
||||
}
|
||||
]
|
||||
)
|
||||
def some_data() -> Iterator[Dict[str, int]]:
|
||||
yield from [{"col1": i} for i in range(10)]
|
||||
|
||||
bigquery_adapter(some_data, round_half_away_from_zero="col1")
|
||||
|
||||
pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True)
|
||||
pipeline.run(some_data())
|
||||
with pipeline.sql_client() as c:
|
||||
with c.execute_query(
|
||||
"SELECT * FROM INFORMATION_SCHEMA.COLUMN_FIELD_PATHS WHERE table_name = 'some_data' AND"
|
||||
" column_name = 'col1'"
|
||||
) as cur:
|
||||
column_info = cur.fetchone()
|
||||
assert column_info is not None
|
||||
assert column_info["description"] == "This is a test description for column col1" # type: ignore[call-overload]
|
||||
assert column_info["rounding_mode"] == "ROUND_HALF_AWAY_FROM_ZERO" # type: ignore[call-overload]
|
||||
|
||||
|
||||
def test_adapter_no_hints_parsing() -> None:
|
||||
@dlt.resource(columns=[{"name": "int_col", "data_type": "bigint"}])
|
||||
def some_data() -> Iterator[Dict[str, str]]:
|
||||
|
||||
Reference in New Issue
Block a user