Scraping Central is reader-supported. When you buy through links on our site, we may earn an affiliate commission.

Building a Scraping Pipeline with Airflow

Build a complete web scraping data pipeline with Apache Airflow for scheduling, dependency management, and monitoring.

Deployment · #15advanced4 min read
Share:WhatsAppLinkedIn

Apache Airflow is a workflow orchestration tool that excels at managing complex scraping pipelines with multiple steps, dependencies, and schedules. It gives you retries, monitoring, and a visual dashboard out of the box.

When to Use Airflow

  • Your scraping involves multiple sequential steps (scrape, parse, clean, load)
  • You need dependency management between tasks
  • You want a visual dashboard for monitoring
  • You need robust retry and alerting built in
  • You are already using Airflow for data engineering

Setup

pip install apache-airflow
airflow db init
airflow users create --username admin --password admin --role Admin \
    --firstname Admin --lastname User --email admin@example.com

A Complete Scraping DAG

# dags/scraping_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests
from bs4 import BeautifulSoup
import json
import os

default_args = {
    "owner": "scraper",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alerts@example.com"],
}

dag = DAG(
    "product_scraping_pipeline",
    default_args=default_args,
    description="Scrape products, parse, and store",
    schedule_interval="0 8 * * *",  # Daily at 8 AM
    start_date=days_ago(1),
    catchup=False,
    tags=["scraping"],
)

DATA_DIR = "/opt/airflow/data"

def scrape_pages(**context):
    """Task 1: Scrape raw HTML pages."""
    os.makedirs(f"{DATA_DIR}/raw", exist_ok=True)
    urls = [f"https://quotes.toscrape.com/page/{i}/" for i in range(1, 6)]

    scraped_files = []
    for i, url in enumerate(urls):
        response = requests.get(url, timeout=30, headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0"
        })
        response.raise_for_status()

        filepath = f"{DATA_DIR}/raw/page_{i+1}.html"
        with open(filepath, "w") as f:
            f.write(response.text)
        scraped_files.append(filepath)

    # Pass file list to the next task via XCom
    context["ti"].xcom_push(key="scraped_files", value=scraped_files)
    return f"Scraped {len(scraped_files)} pages"

def parse_data(**context):
    """Task 2: Parse HTML into structured data."""
    scraped_files = context["ti"].xcom_pull(
        task_ids="scrape_pages", key="scraped_files"
    )

    all_items = []
    for filepath in scraped_files:
        with open(filepath) as f:
            soup = BeautifulSoup(f.read(), "html.parser")

        for quote in soup.select("div.quote"):
            all_items.append({
                "text": quote.select_one("span.text").text,
                "author": quote.select_one("small.author").text,
                "tags": [t.text for t in quote.select("a.tag")],
            })

    output = f"{DATA_DIR}/parsed/quotes.json"
    os.makedirs(f"{DATA_DIR}/parsed", exist_ok=True)
    with open(output, "w") as f:
        json.dump(all_items, f, indent=2)

    context["ti"].xcom_push(key="parsed_file", value=output)
    return f"Parsed {len(all_items)} quotes"

def validate_data(**context):
    """Task 3: Validate the parsed data."""
    parsed_file = context["ti"].xcom_pull(
        task_ids="parse_data", key="parsed_file"
    )

    with open(parsed_file) as f:
        data = json.load(f)

    assert len(data) > 0, "No data was parsed!"
    for item in data:
        assert "text" in item, "Missing text field"
        assert "author" in item, "Missing author field"
        assert len(item["text"]) > 0, "Empty text"

    return f"Validated {len(data)} items"

def store_data(**context):
    """Task 4: Store data in final location."""
    parsed_file = context["ti"].xcom_pull(
        task_ids="parse_data", key="parsed_file"
    )

    with open(parsed_file) as f:
        data = json.load(f)

    # In production, this would write to a database or S3
    from datetime import datetime
    final_dir = f"{DATA_DIR}/final"
    os.makedirs(final_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d")
    final_path = f"{final_dir}/quotes_{timestamp}.json"

    with open(final_path, "w") as f:
        json.dump(data, f, indent=2)

    return f"Stored {len(data)} items at {final_path}"

# Define tasks
t1 = PythonOperator(task_id="scrape_pages", python_callable=scrape_pages, dag=dag)
t2 = PythonOperator(task_id="parse_data", python_callable=parse_data, dag=dag)
t3 = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
t4 = PythonOperator(task_id="store_data", python_callable=store_data, dag=dag)

# Define dependencies
t1 >> t2 >> t3 >> t4

Running with Docker Compose

# docker-compose.yml
version: "3.8"

services:
  airflow-webserver:
    image: apache/airflow:2.9.1-python3.12
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
    volumes:
      - ./dags:/opt/airflow/dags
      - ./data:/opt/airflow/data
    ports:
      - "8080:8080"
    command: >
      bash -c "airflow db init &&
               pip install requests beautifulsoup4 &&
               airflow webserver"

  airflow-scheduler:
    image: apache/airflow:2.9.1-python3.12
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
    volumes:
      - ./dags:/opt/airflow/dags
      - ./data:/opt/airflow/data
    command: >
      bash -c "pip install requests beautifulsoup4 &&
               airflow scheduler"
docker compose up -d
# Visit http://localhost:8080 to see the Airflow dashboard

Tips

  • Use ScraperAPI in the scrape task to avoid managing proxies within Airflow
  • Keep tasks small and focused, one task per pipeline step
  • Use XCom for passing small data (file paths, counts) between tasks
  • Set meaningful retries and timeouts for each task
  • Use Airflow Variables and Connections for configuration, not hardcoded values
  • Monitor task duration trends to catch performance regressions early