mirror of
https://github.com/dbt-labs/dbt-snowflake
synced 2025-12-17 19:31:31 +00:00
Implement get_catalog_for_single_relation macro (#1064)
* Allow enabling/disabling stats collection on catalog table using env var * Implement get_relation_metadata macro * Discard changes to dbt/include/snowflake/macros/catalog.sql * Import artifacts from dbt_common * fix dbt_common import * Use get_relation_metadata from dbt-common * Update Fixes-20240516-224134.yaml * use snowflakerelation * add broken test * wip * add test for get_catalog_for_single_relation * wip * hardcode macro name * fix docs_generate test * address pr comments * update changie, add type sigs * Use enum values for comparison * remove mv * Discard changes to dev-requirements.txt * Discard changes to setup.py * Bump pins on dbt-common and dbt-adapters --------- Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com> Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com>
This commit is contained in:
6
.changes/unreleased/Fixes-20240516-224134.yaml
Normal file
6
.changes/unreleased/Fixes-20240516-224134.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
kind: Fixes
|
||||
body: Get catalog metadata for a single relation in the most optimized way using the get_catalog_for_single_relation macro and capability
|
||||
time: 2024-05-16T22:41:34.256095+01:00
|
||||
custom:
|
||||
Author: aranke
|
||||
Issue: "1048"
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -94,3 +94,4 @@ venv/
|
||||
|
||||
# vscode
|
||||
.vscode/
|
||||
.venv/
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Mapping, Any, Optional, List, Union, Dict, FrozenSet, Tuple, TYPE_CHECKING
|
||||
|
||||
|
||||
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
|
||||
from dbt.adapters.base.meta import available
|
||||
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
|
||||
@@ -10,17 +9,27 @@ from dbt.adapters.sql.impl import (
|
||||
LIST_SCHEMAS_MACRO_NAME,
|
||||
LIST_RELATIONS_MACRO_NAME,
|
||||
)
|
||||
|
||||
from dbt.adapters.snowflake import SnowflakeConnectionManager
|
||||
from dbt.adapters.snowflake import SnowflakeRelation
|
||||
from dbt.adapters.snowflake import SnowflakeColumn
|
||||
from dbt_common.contracts.constraints import ConstraintType
|
||||
from dbt_common.contracts.metadata import (
|
||||
TableMetadata,
|
||||
StatsDict,
|
||||
StatsItem,
|
||||
CatalogTable,
|
||||
ColumnMetadata,
|
||||
)
|
||||
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
|
||||
from dbt_common.utils import filter_null_values
|
||||
|
||||
from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
|
||||
from dbt.adapters.snowflake import SnowflakeColumn
|
||||
from dbt.adapters.snowflake import SnowflakeConnectionManager
|
||||
from dbt.adapters.snowflake import SnowflakeRelation
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import agate
|
||||
|
||||
SHOW_OBJECT_METADATA_MACRO_NAME = "snowflake__show_object_metadata"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeConfig(AdapterConfig):
|
||||
@@ -56,6 +65,7 @@ class SnowflakeAdapter(SQLAdapter):
|
||||
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
|
||||
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
|
||||
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
|
||||
Capability.GetCatalogForSingleRelation: CapabilitySupport(support=Support.Full),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -131,6 +141,84 @@ class SnowflakeAdapter(SQLAdapter):
|
||||
else:
|
||||
raise
|
||||
|
||||
def _show_object_metadata(self, relation: SnowflakeRelation) -> Optional[dict]:
|
||||
try:
|
||||
kwargs = {"relation": relation}
|
||||
results = self.execute_macro(SHOW_OBJECT_METADATA_MACRO_NAME, kwargs=kwargs)
|
||||
|
||||
if len(results) == 0:
|
||||
return None
|
||||
|
||||
return results
|
||||
except DbtDatabaseError:
|
||||
return None
|
||||
|
||||
def get_catalog_for_single_relation(
|
||||
self, relation: SnowflakeRelation
|
||||
) -> Optional[CatalogTable]:
|
||||
object_metadata = self._show_object_metadata(relation)
|
||||
|
||||
if not object_metadata:
|
||||
return None
|
||||
|
||||
row = object_metadata[0]
|
||||
|
||||
is_dynamic = row.get("is_dynamic") in ("Y", "YES")
|
||||
kind = row.get("kind")
|
||||
|
||||
if is_dynamic and kind == str(SnowflakeRelationType.Table).upper():
|
||||
table_type = str(SnowflakeRelationType.DynamicTable).upper()
|
||||
else:
|
||||
table_type = kind
|
||||
|
||||
# https://docs.snowflake.com/en/sql-reference/sql/show-views#output
|
||||
# Note: we don't support materialized views in dbt-snowflake
|
||||
is_view = kind == str(SnowflakeRelationType.View).upper()
|
||||
|
||||
table_metadata = TableMetadata(
|
||||
type=table_type,
|
||||
schema=row.get("schema_name"),
|
||||
name=row.get("name"),
|
||||
database=row.get("database_name"),
|
||||
comment=row.get("comment"),
|
||||
owner=row.get("owner"),
|
||||
)
|
||||
|
||||
stats_dict: StatsDict = {
|
||||
"has_stats": StatsItem(
|
||||
id="has_stats",
|
||||
label="Has Stats?",
|
||||
value=True,
|
||||
include=False,
|
||||
description="Indicates whether there are statistics for this table",
|
||||
),
|
||||
"row_count": StatsItem(
|
||||
id="row_count",
|
||||
label="Row Count",
|
||||
value=row.get("rows"),
|
||||
include=(not is_view),
|
||||
description="Number of rows in the table as reported by Snowflake",
|
||||
),
|
||||
"bytes": StatsItem(
|
||||
id="bytes",
|
||||
label="Approximate Size",
|
||||
value=row.get("bytes"),
|
||||
include=(not is_view),
|
||||
description="Size of the table as reported by Snowflake",
|
||||
),
|
||||
}
|
||||
|
||||
catalog_columns = {
|
||||
c.column: ColumnMetadata(type=c.dtype, index=i + 1, name=c.column)
|
||||
for i, c in enumerate(self.get_columns_in_relation(relation))
|
||||
}
|
||||
|
||||
return CatalogTable(
|
||||
metadata=table_metadata,
|
||||
columns=catalog_columns,
|
||||
stats=stats_dict,
|
||||
)
|
||||
|
||||
def list_relations_without_caching(
|
||||
self, schema_relation: SnowflakeRelation
|
||||
) -> List[SnowflakeRelation]:
|
||||
|
||||
@@ -47,6 +47,15 @@
|
||||
{% do return(columns) %}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro snowflake__show_object_metadata(relation) %}
|
||||
{%- set sql -%}
|
||||
show objects like '{{ relation.identifier }}' in {{ relation.include(identifier=False) }} limit 1
|
||||
{%- endset -%}
|
||||
|
||||
{%- set result = run_query(sql) -%}
|
||||
{{ return(result) }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro snowflake__list_schemas(database) -%}
|
||||
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #}
|
||||
{% set maximum = 10000 %}
|
||||
|
||||
4
setup.py
4
setup.py
@@ -57,8 +57,8 @@ setup(
|
||||
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
|
||||
include_package_data=True,
|
||||
install_requires=[
|
||||
"dbt-common>=1.0.4,<2.0",
|
||||
"dbt-adapters>=1.1.1,<2.0",
|
||||
"dbt-common>=1.3.0,<2.0",
|
||||
"dbt-adapters>=1.3.1,<2.0",
|
||||
"snowflake-connector-python[secure-local-storage]~=3.0",
|
||||
# add dbt-core to ensure backwards compatibility of installation, this is not a functional dependency
|
||||
"dbt-core>=1.8.0",
|
||||
|
||||
@@ -5,6 +5,9 @@ from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests
|
||||
from dbt.tests.adapter.basic.test_singular_tests_ephemeral import (
|
||||
BaseSingularTestsEphemeral,
|
||||
)
|
||||
from dbt.tests.adapter.basic.test_get_catalog_for_single_relation import (
|
||||
BaseGetCatalogForSingleRelation,
|
||||
)
|
||||
from dbt.tests.adapter.basic.test_empty import BaseEmpty
|
||||
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
|
||||
from dbt.tests.adapter.basic.test_incremental import BaseIncremental
|
||||
@@ -14,6 +17,9 @@ from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestam
|
||||
from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod
|
||||
from dbt.tests.adapter.basic.test_docs_generate import BaseDocsGenerate
|
||||
from dbt.tests.adapter.basic.expected_catalog import base_expected_catalog, no_stats
|
||||
from dbt_common.contracts.metadata import CatalogTable, TableMetadata, ColumnMetadata, StatsItem
|
||||
|
||||
from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
|
||||
from tests.functional.adapter.expected_stats import snowflake_stats
|
||||
|
||||
|
||||
@@ -25,6 +31,162 @@ class TestSingularTestsSnowflake(BaseSingularTests):
|
||||
pass
|
||||
|
||||
|
||||
class TestGetCatalogForSingleRelationSnowflake(BaseGetCatalogForSingleRelation):
|
||||
@pytest.fixture(scope="class")
|
||||
def current_role(self, project):
|
||||
return project.run_sql("select current_role()", fetch="one")[0]
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def expected_catalog_my_seed(self, project, current_role):
|
||||
return CatalogTable(
|
||||
metadata=TableMetadata(
|
||||
type=SnowflakeRelationType.Table.upper(),
|
||||
schema=project.test_schema.upper(),
|
||||
name="MY_SEED",
|
||||
database=project.database,
|
||||
comment="",
|
||||
owner=current_role,
|
||||
),
|
||||
columns={
|
||||
"ID": ColumnMetadata(type="NUMBER", index=1, name="ID", comment=None),
|
||||
"FIRST_NAME": ColumnMetadata(
|
||||
type="VARCHAR", index=2, name="FIRST_NAME", comment=None
|
||||
),
|
||||
"EMAIL": ColumnMetadata(type="VARCHAR", index=3, name="EMAIL", comment=None),
|
||||
"IP_ADDRESS": ColumnMetadata(
|
||||
type="VARCHAR", index=4, name="IP_ADDRESS", comment=None
|
||||
),
|
||||
"UPDATED_AT": ColumnMetadata(
|
||||
type="TIMESTAMP_NTZ", index=5, name="UPDATED_AT", comment=None
|
||||
),
|
||||
},
|
||||
stats={
|
||||
"has_stats": StatsItem(
|
||||
id="has_stats",
|
||||
label="Has Stats?",
|
||||
value=True,
|
||||
include=False,
|
||||
description="Indicates whether there are statistics for this table",
|
||||
),
|
||||
"row_count": StatsItem(
|
||||
id="row_count",
|
||||
label="Row Count",
|
||||
value=1,
|
||||
include=True,
|
||||
description="Number of rows in the table as reported by Snowflake",
|
||||
),
|
||||
"bytes": StatsItem(
|
||||
id="bytes",
|
||||
label="Approximate Size",
|
||||
value=2048,
|
||||
include=True,
|
||||
description="Size of the table as reported by Snowflake",
|
||||
),
|
||||
},
|
||||
unique_id=None,
|
||||
)
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def expected_catalog_my_view_model(self, project, current_role):
|
||||
return CatalogTable(
|
||||
metadata=TableMetadata(
|
||||
type=SnowflakeRelationType.View.upper(),
|
||||
schema=project.test_schema.upper(),
|
||||
name="MY_VIEW_MODEL",
|
||||
database=project.database,
|
||||
comment="",
|
||||
owner=current_role,
|
||||
),
|
||||
columns={
|
||||
"ID": ColumnMetadata(type="NUMBER", index=1, name="ID", comment=None),
|
||||
"FIRST_NAME": ColumnMetadata(
|
||||
type="VARCHAR", index=2, name="FIRST_NAME", comment=None
|
||||
),
|
||||
"EMAIL": ColumnMetadata(type="VARCHAR", index=3, name="EMAIL", comment=None),
|
||||
"IP_ADDRESS": ColumnMetadata(
|
||||
type="VARCHAR", index=4, name="IP_ADDRESS", comment=None
|
||||
),
|
||||
"UPDATED_AT": ColumnMetadata(
|
||||
type="TIMESTAMP_NTZ", index=5, name="UPDATED_AT", comment=None
|
||||
),
|
||||
},
|
||||
stats={
|
||||
"has_stats": StatsItem(
|
||||
id="has_stats",
|
||||
label="Has Stats?",
|
||||
value=True,
|
||||
include=False,
|
||||
description="Indicates whether there are statistics for this table",
|
||||
),
|
||||
"row_count": StatsItem(
|
||||
id="row_count",
|
||||
label="Row Count",
|
||||
value=0,
|
||||
include=False,
|
||||
description="Number of rows in the table as reported by Snowflake",
|
||||
),
|
||||
"bytes": StatsItem(
|
||||
id="bytes",
|
||||
label="Approximate Size",
|
||||
value=0,
|
||||
include=False,
|
||||
description="Size of the table as reported by Snowflake",
|
||||
),
|
||||
},
|
||||
unique_id=None,
|
||||
)
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def expected_catalog_my_table_model(self, project, current_role):
|
||||
return CatalogTable(
|
||||
metadata=TableMetadata(
|
||||
type=SnowflakeRelationType.Table.upper(),
|
||||
schema=project.test_schema.upper(),
|
||||
name="MY_TABLE_MODEL",
|
||||
database=project.database,
|
||||
comment="",
|
||||
owner=current_role,
|
||||
),
|
||||
columns={
|
||||
"ID": ColumnMetadata(type="NUMBER", index=1, name="ID", comment=None),
|
||||
"FIRST_NAME": ColumnMetadata(
|
||||
type="VARCHAR", index=2, name="FIRST_NAME", comment=None
|
||||
),
|
||||
"EMAIL": ColumnMetadata(type="VARCHAR", index=3, name="EMAIL", comment=None),
|
||||
"IP_ADDRESS": ColumnMetadata(
|
||||
type="VARCHAR", index=4, name="IP_ADDRESS", comment=None
|
||||
),
|
||||
"UPDATED_AT": ColumnMetadata(
|
||||
type="TIMESTAMP_NTZ", index=5, name="UPDATED_AT", comment=None
|
||||
),
|
||||
},
|
||||
stats={
|
||||
"has_stats": StatsItem(
|
||||
id="has_stats",
|
||||
label="Has Stats?",
|
||||
value=True,
|
||||
include=False,
|
||||
description="Indicates whether there are statistics for this table",
|
||||
),
|
||||
"row_count": StatsItem(
|
||||
id="row_count",
|
||||
label="Row Count",
|
||||
value=1,
|
||||
include=True,
|
||||
description="Number of rows in the table as reported by Snowflake",
|
||||
),
|
||||
"bytes": StatsItem(
|
||||
id="bytes",
|
||||
label="Approximate Size",
|
||||
value=2048,
|
||||
include=True,
|
||||
description="Size of the table as reported by Snowflake",
|
||||
),
|
||||
},
|
||||
unique_id=None,
|
||||
)
|
||||
|
||||
|
||||
class TestSingularTestsEphemeralSnowflake(BaseSingularTestsEphemeral):
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from contextlib import contextmanager
|
||||
from unittest import mock
|
||||
|
||||
from dbt.adapters.base import BaseAdapter
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
def adapter_factory():
|
||||
@@ -33,6 +33,9 @@ def adapter_factory():
|
||||
def get_columns_in_relation(self, *args, **kwargs):
|
||||
return self.responder.get_columns_in_relation(*args, **kwargs)
|
||||
|
||||
def get_catalog_for_single_relation(self, *args, **kwargs):
|
||||
return self.responder.get_catalog_for_single_relation(*args, **kwargs)
|
||||
|
||||
def expand_column_types(self, *args, **kwargs):
|
||||
return self.responder.expand_column_types(*args, **kwargs)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user