Building a Scraping Pipeline with Airflow
Build a complete web scraping data pipeline with Apache Airflow for scheduling, dependency management, and monitoring.
Deployment · #15advanced4 min read
Apache Airflow is a workflow orchestration tool that excels at managing complex scraping pipelines with multiple steps, dependencies, and schedules. It gives you retries, monitoring, and a visual dashboard out of the box.
When to Use Airflow
- Your scraping involves multiple sequential steps (scrape, parse, clean, load)
- You need dependency management between tasks
- You want a visual dashboard for monitoring
- You need robust retry and alerting built in
- You are already using Airflow for data engineering
Setup
pip install apache-airflow
airflow db init
airflow users create --username admin --password admin --role Admin \
--firstname Admin --lastname User --email admin@example.com
A Complete Scraping DAG
# dags/scraping_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests
from bs4 import BeautifulSoup
import json
import os
default_args = {
"owner": "scraper",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alerts@example.com"],
}
dag = DAG(
"product_scraping_pipeline",
default_args=default_args,
description="Scrape products, parse, and store",
schedule_interval="0 8 * * *", # Daily at 8 AM
start_date=days_ago(1),
catchup=False,
tags=["scraping"],
)
DATA_DIR = "/opt/airflow/data"
def scrape_pages(**context):
"""Task 1: Scrape raw HTML pages."""
os.makedirs(f"{DATA_DIR}/raw", exist_ok=True)
urls = [f"https://quotes.toscrape.com/page/{i}/" for i in range(1, 6)]
scraped_files = []
for i, url in enumerate(urls):
response = requests.get(url, timeout=30, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0"
})
response.raise_for_status()
filepath = f"{DATA_DIR}/raw/page_{i+1}.html"
with open(filepath, "w") as f:
f.write(response.text)
scraped_files.append(filepath)
# Pass file list to the next task via XCom
context["ti"].xcom_push(key="scraped_files", value=scraped_files)
return f"Scraped {len(scraped_files)} pages"
def parse_data(**context):
"""Task 2: Parse HTML into structured data."""
scraped_files = context["ti"].xcom_pull(
task_ids="scrape_pages", key="scraped_files"
)
all_items = []
for filepath in scraped_files:
with open(filepath) as f:
soup = BeautifulSoup(f.read(), "html.parser")
for quote in soup.select("div.quote"):
all_items.append({
"text": quote.select_one("span.text").text,
"author": quote.select_one("small.author").text,
"tags": [t.text for t in quote.select("a.tag")],
})
output = f"{DATA_DIR}/parsed/quotes.json"
os.makedirs(f"{DATA_DIR}/parsed", exist_ok=True)
with open(output, "w") as f:
json.dump(all_items, f, indent=2)
context["ti"].xcom_push(key="parsed_file", value=output)
return f"Parsed {len(all_items)} quotes"
def validate_data(**context):
"""Task 3: Validate the parsed data."""
parsed_file = context["ti"].xcom_pull(
task_ids="parse_data", key="parsed_file"
)
with open(parsed_file) as f:
data = json.load(f)
assert len(data) > 0, "No data was parsed!"
for item in data:
assert "text" in item, "Missing text field"
assert "author" in item, "Missing author field"
assert len(item["text"]) > 0, "Empty text"
return f"Validated {len(data)} items"
def store_data(**context):
"""Task 4: Store data in final location."""
parsed_file = context["ti"].xcom_pull(
task_ids="parse_data", key="parsed_file"
)
with open(parsed_file) as f:
data = json.load(f)
# In production, this would write to a database or S3
from datetime import datetime
final_dir = f"{DATA_DIR}/final"
os.makedirs(final_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d")
final_path = f"{final_dir}/quotes_{timestamp}.json"
with open(final_path, "w") as f:
json.dump(data, f, indent=2)
return f"Stored {len(data)} items at {final_path}"
# Define tasks
t1 = PythonOperator(task_id="scrape_pages", python_callable=scrape_pages, dag=dag)
t2 = PythonOperator(task_id="parse_data", python_callable=parse_data, dag=dag)
t3 = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
t4 = PythonOperator(task_id="store_data", python_callable=store_data, dag=dag)
# Define dependencies
t1 >> t2 >> t3 >> t4
Running with Docker Compose
# docker-compose.yml
version: "3.8"
services:
airflow-webserver:
image: apache/airflow:2.9.1-python3.12
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
volumes:
- ./dags:/opt/airflow/dags
- ./data:/opt/airflow/data
ports:
- "8080:8080"
command: >
bash -c "airflow db init &&
pip install requests beautifulsoup4 &&
airflow webserver"
airflow-scheduler:
image: apache/airflow:2.9.1-python3.12
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
volumes:
- ./dags:/opt/airflow/dags
- ./data:/opt/airflow/data
command: >
bash -c "pip install requests beautifulsoup4 &&
airflow scheduler"
docker compose up -d
# Visit http://localhost:8080 to see the Airflow dashboard
Tips
- Use ScraperAPI in the scrape task to avoid managing proxies within Airflow
- Keep tasks small and focused, one task per pipeline step
- Use XCom for passing small data (file paths, counts) between tasks
- Set meaningful retries and timeouts for each task
- Use Airflow Variables and Connections for configuration, not hardcoded values
- Monitor task duration trends to catch performance regressions early