Scraping Central is reader-supported. When you buy through links on our site, we may earn an affiliate commission.

Building an API Data Pipeline

Build a production-ready API scraping pipeline with extraction, transformation, storage, scheduling, and error handling.

API Scraping · #14advanced3 min read
Share:WhatsAppLinkedIn

A data pipeline turns ad-hoc API scraping into a reliable, automated system. It extracts data from APIs, transforms it into a usable format, and loads it into storage, the classic ETL (Extract, Transform, Load) pattern.

Pipeline Architecture

[API Sources] -> [Extract] -> [Transform] -> [Load] -> [Storage]
                     |             |            |
                  Retry logic   Cleaning     CSV/DB/JSON
                  Rate limits   Validation   Deduplication
                  Pagination    Flattening   Incremental updates

Complete Pipeline Example

import requests
import csv
import time
import logging
from datetime import datetime
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class APIPipeline:
    def __init__(self, base_url, output_dir="data"):
        self.base_url = base_url
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "DataPipeline/1.0",
            "Accept": "application/json",
        })

    def extract(self, endpoint, params=None):
        """Extract data from a paginated API endpoint."""
        all_items = []
        page = 1

        while True:
            request_params = {"_page": page, "_limit": 100}
            if params:
                request_params.update(params)

            try:
                response = self.session.get(
                    f"{self.base_url}/{endpoint}",
                    params=request_params,
                    timeout=30,
                )
                response.raise_for_status()
            except requests.exceptions.RequestException as e:
                logger.error(f"Request failed: {e}")
                break

            data = response.json()
            if not data:
                break

            all_items.extend(data)
            logger.info(f"Extracted page {page}: {len(data)} items")
            page += 1
            time.sleep(0.5)

        return all_items

    def transform(self, raw_items, fields=None):
        """Clean and flatten extracted data."""
        transformed = []
        for item in raw_items:
            clean = {}
            target_fields = fields or item.keys()
            for field in target_fields:
                value = item.get(field, "")
                # Clean string values
                if isinstance(value, str):
                    value = value.strip()
                clean[field] = value
            transformed.append(clean)

        # Deduplicate by ID
        seen_ids = set()
        unique = []
        for item in transformed:
            item_id = item.get("id")
            if item_id not in seen_ids:
                seen_ids.add(item_id)
                unique.append(item)

        logger.info(f"Transformed: {len(raw_items)} -> {len(unique)} unique items")
        return unique

    def load(self, items, filename):
        """Save data to CSV."""
        if not items:
            logger.warning("No items to save")
            return

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filepath = self.output_dir / f"{filename}_{timestamp}.csv"

        with open(filepath, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=items[0].keys())
            writer.writeheader()
            writer.writerows(items)

        logger.info(f"Saved {len(items)} items to {filepath}")
        return filepath

    def run(self, endpoint, fields=None, filename="output"):
        """Execute the full ETL pipeline."""
        logger.info(f"Starting pipeline for {endpoint}")
        raw_data = self.extract(endpoint)
        clean_data = self.transform(raw_data, fields)
        output_path = self.load(clean_data, filename)
        logger.info(f"Pipeline complete: {output_path}")
        return clean_data


# Usage
pipeline = APIPipeline("https://jsonplaceholder.typicode.com")
posts = pipeline.run(
    endpoint="posts",
    fields=["id", "userId", "title"],
    filename="posts",
)

Adding Scheduling

Use schedule for simple cron-like scheduling:

import schedule
import time

def run_daily_scrape():
    pipeline = APIPipeline("https://api.example.com/v1")
    pipeline.run("products", filename="products")

schedule.every().day.at("06:00").do(run_daily_scrape)

while True:
    schedule.run_pending()
    time.sleep(60)
pip install schedule

Production Considerations

Concern Solution
Network failures Retry with exponential backoff
Duplicate data Deduplicate by unique ID
Schema changes Validate fields before transform
Large datasets Stream to disk, process in chunks
Monitoring Log metrics, alert on failures

For pipelines hitting protected APIs, route requests through ScraperAPI to handle proxy rotation and CAPTCHAs without changing your pipeline code.

Next Steps

  • Compare API scraping vs HTML scraping for your use case
  • Add database storage (SQLite, PostgreSQL)
  • Deploy your pipeline to a server with cron or Airflow