feat: Schema.to_mermaid() (#3364)

* Add dlt.Schema.to_mermaid() method

---------

Co-authored-by: jayant <jayant746@gmail.com>
This commit is contained in:
Thierry Jean
2025-11-24 22:31:59 -05:00
committed by GitHub
parent 661c6c1ada
commit 382eb6bab7
9 changed files with 777 additions and 6 deletions

View File

@@ -378,6 +378,8 @@ def pipeline_command(
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
elif format_ == "mermaid":
schema_str = s.to_mermaid()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults_)

View File

@@ -489,6 +489,8 @@ The `dlt schema` command will load, validate and print out a dlt schema: `dlt sc
schema_str = s.to_dbml()
elif format_ == "dot":
schema_str = s.to_dot()
elif format == "mermaid":
schema_str = s.to_mermaid()
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)

View File

@@ -795,6 +795,42 @@ class Schema:
)
return dot
def to_mermaid(
self,
remove_processing_hints: bool = False,
hide_columns: bool = False,
hide_descriptions: bool = False,
include_dlt_tables: bool = True,
) -> str:
"""Convert schema to a Mermaid diagram string.
Args:
remove_processing_hints: If True, remove hints used for data processing and redundant information.
This reduces the size of the schema and improves readability.
hide_columns: If True, the diagram hides columns details. This helps readability of large diagrams.
hide_descriptions: If True, hide the column descriptions
include_dlt_tables: If `True` (the default), internal dlt tables (`_dlt_version`,
`_dlt_loads`, `_dlt_pipeline_state`)
Returns:
A string containing a Mermaid ERdiagram of the schema.
"""
from dlt.helpers.mermaid import schema_to_mermaid
stored_schema = self.to_dict(
# setting this to `True` removes `name` fields that are used in `schema_to_dbml()`
# if required, we can refactor `dlt.helpers.dbml` to support this
remove_defaults=False,
remove_processing_hints=remove_processing_hints,
)
return schema_to_mermaid(
stored_schema,
references=self.references,
hide_columns=hide_columns,
hide_descriptions=hide_descriptions,
include_dlt_tables=include_dlt_tables,
)
def clone(
self,
with_name: str = None,

View File

@@ -22,7 +22,7 @@ from dlt.common.typing import Annotated, DictStrAny, DictStrOptionalStr, get_arg
from dlt.common.utils import digest128
TSchemaFileFormat = Literal["json", "yaml", "dbml", "dot"]
TSchemaFileFormat = Literal["json", "yaml", "dbml", "dot", "mermaid"]
SCHEMA_FILES_EXTENSIONS = get_args(TSchemaFileFormat)

View File

@@ -284,6 +284,8 @@ class SchemaStorage(Mapping[str, Schema]):
raise ValueError(extension, "Schema parser for `dbml` not yet implemented")
elif extension == "dot":
raise ValueError(extension, "Schema parser for `dot` not yet implemented")
elif extension == "mermaid":
raise ValueError(extension, "Schema parser for `mermaid` not yet implemented")
else:
raise ValueError(extension)
return imported_schema

125
dlt/helpers/mermaid.py Normal file
View File

@@ -0,0 +1,125 @@
"""Build a mermaid graph representation using raw strings without additional dependencies"""
from enum import Enum
from dlt.common.schema.typing import (
TColumnSchema,
TReferenceCardinality,
TStoredSchema,
TTableReferenceStandalone,
TTableSchema,
)
INDENT = " "
def schema_to_mermaid(
schema: TStoredSchema,
*,
references: list[TTableReferenceStandalone],
hide_columns: bool = False,
hide_descriptions: bool = False,
include_dlt_tables: bool = True,
) -> str:
mermaid_er_diagram = "erDiagram\n"
for table_name, table_schema in schema["tables"].items():
if not include_dlt_tables and table_name.startswith("_dlt"):
continue
mermaid_er_diagram += INDENT + _to_mermaid_table(
table_schema,
hide_columns=hide_columns,
hide_descriptions=hide_descriptions,
)
for ref in references:
if not include_dlt_tables:
if ref["table"].startswith("_dlt") or ref["referenced_table"].startswith("_dlt"):
continue
mermaid_er_diagram += INDENT + _to_mermaid_reference(ref)
return mermaid_er_diagram
def _to_mermaid_table(
table: TTableSchema, hide_columns: bool = False, hide_descriptions: bool = False
) -> str:
mermaid_table: str = table["name"]
mermaid_table += "{\n"
if hide_columns is False:
for column in table["columns"].values():
mermaid_table += INDENT + _to_mermaid_column(
column,
hide_descriptions=hide_descriptions,
)
mermaid_table += "}\n"
return mermaid_table
# TODO add scale & precision to `data_type`
def _to_mermaid_column(column: TColumnSchema, hide_descriptions: bool = False) -> str:
mermaid_col = column["data_type"] + " " + column["name"]
keys = []
if column.get("primary_key"):
keys.append("PK")
if column.get("unique"):
keys.append("UK")
if keys:
mermaid_col += " " + ",".join(keys)
if hide_descriptions is False:
if description := column.get("description"):
mermaid_col += f' "{description}"'
mermaid_col += "\n"
return mermaid_col
class TMermaidArrows(str, Enum):
ONE_TO_MANY = "||--|{"
MANY_TO_ONE = "}|--||"
ZERO_TO_MANY = "|o--|{"
MANY_TO_ZERO = "}|--o|"
ONE_TO_MORE = "||--o{"
MORE_TO_ONE = "}o--||"
ONE_TO_ONE = "||--||"
MANY_TO_MANY = "}|--|{"
ZERO_TO_ONE = "|o--o|"
_CARDINALITY_ARROW: dict[TReferenceCardinality, TMermaidArrows] = {
"one_to_many": TMermaidArrows.ONE_TO_MANY,
"many_to_one": TMermaidArrows.MANY_TO_ONE,
"zero_to_many": TMermaidArrows.ZERO_TO_MANY,
"many_to_zero": TMermaidArrows.MANY_TO_ZERO,
"one_to_one": TMermaidArrows.ONE_TO_ONE,
"many_to_many": TMermaidArrows.MANY_TO_MANY,
"zero_to_one": TMermaidArrows.ZERO_TO_ONE,
"one_to_zero": TMermaidArrows.ZERO_TO_ONE,
}
def _to_mermaid_reference(ref: TTableReferenceStandalone) -> str:
"""Builds references in the following format using cardinality and label to describe
the relationship
<left-entity> [<relationship> <right-entity> : <relationship-label>]
"""
left_table = ref.get("table")
right_table = ref.get("referenced_table")
cardinality = ref.get("cardinality", "one_to_many")
label = ref.get("label", '""')
arrow: str = _CARDINALITY_ARROW.get(cardinality).value
mermaid_reference = f"{left_table} {arrow} {right_table}"
if label:
mermaid_reference += f" : {label}"
mermaid_reference += "\n"
return mermaid_reference

View File

@@ -837,7 +837,7 @@ TableGroup "_dlt" {
## Export to Graphviz
[Graphviz](https://www.graphviz.org/) is an open soruce graph visualization engine which uses the [DOT language](https://graphviz.org/doc/info/lang.html). dlt allows you to export your `dlt.Schema` as DOT string, which can be rendered using the Python `graphviz` library, lightweight JS libraries (e.g., [d3-graphviz](https://github.com/magjac/d3-graphviz)), or IDE extensions.
[Graphviz](https://www.graphviz.org/) is an open source graph visualization engine which uses the [DOT language](https://graphviz.org/doc/info/lang.html). dlt allows you to export your `dlt.Schema` as DOT string, which can be rendered using the Python `graphviz` library, lightweight JS libraries (e.g., [d3-graphviz](https://github.com/magjac/d3-graphviz)), or IDE extensions.
Note that the conversion is lossy. You can't fully recreate `dlt.Schema` from a DOT string.
@@ -1278,3 +1278,74 @@ _dlt_version:f4:_ -> _dlt_loads:f2:_ [dir=both, penwidth=1, color="#1c1c34", arr
</details>
![graphviz dot render](https://storage.googleapis.com/dlt-blog-images/schema_dot_export.svg)
## Export to Mermaid
[Mermaid](https://www.mermaidchart.com/) is a widely-supported diagramming language. dlt allows you to export your `dlt.Schema` as Mermaid string. This can natively rendered by many tools (GitHub markdown, Notion, marimo notebooks).
Note that the conversion is lossy. You can't fully recreate `dlt.Schema` from a Mermaid string.
```py
schema_mermaid = pipeline.default_schema.to_mermaid()
```
```sh
# `chess_pipeline` is the name of the pipeline
dlt pipeline chess_pipeline schema --format mermaid
```
<details>
<summary>See Mermaid</summary>
```mermaid
erDiagram
_dlt_version{
bigint version
bigint engine_version
timestamp inserted_at
text schema_name
text version_hash
text schema
}
_dlt_loads{
text load_id
text schema_name
bigint status
timestamp inserted_at
text schema_version_hash
}
customers{
bigint id PK
text name
text city
text _dlt_load_id
text _dlt_id UK
}
purchases{
bigint id PK
bigint customer_id
bigint inventory_id
bigint quantity
text date
text _dlt_load_id
text _dlt_id UK
}
_dlt_pipeline_state{
bigint version
bigint engine_version
text pipeline_name
text state
timestamp created_at
text version_hash
text _dlt_load_id
text _dlt_id UK
}
customers }|--|| _dlt_loads : _dlt_load
purchases }|--|| _dlt_loads : _dlt_load
purchases ||--|{ customers : ""
_dlt_pipeline_state }|--|| _dlt_loads : _dlt_load
```
</details>
![mermaid render](https://storage.googleapis.com/dlt-blog-images/schema_mermaid_export.png)

View File

@@ -89,7 +89,7 @@ Shows, converts and upgrades schemas.
**Usage**
```sh
dlt schema [-h] [--format {json,yaml,dbml,dot}] [--remove-defaults] file
dlt schema [-h] [--format {json,yaml,dbml,dot,mermaid}] [--remove-defaults] file
```
**Description**
@@ -107,7 +107,7 @@ Inherits arguments from [`dlt`](#dlt).
**Options**
* `-h, --help` - Show this help message and exit
* `--format {json,yaml,dbml,dot}` - Display schema in this format
* `--format {json,yaml,dbml,dot,mermaid}` - Display schema in this format
* `--remove-defaults` - Does not show default hint values
</details>
@@ -334,7 +334,7 @@ Displays default schema.
**Usage**
```sh
dlt pipeline [pipeline_name] schema [-h] [--format {json,yaml,dbml,dot}]
dlt pipeline [pipeline_name] schema [-h] [--format {json,yaml,dbml,dot,mermaid}]
[--remove-defaults]
```
@@ -350,7 +350,7 @@ Inherits arguments from [`dlt pipeline`](#dlt-pipeline).
**Options**
* `-h, --help` - Show this help message and exit
* `--format {json,yaml,dbml,dot}` - Display schema in this format
* `--format {json,yaml,dbml,dot,mermaid}` - Display schema in this format
* `--remove-defaults` - Does not show default hint values
</details>

View File

@@ -0,0 +1,533 @@
import pytest
import dlt
from dlt.common.schema.typing import TColumnSchema, TTableReferenceStandalone, TTableSchema
from dlt.helpers.mermaid import (
schema_to_mermaid,
_to_mermaid_column,
_to_mermaid_reference,
_to_mermaid_table,
)
@pytest.fixture
def example_schema() -> dlt.Schema:
return dlt.Schema.from_dict(
{
"version": 2,
"version_hash": "iW0MtTw8NXm1r/amMiYpOF63Of44Mx5VfYOh5DM6/7s=",
"engine_version": 11,
"name": "fruit_with_ref",
"tables": {
"_dlt_version": {
"name": "_dlt_version",
"columns": {
"version": {"name": "version", "data_type": "bigint", "nullable": False},
"engine_version": {
"name": "engine_version",
"data_type": "bigint",
"nullable": False,
},
"inserted_at": {
"name": "inserted_at",
"data_type": "timestamp",
"nullable": False,
},
"schema_name": {
"name": "schema_name",
"data_type": "text",
"nullable": False,
},
"version_hash": {
"name": "version_hash",
"data_type": "text",
"nullable": False,
},
"schema": {"name": "schema", "data_type": "text", "nullable": False},
},
"write_disposition": "skip",
"resource": "_dlt_version",
"description": "Created by DLT. Tracks schema updates",
},
"_dlt_loads": {
"name": "_dlt_loads",
"columns": {
"load_id": {"name": "load_id", "data_type": "text", "nullable": False},
"schema_name": {
"name": "schema_name",
"data_type": "text",
"nullable": True,
},
"status": {"name": "status", "data_type": "bigint", "nullable": False},
"inserted_at": {
"name": "inserted_at",
"data_type": "timestamp",
"nullable": False,
},
"schema_version_hash": {
"name": "schema_version_hash",
"data_type": "text",
"nullable": True,
},
},
"write_disposition": "skip",
"resource": "_dlt_loads",
"description": "Created by DLT. Tracks completed loads",
},
"customers": {
"columns": {
"id": {
"name": "id",
"nullable": False,
"primary_key": True,
"data_type": "bigint",
},
"name": {
"x-annotation-pii": True,
"name": "name",
"data_type": "text",
"nullable": True,
},
"city": {"name": "city", "data_type": "text", "nullable": True},
"_dlt_load_id": {
"name": "_dlt_load_id",
"data_type": "text",
"nullable": False,
},
"_dlt_id": {
"name": "_dlt_id",
"data_type": "text",
"nullable": False,
"unique": True,
"row_key": True,
},
},
"write_disposition": "append",
"name": "customers",
"resource": "customers",
"x-normalizer": {"seen-data": True},
},
"purchases": {
"columns": {
"id": {
"name": "id",
"nullable": False,
"primary_key": True,
"data_type": "bigint",
},
"customer_id": {
"name": "customer_id",
"data_type": "bigint",
"nullable": True,
},
"inventory_id": {
"name": "inventory_id",
"data_type": "bigint",
"nullable": True,
},
"quantity": {"name": "quantity", "data_type": "bigint", "nullable": True},
"date": {"name": "date", "data_type": "text", "nullable": True},
"_dlt_load_id": {
"name": "_dlt_load_id",
"data_type": "text",
"nullable": False,
},
"_dlt_id": {
"name": "_dlt_id",
"data_type": "text",
"nullable": False,
"unique": True,
"row_key": True,
},
},
"write_disposition": "append",
"references": [
{
"columns": ["customer_id"],
"referenced_table": "customers",
"referenced_columns": ["id"],
}
],
"name": "purchases",
"resource": "purchases",
"x-normalizer": {"seen-data": True},
},
"_dlt_pipeline_state": {
"columns": {
"version": {"name": "version", "data_type": "bigint", "nullable": False},
"engine_version": {
"name": "engine_version",
"data_type": "bigint",
"nullable": False,
},
"pipeline_name": {
"name": "pipeline_name",
"data_type": "text",
"nullable": False,
},
"state": {"name": "state", "data_type": "text", "nullable": False},
"created_at": {
"name": "created_at",
"data_type": "timestamp",
"nullable": False,
},
"version_hash": {
"name": "version_hash",
"data_type": "text",
"nullable": True,
},
"_dlt_load_id": {
"name": "_dlt_load_id",
"data_type": "text",
"nullable": False,
},
"_dlt_id": {
"name": "_dlt_id",
"data_type": "text",
"nullable": False,
"unique": True,
"row_key": True,
},
},
"write_disposition": "append",
"file_format": "preferred",
"name": "_dlt_pipeline_state",
"resource": "_dlt_pipeline_state",
"x-normalizer": {"seen-data": True},
},
"purchases__items": {
"name": "purchases__items",
"columns": {
"purchase_id": {
"name": "purchase_id",
"data_type": "bigint",
"nullable": False,
},
"name": {"name": "name", "data_type": "text", "nullable": False},
"price": {"name": "price", "data_type": "bigint", "nullable": False},
"_dlt_root_id": {
"name": "_dlt_root_id",
"data_type": "text",
"nullable": False,
"root_key": True,
},
"_dlt_parent_id": {
"name": "_dlt_parent_id",
"data_type": "text",
"nullable": False,
"parent_key": True,
},
"_dlt_list_idx": {
"name": "_dlt_list_idx",
"data_type": "bigint",
"nullable": False,
},
"_dlt_id": {
"name": "_dlt_id",
"data_type": "text",
"nullable": False,
"unique": True,
"row_key": True,
},
},
"parent": "purchases",
"x-normalizer": {"seen-data": True},
},
},
"settings": {
"detections": ["iso_timestamp"],
"default_hints": {
"not_null": [
"_dlt_id",
"_dlt_root_id",
"_dlt_parent_id",
"_dlt_list_idx",
"_dlt_load_id",
],
"parent_key": ["_dlt_parent_id"],
"root_key": ["_dlt_root_id"],
"unique": ["_dlt_id"],
"row_key": ["_dlt_id"],
},
},
"normalizers": {
"names": "snake_case",
"json": {"module": "dlt.common.normalizers.json.relational"},
},
"previous_hashes": [
"+stnjP5XdPbykNQJVpK/zpfo0iVbyRFfSIIRzuPzcI4=",
"nTU+qnLwEmiMSWTwu+QH321j4zl8NrOVL4Hx/GxQAHE=",
],
}
)
EXPECTED_MERMAID_STR = """
"""
@pytest.mark.parametrize(
"hints,expected_mermaid_col",
[
(
{"name": "simple_col", "data_type": "text"},
"text simple_col\n",
),
(
{"name": "unique_col", "data_type": "text", "unique": True}, # default value
"text unique_col UK\n",
),
(
{"name": "unique_col", "data_type": "text", "unique": False},
"text unique_col\n",
),
(
{"name": "primary_key_col", "data_type": "text", "primary_key": False},
"text primary_key_col\n",
),
(
{"name": "primary_key_col", "data_type": "text", "primary_key": True},
"text primary_key_col PK\n",
),
(
{
"name": "unique_and_primary_col",
"data_type": "text",
"primary_key": True,
"unique": True,
},
"text unique_and_primary_col PK,UK\n",
),
( # change the order of `primary_key` and `unique` in dict
{
"name": "unique_and_primary_col",
"data_type": "text",
"unique": True,
"primary_key": True,
},
"text unique_and_primary_col PK,UK\n",
),
(
{"name": "description_col", "data_type": "text", "description": "foo"},
'text description_col "foo"\n',
),
],
)
def test_to_mermaid_column(hints: TColumnSchema, expected_mermaid_col: str) -> None:
"""Test `dlt -> mermaid`."""
inferred_mermaid_col = _to_mermaid_column(hints)
assert inferred_mermaid_col == expected_mermaid_col
@pytest.mark.parametrize(
"table,expected_mermaid_table",
[
(
{
"name": "simple_table",
"columns": {
"foo": {"name": "foo", "data_type": "text"},
"bar": {"name": "bar", "data_type": "bigint"},
},
},
"simple_table{\n text foo\n bigint bar\n}\n",
),
],
)
def test_to_and_from_dbml_table(table: TTableSchema, expected_mermaid_table: str) -> None:
"""Test `dlt -> mermaid`."""
inferred_mermaid_table = _to_mermaid_table(table)
assert inferred_mermaid_table == expected_mermaid_table
@pytest.mark.parametrize(
"reference, expected_mermaid_reference",
[
(
TTableReferenceStandalone(
table="customers",
columns=["id"],
referenced_columns=["customer_id"],
referenced_table="orders",
label="ordered",
cardinality="zero_to_many",
),
"customers |o--|{ orders : ordered\n",
),
( # default label
TTableReferenceStandalone(
table="customers",
columns=["id"],
referenced_columns=["customer_id"],
referenced_table="orders",
cardinality="zero_to_many",
),
'customers |o--|{ orders : ""\n',
),
( # default cardinality
TTableReferenceStandalone(
table="customers",
columns=["id"],
referenced_columns=["customer_id"],
referenced_table="orders",
label="ordered",
),
"customers ||--|{ orders : ordered\n",
),
],
)
def test_to_mermaid_reference(
reference: TTableReferenceStandalone, expected_mermaid_reference: str
) -> None:
inferred_mermaid_reference = _to_mermaid_reference(reference)
assert inferred_mermaid_reference == expected_mermaid_reference
def test_schema_to_mermaid_generates_an_er_diagram(example_schema: dlt.Schema):
mermaid_str = schema_to_mermaid(example_schema.to_dict(), references=example_schema.references)
assert mermaid_str.startswith("erDiagram")
@pytest.mark.parametrize("remove_process_hints", [False, True])
def test_schema_to_mermaid_invariant_to_processing_hint(
example_schema: dlt.Schema, remove_process_hints: bool
):
expected_mermaid_str = """\
erDiagram
_dlt_version{
bigint version
bigint engine_version
timestamp inserted_at
text schema_name
text version_hash
text schema
}
_dlt_loads{
text load_id
text schema_name
bigint status
timestamp inserted_at
text schema_version_hash
}
customers{
bigint id PK
text name
text city
text _dlt_load_id
text _dlt_id UK
}
purchases{
bigint id PK
bigint customer_id
bigint inventory_id
bigint quantity
text date
text _dlt_load_id
text _dlt_id UK
}
_dlt_pipeline_state{
bigint version
bigint engine_version
text pipeline_name
text state
timestamp created_at
text version_hash
text _dlt_load_id
text _dlt_id UK
}
purchases__items{
bigint purchase_id
text name
bigint price
text _dlt_root_id
text _dlt_parent_id
bigint _dlt_list_idx
text _dlt_id UK
}
customers }|--|| _dlt_loads : _dlt_load
purchases }|--|| _dlt_loads : _dlt_load
purchases ||--|{ customers : ""
_dlt_pipeline_state }|--|| _dlt_loads : _dlt_load
purchases__items }|--|| purchases : _dlt_parent
purchases__items }|--|| purchases : _dlt_root
"""
schema_dict = example_schema.to_dict(remove_processing_hints=remove_process_hints)
mermaid_str = schema_to_mermaid(
schema_dict,
references=example_schema.references,
)
assert mermaid_str == expected_mermaid_str
def test_schema_to_mermaid_exclude_dlt_tables(example_schema: dlt.Schema) -> None:
expected_mermaid_str = """\
erDiagram
customers{
bigint id PK
text name
text city
text _dlt_load_id
text _dlt_id UK
}
purchases{
bigint id PK
bigint customer_id
bigint inventory_id
bigint quantity
text date
text _dlt_load_id
text _dlt_id UK
}
purchases__items{
bigint purchase_id
text name
bigint price
text _dlt_root_id
text _dlt_parent_id
bigint _dlt_list_idx
text _dlt_id UK
}
purchases ||--|{ customers : ""
purchases__items }|--|| purchases : _dlt_parent
purchases__items }|--|| purchases : _dlt_root
"""
schema_dict = example_schema.to_dict()
mermaid_str = schema_to_mermaid(
schema_dict,
references=example_schema.references,
include_dlt_tables=False,
)
assert mermaid_str == expected_mermaid_str
def test_schema_to_mermaid_hide_columns(example_schema: dlt.Schema) -> None:
expected_mermaid_str = """\
erDiagram
_dlt_version{
}
_dlt_loads{
}
customers{
}
purchases{
}
_dlt_pipeline_state{
}
purchases__items{
}
customers }|--|| _dlt_loads : _dlt_load
purchases }|--|| _dlt_loads : _dlt_load
purchases ||--|{ customers : ""
_dlt_pipeline_state }|--|| _dlt_loads : _dlt_load
purchases__items }|--|| purchases : _dlt_parent
purchases__items }|--|| purchases : _dlt_root
"""
schema_dict = example_schema.to_dict()
mermaid_str = schema_to_mermaid(
schema_dict,
references=example_schema.references,
hide_columns=True,
)
assert mermaid_str == expected_mermaid_str