dlt Workflows¶
Using dlt directly for advanced data loading scenarios.
Overview¶
Dango uses dlt (data load tool) under the hood for data ingestion. While dango sync handles most cases, direct dlt access is useful for:
- Debugging pipeline issues
- Custom source development
- Accessing pipeline state and metadata
- Advanced incremental loading configurations
dlt Pipeline Basics¶
How Dango Uses dlt¶
When you run dango sync, Dango:
- Reads source configuration from
sources.yml - Creates a dlt pipeline targeting DuckDB
- Runs the appropriate dlt source
- Stores state in
.dlt/directory
Pipeline State Location¶
.dlt/
├── config.toml # dlt configuration
├── secrets.toml # Credentials
└── pipelines/ # Pipeline state (auto-created)
└── my_source/
└── state/
Direct dlt Commands¶
View Pipeline Information¶
# List all pipelines
dlt pipeline list
# Get info about a specific pipeline
dlt pipeline my_source info
# Show pipeline state
dlt pipeline my_source show
Check Pipeline State¶
# View what's been synced
dlt pipeline my_source sync-state
# Example output:
# resource: orders
# last_value: 2025-01-15T10:30:00
# rows_synced: 15234
Reset Pipeline¶
# Reset a pipeline to re-sync from scratch
dlt pipeline my_source drop
# Then sync again
dango sync --source my_source --full-refresh
Debugging Sync Issues¶
Enable Verbose Logging¶
# Set environment variable for detailed logs
export RUNTIME__LOG_LEVEL=DEBUG
dango sync --source problematic_source
Check dlt Logs¶
Inspect Last Load¶
# Python script to inspect load info
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_source",
destination="duckdb",
dataset_name="raw_my_source"
)
# Get load info
load_info = pipeline.last_trace.last_normalize_info
print(load_info)
Custom Source Development¶
Creating a Custom dlt Source¶
For APIs not supported by built-in sources:
# custom_sources/my_api.py
import dlt
from dlt.sources.helpers import requests
@dlt.source
def my_api_source(api_key: str = dlt.secrets.value):
"""Custom source for My API."""
@dlt.resource(write_disposition="merge", primary_key="id")
def items():
"""Load items from API."""
url = "https://api.example.com/items"
headers = {"Authorization": f"Bearer {api_key}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
yield response.json()["items"]
return items
Register Custom Source in Dango¶
# .dango/sources.yml
sources:
- name: my_custom_api
type: dlt_native
enabled: true
dlt_native:
source_module: "custom_sources.my_api"
source_name: "my_api_source"
Configure Credentials¶
Incremental Loading¶
Understanding Incremental State¶
dlt tracks incremental loading state automatically:
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def orders(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
"""Load orders incrementally."""
# Only fetch records after last_value
params = {"updated_after": updated_at.last_value}
# ... fetch and yield data
View Incremental State¶
Reset Incremental State¶
Advanced Configurations¶
Schema Evolution¶
dlt handles schema changes automatically. Configure behavior:
# .dlt/config.toml
[schema]
# Allow new columns
allow_new_columns = true
# Don't allow column removal
allow_column_removal = false
Parallelization¶
Retry Configuration¶
# .dlt/config.toml
[runtime]
# Retry failed requests
request_max_attempts = 3
request_backoff_factor = 1
Working with dlt Python API¶
Running a Pipeline Directly¶
import dlt
from dlt.sources.rest_api import rest_api_source
# Create pipeline
pipeline = dlt.pipeline(
pipeline_name="test_pipeline",
destination="duckdb",
dataset_name="raw_test"
)
# Configure source
source = rest_api_source(
"https://api.example.com",
{
"resources": [
{"name": "users", "endpoint": "/users"},
{"name": "orders", "endpoint": "/orders"}
]
}
)
# Run
load_info = pipeline.run(source)
print(load_info)
Accessing Load Metadata¶
# Get detailed load info
for package in load_info.load_packages:
for table in package.tables:
print(f"Table: {table.name}")
print(f" Rows: {table.row_count}")
print(f" File: {table.file_path}")
Troubleshooting dlt¶
Common Issues¶
| Issue | Cause | Solution |
|---|---|---|
| "Pipeline not found" | Pipeline hasn't run yet | Run dango sync first |
| "Invalid credentials" | Secrets not configured | Check .dlt/secrets.toml |
| "Schema mismatch" | Source schema changed | Run with --full-refresh |
| "Rate limit exceeded" | API throttling | Add retry configuration |
Getting Help¶
Next Steps¶
- Custom Sources - Build custom data sources
- dbt Workflows - Transform loaded data
- Troubleshooting - More debugging tips