mirror of
https://github.com/dlt-hub/dlt.git
synced 2025-12-17 19:31:30 +00:00
docs: lifecycle of @dlt.hub.transformation and dlt.Relation (#3329)
* Lifecycle of a dlt transformation * Added test to match lifecycle docs
This commit is contained in:
@@ -162,7 +162,7 @@ class Dataset:
|
||||
return self._table_client
|
||||
|
||||
# TODO remove method; need to update `dlthub` to avoid conflict
|
||||
# this is only used by `dlthub.transformation` currently
|
||||
# this is only used by `dlt.hub.transformation` currently
|
||||
def is_same_physical_destination(self, other: dlt.Dataset) -> bool:
|
||||
"""
|
||||
Returns true if the other dataset is on the same physical destination
|
||||
|
||||
@@ -11,8 +11,8 @@ import { DltHubFeatureAdmonition } from '@theme/DltHubFeatureAdmonition';
|
||||
|
||||
`dlt transformations` let you build new tables or full datasets from datasets that have _already_ been ingested with `dlt`. `dlt transformations` are written and run in a very similar fashion to dlt source and resources. `dlt transformations` require you to have loaded data to a location, for example a local duckdb database, a bucket or a warehouse on which the transformations may be executed. `dlt transformations` are fully supported for all of our sql destinations including all filesystem and bucket formats.
|
||||
|
||||
You create them with the `@dlt.hub.transformation` decorator which has the same signature as the `@dlt.resource` decorator, but does not yield items but rather a SQL query including the resulting
|
||||
column schema. dlt transformations support the same write_dispositions per destination as dlt resources do.
|
||||
You create them with the `@dlt.hub.transformation` decorator, which has the same signature as the `@dlt.resource` decorator but yields a SQL query, including the resulting
|
||||
column schema, rather than data items. dlt transformations support the same write_dispositions per destination as dlt resources do.
|
||||
|
||||
## Motivations
|
||||
|
||||
@@ -31,7 +31,11 @@ A few real-world scenarios where dlt transformations can be useful:
|
||||
|
||||
## Quick-start in three simple steps
|
||||
|
||||
For the example below you can copy–paste everything into one script and run it. It is useful to know how to use dlt [Datasets and Relations](../../../general-usage/dataset-access/dataset.md), since these are heavily used in transformations.
|
||||
For the example below, you can copy–paste everything into one script and run it.
|
||||
|
||||
:::note
|
||||
It is useful to know how to use dlt [Datasets and Relations](../../../general-usage/dataset-access/dataset.md), since these are heavily used in transformations.
|
||||
:::
|
||||
|
||||
### 1. Load some example data
|
||||
|
||||
@@ -39,15 +43,6 @@ The snippets below assume that we have a simple fruitshop dataset as produced by
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::quick_start_example-->
|
||||
|
||||
|
||||
### 1.1 Use the fruitshop template as a starting point
|
||||
|
||||
Alternatively, you can follow the code examples below by creating a new pipeline with the fruitshop template and running transformations on the resulting dataset:
|
||||
|
||||
```sh
|
||||
dlt init fruitshop duckdb
|
||||
```
|
||||
|
||||
### 2. Inspect the dataset
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::dataset_inspection-->
|
||||
@@ -75,7 +70,7 @@ Most of the following examples will be using the ibis expressions of the `dlt.Da
|
||||
|
||||
* **Decorator arguments** mirror those accepted by `@dlt.resource`.
|
||||
* The transformation function signature must contain at least one `dlt.Dataset` which is used inside the function to create the transformation SQL statements and calculate the resulting schema update.
|
||||
* Yields a `Relation` created with ibis expressions or a select query which will be materialized into the destination table. If the first item yielded is a valid sql query or relation object, data will be interpreted as a transformation. In all other cases, the transformation decorator will work like any other resource.
|
||||
* A transformation yields a `Relation` created with ibis expressions or a select query which will be materialized into the destination table. If the first item yielded is a valid sql query or relation object, data will be interpreted as a transformation. In all other cases, the transformation decorator will work like any other resource.
|
||||
|
||||
## Loading to other datasets
|
||||
|
||||
@@ -105,13 +100,13 @@ Below we load the data from our local DuckDB instance to a Postgres instance. dl
|
||||
|
||||
### Yielding multiple transformations from one transformation resource
|
||||
|
||||
`dlt transformations` may also yield more than one transformation instruction. If no further table name hints are supplied, the result will be a union of the yielded transformation instructions. dlt will take care of the necessary schema migrations, you will just need to ensure that no columns are marked as non-nullable that are missing from one of the transformation instructions:
|
||||
`dlt transformations` may also yield more than one transformation instruction. If no further table name hints are supplied, the result will be a union of the yielded transformation instructions. `dlt` will take care of the necessary schema migrations, you will just need to ensure that no columns are marked as non-nullable that are missing from one of the transformation instructions:
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::multiple_transformation_instructions-->
|
||||
|
||||
### Supplying additional hints
|
||||
|
||||
You may supply column and table hints the same way you do for regular resources. `dlt` will derive schema hints from your query, but in some cases you may need to change or amend hints, such as making columns nullable for the example above or change the precision or type of a column to make it work with a given target destination (if different from the source)
|
||||
You may supply column and table hints the same way you do for regular resources. `dlt` will derive schema hints from your query, but in some cases you may need to modify or extend them — for example, making columns nullable as in the example above, or adjusting the precision or type of a column to ensure compatibility with a specific target destination (if it differs from the source).
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::supply_hints-->
|
||||
|
||||
@@ -125,14 +120,14 @@ The identifiers (table and column names) used in these raw SQL expressions must
|
||||
|
||||
## Using pandas dataframes or arrow tables
|
||||
|
||||
You can also directly write transformations using pandas or arrow. Please note that your transformation resource will act like a regular resource in this case, you will not have column level hints forward but rather dlt will just see the dataframes or arrow tables you yield and process them like from any other resource. This may change in the future.
|
||||
You can also write transformations directly using pandas or arrow. Note that in this case your transformation resource behaves like a regular resource: column-level hints will not be propagated, and `dlt` will simply treat the yielded dataframes or arrow tables like data from any other resource. This behavior may change in the future.
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::arrow_dataframe_operations-->
|
||||
|
||||
|
||||
## Schema evolution and hints lineage
|
||||
|
||||
When executing transformations, dlt computes the resulting schema before the transformation is executed. This allows dlt to:
|
||||
When executing transformations, `dlt` computes the resulting schema before the transformation is executed. This allows `dlt` to:
|
||||
|
||||
1. Migrate the destination schema accordingly, creating new columns or tables as needed
|
||||
2. Fail early if there are schema mismatches that cannot be resolved
|
||||
@@ -140,7 +135,7 @@ When executing transformations, dlt computes the resulting schema before the tra
|
||||
|
||||
### Schema evolution
|
||||
|
||||
For example, if your transformation joins two tables and creates new columns, dlt will automatically update the destination schema to accommodate these changes. If your transformation would result in incompatible schema changes (like changing a column's data type in a way that could lose data), dlt will fail before executing the transformation, protecting your data and saving execution and debug time.
|
||||
For example, if your transformation joins two tables and creates new columns, `dlt` will automatically update the destination schema to accommodate these changes. If your transformation would result in incompatible schema changes (like changing a column's data type in a way that could lose data), `dlt` will fail before executing the transformation, protecting your data and saving execution and debug time.
|
||||
|
||||
You can inspect the computed result schema during development by looking at the result of `compute_columns_schema` on your `Relation`:
|
||||
|
||||
@@ -148,22 +143,38 @@ You can inspect the computed result schema during development by looking at the
|
||||
|
||||
### Column level hint forwarding
|
||||
|
||||
When creating or updating tables with transformation resources, dlt will also forward certain column hints to the new tables. In our fruitshop source, we have applied a custom hint named
|
||||
`x-annotation-pii` set to True for the `name` column, which indicates that this column contains PII (personally identifiable information). We might want to know downstream of our transformation layer
|
||||
which columns resulted from origin columns that contain private data:
|
||||
When creating or updating tables with transformation resources, `dlt` will also forward certain column hints to the new tables. In our fruitshop source, we have applied a custom hint named
|
||||
`x-annotation-pii` set to True for the `name` column, which indicates that this column contains PII (personally identifiable information).
|
||||
Downstream of the transformation layer, we may want to know which columns originate from columns that contain private data:
|
||||
|
||||
<!--@@@DLT_SNIPPET ./transformation-snippets.py::column_level_lineage-->
|
||||
|
||||
Features and limitations:
|
||||
#### Features and limitations:
|
||||
|
||||
* `dlt` will only forward certain types of hints to the resulting tables: custom hints starting with `x-annotation...` and type hints such as `nullable`, `data_type`, `precision`, `scale`, and `timezone`. Other hints, such as `primary_key` or `merge_keys`, will need to be set via the `columns` argument on the transformation decorator, since dlt does not know how the transformed tables will be used.
|
||||
* `dlt` will not be able to forward hints for columns that are the result of combining two origin columns, for example by concatenating them or similar sql operations.
|
||||
* `dlt` will only forward certain types of hints to the resulting tables: custom hints starting with `x-annotation...` and type hints such as `nullable`, `data_type`, `precision`, `scale`, and `timezone`. Other hints, such as `primary_key` or `merge_keys`, will need to be set via the `columns` argument on the transformation decorator, since `dlt` does not know how the transformed tables will be used.
|
||||
* `dlt` cannot forward hints for columns that result from combining multiple origin columns, such as when they are concatenated or produced through other SQL operations.
|
||||
|
||||
## Query Normalization
|
||||
|
||||
### `dlt` columns
|
||||
## Lifecycle of a SQL transformation
|
||||
|
||||
When executing transformations, dlt will add internal dlt columns to your SQL queries depending on the configuration:
|
||||
In this section, we focus on the lifecycle of transformations that yield a `Relation` object, which we call SQL transformations here. This is in contrast to Python-based transformations that yield dataframes or arrow tables, which go through the regular extract, normalize, and load lifecycle of a `dlt` resource.
|
||||
|
||||
### Extract
|
||||
|
||||
In the extract stage, a `Relation` yielded by a transformation is converted into a SQL string and saved as a `.model` file along with its source SQL dialect.
|
||||
At this stage, the SQL string is just the user's original query — either the string that was explicitly provided or the one generated by `Relation.to_sql()`. No `dlt`-specific columns like `_dlt_id` or `_dlt_load_id` are added yet.
|
||||
|
||||
### Normalize
|
||||
|
||||
In the normalize stage, `.model` files are read and processed. The normalization process modifies your SQL queries to ensure they execute correctly and integrate with `dlt`'s features.
|
||||
|
||||
:::info
|
||||
The normalization described here applies only to SQL-based transformations. Python-based transformations, such as those using dataframes or arrow tables, follow the [regular normalization process](../../../reference/explainers/how-dlt-works.md#normalize).
|
||||
:::
|
||||
|
||||
#### Adding `dlt` columns
|
||||
|
||||
During normalization, `dlt` adds internal `dlt` columns to your SQL queries depending on the configuration:
|
||||
|
||||
- `_dlt_load_id`, which tracks which load operation created or modified each row, is **added by default**. Even if present in your query, the `_dlt_load_id` column will be **replaced with a constant value** corresponding to the current load ID. To disable this behavior, set:
|
||||
```toml
|
||||
@@ -183,33 +194,48 @@ When executing transformations, dlt will add internal dlt columns to your SQL qu
|
||||
- In **Redshift**, `_dlt_id` is generated using an `MD5` hash of the load ID and row number.
|
||||
- In **SQLite**, `_dlt_id` is simulated using `lower(hex(randomblob(16)))`.
|
||||
|
||||
Additionally, column names are normalized according to the naming schema selected and the identifier capabilities of the destinations. This ensures compatibility and consistent naming conventions across different data sources and destination systems.
|
||||
|
||||
This allows dlt to maintain data lineage and enables features like incremental loading and merging, even when working with raw SQL queries.
|
||||
#### Query transformations
|
||||
|
||||
:::info
|
||||
The normalization described here, including automatic injection or replacement of dlt columns, applies only to SQL-based transformations. Python-based transformations, such as those using dataframes or arrow tables, follow the [regular normalization process](../../../reference/explainers/how-dlt-works.md#normalize).
|
||||
:::
|
||||
The normalization process also applies the following transformations to ensure your queries work correctly:
|
||||
|
||||
### Query Processing
|
||||
1. Fully qualifies all identifiers with database and dataset prefixes
|
||||
2. Quotes and adjusts identifier casing to match destination requirements
|
||||
3. Normalizes column names according to the selected naming convention
|
||||
4. Aliases columns and tables to handle naming convention differences
|
||||
5. Reorders columns to match the destination table schema
|
||||
6. Fills in `NULL` values for columns that exist in the destination but aren't in your query
|
||||
|
||||
When you run your transformations, `dlt` takes care of several important steps to ensure your queries are executed smoothly and correctly on the input dataset. Here’s what happens behind the scenes:
|
||||
### Load
|
||||
|
||||
1. Expands any `*` (star) selects to include all relevant columns.
|
||||
2. Adds special dlt columns (see below for details).
|
||||
3. Fully qualifies all identifiers by adding database and dataset prefixes, so tables are always referenced unambiguously during query execution.
|
||||
4. Properly quotes and, if necessary, adjusts the case of your identifiers to match the destination’s requirements.
|
||||
5. Handles differences in naming conventions by aliasing columns and tables as needed, so names always match those in the destination.
|
||||
6. Reorders columns to match the expected order in the destination table.
|
||||
7. Fills in default `NULL` values for any columns that exist in the destination table but are not selected in your query.
|
||||
|
||||
Given a table of the name `my_table` with the columns `id` and `value` on `duckdb`, on a dataset with the name `my_dataset`, loaded into a dataset named `transformed_dataset`, the following query:
|
||||
In the load stage, the normalized queries from `.model` files are wrapped in INSERT statements and executed on the destination.
|
||||
For example, given this query from the extract stage:
|
||||
|
||||
```sql
|
||||
SELECT id, value FROM table
|
||||
SELECT
|
||||
"my_table"."id" AS "id",
|
||||
"my_table"."value" AS "value"
|
||||
FROM "my_pipeline_dataset"."my_table" AS "my_table"
|
||||
```
|
||||
|
||||
Will be translated to
|
||||
After the normalize stage processes it (adding dlt columns, wrapping in subquery, etc.) and results in:
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
_dlt_subquery."id" AS "id",
|
||||
_dlt_subquery."value" AS "value",
|
||||
'1749134128.17655' AS "_dlt_load_id",
|
||||
UUID() AS "_dlt_id"
|
||||
FROM (
|
||||
SELECT
|
||||
"my_table"."id" AS "id",
|
||||
"my_table"."value" AS "value"
|
||||
FROM "my_pipeline_dataset"."my_table" AS "my_table"
|
||||
)
|
||||
AS _dlt_subquery
|
||||
```
|
||||
|
||||
The load stage executes:
|
||||
|
||||
```sql
|
||||
INSERT INTO
|
||||
@@ -228,6 +254,8 @@ FROM (
|
||||
AS _dlt_subquery
|
||||
```
|
||||
|
||||
The query is executed via the destination's SQL client, materializing the transformation result directly in the database.
|
||||
|
||||
## Examples
|
||||
|
||||
### Local in-transit transformations example
|
||||
|
||||
@@ -14,6 +14,7 @@ def fruitshop_pipeline() -> dlt.Pipeline:
|
||||
|
||||
# @@@DLT_SNIPPET_START quick_start_example
|
||||
|
||||
import dlt
|
||||
from dlt.destinations import duckdb
|
||||
from dlt._workspace._templates._single_file_templates.fruitshop_pipeline import (
|
||||
fruitshop as fruitshop_source,
|
||||
@@ -30,6 +31,7 @@ def fruitshop_pipeline() -> dlt.Pipeline:
|
||||
|
||||
def basic_transformation_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
|
||||
# @@@DLT_SNIPPET_START basic_transformation
|
||||
from typing import Any
|
||||
|
||||
@dlt.hub.transformation
|
||||
def copied_customers(dataset: dlt.Dataset) -> Any:
|
||||
@@ -176,7 +178,7 @@ def sql_queries_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
|
||||
|
||||
# @@@DLT_SNIPPET_END sql_queries_short
|
||||
|
||||
# Joins and other more complex queries are also possible of course
|
||||
# Joins and other more complex queries are also possible
|
||||
@dlt.hub.transformation
|
||||
def enriched_purchases(dataset: dlt.Dataset) -> Any:
|
||||
enriched_purchases = dataset(
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import io
|
||||
|
||||
from typing import Any, List
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
import dlt
|
||||
|
||||
@@ -925,3 +925,79 @@ def test_data_contract_on_data_type(
|
||||
)
|
||||
assert py_exc.value.step == "load"
|
||||
assert isinstance(py_exc.value.__context__, LoadClientJobException)
|
||||
|
||||
|
||||
def test_relation_lifecycle() -> None:
|
||||
"""Test showing the lifecycle of a model: how it looks like after each pipeline step."""
|
||||
pipeline = dlt.pipeline(pipeline_name=f"rel_lifecycle_{uniq_id()}", destination="duckdb")
|
||||
pipeline.run(
|
||||
[{"a": string, "b": i} for i, string in enumerate(["I", "love", "dlt"])],
|
||||
table_name="example_table",
|
||||
)
|
||||
|
||||
dataset = pipeline.dataset()
|
||||
rel = dataset["example_table"][["a", "_dlt_load_id", "_dlt_id"]]
|
||||
|
||||
@dlt.resource
|
||||
def my_resource() -> Any:
|
||||
yield dlt.mark.with_hints(
|
||||
rel,
|
||||
hints=make_hints(
|
||||
columns={
|
||||
k: v
|
||||
for k, v in pipeline.default_schema.tables["example_table"]["columns"].items()
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
load_info = pipeline.extract(my_resource())
|
||||
|
||||
# Get the extracted model file
|
||||
load_id = load_info.loads_ids[0]
|
||||
normalize_storage = pipeline._get_normalize_storage()
|
||||
model_job_file = normalize_storage.extracted_packages.list_new_jobs(load_id)[0]
|
||||
assert model_job_file.endswith(".model")
|
||||
|
||||
# Ensure it contains the dialect and the query string
|
||||
with normalize_storage.extracted_packages.storage.open_file(model_job_file, "r") as f:
|
||||
content = f.read()
|
||||
extracted_query = rel.to_sql()
|
||||
assert f"dialect: duckdb\n{extracted_query}\n" == content
|
||||
|
||||
pipeline.normalize()
|
||||
|
||||
# Get the normalized model file
|
||||
load_storage = pipeline._get_load_storage()
|
||||
model_job_file = load_storage.normalized_packages.list_new_jobs(load_id)[0]
|
||||
assert model_job_file.endswith(".model")
|
||||
|
||||
# Ensure it contains the normalized query that, for example:
|
||||
# - has the _dlt_load_id column
|
||||
# - has NULL set as column b since we selected only a
|
||||
# - is wrapped in a subquery
|
||||
with load_storage.normalized_packages.storage.open_file(model_job_file, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
normalized_query = f"""SELECT _dlt_subquery."a" AS "a", NULL AS "b", \'{load_id}\' AS "_dlt_load_id", _dlt_subquery."_dlt_id" AS "_dlt_id" FROM ({extracted_query}) AS _dlt_subquery"""
|
||||
assert f"dialect: duckdb\n{normalized_query}\n" == content
|
||||
|
||||
from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient
|
||||
|
||||
# Get the insert query that is executed during load
|
||||
captured_sqls = []
|
||||
_original_execute = DuckDbSqlClient.execute_sql
|
||||
|
||||
def spy_execute_sql(self, sql, *args, **kwargs):
|
||||
captured_sqls.append(sql)
|
||||
return _original_execute(self, sql, *args, **kwargs)
|
||||
|
||||
with patch.object(DuckDbSqlClient, "execute_sql", spy_execute_sql):
|
||||
pipeline.load()
|
||||
|
||||
# Assert exact match with the actual INSERT statement
|
||||
loaded_query = f"""INSERT INTO "{pipeline.dataset_name}"."my_resource" ("a", "b", "_dlt_load_id", "_dlt_id") {normalized_query}"""
|
||||
all_insert_stmts = [
|
||||
sql for sql in captured_sqls if isinstance(sql, str) and sql.startswith("INSERT INTO")
|
||||
]
|
||||
my_resource_insert = [stmt for stmt in all_insert_stmts if '"my_resource"' in stmt][0]
|
||||
assert loaded_query == my_resource_insert
|
||||
|
||||
Reference in New Issue
Block a user