Files
dlt/tests/cases.py
Menna e2ef7c1ec8 feat/3103: Ensure consistency in HexBytes coercion (#3200)
* Refactor: Replace hexbytes dependency with custom HexBytes implementation

* Removed the hexbytes library and integrated a custom HexBytes class to ensure compatibility with the codebase.
* Updated imports across multiple files to use the new HexBytes class.
* Added tests for the HexBytes class to validate its functionality and ensure proper behavior with various input types.

* Update hexbytes error handling test to reject lists as input type

* Remove TypeError test for unsupported list input in HexBytes error handling

* Refactor: Improve formatting of hex method in HexBytes class for better readability

* Refactor: Clean up comments and improve readability in hex method of HexBytes class

* Refactor: Rename methods in HexBytes class for clarity and consistency

* Updated method names from `to_bytes` to `_to_bytes` and `hexstr_to_bytes` to `_hexstr_to_bytes` to indicate their private nature.
* Adjusted method calls within the class to reflect the new names, enhancing code readability and maintainability.

* * Removed support for bool and int types in HexBytes constructor, streamlining input handling and Introduced a new fromhex method to create HexBytes from hex strings, improving clarity.

* Remove hexbytes dependency from lockfile and related configurations

* Enhance hex method in HexBytes class to support custom separators and bytes per separator. This improves flexibility in hex encoding output while maintaining the existing functionality.

* Refactor hex method in HexBytes class to improve parameter handling and readability. Updated the method signature to clarify the use of custom separators and bytes per separator, ensuring consistent behavior with existing functionality.

* Update hex method in HexBytes class to remove unnecessary noqa comments, enhancing code clarity and consistency.
2025-10-20 22:22:06 +02:00

497 lines
21 KiB
Python

import datetime # noqa: I251
import hashlib
from typing import Dict, List, Any, Sequence, Tuple, Literal, Union
import base64
from copy import deepcopy
from string import ascii_lowercase
import random
import secrets
from dlt.common import Decimal, pendulum, json
from dlt.common.arithmetics import numeric_default_quantize
from dlt.common.data_types import TDataType
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import new_column
from dlt.common.typing import StrAny, TDataItems
from dlt.common.wei import Wei
from dlt.common.time import (
ensure_pendulum_datetime_utc,
ensure_pendulum_datetime_non_utc,
reduce_pendulum_datetime_precision,
ensure_pendulum_time,
ensure_pendulum_date,
)
from dlt.common.libs.hexbytes import HexBytes
from dlt.common.schema import TColumnSchema, TTableSchemaColumns
from tests.utils import TPythonTableFormat, TestDataItemFormat, arrow_item_from_pandas
# _UUID = "c8209ee7-ee95-4b90-8c9f-f7a0f8b51014"
JSON_TYPED_DICT: StrAny = {
"str": "string",
"decimal": Decimal("21.37"),
"big_decimal": Decimal(
"115792089237316195423570985008687907853269984665640564039457584007913129639935.1"
),
"datetime": pendulum.parse("2005-04-02T20:37:37.358236Z"),
"date": ensure_pendulum_date("2022-02-02"),
# "uuid": UUID(_UUID),
"hexbytes": HexBytes("0x2137"),
"bytes": b"2137",
"wei": Wei.from_int256(2137, decimals=2),
"time": ensure_pendulum_time("20:37:37.358236"),
}
# TODO: a version after PUA decoder (time is not yet implemented end to end)
JSON_TYPED_DICT_DECODED = dict(JSON_TYPED_DICT)
JSON_TYPED_DICT_TYPES: Dict[str, TDataType] = {
"str": "text",
"decimal": "decimal",
"big_decimal": "decimal",
"datetime": "timestamp",
"date": "date",
# "uuid": "text",
"hexbytes": "binary",
"bytes": "binary",
"wei": "wei",
"time": "time",
}
JSON_TYPED_DICT_NESTED = {
"dict": dict(JSON_TYPED_DICT),
"list_dicts": [dict(JSON_TYPED_DICT), dict(JSON_TYPED_DICT)],
"list": list(JSON_TYPED_DICT.values()),
**JSON_TYPED_DICT,
}
JSON_TYPED_DICT_NESTED_DECODED = {
"dict": dict(JSON_TYPED_DICT_DECODED),
"list_dicts": [dict(JSON_TYPED_DICT_DECODED), dict(JSON_TYPED_DICT_DECODED)],
"list": list(JSON_TYPED_DICT_DECODED.values()),
**JSON_TYPED_DICT_DECODED,
}
TABLE_UPDATE: List[TColumnSchema] = [
{"name": "col1", "data_type": "bigint", "nullable": False},
{"name": "col2", "data_type": "double", "nullable": False},
{"name": "col3", "data_type": "bool", "nullable": False},
{"name": "col4", "data_type": "timestamp", "nullable": False},
{"name": "col5", "data_type": "text", "nullable": False},
{"name": "col6", "data_type": "decimal", "nullable": False},
{"name": "col7", "data_type": "binary", "nullable": False},
{"name": "col8", "data_type": "wei", "nullable": False},
{"name": "col9", "data_type": "json", "nullable": False, "variant": True},
{"name": "col10", "data_type": "date", "nullable": False},
{"name": "col11", "data_type": "time", "nullable": False},
{"name": "col1_null", "data_type": "bigint", "nullable": True},
{"name": "col2_null", "data_type": "double", "nullable": True},
{"name": "col3_null", "data_type": "bool", "nullable": True},
{"name": "col4_null", "data_type": "timestamp", "nullable": True},
{"name": "col5_null", "data_type": "text", "nullable": True},
{"name": "col6_null", "data_type": "decimal", "nullable": True},
{"name": "col7_null", "data_type": "binary", "nullable": True},
{"name": "col8_null", "data_type": "wei", "nullable": True},
{"name": "col9_null", "data_type": "json", "nullable": True, "variant": True},
{"name": "col10_null", "data_type": "date", "nullable": True},
{"name": "col11_null", "data_type": "time", "nullable": True},
{"name": "col1_precision", "data_type": "bigint", "precision": 16, "nullable": False},
{"name": "col4_precision", "data_type": "timestamp", "precision": 3, "nullable": False},
{"name": "col5_precision", "data_type": "text", "precision": 25, "nullable": False},
{
"name": "col6_precision",
"data_type": "decimal",
"precision": 6,
"scale": 2,
"nullable": False,
},
{"name": "col7_precision", "data_type": "binary", "precision": 19, "nullable": False},
{"name": "col11_precision", "data_type": "time", "precision": 3, "nullable": False},
{"name": "col12", "data_type": "timestamp", "timezone": False, "nullable": False},
]
TABLE_UPDATE_COLUMNS_SCHEMA: TTableSchemaColumns = {c["name"]: c for c in TABLE_UPDATE}
TABLE_ROW_ALL_DATA_TYPES = {
"col1": 989127831,
"col2": 898912.821982,
"col3": True,
"col4": "2022-05-23T13:26:45.176451+00:00",
"col5": "string data \n \r 🦆",
"col6": Decimal("2323.34"),
"col7": b"binary data \n \r ",
"col8": 2**56 + 92093890840,
"col9": {
"nested": [1, 2, 3, "a"],
"link": (
"?commen\ntU\nrn=urn%3Ali%3Acomment%3A%28acti\012 \6"
" \\vity%3A69'08444473\n\n551163392%2C6n \r 9085"
),
},
"col10": "2023-02-27",
"col11": "13:26:45.176451",
"col1_null": None,
"col2_null": None,
"col3_null": None,
"col4_null": None,
"col5_null": None,
"col6_null": None,
"col7_null": None,
"col8_null": None,
"col9_null": None,
"col10_null": None,
"col11_null": None,
"col1_precision": 22324,
"col4_precision": "2022-05-23T13:26:46.167+00:00",
"col5_precision": "string data 2 \n \r 🦆",
"col6_precision": Decimal("2323.34"),
"col7_precision": b"binary data 2 \n \r A",
"col11_precision": "13:26:45.176451",
"col12": "2299-12-31 00:00:01.000",
}
TABLE_ROW_ALL_DATA_TYPES_DATETIMES = deepcopy(TABLE_ROW_ALL_DATA_TYPES)
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col4"] = ensure_pendulum_datetime_utc(TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col4"]) # type: ignore[arg-type]
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col10"] = ensure_pendulum_date(TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col10"]) # type: ignore[arg-type]
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col11"] = pendulum.Time.fromisoformat(TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col11"]) # type: ignore[arg-type]
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col4_precision"] = ensure_pendulum_datetime_utc(TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col4_precision"]) # type: ignore[arg-type]
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col11_precision"] = pendulum.Time.fromisoformat(TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col11_precision"]) # type: ignore[arg-type]
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col12"] = ensure_pendulum_datetime_non_utc(
TABLE_ROW_ALL_DATA_TYPES_DATETIMES["col12"] # type: ignore[arg-type]
)
TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS = [
new_column("col1_ts", "timestamp", precision=0),
new_column("col2_ts", "timestamp", precision=3),
new_column("col3_ts", "timestamp", precision=6),
new_column("col4_ts", "timestamp", precision=9),
]
TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS_COLUMNS: TTableSchemaColumns = {
c["name"]: c for c in TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS
}
TABLE_UPDATE_ALL_INT_PRECISIONS = [
new_column("col1_int", "bigint", precision=8),
new_column("col2_int", "bigint", precision=16),
new_column("col3_int", "bigint", precision=32),
new_column("col4_int", "bigint", precision=64),
new_column("col5_int", "bigint", precision=128),
]
TABLE_UPDATE_ALL_INT_PRECISIONS_COLUMNS: TTableSchemaColumns = {
c["name"]: c for c in TABLE_UPDATE_ALL_INT_PRECISIONS
}
def table_update_and_row(
exclude_types: Sequence[TDataType] = None, exclude_columns: Sequence[str] = None
) -> Tuple[TTableSchemaColumns, Dict[str, Any]]:
"""Get a table schema and a row with all possible data types.
Optionally exclude some data types from the schema and row.
"""
column_schemas = deepcopy(TABLE_UPDATE_COLUMNS_SCHEMA)
data_row = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES)
exclude_col_names = list(exclude_columns or [])
if exclude_types:
exclude_col_names.extend(
[key for key, value in column_schemas.items() if value["data_type"] in exclude_types]
)
for col_name in set(exclude_col_names):
del column_schemas[col_name]
del data_row[col_name]
return column_schemas, data_row
def assert_all_data_types_row(
caps: DestinationCapabilitiesContext,
db_row: Union[List[Any], TDataItems],
expected_row: Dict[str, Any] = None,
parse_json_strings: bool = False,
allow_base64_binary: bool = False,
schema: TTableSchemaColumns = None,
expect_filtered_null_columns=False,
allow_string_binary: bool = False,
) -> None:
schema = schema or TABLE_UPDATE_COLUMNS_SCHEMA
expected_row = expected_row or TABLE_ROW_ALL_DATA_TYPES_DATETIMES
# Include only columns requested in schema
if isinstance(db_row, dict):
db_mapping = db_row.copy()
else:
db_mapping = {col_name: db_row[i] for i, col_name in enumerate(schema)}
# TODO: with so many exceptions, test would be more readable if they are made by
# destination type
expected_rows = {key: value for key, value in expected_row.items() if key in schema}
# prepare date to be compared: convert into pendulum instance, adjust microsecond precision
if "col4" in expected_rows:
parsed_date = ensure_pendulum_datetime_utc((db_mapping["col4"]))
db_mapping["col4"] = reduce_pendulum_datetime_precision(
parsed_date, caps.timestamp_precision
)
expected_rows["col4"] = reduce_pendulum_datetime_precision(
ensure_pendulum_datetime_utc(expected_rows["col4"]), # type: ignore[arg-type]
caps.timestamp_precision,
)
if "col12" in expected_rows:
parsed_date = ensure_pendulum_datetime_non_utc((db_mapping["col12"]))
db_mapping["col12"] = reduce_pendulum_datetime_precision(
parsed_date, caps.timestamp_precision
)
expected_rows["col12"] = reduce_pendulum_datetime_precision(
ensure_pendulum_datetime_non_utc(expected_rows["col12"]), # type: ignore[arg-type]
caps.timestamp_precision,
)
if "col4_precision" in expected_rows:
parsed_date = ensure_pendulum_datetime_utc((db_mapping["col4_precision"]))
db_mapping["col4_precision"] = reduce_pendulum_datetime_precision(parsed_date, 3)
expected_rows["col4_precision"] = reduce_pendulum_datetime_precision(
ensure_pendulum_datetime_utc(expected_rows["col4_precision"]), 3 # type: ignore[arg-type]
)
# sqlalchemy sends floats not decimals
if "col6" in expected_rows and isinstance(db_mapping["col6"], (str, float)):
db_mapping["col6"] = numeric_default_quantize(Decimal(db_mapping["col6"]))
if "col6_precision" in expected_rows and isinstance(db_mapping["col6_precision"], (str, float)):
db_mapping["col6_precision"] = numeric_default_quantize(
Decimal(db_mapping["col6_precision"])
)
if "col10" in expected_rows:
db_mapping["col10"] = ensure_pendulum_date(db_mapping["col10"])
if "col11" in expected_rows:
expected_rows["col11"] = reduce_pendulum_datetime_precision(
ensure_pendulum_time(expected_rows["col11"]), caps.timestamp_precision # type: ignore[arg-type]
).isoformat()
if "col11_precision" in expected_rows:
parsed_time = ensure_pendulum_time(db_mapping["col11_precision"])
db_mapping["col11_precision"] = reduce_pendulum_datetime_precision(parsed_time, 3)
expected_rows["col11_precision"] = reduce_pendulum_datetime_precision(
ensure_pendulum_time(expected_rows["col11_precision"]), 3 # type: ignore[arg-type]
)
# redshift and bigquery return strings from structured fields
for binary_col in ["col7", "col7_precision"]:
if binary_col in db_mapping:
if isinstance(db_mapping[binary_col], str):
try:
db_mapping[binary_col] = bytes.fromhex(
db_mapping[binary_col]
) # redshift returns binary as hex string
except ValueError:
if allow_string_binary:
db_mapping[binary_col] = db_mapping[binary_col].encode("utf-8")
elif allow_base64_binary:
db_mapping[binary_col] = base64.b64decode(
db_mapping[binary_col], validate=True
)
else:
raise
else:
db_mapping[binary_col] = bytes(db_mapping[binary_col])
# `delta` table format stores `wei` type as string
# reason: decimal256 not supported so we convert to string
if "col8" in db_mapping:
if isinstance(db_mapping["col8"], str):
db_mapping["col8"] = int(db_mapping["col8"])
if abs(db_mapping["col8"] - expected_row["col8"]) < 1000:
# loss of precision on wei: when writing or reading
db_mapping["col8"] = expected_row["col8"]
# redshift and bigquery return strings from structured fields
if "col9" in db_mapping:
if isinstance(db_mapping["col9"], str):
# then it must be json
db_mapping["col9"] = json.loads(db_mapping["col9"])
# parse again
if parse_json_strings and isinstance(db_mapping["col9"], str):
# then it must be json
db_mapping["col9"] = json.loads(db_mapping["col9"])
# if "col10" in db_mapping:
# db_mapping["col10"] = db_mapping["col10"].isoformat()
if "col11" in db_mapping:
db_mapping["col11"] = ensure_pendulum_time(db_mapping["col11"]).isoformat()
if "col12" in db_mapping:
# sqlite returns datetime as str
if isinstance(db_mapping["col12"], str):
db_mapping["col12"] = datetime.datetime.fromisoformat(db_mapping["col12"])
# some destinations do not allow or do not implement naive date times
# in that case assume that naive datetime was stored as UTC
if db_mapping["col12"].tzinfo is not None:
assert (
not caps.supports_naive_datetime
), "destination supports naive datetime, got tz-aware datetime on naive column"
db_mapping["col12"] = db_mapping["col12"].replace(tzinfo=None)
else:
assert caps.supports_naive_datetime, (
"destination supports does not support naive datetime, but got naive datetime on"
" naive column"
)
if expect_filtered_null_columns:
for key, expected in expected_rows.items():
if expected is None:
assert db_mapping.get(key, None) is None
db_mapping[key] = None
for key, expected in expected_rows.items():
actual = db_mapping[key]
# print(f"Expected {expected} but got {actual} for column {key}")
assert expected == actual, f"Expected {expected} but got {actual} for column {key}"
assert db_mapping == expected_rows
def arrow_table_all_data_types(
object_format: TestDataItemFormat,
include_json: bool = True,
include_time: bool = True,
include_binary: bool = True,
include_decimal: bool = True,
include_decimal_default_precision: bool = False,
include_decimal_arrow_max_precision: bool = False,
include_decimal_high_precision: bool = False,
include_date: bool = True,
include_not_normalized_name: bool = True,
include_name_clash: bool = False,
include_null: bool = True,
num_rows: int = 3,
tz="UTC",
) -> Tuple[Any, List[Dict[str, Any]], Dict[str, List[Any]]]:
"""Create an arrow object or pandas dataframe with all supported data types.
Returns the table and its records in python format
"""
import pandas as pd
import numpy as np
data = {
"string": [secrets.token_urlsafe(8) + "\"'\\🦆\n\r" for _ in range(num_rows)],
"float": [round(random.uniform(0, 100), 4) for _ in range(num_rows)],
"int": [random.randrange(0, 100) for _ in range(num_rows)],
"datetime": pd.date_range("2021-01-01T01:02:03.1234", periods=num_rows, tz=tz, unit="us"),
"bool": [random.choice([True, False]) for _ in range(num_rows)],
"string_null": [random.choice(ascii_lowercase) for _ in range(num_rows - 1)] + [None],
"float_null": [round(random.uniform(0, 100), 4) for _ in range(num_rows - 1)] + [
None
], # decrease precision
}
if include_null:
data["null"] = pd.Series([None for _ in range(num_rows)])
if include_name_clash:
data["pre Normalized Column"] = [random.choice(ascii_lowercase) for _ in range(num_rows)]
include_not_normalized_name = True
if include_not_normalized_name:
data["Pre Normalized Column"] = [random.choice(ascii_lowercase) for _ in range(num_rows)]
if include_json:
data["json"] = [{"a": random.randrange(0, 100)} for _ in range(num_rows)]
if include_time:
# data["time"] = pd.date_range("2021-01-01", periods=num_rows, tz="UTC").time
# data["time"] = pd.date_range("2021-01-01T01:02:03.1234", periods=num_rows, tz=tz, unit="us").time
# random time objects with different hours/minutes/seconds/microseconds
data["time"] = [
datetime.time(
random.randrange(0, 24),
random.randrange(0, 60),
random.randrange(0, 60),
random.randrange(0, 1000000),
)
for _ in range(num_rows)
]
if include_binary:
# "binary": [hashlib.sha3_256(random.choice(ascii_lowercase).encode()).digest() for _ in range(num_rows)],
data["binary"] = [random.choice(ascii_lowercase).encode() for _ in range(num_rows)]
if include_decimal:
data["decimal"] = [Decimal(str(round(random.uniform(0, 100), 4))) for _ in range(num_rows)]
if include_decimal_default_precision:
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION
data["decimal_default_precision"] = [
Decimal(int("1" * DEFAULT_NUMERIC_PRECISION)) for _ in range(num_rows)
]
if include_decimal_arrow_max_precision:
from dlt.common.libs.pyarrow import ARROW_DECIMAL_MAX_PRECISION
data["decimal_arrow_max_precision"] = [
Decimal(int("1" * ARROW_DECIMAL_MAX_PRECISION)) for _ in range(num_rows)
]
if include_decimal_high_precision:
# Create high precision decimal with native decimal type (38,18)
decimal_data = [Decimal("9" * 20 + "." + "9" * 18) for _ in range(num_rows)]
data["high_decimal_precision"] = decimal_data
if include_date:
data["date"] = pd.date_range("2021-01-01", periods=num_rows, tz=tz).date
df = pd.DataFrame(data)
# None integers/floats are converted to nan, also replaces floats with objects and loses precision
df = df.replace(np.nan, None)
# records have normalized identifiers for comparing
rows = (
df.rename(
columns={
"Pre Normalized Column": "pre_normalized_column",
}
)
.drop(columns=(["null"] if include_null else []))
.to_dict("records")
)
if object_format == "object":
return rows, rows, data
else:
return arrow_item_from_pandas(df, object_format), rows, data
def remove_column_from_data(object_format: TestDataItemFormat, data: Any, column_name: str) -> Any:
"""drop the column form arrow_table pandas or object data"""
if object_format == "arrow-table":
return data.drop([column_name])
elif object_format == "pandas":
return data.drop(columns=[column_name])
elif object_format == "object":
# actually is a list of dicts
return [{k: v for k, v in row.items() if k != column_name} for row in data]
else:
raise ValueError(f"not supported: {object_format}")
def prepare_shuffled_tables() -> Tuple[Any, Any, Any]:
from dlt.common.libs.pyarrow import remove_columns
from dlt.common.libs.pyarrow import pyarrow as pa
table, _, _ = arrow_table_all_data_types(
"arrow-table",
include_json=False,
include_not_normalized_name=False,
tz="Europe/Berlin",
num_rows=5432,
)
# remove null column from table (it will be removed in extract)
table = remove_columns(table, "null")
# shuffled_columns = table.schema.names
shuffled_indexes = list(range(len(table.schema.names)))
random.shuffle(shuffled_indexes)
shuffled_table = pa.Table.from_arrays(
[table.column(idx) for idx in shuffled_indexes],
schema=pa.schema([table.schema.field(idx) for idx in shuffled_indexes]),
)
shuffled_removed_column = remove_columns(shuffled_table, ["binary"])
assert shuffled_table.schema.names != table.schema.names
return table, shuffled_table, shuffled_removed_column