Docs: Simpler Examples, generate examples pages from actual examples code (#1134)

* start migrating examples

* restore chess dbt example

* bring examples into desired shape

* generate example pages from existing examples using docstrings

* fix one md link

* post merge file delete

* add some notes for test vars

* move chess example back into examples folder

* skip examples without proper header

* separate examples testing into own make command

* prepare tests for examples and run them

* fix examples test setup

* add postgres dependency to snippets tests

* ignore some folders

* add argparse plus clear flag to example test preparation
make examples raise in case of failed loads

* simplify example folder skipping

* add a template for a new example

* fix bug in deployment

* update contributing
This commit is contained in:
David Scharf
2024-04-03 12:35:45 +02:00
committed by GitHub
parent ee33548508
commit 6bf1940341
80 changed files with 695 additions and 1719 deletions

View File

@@ -58,11 +58,19 @@ jobs:
- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery --with docs,sentry-sdk --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres --with docs,sentry-sdk --without airflow
- name: create secrets.toml
- name: create secrets.toml for examples
run: pwd && echo "$DLT_SECRETS_TOML" > docs/examples/.dlt/secrets.toml
- name: create secrets.toml for snippets
run: pwd && echo "$DLT_SECRETS_TOML" > docs/website/docs/.dlt/secrets.toml
- name: Run linter and tests
run: make test-and-lint-snippets
- name: Run linter and tests on examples
run: make lint-and-test-examples
- name: Run linter and tests on snippets
run: make lint-and-test-snippets

6
.gitignore vendored
View File

@@ -12,7 +12,6 @@ experiments/*
# !experiments/pipeline/
# !experiments/pipeline/*
secrets.toml
!docs/**/secrets.toml
*.session.sql
*.duckdb
*.wal
@@ -141,4 +140,7 @@ tmp
**/tmp
# Qdrant embedding models cache
local_cache/
local_cache/
# test file for examples are generated and should not be committed
docs/examples/**/test*.py

View File

@@ -27,7 +27,7 @@ help:
@echo " tests all components using local destinations: duckdb and postgres"
@echo " test-common"
@echo " tests common components"
@echo " test-and-lint-snippets"
@echo " lint-and-test-snippets"
@echo " tests and lints snippets and examples in docs"
@echo " build-library"
@echo " makes dev and then builds dlt package for distribution"
@@ -60,12 +60,22 @@ format:
poetry run black dlt docs tests --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
# poetry run isort ./
test-and-lint-snippets:
lint-and-test-snippets:
cd docs/tools && poetry run python check_embedded_snippets.py full
poetry run mypy --config-file mypy.ini docs/website docs/examples docs/tools --exclude docs/tools/lint_setup
poetry run flake8 --max-line-length=200 docs/website docs/examples docs/tools
cd docs/website/docs && poetry run pytest --ignore=node_modules
lint-and-test-examples:
poetry run mypy --config-file mypy.ini docs/examples
poetry run flake8 --max-line-length=200 docs/examples
cd docs/tools && poetry run python prepare_examples_tests.py
cd docs/examples && poetry run pytest
test-examples:
cd docs/examples && poetry run pytest
lint-security:
poetry run bandit -r dlt/ -n 3 -l

View File

@@ -4,50 +4,27 @@ Note: All paths in this guide are relative to the `dlt` repository directory.
## Add snippet
- Go to `docs/website/docs/examples/`.
- Copy one of the examples, rename scripts.
- Modify the script in `<example-name>/code/<snippet-name>-snippets.py`:
- The whole example code should be inside of `def <snippet-name>_snippet()` function.
- Use tags `# @@@DLT_SNIPPET_START example` and `# @@@DLT_SNIPPET_END example` to indicate which part of the code will be auto-generated in the final script `docs/examples/<examlple-name>/<snippet-name>.py`.
- Use additional tags as `# @@@DLT_SNIPPET_START smal_part_of_code` to indicate which part of the code will be auto-inserted into a text document `docs/website/docs/examples/<example-name>/index.md` in the form of a code snippet.
- Modify .`dlt/secrets.toml` and `configs.toml` if needed.
- Modify `<example-name>/index.md`:
- In the section `<Header info=` add the tl;dr for your example, it should be short but informative.
- Set `slug="<example-name>" run_file="<snippet-name>" />`.
- List what users will learn from this example. Use bullet points and link corresponding documentation pages.
- Use tags `<!--@@@DLT_SNIPPET ./code/<snippet-name>-snippets.py::smal_part_of_code-->` to insert example code snippets. Do not write them manually!
- Go to `docs/examples/`.
- Copy the template in `./_template/..`.
- Make sure the folder and your examples script have the same name
- Update the doc string which will compromise the generated markdown file, check the other examples how it is done
- If your example requires any secrets, add the vars to the example.secrects.toml but do not enter the values.
- Add your example code, make sure you have a `if __name__ = "__main__"` clause in which you run the example script, this will be used for testing
- You should add one or two assertions after running your example and maybe also `load_info.raise_on_failed_jobs()`, this will help greatly with testing
## Add tests
## Testing
- You can test your example simply by running your example script from your example folder. On CI a test will be automatically generated.
- Do not forget to add tests to `<example-name>/code/<snippet-name>-snippets.py`.
- They could be short asserts, code should work.
- Use `# @@@DLT_REMOVE` to remove test code from final code example.
- Test your snippets locally first with command:
- `cd docs/website/docs/examples/<example-name>/code && pytest --ignore=node_modules -s -v`.
- Add `@skipifgithubfork` decorator to your main snippet function, look [example](https://github.com/dlt-hub/dlt/blob/master/docs/website/docs/examples/chess_production/code/chess-snippets.py#L1-L4).
## Run npm start
## Checking your generated markdown
The command `npm start` starts a local development server and opens up a browser window.
- To install npm read [README](../website/README.md).
- This command will generate a clean example script in `docs/examples/<examlple-name>` folder based on `docs/website/docs/examples/<example-name>/code/<snippet-name>-snippets.py`.
- Also, this command automatically inserts code snippets to `docs/website/docs/examples/<example-name>/index.md`.
- You should your example be automatically added to the examples section in the local version of the docs. Check the rendered output and see wether it looks the way you intended.
## Add ENV variables
If you use any secrets for the code snippets, e.g. Zendesk requires credentials. You need to add them to GitHub Actions in ENV style:
- First, add the variables to `.github/workflows/test_doc_snippets.yml`:
Example:
```yaml
# zendesk vars for example
SOURCES__ZENDESK__CREDENTIALS: ${{ secrets.ZENDESK__CREDENTIALS }}
```
- Ask dlt team to add them to the GitHub Secrets.
If you use any secrets for the code snippets, e.g. Zendesk requires credentials. Please talk to us. We will add them to our google secrets vault.
## Add dependencies

View File

@@ -0,0 +1,30 @@
"""
---
title: Example Template
description: Add desciption here
keywords: [example]
---
This is a template for a new example. This text will show up in the docs.
With this example you will learn to:
* One
* two
* Three
"""
import dlt
if __name__ == "__main__":
# run a pipeline
pipeline = dlt.pipeline(
pipeline_name="example_pipeline", destination="duckdb", dataset_name="example_data"
)
# Extract, normalize, and load the data
load_info = pipeline.run([1, 2, 3], table_name="player")
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -1,4 +1,3 @@
import os
import threading
from typing import Any, Iterator
@@ -49,12 +48,14 @@ def chess(
if __name__ == "__main__":
print("You must run this from the docs/examples/chess folder")
assert os.getcwd().endswith("chess")
# chess_url in config.toml, credentials for postgres in secrets.toml, credentials always under credentials key
# look for parallel run configuration in `config.toml`!
# mind the full_refresh: it makes the pipeline to load to a distinct dataset each time it is run and always is resetting the schema and state
info = dlt.pipeline(
load_info = dlt.pipeline(
pipeline_name="chess_games", destination="postgres", dataset_name="chess", full_refresh=True
).run(chess(max_players=5, month=9))
# display where the data went
print(info)
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -0,0 +1 @@
chess_url="https://api.chess.com/pub/"

View File

@@ -1,10 +1,38 @@
"""
---
title: Run chess pipeline in production
description: Learn how run chess pipeline in production
keywords: [incremental loading, example]
---
In this example, you'll find a Python script that interacts with the Chess API to extract players and game data.
We'll learn how to:
- Inspecting packages after they have been loaded.
- Loading back load information, schema updates, and traces.
- Triggering notifications in case of schema evolution.
- Using context managers to independently retry pipeline stages.
- Run basic tests utilizing `sql_client` and `normalize_info`.
"""
import threading
from typing import Any, Iterator
from tenacity import (
Retrying,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)
import dlt
from dlt.common import sleep
from dlt.common import sleep, logger
from dlt.common.typing import StrAny, TDataItems
from dlt.sources.helpers.requests import client
from dlt.pipeline.helpers import retry_load
from dlt.common.runtime.slack import send_slack_message
@dlt.source
@@ -44,17 +72,6 @@ def chess(
return players(), players_profiles, players_games
from tenacity import (
Retrying,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)
from dlt.common import logger
from dlt.common.runtime.slack import send_slack_message
from dlt.pipeline.helpers import retry_load
MAX_PLAYERS = 5
@@ -107,6 +124,7 @@ def load_data_with_retry(pipeline, data):
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS
# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
@@ -116,13 +134,16 @@ def load_data_with_retry(pipeline, data):
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS
# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")
assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts
# print all the new tables/columns in
for package in load_info.load_packages:
@@ -134,6 +155,7 @@ def load_data_with_retry(pipeline, data):
# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")
assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts
return load_info
@@ -146,5 +168,8 @@ if __name__ == "__main__":
dataset_name="chess_data",
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
data = chess(max_players=MAX_PLAYERS)
load_info = load_data_with_retry(pipeline, data)
# make sure nothing failed
load_info.raise_on_failed_jobs()

57
docs/examples/conftest.py Normal file
View File

@@ -0,0 +1,57 @@
import os
import pytest
from unittest.mock import patch
from dlt.common.configuration.container import Container
from dlt.common.configuration.providers import (
ConfigTomlProvider,
EnvironProvider,
SecretsTomlProvider,
StringTomlProvider,
)
from dlt.common.configuration.specs.config_providers_context import (
ConfigProvidersContext,
)
from dlt.common.utils import set_working_dir
from tests.utils import (
patch_home_dir,
autouse_test_storage,
preserve_environ,
duckdb_pipeline_location,
wipe_pipeline,
)
@pytest.fixture(autouse=True)
def setup_secret_providers(request):
"""Creates set of config providers where tomls are loaded from tests/.dlt"""
secret_dir = "./.dlt"
dname = os.path.dirname(request.module.__file__)
config_dir = dname + "/.dlt"
# inject provider context so the original providers are restored at the end
def _initial_providers():
return [
EnvironProvider(),
SecretsTomlProvider(project_dir=secret_dir, add_global_config=False),
ConfigTomlProvider(project_dir=config_dir, add_global_config=False),
]
glob_ctx = ConfigProvidersContext()
glob_ctx.providers = _initial_providers()
with set_working_dir(dname), Container().injectable_context(glob_ctx), patch(
"dlt.common.configuration.specs.config_providers_context.ConfigProvidersContext.initial_providers",
_initial_providers,
):
# extras work when container updated
glob_ctx.add_extras()
yield
def pytest_configure(config):
# push sentry to ci
os.environ["RUNTIME__SENTRY_DSN"] = (
"https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752"
)

View File

@@ -0,0 +1,72 @@
"""
---
title: Load mysql table with ConnectorX & Arrow
description: Load data from sql queries fast with connector x and arrow tables
keywords: [connector x, pyarrow, zero copy]
---
The example script below takes genome data from public **mysql** instance and then loads it into **duckdb**. Mind that your destination
must support loading of parquet files as this is the format that `dlt` uses to save arrow tables. [Connector X](https://github.com/sfu-db/connector-x) allows to
get data from several popular databases and creates in memory Arrow table which `dlt` then saves to load package and loads to the destination.
:::tip
You can yield several tables if your data is large and you need to partition your load.
:::
We'll learn:
- How to get arrow tables from [connector X](https://github.com/sfu-db/connector-x) and yield them.
- That merge and incremental loads work with arrow tables.
- How to enable [incremental loading](../general-usage/incremental-loading) for efficient data extraction.
- How to use build in ConnectionString credentials.
"""
import connectorx as cx
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)
def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()
load_info = pipeline.run(genome)
print(load_info)
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -1,41 +0,0 @@
import connectorx as cx
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)
def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()
print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading

View File

@@ -1,3 +1,20 @@
"""
---
title: Custom destination with BigQuery
description: Learn how use the custom destination to load to bigquery and use credentials
keywords: [destination, credentials, example, bigquery, custom destination]
---
In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination.
We'll learn how to:
- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials)
- use the [custom destination](../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on bigquery
- Use bigquery `autodetect=True` for schema inference from parquet files
"""
import dlt
import pandas as pd
import pyarrow as pa
@@ -72,3 +89,6 @@ if __name__ == "__main__":
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -2,15 +2,15 @@
[sources.google_sheets]
credentials='''
{
"type": "set me up!",
"project_id": "set me up!",
"private_key_id": "set me up!",
"private_key": "set me up!",
"client_email": "set me up!",
"client_id": "set me up!",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "set me up!"
}
'''
"type": "set me up!",
"project_id": "set me up!",
"private_key_id": "set me up!",
"private_key": "set me up!",
"client_email": "set me up!",
"client_id": "set me up!",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "set me up!"
}
'''

View File

@@ -1,3 +1,26 @@
"""
---
title: Google Sheets minimal example
description: Learn how work with Google services
keywords: [google sheets, credentials, example]
---
In this example, you'll find a Python script that demonstrates how to load Google Sheets data using the `dlt` library.
We'll learn how to:
- use [built-in credentials](../general-usage/credentials/config_specs#gcp-credentials);
- use [union of credentials](../general-usage/credentials/config_specs#working-with-alternatives-of-credentials-union-types);
- create [dynamically generated resources](../general-usage/source#create-resources-dynamically).
:::tip
This example is for educational purposes. For best practices, we recommend using [Google Sheets verified source](../dlt-ecosystem/verified-sources/google_sheets.md).
:::
"""
# NOTE: this line is only for dlt CI purposes, you may delete it if you are using this example
__source_name__ = "google_sheets"
from typing import Any, Iterator, Sequence, Union, cast
from googleapiclient.discovery import build
@@ -64,10 +87,18 @@ if __name__ == "__main__":
sheet_id = "1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580"
range_names = ["hidden_columns_merged_cells", "Blank Columns"]
# "2022-05", "model_metadata"
info = pipeline.run(
load_info = pipeline.run(
google_spreadsheet(
spreadsheet_id=sheet_id,
sheet_names=range_names,
)
)
print(info)
print(load_info)
row_counts = pipeline.last_trace.last_normalize_info.row_counts
print(row_counts.keys())
assert row_counts["hidden_columns_merged_cells"] == 7
assert row_counts["blank_columns"] == 21
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -1,4 +1,4 @@
[sources.zendesk.credentials]
password = ""
subdomain = ""
email = ""
email = ""

View File

@@ -1,3 +1,25 @@
"""
---
title: Load Zendesk tickets incrementally
description: Learn how do incremental loading in consecutive runs
keywords: [incremental loading, example]
---
In this example, you'll find a Python script that interacts with the Zendesk Support API to extract ticket events data.
We'll learn:
- How to pass [credentials](../general-usage/credentials) as dict and how to type the `@dlt.source` function arguments.
- How to set [the nesting level](../general-usage/source#reduce-the-nesting-level-of-generated-tables).
- How to enable [incremental loading](../general-usage/incremental-loading) for efficient data extraction.
- How to specify [the start and end dates](../general-usage/incremental-loading#using-dltsourcesincremental-for-backfill) for the data loading and how to [opt-in to Airflow scheduler](../general-usage/incremental-loading#using-airflow-schedule-for-backfill-and-incremental-loading) by setting `allow_external_schedulers` to `True`.
- How to work with timestamps, specifically converting them to Unix timestamps for incremental data extraction.
- How to use the `start_time` parameter in API requests to retrieve data starting from a specific timestamp.
"""
# NOTE: this line is only for dlt CI purposes, you may delete it if you are using this example
__source_name__ = "zendesk"
from typing import Optional, Dict, Any, Tuple
import dlt
@@ -121,3 +143,10 @@ if __name__ == "__main__":
load_info = pipeline.run(zendesk_support())
print(load_info)
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["ticket_events"] == 17
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -1,2 +1,2 @@
[sources.mongodb]
connection_url=""
connection_url=""

View File

@@ -1,3 +1,24 @@
"""
---
title: Control nested MongoDB data
description: Learn how control nested data
keywords: [incremental loading, example]
---
In this example, you'll find a Python script that demonstrates how to control nested data using the `dlt` library.
We'll learn how to:
- [Adjust maximum nesting level in three ways:](../general-usage/source#reduce-the-nesting-level-of-generated-tables)
- Limit nesting levels with dlt decorator.
- Dynamic nesting level adjustment.
- Apply data type hints.
- Work with [MongoDB](../dlt-ecosystem/verified-sources/mongodb) in Python and `dlt`.
- Enable [incremental loading](../general-usage/incremental-loading) for efficient data extraction.
"""
# NOTE: this line is only for dlt CI purposes, you may delete it if you are using this example
__source_name__ = "mongodb"
from itertools import islice
from typing import Any, Dict, Iterator, Optional
@@ -103,6 +124,12 @@ if __name__ == "__main__":
source_data = mongodb_collection(collection="movies", write_disposition="replace")
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 7, pipeline.last_trace.last_normalize_info
# make sure nothing failed
load_info.raise_on_failed_jobs()
# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
@@ -118,6 +145,12 @@ if __name__ == "__main__":
source_data.max_table_nesting = 0
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 1, pipeline.last_trace.last_normalize_info
# make sure nothing failed
load_info.raise_on_failed_jobs()
# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
@@ -132,3 +165,9 @@ if __name__ == "__main__":
source_data.movies.apply_hints(columns={"cast": {"data_type": "complex"}})
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts
tables.pop("_dlt_pipeline_state")
assert len(tables) == 6, pipeline.last_trace.last_normalize_info
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -1,5 +1,29 @@
import os
"""
---
title: Load PDFs to Weaviate
description: Extract text from PDF and load it into a vector database
keywords: [pdf, weaviate, vector store, vector database, ]
---
We'll use PyPDF2 to extract text from PDFs. Make sure you have it installed:
```sh
pip install PyPDF2
```
We start with a simple resource that lists files in specified folder. To that we add a **filter** function that removes all files that are not pdfs.
To parse PDFs we use [PyPDF](https://pypdf2.readthedocs.io/en/3.0.0/user/extract-text.html) and return each page from a given PDF as separate data item.
Parsing happens in `@dlt.transformer` which receives data from `list_files` resource. It splits PDF into pages, extracts text and yields pages separately
so each PDF will correspond to many items in Weaviate `InvoiceText` class. We set the primary key and use merge disposition so if the same PDF comes twice
we'll just update the vectors, and not duplicate.
Look how we pipe data from `list_files` resource (note that resource is deselected so we do not load raw file items to destination) into `pdf_to_text` using **|** operator.
"""
import os
import dlt
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader
@@ -31,27 +55,31 @@ def pdf_to_text(file_item, separate_pages: bool = False):
yield page_item
pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")
if __name__ == "__main__":
pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")
# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)
# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)
# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"
# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"
# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
import weaviate
import weaviate
client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -5,4 +5,4 @@ api_key = ""
[sources.zendesk.credentials]
password = ""
subdomain = ""
email = ""
email = ""

View File

@@ -1,3 +1,33 @@
"""
---
title: Similarity Searching with Qdrant
description: Learn how to use the dlt source, Zendesk and dlt destination, Qdrant to conduct a similarity search on your tickets data.
keywords: [similarity search, example]
---
This article outlines a system to map vectorized ticket data from Zendesk to Qdrant, similar to our guide on the topic concerning [Weaviate](https://dlthub.com/docs/dlt-ecosystem/destinations/qdrant). In this example, we will:
- Connect to our [Zendesk source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/zendesk).
- Extract tickets data from our Zendesk source.
- [Create a dlt pipeline](https://dlthub.com/docs/walkthroughs/create-a-pipeline) with Qdrant as destination.
- Vectorize/embed the tickets data from Zendesk.
- Pass the vectorized data to be stored in Qdrant via the dlt pipeline.
- Query data that we stored in Qdrant.
- Explore the similarity search results.
First, configure the destination credentials for [Qdrant](https://dlthub.com/docs/dlt-ecosystem/destinations/qdrant#setup-guide) and [Zendesk](https://dlthub.com/docs/walkthroughs/zendesk-weaviate#configuration) in `.dlt/secrets.toml`.
Next, make sure you have the following dependencies installed:
```sh
pip install qdrant-client>=1.6.9
pip install fastembed>=0.1.1
```
"""
# NOTE: this line is only for dlt CI purposes, you may delete it if you are using this example
__source_name__ = "zendesk"
from typing import Optional, Dict, Any, Tuple
import dlt
@@ -148,6 +178,9 @@ if __name__ == "__main__":
print(load_info)
# make sure nothing failed
load_info.raise_on_failed_jobs()
# running the Qdrant client to connect to your Qdrant database
@with_config(sections=("destination", "qdrant", "credentials"))
@@ -169,3 +202,8 @@ if __name__ == "__main__":
query_text=["cancel", "cancel subscription"], # prompt to search
limit=3, # limit the number of results to the nearest 3 embeddings
)
assert len(response) <= 3 and len(response) > 0
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -13,4 +13,4 @@ workers=3
[load]
# have 50 concurrent load jobs
workers=50
workers=50

View File

@@ -1,11 +1,28 @@
"""
---
title: Pokemon details in parallel using transformers
description: Learn how to use dlt transformers and how to speed up your loads with parallelism
keywords: [transformers, parallelism, example]
---
For this example, we will be loading Pokemon data from the [PokeAPI](https://pokeapi.co/) with the help of transformers to load
Pokemon details in parallel.
We'll learn how to:
- create 2 [transformers](../general-usage/resource.md#feeding-data-from-one-resource-into-another) and connect them to a resource with the pipe operator `|`;
- [load these transformers in parallel](../reference/performance.md#parallelism) using the `@dlt.defer` decorator;
- [configure parallelism](../reference/performance.md#parallel-pipeline-config-example) in the `config.toml` file;
- deselect the main resource, so it will not be loaded into the database;
- importing and using a pre-configured `requests` library with automatic retries (`from dlt.sources.helpers import requests`).
"""
import dlt
from dlt.sources.helpers import requests
@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""
# note that we deselect `pokemon_list` - we do not want it to be loaded
@dlt.resource(write_disposition="replace", selected=False)
def pokemon_list():
@@ -55,3 +72,12 @@ if __name__ == "__main__":
# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)
# verify that all went well
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts
# make sure nothing failed
load_info.raise_on_failed_jobs()

View File

@@ -0,0 +1,76 @@
"""
Creates the pytest files for our examples tests. These will not be committed
"""
import os
import argparse
import dlt.cli.echo as fmt
EXAMPLES_DIR = "../examples"
# settings
SKIP_FOLDERS = ["archive", ".", "_", "local_cache"]
# the entry point for the script
MAIN_CLAUSE = 'if __name__ == "__main__":'
# some stuff to insert for setting up and tearing down fixtures
TEST_HEADER = """
from tests.utils import skipifgithubfork
"""
if __name__ == "__main__":
# setup cli
parser = argparse.ArgumentParser(
description="Prepares examples in docs/examples for testing.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-c", "--clear", help="Remove all generated test files", action="store_true"
)
# get args
args = parser.parse_args()
count = 0
for example in next(os.walk(EXAMPLES_DIR))[1]:
# skip some
if any(map(lambda skip: example.startswith(skip), SKIP_FOLDERS)):
continue
count += 1
example_file = f"{EXAMPLES_DIR}/{example}/{example}.py"
test_example_file = f"{EXAMPLES_DIR}/{example}/test_{example}.py"
if args.clear:
os.unlink(test_example_file)
continue
with open(example_file, "r", encoding="utf-8") as f:
lines = f.read().split("\n")
processed_lines = TEST_HEADER.split("\n")
main_clause_found = False
for line in lines:
# convert the main clause to a test function
if line.startswith(MAIN_CLAUSE):
main_clause_found = True
processed_lines.append("@skipifgithubfork")
processed_lines.append(f"def test_{example}():")
else:
processed_lines.append(line)
if not main_clause_found:
fmt.error(f"No main clause defined for example {example}")
exit(1)
with open(test_example_file, "w", encoding="utf-8") as f:
f.write("\n".join(processed_lines))
if args.clear:
fmt.note("Cleared generated test files.")
else:
fmt.note(f"Prepared {count} examples for testing.")

View File

@@ -1,21 +0,0 @@
import Admonition from "@theme/Admonition";
import CodeBlock from '@theme/CodeBlock';
<Admonition>
The source code for this example can be found in our repository at: <a href={"https://github.com/dlt-hub/dlt/tree/devel/docs/examples/" + props.slug}>{"https://github.com/dlt-hub/dlt/tree/devel/docs/examples/" + props.slug}</a>.
</Admonition>
## TLDR
<div>{props.intro}</div>
## Setup: Running this example on your machine
<CodeBlock language="sh">
{`# clone the dlt repository
git clone git@github.com:dlt-hub/dlt.git
# go to example directory
cd ./dlt/docs/examples/${props.slug}
# install dlt with ${props.destination}
pip install "dlt[${props.destination}]"
# run the example script
python ${props.run_file}.py`}
</CodeBlock>

View File

@@ -1,2 +0,0 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_END example

View File

@@ -1,171 +0,0 @@
from tests.utils import skipifgithubfork
@skipifgithubfork
def incremental_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START markdown_source
import threading
from typing import Any, Iterator
import dlt
from dlt.common import sleep
from dlt.common.typing import StrAny, TDataItems
from dlt.sources.helpers.requests import client
@dlt.source
def chess(
chess_url: str = dlt.config.value,
title: str = "GM",
max_players: int = 2,
year: int = 2022,
month: int = 10,
) -> Any:
def _get_data_with_retry(path: str) -> StrAny:
r = client.get(f"{chess_url}{path}")
return r.json() # type: ignore
@dlt.resource(write_disposition="replace")
def players() -> Iterator[TDataItems]:
# return players one by one, you could also return a list
# that would be faster but we want to pass players item by item to the transformer
yield from _get_data_with_retry(f"titled/{title}")["players"][:max_players]
# this resource takes data from players and returns profiles
# it uses `paralellized` flag to enable parallel run in thread pool.
@dlt.transformer(data_from=players, write_disposition="replace", parallelized=True)
def players_profiles(username: Any) -> TDataItems:
print(f"getting {username} profile via thread {threading.current_thread().name}")
sleep(1) # add some latency to show parallel runs
return _get_data_with_retry(f"player/{username}")
# this resource takes data from players and returns games for the last month
# if not specified otherwise
@dlt.transformer(data_from=players, write_disposition="append")
def players_games(username: Any) -> Iterator[TDataItems]:
# https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM}
path = f"player/{username}/games/{year:04d}/{month:02d}"
yield _get_data_with_retry(path)["games"]
return players(), players_profiles, players_games
# @@@DLT_SNIPPET_END markdown_source
# @@@DLT_SNIPPET_START markdown_retry_cm
from tenacity import (
Retrying,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)
from dlt.common import logger
from dlt.common.runtime.slack import send_slack_message
from dlt.pipeline.helpers import retry_load
MAX_PLAYERS = 5
def load_data_with_retry(pipeline, data):
try:
for attempt in Retrying(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1.5, min=4, max=10),
retry=retry_if_exception(retry_load(())),
reraise=True,
):
with attempt:
logger.info(
f"Running the pipeline, attempt={attempt.retry_state.attempt_number}"
)
load_info = pipeline.run(data)
logger.info(str(load_info))
# raise on failed jobs
load_info.raise_on_failed_jobs()
# send notification
send_slack_message(
pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!"
)
except Exception:
# we get here after all the failed retries
# send notification
send_slack_message(pipeline.runtime_config.slack_incoming_hook, "Something went wrong!")
raise
# we get here after a successful attempt
# see when load was started
logger.info(f"Pipeline was started: {load_info.started_at}")
# print the information on the first load package and all jobs inside
logger.info(f"First load package info: {load_info.load_packages[0]}")
# print the information on the first completed job in first load package
logger.info(
f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}"
)
# check for schema updates:
schema_updates = [p.schema_update for p in load_info.load_packages]
# send notifications if there are schema updates
if schema_updates:
# send notification
send_slack_message(pipeline.runtime_config.slack_incoming_hook, "Schema was updated!")
# To run simple tests with `sql_client`, such as checking table counts and
# warning if there is no data, you can use the `execute_query` method
with pipeline.sql_client() as client:
with client.execute_query("SELECT COUNT(*) FROM players") as cursor:
count = cursor.fetchone()[0]
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE
# To run simple tests with `normalize_info`, such as checking table counts and
# warning if there is no data, you can use the `row_counts` attribute.
normalize_info = pipeline.last_trace.last_normalize_info
count = normalize_info.row_counts.get("players", 0)
if count == 0:
logger.info("Warning: No data in players table")
else:
logger.info(f"Players table contains {count} rows")
assert count == MAX_PLAYERS # @@@DLT_REMOVE
# we reuse the pipeline instance below and load to the same dataset as data
logger.info("Saving the load info in the destination")
pipeline.run([load_info], table_name="_load_info")
assert "_load_info" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
# save trace to destination, sensitive data will be removed
logger.info("Saving the trace in the destination")
pipeline.run([pipeline.last_trace], table_name="_trace")
assert "_trace" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
# print all the new tables/columns in
for package in load_info.load_packages:
for table_name, table in package.schema_update.items():
logger.info(f"Table {table_name}: {table.get('description')}")
for column_name, column in table["columns"].items():
logger.info(f"\tcolumn {column_name}: {column['data_type']}")
# save the new tables and column schemas to the destination:
table_updates = [p.asdict()["tables"] for p in load_info.load_packages]
pipeline.run(table_updates, table_name="_new_tables")
assert "_new_tables" in pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
return load_info
# @@@DLT_SNIPPET_END markdown_retry_cm
# @@@DLT_SNIPPET_START markdown_pipeline
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# create dlt pipeline
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="chess_data",
)
# get data for a few famous players
data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS)
load_data_with_retry(pipeline, data)
# @@@DLT_SNIPPET_END markdown_pipeline
# @@@DLT_SNIPPET_END example

View File

@@ -1,47 +0,0 @@
---
title: Run chess pipeline in production
description: Learn how run chess pipeline in production
keywords: [incremental loading, example]
---
import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to investigate, track, retry and test your loads."
slug="chess_production"
run_file="chess"
destination="duckdb" />
## Run chess pipeline in production
In this example, you'll find a Python script that interacts with the Chess API to extract players and game data.
We'll learn how to:
- Inspecting packages after they have been loaded.
- Loading back load information, schema updates, and traces.
- Triggering notifications in case of schema evolution.
- Using context managers to independently retry pipeline stages.
- Run basic tests utilizing `sql_client` and `normalize_info`.
### Init chess source
<!--@@@DLT_SNIPPET ./code/chess-snippets.py::markdown_source-->
### Using context managers to retry pipeline stages separately
<!--@@@DLT_SNIPPET ./code/chess-snippets.py::markdown_retry_cm-->
:::warning
To run this example you need to provide Slack incoming hook in `.dlt/secrets.toml`:
```py
[runtime]
slack_incoming_hook="https://hooks.slack.com/services/***"
```
Read [Using Slack to send messages.](https://dlthub.com/docs/running-in-production/running#using-slack-to-send-messages)
:::
### Run the pipeline
<!--@@@DLT_SNIPPET ./code/chess-snippets.py::markdown_pipeline-->

View File

@@ -1,52 +0,0 @@
def connector_x_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START markdown_source
import connectorx as cx
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)
def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome
# @@@DLT_SNIPPET_END markdown_source
# @@@DLT_SNIPPET_START markdown_pipeline
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()
print(pipeline.run(genome))
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading
# @@@DLT_SNIPPET_END markdown_pipeline
# check that stuff was loaded # @@@DLT_REMOVE
row_counts = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
assert row_counts["genome"] == 1000 # @@@DLT_REMOVE
# @@@DLT_SNIPPET_END example

View File

@@ -1,44 +0,0 @@
---
title: Load mysql table with ConnectorX & Arrow
description: Load data from sql queries fast with connector x and arrow tables
keywords: [connector x, pyarrow, zero copy]
---
import Header from '../_examples-header.md';
<Header
intro="In this example, you will learn how to use arrow tables to load data from sql queries.
This method creates arrow tables in memory using Connector X and then loads them into destination
supporting parquet files without copying data.
"
slug="connector_x_arrow"
run_file="load_arrow"
destination="duckdb" />
## Load mysql table with ConnectorX and Arrow
Example script below takes genome data from public **mysql** instance and then loads it into **duckdb**. Mind that your destination
must support loading of parquet files as this is the format that `dlt` uses to save arrow tables. [Connector X](https://github.com/sfu-db/connector-x) allows to
get data from several popular databases and creates in memory Arrow table which `dlt` then saves to load package and loads to the destination.
:::tip
You can yield several tables if your data is large and you need to partition your load.
:::
We'll learn:
- How to get arrow tables from [connector X](https://github.com/sfu-db/connector-x) and yield them.
- That merge and incremental loads work with arrow tables.
- How to enable [incremental loading](../../general-usage/incremental-loading) for efficient data extraction.
- How to use build in ConnectionString credentials.
### Loading code
<!--@@@DLT_SNIPPET ./code/load_arrow-snippets.py::markdown_source-->
Run the pipeline:
<!--@@@DLT_SNIPPET ./code/load_arrow-snippets.py::markdown_pipeline-->

View File

@@ -1,2 +0,0 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_END example

View File

@@ -1,10 +0,0 @@
# @@@DLT_SNIPPET_START example
[destination.bigquery.credentials]
client_email = ""
private_key = ""
project_id = ""
token_uri = ""
refresh_token = ""
client_id = ""
client_secret = ""
# @@@DLT_SNIPPET_END example

View File

@@ -1,81 +0,0 @@
from tests.utils import skipifgithubfork
from tests.pipeline.utils import assert_load_info
@skipifgithubfork
def custom_destination_biquery_snippet() -> None:
# @@@DLT_SNIPPET_START example
import dlt
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery
from dlt.common.configuration.specs import GcpServiceAccountCredentials
# constants
OWID_DISASTERS_URL = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
# this table needs to be manually created in your gc account
# format: "your-project.your_dataset.your_table"
BIGQUERY_TABLE_ID = "chat-analytics-rasa-ci.ci_streaming_insert.natural-disasters"
# dlt sources
@dlt.resource(name="natural_disasters")
def resource(url: str):
# load pyarrow table with pandas
table = pa.Table.from_pandas(pd.read_csv(url))
# we add a list type column to demontrate bigquery lists
table = table.append_column(
"tags",
pa.array(
[["disasters", "earthquakes", "floods", "tsunamis"]] * len(table),
pa.list_(pa.string()),
),
)
# we add a struct type column to demonstrate bigquery structs
table = table.append_column(
"meta",
pa.array(
[{"loaded_by": "dlt"}] * len(table),
pa.struct([("loaded_by", pa.string())]),
),
)
yield table
# dlt biquery custom destination
# we can use the dlt provided credentials class
# to retrieve the gcp credentials from the secrets
@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0)
def bigquery_insert(
items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value
) -> None:
client = bigquery.Client(
credentials.project_id, credentials.to_native_credentials(), location="US"
)
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.PARQUET,
schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
)
# since we have set the batch_size to 0, we get a filepath and can load the file directly
with open(items, "rb") as f:
load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config)
load_job.result() # Waits for the job to complete.
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# run the pipeline and print load results
pipeline = dlt.pipeline(
pipeline_name="csv_to_bigquery_insert",
destination=bigquery_insert,
dataset_name="mydata",
full_refresh=True,
)
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))
print(load_info)
# @@@DLT_SNIPPET_END example
assert_load_info(load_info)

View File

@@ -1,30 +0,0 @@
---
title: Custom destination with BigQuery
description: Learn how use the custom destination to load to bigquery and use credentials
keywords: [destination, credentials, example, bigquery, custom destination]
---
import Header from '../_examples-header.md';
<Header
intro="This example demonstrates how to use the custom destination to load to BigQuery with automatic schema inference."
slug="custom_destination_bigquery"
run_file="custom_destination_bigquery"
destination="biqquery"/>
## Custom destination BigQuery pipeline
In this example, you'll find a Python script that demonstrates how to load Google Sheets data using the `dlt` library.
We'll learn how to:
- use [built-in credentials](../../general-usage/credentials/config_specs#gcp-credentials)
- use the [custom destination](../../dlt-ecosystem/destinations/destination.md)
- Use pyarrow tables to create complex column types on bigquery
- Use bigquery autodetect=True for schema inference from parquet files
### Your bigquery credentials in secrets.toml
<!--@@@DLT_SNIPPET code/.dlt/example.secrets.toml::example-->
### Pipeline code
<!--@@@DLT_SNIPPET code/custom_destination_bigquery-snippets.py::example-->

View File

@@ -1,2 +0,0 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_END example

View File

@@ -1,18 +0,0 @@
# @@@DLT_SNIPPET_START example
# you can just paste services.json as credentials
[sources.google_sheets]
credentials='''
{
"type": "set me up!",
"project_id": "set me up!",
"private_key_id": "set me up!",
"private_key": "set me up!",
"client_email": "set me up!",
"client_id": "set me up!",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "set me up!"
}
'''
# @@@DLT_SNIPPET_END example

View File

@@ -1,88 +0,0 @@
from tests.utils import skipifgithubfork
__source_name__ = "google_sheets"
@skipifgithubfork
def google_sheets_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START google_sheets
from typing import Any, Iterator, Sequence, Union, cast
from googleapiclient.discovery import build
import dlt
from dlt.common.configuration.specs import (
GcpOAuthCredentials,
GcpServiceAccountCredentials,
)
from dlt.common.typing import DictStrAny, StrAny
def _initialize_sheets(
credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials]
) -> Any:
# Build the service object.
service = build("sheets", "v4", credentials=credentials.to_native_credentials())
return service
@dlt.source
def google_spreadsheet(
spreadsheet_id: str,
sheet_names: Sequence[str],
credentials: Union[
GcpServiceAccountCredentials, GcpOAuthCredentials, str, StrAny
] = dlt.secrets.value,
) -> Any:
sheets = _initialize_sheets(cast(GcpServiceAccountCredentials, credentials))
def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
# get list of list of typed values
result = (
sheets.spreadsheets()
.values()
.get(
spreadsheetId=spreadsheet_id,
range=sheet_name,
# unformatted returns typed values
valueRenderOption="UNFORMATTED_VALUE",
# will return formatted dates
dateTimeRenderOption="FORMATTED_STRING",
)
.execute()
)
# pprint.pprint(result)
values = result.get("values")
# yield dicts assuming row 0 contains headers and following rows values and all rows have identical length
for v in values[1:]:
yield {h: v for h, v in zip(values[0], v)}
# create resources from supplied sheet names
return [
dlt.resource(get_sheet(name), name=name, write_disposition="replace")
for name in sheet_names
]
# @@@DLT_SNIPPET_END google_sheets
# @@@DLT_SNIPPET_START google_sheets_run
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
# see example.secrets.toml to where to put credentials
sheet_id = "1HhWHjqouQnnCIZAFa2rL6vT91YRN8aIhts22SUUR580"
range_names = ["hidden_columns_merged_cells", "Blank Columns"]
# "2022-05", "model_metadata"
info = pipeline.run(
google_spreadsheet(
spreadsheet_id=sheet_id,
sheet_names=range_names,
)
)
print(info)
# @@@DLT_SNIPPET_END google_sheets_run
# @@@DLT_SNIPPET_END example
row_counts = pipeline.last_trace.last_normalize_info.row_counts
print(row_counts.keys())
assert row_counts["hidden_columns_merged_cells"] == 7
assert row_counts["blank_columns"] == 21

View File

@@ -1,42 +0,0 @@
---
title: Google Sheets minimal example
description: Learn how work with Google services
keywords: [google sheets, credentials, example]
---
import Header from '../_examples-header.md';
<Header
intro="This example demonstrates how to load Google Sheets data using Python and the dlt library. It covers working with Google API, using built in credentials, using union of credentials, and creating dynamically generated resources."
slug="google_sheets"
run_file="google_sheets"
destination="duckdb"/>
## Google Sheets data pipeline
In this example, you'll find a Python script that demonstrates how to load Google Sheets data using the `dlt` library.
We'll learn how to:
- use [built-in credentials](../../general-usage/credentials/config_specs#gcp-credentials);
- use [union of credentials](../../general-usage/credentials/config_specs#working-with-alternatives-of-credentials-union-types);
- create [dynamically generated resources](../../general-usage/source#create-resources-dynamically).
:::tip
This example is for educational purposes. For best practices, we recommend using [Google Sheets verified source](../../dlt-ecosystem/verified-sources/google_sheets.md).
:::
### Install Google client library
```sh
pip install google-api-python-client
```
### Loading code
<!--@@@DLT_SNIPPET code/google_sheets-snippets.py::google_sheets-->
### Run the pipeline
<!--@@@DLT_SNIPPET code/google_sheets-snippets.py::google_sheets_run-->

View File

@@ -1,2 +0,0 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_END example

View File

@@ -1,6 +0,0 @@
# @@@DLT_SNIPPET_START example
[sources.zendesk.credentials]
password = ""
subdomain = ""
email = ""
# @@@DLT_SNIPPET_END example

View File

@@ -1,143 +0,0 @@
from tests.utils import skipifgithubfork
# because the example below uses credentials and it is copied to the module zendesk.py
# we force the same config section name
__source_name__ = "zendesk"
@skipifgithubfork
def incremental_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START markdown_source
from typing import Optional, Dict, Any, Tuple
import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
end_date: Optional[TAnyDateTime] = None,
):
"""
Retrieves data from Zendesk Support for tickets events.
Args:
credentials: Zendesk credentials (default: dlt.secrets.value)
start_date: Start date for data extraction (default: 2000-01-01)
end_date: End date for data extraction (default: None).
If end time is not provided, the incremental loading will be
enabled, and after the initial run, only new data will be retrieved.
Returns:
DltResource.
"""
# Convert start_date and end_date to Pendulum datetime objects
start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
# Convert Pendulum datetime objects to Unix timestamps
start_date_ts = start_date_obj.int_timestamp
end_date_ts: Optional[int] = None
if end_date_obj:
end_date_ts = end_date_obj.int_timestamp
# Extract credentials from secrets dictionary
auth = (credentials["email"], credentials["password"])
subdomain = credentials["subdomain"]
url = f"https://{subdomain}.zendesk.com"
# we use `append` write disposition, because objects in ticket_events endpoint are never updated
# so we do not need to merge
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
# when two events have the same timestamp
@dlt.resource(primary_key="id", write_disposition="append")
def ticket_events(
timestamp: dlt.sources.incremental[int] = dlt.sources.incremental(
"timestamp",
initial_value=start_date_ts,
end_value=end_date_ts,
allow_external_schedulers=True,
),
):
# URL For ticket events
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/ticket_events.json?start_time=946684800'
event_pages = get_pages(
url=url,
endpoint="/api/v2/incremental/ticket_events.json",
auth=auth,
data_point_name="ticket_events",
params={"start_time": timestamp.last_value},
)
for page in event_pages:
yield page
# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if timestamp.end_out_of_range:
return
return ticket_events
# @@@DLT_SNIPPET_END markdown_source
def get_pages(
url: str,
endpoint: str,
auth: Tuple[str, str],
data_point_name: str,
params: Optional[Dict[str, Any]] = None,
):
"""
Makes a request to a paginated endpoint and returns a generator of data items per page.
Args:
url: The base URL.
endpoint: The url to the endpoint, e.g. /api/v2/calls
auth: Credentials for authentication.
data_point_name: The key which data items are nested under in the response object (e.g. calls)
params: Optional dict of query params to include in the request.
Returns:
Generator of pages, each page is a list of dict data items.
"""
# update the page size to enable cursor pagination
params = params or {}
params["per_page"] = 1000
headers = None
# make request and keep looping until there is no next page
get_url = f"{url}{endpoint}"
while get_url:
response = client.get(get_url, headers=headers, auth=auth, params=params)
response.raise_for_status()
response_json = response.json()
result = response_json[data_point_name]
yield result
get_url = None
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]
# @@@DLT_SNIPPET_START markdown_pipeline
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# create dlt pipeline
pipeline = dlt.pipeline(
pipeline_name="zendesk", destination="duckdb", dataset_name="zendesk_data"
)
load_info = pipeline.run(zendesk_support())
print(load_info)
# @@@DLT_SNIPPET_END markdown_pipeline
# @@@DLT_SNIPPET_END example
# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["ticket_events"] == 17

View File

@@ -1,42 +0,0 @@
---
title: Load Zendesk tickets incrementally
description: Learn how do incremental loading in consecutive runs
keywords: [incremental loading, example]
---
import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to do incremental loading in consecutive runs with dlt.
The state of your incremental loads will be persisted in
your selected destination and restored and used on each new load,
making it very easy to keep your loaded dataset up to date with the source."
slug="incremental_loading"
run_file="zendesk"
destination="duckdb" />
## Incremental loading with the Zendesk API
In this example, you'll find a Python script that interacts with the Zendesk Support API to extract ticket events data.
We'll learn:
- How to pass [credentials](../../general-usage/credentials) as dict and how to type the `@dlt.source` function arguments.
- How to set [the nesting level](../../general-usage/source#reduce-the-nesting-level-of-generated-tables).
- How to enable [incremental loading](../../general-usage/incremental-loading) for efficient data extraction.
- How to specify [the start and end dates](../../general-usage/incremental-loading#using-dltsourcesincremental-for-backfill) for the data loading and how to [opt-in to Airflow scheduler](../../general-usage/incremental-loading#using-airflow-schedule-for-backfill-and-incremental-loading) by setting `allow_external_schedulers` to `True`.
- How to work with timestamps, specifically converting them to Unix timestamps for incremental data extraction.
- How to use the `start_time` parameter in API requests to retrieve data starting from a specific timestamp.
### Loading code
<!--@@@DLT_SNIPPET ./code/zendesk-snippets.py::markdown_source-->
Run the pipeline:
<!--@@@DLT_SNIPPET ./code/zendesk-snippets.py::markdown_pipeline-->

View File

@@ -1,2 +0,0 @@
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_END example

View File

@@ -1,4 +0,0 @@
# @@@DLT_SNIPPET_START example
[sources.mongodb]
connection_url=""
# @@@DLT_SNIPPET_END example

View File

@@ -1,156 +0,0 @@
from tests.utils import skipifgithubfork
__source_name__ = "mongodb"
@skipifgithubfork
def nested_data_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START nested_data
from itertools import islice
from typing import Any, Dict, Iterator, Optional
from bson.decimal128 import Decimal128
from bson.objectid import ObjectId
from pendulum import _datetime
from pymongo import MongoClient
import dlt
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TDataItem
from dlt.common.utils import map_nested_in_place
CHUNK_SIZE = 10000
# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
# In this example, we specify that we only want to generate child tables up to level 2,
# so there will be only one level of child tables within child tables.
@dlt.source(max_table_nesting=2)
def mongodb_collection(
connection_url: str = dlt.secrets.value,
database: Optional[str] = dlt.config.value,
collection: str = dlt.config.value,
incremental: Optional[dlt.sources.incremental] = None, # type: ignore[type-arg]
write_disposition: Optional[str] = dlt.config.value,
) -> Any:
# set up mongo client
client: Any = MongoClient(connection_url, uuidRepresentation="standard", tz_aware=True)
mongo_database = client.get_default_database() if not database else client[database]
collection_obj = mongo_database[collection]
def collection_documents(
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> Iterator[TDataItem]:
LoaderClass = CollectionLoader
loader = LoaderClass(client, collection, incremental=incremental)
yield from loader.load_documents()
return dlt.resource( # type: ignore
collection_documents,
name=collection_obj.name,
primary_key="_id",
write_disposition=write_disposition,
)(client, collection_obj, incremental=incremental)
# @@@DLT_SNIPPET_END nested_data
class CollectionLoader:
def __init__(
self,
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> None:
self.client = client
self.collection = collection
self.incremental = incremental
if incremental:
self.cursor_field = incremental.cursor_path
self.last_value = incremental.last_value
else:
self.cursor_column = None
self.last_value = None
@property
def _filter_op(self) -> Dict[str, Any]:
if not self.incremental or not self.last_value:
return {}
if self.incremental.last_value_func is max:
return {self.cursor_field: {"$gte": self.last_value}}
elif self.incremental.last_value_func is min:
return {self.cursor_field: {"$lt": self.last_value}}
return {}
def load_documents(self) -> Iterator[TDataItem]:
cursor = self.collection.find(self._filter_op)
while docs_slice := list(islice(cursor, CHUNK_SIZE)):
yield map_nested_in_place(convert_mongo_objs, docs_slice)
def convert_mongo_objs(value: Any) -> Any:
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
if isinstance(value, _datetime.datetime):
return ensure_pendulum_datetime(value)
return value
# @@@DLT_SNIPPET_START nested_data_run
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# When we created the source, we set max_table_nesting to 2.
# This ensures that the generated tables do not have more than two
# levels of nesting, even if the original data structure is more deeply nested.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
tables.pop("_dlt_pipeline_state") # @@@DLT_REMOVE
assert len(tables) == 7, pipeline.last_trace.last_normalize_info # @@@DLT_REMOVE
# The second method involves setting the max_table_nesting attribute directly
# on the source data object.
# This allows for dynamic control over the maximum nesting
# level for a specific data source.
# Here the nesting level is adjusted before running the pipeline.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="not_unpacked_data",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
source_data.max_table_nesting = 0
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
tables.pop("_dlt_pipeline_state") # @@@DLT_REMOVE
assert len(tables) == 1, pipeline.last_trace.last_normalize_info # @@@DLT_REMOVE
# The third method involves applying data type hints to specific columns in the data.
# In this case, we tell dlt that column 'cast' (containing a list of actors)
# in 'movies' table should have type complex which means
# that it will be loaded as JSON/struct and not as child table.
pipeline = dlt.pipeline(
pipeline_name="mongodb_pipeline",
destination="duckdb",
dataset_name="unpacked_data_without_cast",
)
source_data = mongodb_collection(collection="movies", write_disposition="replace")
source_data.movies.apply_hints(columns={"cast": {"data_type": "complex"}})
load_info = pipeline.run(source_data)
print(load_info)
tables = pipeline.last_trace.last_normalize_info.row_counts # @@@DLT_REMOVE
tables.pop("_dlt_pipeline_state") # @@@DLT_REMOVE
assert len(tables) == 6, pipeline.last_trace.last_normalize_info # @@@DLT_REMOVE
# @@@DLT_SNIPPET_END nested_data_run
# @@@DLT_SNIPPET_END example

View File

@@ -1,41 +0,0 @@
---
title: Control nested MongoDB data
description: Learn how control nested data
keywords: [incremental loading, example]
---
import Header from '../_examples-header.md';
<Header
intro="This example demonstrates how to control nested data using Python and the dlt library. It covers working with MongoDB, incremental loading, limiting nesting levels, and applying data type hints."
slug="nested_data"
run_file="nested_data"
destination="duckdb"/>
## Control nested data
In this example, you'll find a Python script that demonstrates how to control nested data using the `dlt` library.
We'll learn how to:
- [Adjust maximum nesting level in three ways:](../../general-usage/source#reduce-the-nesting-level-of-generated-tables)
- Limit nesting levels with dlt decorator.
- Dynamic nesting level adjustment.
- Apply data type hints.
- Work with [MongoDB](../../dlt-ecosystem/verified-sources/mongodb) in Python and `dlt`.
- Enable [incremental loading](../../general-usage/incremental-loading) for efficient data extraction.
### Install pymongo
```sh
pip install pymongo>=4.3.3
```
### Loading code
<!--@@@DLT_SNIPPET code/nested_data-snippets.py::nested_data-->
### Run the pipeline
<!--@@@DLT_SNIPPET code/nested_data-snippets.py::nested_data_run-->

View File

@@ -1,67 +0,0 @@
from tests.pipeline.utils import assert_load_info
from tests.utils import skipifgithubfork
@skipifgithubfork
def pdf_to_weaviate_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START pdf_to_weaviate
import os
import dlt
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader
@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path),
}
@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
raise NotImplementedError()
# extract data from PDF page by page
reader = PdfReader(file_item["file_path"])
for page_no in range(len(reader.pages)):
# add page content to file item
page_item = dict(file_item)
page_item["text"] = reader.pages[page_no].extract_text()
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item
pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")
# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)
# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"
# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text"))
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
# @@@DLT_SNIPPET_END pdf_to_weaviate
# @@@DLT_SNIPPET_START pdf_to_weaviate_read
import weaviate
client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
# @@@DLT_SNIPPET_END pdf_to_weaviate_read
# @@@DLT_SNIPPET_END example
assert_load_info(load_info)

View File

@@ -1,56 +0,0 @@
---
title: Load PDFs to Weaviate
description: Extract text from PDF and load it into a vector database
keywords: [pdf, weaviate, vector store, vector database, ]
---
import Header from '../_examples-header.md';
<Header
intro="This example demonstrates how to extract text from PDF files and load it into Weaviate vector database."
slug="pdf_to_weaviate"
destination="weaviate"
run_file="pdf_to_weaviate" />
Additionally we'll use PyPDF2 to extract text from PDFs. Make sure you have it installed:
```sh
pip install PyPDF2
```
## Example code
<!--@@@DLT_SNIPPET ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate -->
We start with a simple resource that lists files in specified folder. To that we add a **filter** function that removes all files that are not pdfs.
To parse PDFs we use [PyPDF](https://pypdf2.readthedocs.io/en/3.0.0/user/extract-text.html) and return each page from a given PDF as separate data item.
Parsing happens in `@dlt.transformer` which receives data from `list_files` resource. It splits PDF into pages, extracts text and yields pages separately
so each PDF will correspond to many items in Weaviate `InvoiceText` class. We set the primary key and use merge disposition so if the same PDF comes twice
we'll just update the vectors, and not duplicate.
Look how we pipe data from `list_files` resource (note that resource is deselected so we do not load raw file items to destination) into `pdf_to_text` using **|** operator.
Just before load, the `weaviate_adapter` is used to tell `weaviate` destination which fields to vectorize.
Now it is time to query our documents.
<!--@@@DLT_SNIPPET ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate_read-->
Above we provide URL to local cluster. We also use `contextionary` to vectorize data. You may find information on our setup in links below.
:::tip
Change the destination to `duckdb` if you do not have access to Weaviate cluster or not able to run it locally.
:::
Learn more:
- [Setup Weaviate destination - local or cluster](dlt-ecosystem/destinations/weaviate.md).
- [Connect the transformers to the resources](general-usage/resource#feeding-data-from-one-resource-into-another)
to load additional data or enrich it.
- [Transform your data before loading](general-usage/resource#customize-resources) and see some
[examples of customizations like column renames and anonymization](general-usage/customising-pipelines/renaming_columns).

View File

@@ -1,10 +0,0 @@
# @@@DLT_SNIPPET_START example
[destination.qdrant.credentials]
location = ""
api_key = ""
[sources.zendesk.credentials]
password = ""
subdomain = ""
email = ""
# @@@DLT_SNIPPET_END example

View File

@@ -1,191 +0,0 @@
from tests.utils import skipifgithubfork
__source_name__ = "zendesk"
@skipifgithubfork
def qdrant_snippet():
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START zendesk_conn
from typing import Optional, Dict, Any, Tuple
import dlt
from dlt.common import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client
from dlt.destinations.adapters import qdrant_adapter
from qdrant_client import QdrantClient
from dlt.common.configuration.inject import with_config
# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
end_date: Optional[TAnyDateTime] = None,
):
"""
Retrieves data from Zendesk Support for tickets events.
Args:
credentials: Zendesk credentials (default: dlt.secrets.value)
start_date: Start date for data extraction (default: 2000-01-01)
end_date: End date for data extraction (default: None).
If end time is not provided, the incremental loading will be
enabled, and after the initial run, only new data will be retrieved.
Returns:
DltResource.
"""
# Convert start_date and end_date to Pendulum datetime objects
start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
# Extract credentials from secrets dictionary
auth = (credentials["email"], credentials["password"])
subdomain = credentials["subdomain"]
url = f"https://{subdomain}.zendesk.com"
# we use `append` write disposition, because objects in tickets_data endpoint are never updated
# so we do not need to merge
# we set primary_key so allow deduplication of events by the `incremental` below in the rare case
# when two events have the same timestamp
@dlt.resource(primary_key="id", write_disposition="append")
def tickets_data(
updated_at: dlt.sources.incremental[pendulum.DateTime] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
)
):
# URL For ticket events
# 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800'
event_pages = get_pages(
url=url,
endpoint="/api/v2/incremental/tickets",
auth=auth,
data_point_name="tickets",
params={"start_time": updated_at.last_value.int_timestamp},
)
for page in event_pages:
yield ([_fix_date(ticket) for ticket in page])
# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if updated_at.end_out_of_range:
return
return tickets_data
# @@@DLT_SNIPPET_END zendesk_conn
# helper function to fix the datetime format
def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]:
if not value:
return None
return ensure_pendulum_datetime(value)
# modify dates to return datetime objects instead
def _fix_date(ticket):
ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"])
ticket["created_at"] = _parse_date_or_none(ticket["created_at"])
ticket["due_at"] = _parse_date_or_none(ticket["due_at"])
return ticket
# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
def get_pages(
url: str,
endpoint: str,
auth: Tuple[str, str],
data_point_name: str,
params: Optional[Dict[str, Any]] = None,
):
"""
Makes a request to a paginated endpoint and returns a generator of data items per page.
Args:
url: The base URL.
endpoint: The url to the endpoint, e.g. /api/v2/calls
auth: Credentials for authentication.
data_point_name: The key which data items are nested under in the response object (e.g. calls)
params: Optional dict of query params to include in the request.
Returns:
Generator of pages, each page is a list of dict data items.
"""
# update the page size to enable cursor pagination
params = params or {}
params["per_page"] = 1000
headers = None
# make request and keep looping until there is no next page
get_url = f"{url}{endpoint}"
while get_url:
response = client.get(get_url, headers=headers, auth=auth, params=params)
response.raise_for_status()
response_json = response.json()
result = response_json[data_point_name]
yield result
get_url = None
# See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]
# @@@DLT_SNIPPET_START main_code
__name__ = "__main__" # @@@DLT_REMOVE
if __name__ == "__main__":
# create a pipeline with an appropriate name
pipeline = dlt.pipeline(
pipeline_name="qdrant_zendesk_pipeline",
destination="qdrant",
dataset_name="zendesk_data",
)
# run the dlt pipeline and save info about the load process
load_info = pipeline.run(
# here we use a special function to tell Qdrant which fields to embed
qdrant_adapter(
zendesk_support(), # retrieve tickets data
embed=["subject", "description"],
)
)
print(load_info)
# @@@DLT_SNIPPET_END main_code
# @@@DLT_SNIPPET_START declare_qdrant_client
# running the Qdrant client to connect to your Qdrant database
@with_config(sections=("destination", "qdrant", "credentials"))
def get_qdrant_client(location=dlt.secrets.value, api_key=dlt.secrets.value):
return QdrantClient(
url=location,
api_key=api_key,
)
# running the Qdrant client to connect to your Qdrant database
qdrant_client = get_qdrant_client()
# view Qdrant collections you'll find your dataset here:
print(qdrant_client.get_collections())
# @@@DLT_SNIPPET_END declare_qdrant_client
# @@@DLT_SNIPPET_START get_response
# query Qdrant with prompt: getting tickets info close to "cancellation"
response = qdrant_client.query(
"zendesk_data_content", # collection/dataset name with the 'content' suffix -> tickets content table
query_text=["cancel", "cancel subscription"], # prompt to search
limit=3, # limit the number of results to the nearest 3 embeddings
)
# @@@DLT_SNIPPET_END get_response
assert len(response) <= 3 and len(response) > 0 # @@@DLT_REMOVE
# @@@DLT_SNIPPET_END example

View File

@@ -1,87 +0,0 @@
---
title: Similarity Searching with Qdrant
description: Learn how to use the dlt source, Zendesk and dlt destination, Qdrant to conduct a similarity search on your tickets data.
keywords: [similarity search, example]
---
import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to do use dlt to store your
vectorized Zendesk tickets data in the dlt destination: Qdrant. You can
use Qdrant's vectorization and similarity searching capabilities on your tickets data,
while using dlt as a medium to automate your pipeline."
slug="qdrant_zendesk"
run_file="qdrant"
destination="qdrant"
/>
This article outlines a system to map vectorized ticket data from Zendesk to Qdrant, similar to our guide on the topic concerning [Weaviate](https://dlthub.com/docs/dlt-ecosystem/destinations/qdrant). In this example, we will:
- Connect to our [Zendesk source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/zendesk).
- Extract tickets data from our Zendesk source.
- [Create a dlt pipeline](https://dlthub.com/docs/walkthroughs/create-a-pipeline) with Qdrant as destination.
- Vectorize/embed the tickets data from Zendesk.
- Pass the vectorized data to be stored in Qdrant via the dlt pipeline.
- Query data that we stored in Qdrant.
- Explore the similarity search results.
First, configure the destination credentials for [Qdrant](https://dlthub.com/docs/dlt-ecosystem/destinations/qdrant#setup-guide) and [Zendesk](https://dlthub.com/docs/walkthroughs/zendesk-weaviate#configuration) in `.dlt/secrets.toml`.
Next, make sure you have the following dependencies installed:
```sh
pip install qdrant-client>=1.6.9
pip install fastembed>=0.1.1
```
## Connect to Zendesk and load tickets data
<!--@@@DLT_SNIPPET ./code/qdrant-snippets.py::zendesk_conn-->
## Inititating a pipeline with Qdrant
<!--@@@DLT_SNIPPET ./code/qdrant-snippets.py::main_code-->
## Querying the data
<!--@@@DLT_SNIPPET ./code/qdrant-snippets.py::declare_qdrant_client-->
<!--@@@DLT_SNIPPET ./code/qdrant-snippets.py::get_response-->
The query above gives stores the following results in the `response` variable:
```py
[QueryResponse(id='6aeacd21-b3d0-5174-97ef-5aaa59486414', embedding=None, metadata={'_dlt_id': 'Nx3wBiL29xTgaQ', '_dlt_load_id': '1700130284.002391', 'allow_attachments': True, 'allow_channelback': False, 'assignee_id': 12765072569105, 'brand_id': 12765073054225, 'created_at': '2023-09-01T11:19:25+00:00', 'custom_status_id': 12765028278545, 'description': 'I have been trying to cancel my subscription but the system wont let me do it. Can you please help?', 'from_messaging_channel': False, 'generated_timestamp': 1693567167, 'group_id': 12765036328465, 'has_incidents': False, 'id': 12, 'is_public': True, 'organization_id': 12765041119505, 'raw_subject': 'Unable to Cancel Subscription', 'requester_id': 12765072569105, 'status': 'open', 'subject': 'Unable to Cancel Subscription', 'submitter_id': 12765072569105, 'tags': ['test1'], 'test_field': 'test1', 'ticket_form_id': 12765054772497, 'updated_at': '2023-09-01T11:19:25+00:00', 'url': 'https://d3v-dlthub.zendesk.com/api/v2/tickets/12.json', 'via__channel': 'web'}, document='', score=0.89545774),
QueryResponse(id='a22189c1-70ab-5421-938b-1caae3e7d6d8', embedding=None, metadata={'_dlt_id': 'bc/xloksL89EUg', '_dlt_load_id': '1700130284.002391', 'allow_attachments': True, 'allow_channelback': False, 'assignee_id': 12765072569105, 'brand_id': 12765073054225, 'created_at': '2023-07-18T17:23:42+00:00', 'custom_status_id': 12765028278545, 'description': 'ABCDEF', 'from_messaging_channel': False, 'generated_timestamp': 1689701023, 'group_id': 12765036328465, 'has_incidents': False, 'id': 4, 'is_public': True, 'organization_id': 12765041119505, 'raw_subject': 'What is this ticket', 'requester_id': 12765072569105, 'status': 'open', 'subject': 'What is this ticket', 'submitter_id': 12765072569105, 'tags': ['test1'], 'test_field': 'test1', 'ticket_form_id': 12765054772497, 'updated_at': '2023-07-18T17:23:42+00:00', 'url': 'https://d3v-dlthub.zendesk.com/api/v2/tickets/4.json', 'via__channel': 'web'}, document='', score=0.8643349),
QueryResponse(id='ce2f1c5c-41c3-56c3-a31d-2399a7a9239d', embedding=None, metadata={'_dlt_id': 'ZMuFJZo0AJxV4A', '_dlt_load_id': '1700130284.002391', 'allow_attachments': True, 'allow_channelback': False, 'assignee_id': 12765072569105, 'brand_id': 12765073054225, 'created_at': '2023-03-14T10:52:28+00:00', 'custom_status_id': 12765028278545, 'description': 'X', 'from_messaging_channel': False, 'generated_timestamp': 1696163084, 'group_id': 12765036328465, 'has_incidents': False, 'id': 2, 'is_public': True, 'priority': 'high', 'raw_subject': 'SCRUBBED', 'requester_id': 13726460510097, 'status': 'deleted', 'subject': 'SCRUBBED', 'submitter_id': 12765072569105, 'tags': [], 'ticket_form_id': 13726337882769, 'type': 'question', 'updated_at': '2023-09-01T12:10:35+00:00', 'url': 'https://d3v-dlthub.zendesk.com/api/v2/tickets/2.json', 'via__channel': 'web'}, document='', score=0.8467072)]
```
To get a closer look at what the Zendesk ticket was, and how dlt dealt with it, we can index into the metadata of the first `QueryResponse` object:
```py
{'_dlt_id': 'Nx3wBiL29xTgaQ',
'_dlt_load_id': '1700130284.002391',
'allow_attachments': True,
'allow_channelback': False,
'assignee_id': 12765072569105,
'brand_id': 12765073054225,
'created_at': '2023-09-01T11:19:25+00:00',
'custom_status_id': 12765028278545,
'description': 'I have been trying to cancel my subscription but the system wont let me do it. Can you please help?',
'from_messaging_channel': False,
'generated_timestamp': 1693567167,
'group_id': 12765036328465,
'has_incidents': False,
'id': 12,
'is_public': True,
'organization_id': 12765041119505,
'raw_subject': 'Unable to Cancel Subscription',
'requester_id': 12765072569105,
'status': 'open',
'subject': 'Unable to Cancel Subscription',
'submitter_id': 12765072569105,
'tags': ['test1'],
'test_field': 'test1',
'ticket_form_id': 12765054772497,
'updated_at': '2023-09-01T11:19:25+00:00',
'url': 'https://d3v-dlthub.zendesk.com/api/v2/tickets/12.json',
'via__channel': 'web'}
```

View File

@@ -1,18 +0,0 @@
# @@@DLT_SNIPPET_START example
[runtime]
log_level="WARNING"
[extract]
# use 2 workers to extract sources in parallel
worker=2
# allow 10 async items to be processed in parallel
max_parallel_items=10
[normalize]
# use 3 worker processes to process 3 files in parallel
workers=3
[load]
# have 50 concurrent load jobs
workers=50
# @@@DLT_SNIPPET_END example

View File

@@ -1,72 +0,0 @@
def transformers_snippet() -> None:
# @@@DLT_SNIPPET_START example
import dlt
from dlt.sources.helpers import requests
@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""
# note that we deselect `pokemon_list` - we do not want it to be loaded
@dlt.resource(write_disposition="replace", selected=False)
def pokemon_list():
"""Retrieve a first page of Pokemons and yield it. We do not retrieve all the pages in this example"""
yield requests.get(pokemon_api_url).json()["results"]
# transformer that retrieves a list of objects in parallel
@dlt.transformer
def pokemon(pokemons):
"""Yields details for a list of `pokemons`"""
# @dlt.defer marks a function to be executed in parallel
# in a thread pool
@dlt.defer
def _get_pokemon(_pokemon):
return requests.get(_pokemon["url"]).json()
# call and yield the function result normally, the @dlt.defer takes care of parallelism
for _pokemon in pokemons:
yield _get_pokemon(_pokemon)
# a special case where just one item is retrieved in transformer
# a whole transformer may be marked for parallel execution
@dlt.transformer(parallelized=True)
def species(pokemon_details):
"""Yields species details for a pokemon"""
species_data = requests.get(pokemon_details["species"]["url"]).json()
# link back to pokemon so we have a relation in loaded data
species_data["pokemon_id"] = pokemon_details["id"]
# You can return the result instead of yield since the transformer only generates one result
return species_data
# create two simple pipelines with | operator
# 1. send list of pokemons into `pokemon` transformer to get pokemon details
# 2. send pokemon details into `species` transformer to get species details
# NOTE: dlt is smart enough to get data from pokemon_list and pokemon details once
return (pokemon_list | pokemon, pokemon_list | pokemon | species)
if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)
# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)
# @@@DLT_SNIPPET_END example
# Run without __main__
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)
# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
# test assertions
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts

View File

@@ -1,37 +0,0 @@
---
title: Pokemon details in parallel using transformers
description: Learn how to use dlt transformers and how to speed up your loads with parallelism
keywords: [transformers, parallelism, example]
---
import Header from '../_examples-header.md';
<Header
intro="In this example, you will learn how to load a list of Pokemon from the PokeAPI and with the help of dlt transformers
automatically query additional data per retrieved Pokemon. You will also learn how to harness parallelism with a thread pool."
slug="transformers"
run_file="pokemon"
destination="duckdb"/>
## Using transformers with the Pokemon API
For this example, we will be loading Pokemon data from the [PokeAPI](https://pokeapi.co/) with the help of transformers to load
Pokemon details in parallel.
We'll learn how to:
- create 2 [transformers](../../general-usage/resource.md#feeding-data-from-one-resource-into-another) and connect them to a resource with the pipe operator `|`;
- [load these transformers in parallel](../../reference/performance.md#parallelism) using the `@dlt.defer` decorator;
- [configure parallelism](../../reference/performance.md#parallel-pipeline-config-example) in the `config.toml` file;
- deselect the main resource, so it will not be loaded into the database;
- importing and using a pre-configured `requests` library with automatic retries (`from dlt.sources.helpers import requests`).
### Loading code
<!--@@@DLT_SNIPPET ./code/pokemon-snippets.py::example-->
### config.toml with examples how to configure parallelism
<!--@@@DLT_SNIPPET ./code/.dlt/config.toml::example-->

View File

@@ -11,6 +11,19 @@
// @ts-check
const fs = require('fs');
const path = require('path');
function *walkSync(dir) {
const files = fs.readdirSync(dir, { withFileTypes: true });
for (const file of files) {
if (file.isDirectory()) {
yield* walkSync(path.join(dir, file.name));
} else {
yield path.join(dir, file.name);
}
}
}
/** @type {import('@docusaurus/plugin-content-docs').SidebarsConfig} */
const sidebars = {
@@ -273,15 +286,6 @@ const sidebars = {
keywords: ['examples'],
},
items: [
'examples/transformers/index',
'examples/incremental_loading/index',
'examples/connector_x_arrow/index',
'examples/chess_production/index',
'examples/nested_data/index',
'examples/qdrant_zendesk/index',
'examples/google_sheets/index',
'examples/pdf_to_weaviate/index',
'examples/custom_destination_bigquery/index'
],
},
{
@@ -309,6 +313,19 @@ const sidebars = {
]
};
// insert examples
for (const item of sidebars.tutorialSidebar) {
if (item.label === 'Code examples') {
for (let examplePath of walkSync("./docs_processed/examples")) {
examplePath = examplePath.replace("docs_processed/", "");
examplePath = examplePath.replace(".md", "");
item.items.push(examplePath);
}
}
}
// inject api reference if it exists
if (fs.existsSync('./docs_processed/api_reference/sidebar.json')) {
for (const item of sidebars.tutorialSidebar) {

View File

@@ -15,10 +15,9 @@ const DOCS_EXTENSIONS = [".md", ".mdx"];
const SNIPPETS_FILE_SUFFIX = "-snippets.py"
// examples settings
const EXAMPLES_SOURCE_DIR = "./docs/examples/";
const EXAMPLES_DESTINATION_DIR = "../examples/";
const EXAMPLES_MAIN_SNIPPET_NAME = "example";
const EXAMPLES_CODE_SUBDIR = "/code";
const EXAMPLES_DESTINATION_DIR = `./${MD_TARGET_DIR}examples/`;
const EXAMPLES_SOURCE_DIR = "../examples/";
const EXAMPLES_EXCLUSIONS = [".", "_", "archive", "local_cache"]
// markers
const DLT_MARKER = "@@@DLT";
@@ -244,35 +243,110 @@ function preprocess_docs() {
}
function trimArray(lines) {
if (lines.length == 0) {
return lines;
}
while (!lines[0].trim()) {
lines.shift();
}
while (!lines[lines.length-1].trim()) {
lines.pop();
}
return lines;
}
/**
* Sync examples into examples folder
* Sync examples into docs
*/
function syncExamples() {
let count = 0;
for (const exampleDir of listDirsSync(EXAMPLES_SOURCE_DIR)) {
const exampleName = exampleDir.split("/").slice(-1)[0];
const exampleDestinationDir = EXAMPLES_DESTINATION_DIR + exampleName;
// clear example destination dir
fs.rmSync(exampleDestinationDir, { recursive: true, force: true });
// create __init__.py
fs.mkdirSync(exampleDestinationDir, { recursive: true });
fs.writeFileSync(exampleDestinationDir + "/__init__.py", "");
const exampleName = exampleDir.split("/").slice(-1)[0];
// walk all files of example and copy to example destination
const exampleCodeDir = exampleDir + EXAMPLES_CODE_SUBDIR;
for (const fileName of walkSync(exampleCodeDir)) {
let lines = getSnippetFromFile(fileName, EXAMPLES_MAIN_SNIPPET_NAME);
if (!lines) {
continue;
}
lines = removeRemainingMarkers(lines);
// exclude some folders
if (EXAMPLES_EXCLUSIONS.some(ex => exampleName.startsWith(ex))) {
continue;
}
// write file
const destinationFileName = exampleDestinationDir + fileName.replace(exampleCodeDir, "").replace("-snippets", "");
fs.mkdirSync(path.dirname(destinationFileName), { recursive: true });
fs.writeFileSync(destinationFileName, lines.join("\n"));
const exampleFile = `${EXAMPLES_SOURCE_DIR}${exampleName}/${exampleName}.py`;
const targetFileName = `${EXAMPLES_DESTINATION_DIR}/${exampleName}.md`;
const lines = fs.readFileSync(exampleFile, 'utf8').split(/\r?\n/);
let commentCount = 0;
let headerCount = 0;
// separate file content
const header = []
const markdown = []
const code = []
for (const line of lines) {
// find file docstring boundaries
if (line.startsWith(`"""`)) {
commentCount += 1
if (commentCount > 2) {
throw new Error();
}
continue;
}
// find header boundaries
if (line.startsWith(`---`)) {
headerCount += 1;
if (headerCount > 2) {
throw new Error();
}
continue;
}
if (headerCount == 1) {
header.push(line);
}
else if (commentCount == 1) {
markdown.push(line)
}
else if (commentCount == 2) {
code.push(line);
}
}
// if there is no header, do not generate a page
if (headerCount == 0 ) {
continue;
}
let output = [];
output.push("---")
output = output.concat(header);
output.push("---")
// add tip
output.push(":::info")
const url = `https://github.com/dlt-hub/dlt/tree/devel/docs/examples/${exampleName}`
output.push(`The source code for this example can be found in our repository at: `)
output.push(url);
output.push(":::")
output.push("## About this Example")
output = output.concat(trimArray(markdown));
output.push("### Full source code")
output.push("```py");
output = output.concat(trimArray(code));
output.push("```");
fs.mkdirSync(path.dirname(targetFileName), { recursive: true });
fs.writeFileSync(targetFileName, output.join("\n"));
count += 1;
}
console.log(`Synced ${count} examples`)
}
syncExamples();