Building an API Data Pipeline
Build a production-ready API scraping pipeline with extraction, transformation, storage, scheduling, and error handling.
API Scraping · #14advanced3 min read
A data pipeline turns ad-hoc API scraping into a reliable, automated system. It extracts data from APIs, transforms it into a usable format, and loads it into storage, the classic ETL (Extract, Transform, Load) pattern.
Pipeline Architecture
[API Sources] -> [Extract] -> [Transform] -> [Load] -> [Storage]
| | |
Retry logic Cleaning CSV/DB/JSON
Rate limits Validation Deduplication
Pagination Flattening Incremental updates
Complete Pipeline Example
import requests
import csv
import time
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class APIPipeline:
def __init__(self, base_url, output_dir="data"):
self.base_url = base_url
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "DataPipeline/1.0",
"Accept": "application/json",
})
def extract(self, endpoint, params=None):
"""Extract data from a paginated API endpoint."""
all_items = []
page = 1
while True:
request_params = {"_page": page, "_limit": 100}
if params:
request_params.update(params)
try:
response = self.session.get(
f"{self.base_url}/{endpoint}",
params=request_params,
timeout=30,
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
break
data = response.json()
if not data:
break
all_items.extend(data)
logger.info(f"Extracted page {page}: {len(data)} items")
page += 1
time.sleep(0.5)
return all_items
def transform(self, raw_items, fields=None):
"""Clean and flatten extracted data."""
transformed = []
for item in raw_items:
clean = {}
target_fields = fields or item.keys()
for field in target_fields:
value = item.get(field, "")
# Clean string values
if isinstance(value, str):
value = value.strip()
clean[field] = value
transformed.append(clean)
# Deduplicate by ID
seen_ids = set()
unique = []
for item in transformed:
item_id = item.get("id")
if item_id not in seen_ids:
seen_ids.add(item_id)
unique.append(item)
logger.info(f"Transformed: {len(raw_items)} -> {len(unique)} unique items")
return unique
def load(self, items, filename):
"""Save data to CSV."""
if not items:
logger.warning("No items to save")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = self.output_dir / f"{filename}_{timestamp}.csv"
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=items[0].keys())
writer.writeheader()
writer.writerows(items)
logger.info(f"Saved {len(items)} items to {filepath}")
return filepath
def run(self, endpoint, fields=None, filename="output"):
"""Execute the full ETL pipeline."""
logger.info(f"Starting pipeline for {endpoint}")
raw_data = self.extract(endpoint)
clean_data = self.transform(raw_data, fields)
output_path = self.load(clean_data, filename)
logger.info(f"Pipeline complete: {output_path}")
return clean_data
# Usage
pipeline = APIPipeline("https://jsonplaceholder.typicode.com")
posts = pipeline.run(
endpoint="posts",
fields=["id", "userId", "title"],
filename="posts",
)
Adding Scheduling
Use schedule for simple cron-like scheduling:
import schedule
import time
def run_daily_scrape():
pipeline = APIPipeline("https://api.example.com/v1")
pipeline.run("products", filename="products")
schedule.every().day.at("06:00").do(run_daily_scrape)
while True:
schedule.run_pending()
time.sleep(60)
pip install schedule
Production Considerations
| Concern | Solution |
|---|---|
| Network failures | Retry with exponential backoff |
| Duplicate data | Deduplicate by unique ID |
| Schema changes | Validate fields before transform |
| Large datasets | Stream to disk, process in chunks |
| Monitoring | Log metrics, alert on failures |
For pipelines hitting protected APIs, route requests through ScraperAPI to handle proxy rotation and CAPTCHAs without changing your pipeline code.
Next Steps
- Compare API scraping vs HTML scraping for your use case
- Add database storage (SQLite, PostgreSQL)
- Deploy your pipeline to a server with cron or Airflow