Custom API Integration¶
Build a custom data source for any REST API.
What You'll Build¶
By the end of this tutorial, you'll have:
- A custom dlt source for a REST API
- Data synced from the API to DuckDB
- Understanding of custom source patterns
Duration: ~30 minutes
Prerequisites¶
- Dango project initialized
- Python basics
- An API you want to integrate (we'll use a sample)
Approach Options¶
Dango offers two ways to integrate custom APIs:
| Method | Best For | Effort |
|---|---|---|
| REST API source | Simple APIs, quick setup | Low |
| Custom dlt source | Complex APIs, full control | Medium |
We'll cover both approaches.
Option 1: REST API Source (Quick)¶
For simple REST APIs, use Dango's built-in REST API source.
Step 1: Configure Source¶
# .dango/sources.yml
sources:
- name: my_api
type: rest_api
enabled: true
rest_api:
base_url: "https://api.example.com"
headers:
Authorization: "Bearer ${MY_API_TOKEN}"
resources:
- name: users
endpoint: /users
method: GET
- name: orders
endpoint: /orders
method: GET
params:
status: completed
Step 2: Add Credentials¶
Step 3: Sync¶
Pagination Support¶
For paginated APIs:
rest_api:
base_url: "https://api.example.com"
resources:
- name: users
endpoint: /users
pagination:
type: page_number # or: offset, cursor
param: page
start: 1
Option 2: Custom dlt Source (Full Control)¶
For complex APIs or specific requirements, create a custom dlt source.
Step 1: Create Source File¶
Create custom_sources/my_api.py:
"""
Custom dlt source for My API.
"""
import dlt
from dlt.sources.helpers import requests
from typing import Iterator, Dict, Any
@dlt.source
def my_api_source(
api_key: str = dlt.secrets.value,
base_url: str = "https://api.example.com"
) -> Iterator[dlt.DltResource]:
"""
Load data from My API.
Args:
api_key: API authentication key
base_url: API base URL
"""
# Shared session with auth header
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
@dlt.resource(write_disposition="replace")
def users() -> Iterator[Dict[str, Any]]:
"""Load all users."""
url = f"{base_url}/users"
response = session.get(url)
response.raise_for_status()
for user in response.json()["data"]:
yield user
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def orders(
updated_after: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[Dict[str, Any]]:
"""Load orders incrementally."""
url = f"{base_url}/orders"
params = {"updated_after": updated_after.last_value}
while url:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
for order in data["data"]:
yield order
# Handle pagination
url = data.get("next_page_url")
params = {} # Next page URL includes params
# Return resources to load
return users, orders
Step 2: Configure in Dango¶
# .dango/sources.yml
sources:
- name: my_custom_api
type: dlt_native
enabled: true
dlt_native:
source_module: "custom_sources.my_api"
source_name: "my_api_source"
Step 3: Add Credentials¶
Step 4: Sync¶
Real-World Example: HubSpot¶
Here's a complete example for HubSpot contacts:
Source Code¶
# custom_sources/hubspot_contacts.py
import dlt
from dlt.sources.helpers import requests
from typing import Iterator, Dict, Any
@dlt.source
def hubspot_contacts_source(
api_key: str = dlt.secrets.value,
) -> Iterator[dlt.DltResource]:
"""Load contacts from HubSpot."""
base_url = "https://api.hubapi.com"
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def contacts(
updated_after: dlt.sources.incremental[int] = dlt.sources.incremental(
"updatedAt",
initial_value=0
)
) -> Iterator[Dict[str, Any]]:
"""Load contacts with incremental updates."""
url = f"{base_url}/crm/v3/objects/contacts"
headers = {"Authorization": f"Bearer {api_key}"}
params = {
"limit": 100,
"properties": "email,firstname,lastname,company,phone",
"filterGroups": [{
"filters": [{
"propertyName": "lastmodifieddate",
"operator": "GTE",
"value": str(updated_after.last_value)
}]
}]
}
while True:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
for contact in data.get("results", []):
yield {
"id": contact["id"],
"email": contact["properties"].get("email"),
"first_name": contact["properties"].get("firstname"),
"last_name": contact["properties"].get("lastname"),
"company": contact["properties"].get("company"),
"phone": contact["properties"].get("phone"),
"created_at": contact["createdAt"],
"updated_at": contact["updatedAt"],
}
# Pagination
if "paging" in data and "next" in data["paging"]:
params["after"] = data["paging"]["next"]["after"]
else:
break
return contacts
Configuration¶
# .dango/sources.yml
sources:
- name: hubspot
type: dlt_native
enabled: true
dlt_native:
source_module: "custom_sources.hubspot_contacts"
source_name: "hubspot_contacts_source"
Handling Common Patterns¶
Pagination¶
# Offset-based
params = {"offset": 0, "limit": 100}
while True:
response = session.get(url, params=params)
data = response.json()
yield from data["items"]
if len(data["items"]) < params["limit"]:
break
params["offset"] += params["limit"]
# Cursor-based
cursor = None
while True:
params = {"cursor": cursor} if cursor else {}
response = session.get(url, params=params)
data = response.json()
yield from data["items"]
cursor = data.get("next_cursor")
if not cursor:
break
Rate Limiting¶
import time
from dlt.sources.helpers import requests
# Built-in retry with backoff
session = requests.Session()
session.hooks["response"].append(
lambda r, *args, **kwargs: time.sleep(0.1) # Basic rate limit
)
# Or use dlt's rate limiter
from dlt.sources.helpers.rest_client import RESTClient
client = RESTClient(
base_url="https://api.example.com",
rate_limit=10, # requests per second
)
Authentication¶
# API Key in header
session.headers["X-API-Key"] = api_key
# Bearer token
session.headers["Authorization"] = f"Bearer {token}"
# Basic auth
session.auth = (username, password)
# OAuth (use dlt's OAuth support)
from dlt.sources.helpers.rest_client.auth import OAuth2ClientCredentials
auth = OAuth2ClientCredentials(
client_id=client_id,
client_secret=client_secret,
token_endpoint="https://api.example.com/oauth/token"
)
Error Handling¶
@dlt.resource
def robust_resource():
try:
response = session.get(url)
response.raise_for_status()
yield from response.json()
except requests.HTTPError as e:
if e.response.status_code == 429:
# Rate limited - dlt will retry
raise
elif e.response.status_code == 404:
# Not found - skip
return
else:
raise
Testing Your Source¶
Local Test¶
# test_source.py
from custom_sources.my_api import my_api_source
# Test locally
if __name__ == "__main__":
import dlt
pipeline = dlt.pipeline(
pipeline_name="test_my_api",
destination="duckdb",
dataset_name="test_data"
)
source = my_api_source(api_key="your-test-key")
load_info = pipeline.run(source)
print(load_info)
Run:
Verify Data¶
Project Structure¶
my-project/
├── custom_sources/
│ ├── __init__.py
│ ├── my_api.py
│ └── hubspot_contacts.py
├── .dango/
│ └── sources.yml
├── .dlt/
│ └── secrets.toml
└── data/
└── warehouse.duckdb
Debugging Tips¶
Enable Verbose Logging¶
Check dlt State¶
Inspect Raw Response¶
# Add debug output
response = session.get(url)
print(f"Status: {response.status_code}")
print(f"Response: {response.text[:500]}")
Summary¶
You've learned to integrate custom APIs:
- REST API source for simple cases
- Custom dlt source for full control
- Pagination, auth, and error handling
- Testing and debugging
When to Use Each¶
| Use Case | Approach |
|---|---|
| Simple GET endpoints | REST API source |
| Complex auth (OAuth) | Custom dlt source |
| Custom pagination | Custom dlt source |
| Data transformation | Custom dlt source |
| Quick prototype | REST API source |
Next Steps¶
- dlt Workflows - Advanced dlt usage
- Custom Sources Reference - Full documentation
- Multi-Source Integration - Combine with other sources