Scraping Central is reader-supported. When you buy through links on our site, we may earn an affiliate commission.

4.4intermediate4 min read

Item Pipelines: Validation, Deduplication, Storage

Chain processors that transform every scraped item: validate, dedupe, enrich, store. The Scrapy abstraction that scales from a CSV to a Postgres cluster.

What you’ll learn

  • Write a pipeline that validates items against required fields and drops invalid ones.
  • Implement an in-memory and persistent dedup pipeline.
  • Stream items to Postgres safely with batched inserts.

Pipelines are sequential processors. Each item flows through every enabled pipeline in priority order. Reject early, enrich in the middle, store at the end.

The interface

A pipeline class needs four optional methods:

class MyPipeline:
  def open_spider(self, spider):
  # Called once when the spider starts
  ...

  def close_spider(self, spider):
  # Called once when the spider stops, flush, close connections
  ...

  def process_item(self, item, spider):
  # Called for every item. Return item to pass it on,
  # or raise DropItem to discard.
  return item

  @classmethod
  def from_crawler(cls, crawler):
  # Factory hook, receive settings, signals, etc.
  return cls()

process_item is the only required method. Return the item (potentially modified) to pass it down the chain. Raise DropItem to discard.

Enable pipelines in settings.py:

ITEM_PIPELINES = {
  "myproject.pipelines.ValidatePipeline": 100,
  "myproject.pipelines.DedupPipeline": 200,
  "myproject.pipelines.EnrichPipeline": 300,
  "myproject.pipelines.PostgresPipeline": 500,
}

Lower numbers run first. Convention is 100, 200, 300…, leave room to insert pipelines between them later.

Validation pipeline

Drop items missing required fields. Log why.

from scrapy.exceptions import DropItem

REQUIRED = ["url", "title", "price"]

class ValidatePipeline:
  def process_item(self, item, spider):
  for field in REQUIRED:
  if not item.get(field):
  raise DropItem(f"missing {field}: {item.get('url')}")
  if not isinstance(item.get("price"), (int, float)):
  raise DropItem(f"price not numeric: {item.get('url')}")
  return item

DropItem increments item_dropped_count in stats. After a run, check the stats, a high drop rate means your selectors or loader are wrong.

Deduplication pipeline

Two flavors: in-memory (fast, simple, per-run) and persistent (survives restarts).

In-memory dedup

class InMemoryDedupPipeline:
  def __init__(self):
  self.seen = set()

  def process_item(self, item, spider):
  sku = item.get("sku") or item.get("url")
  if sku in self.seen:
  raise DropItem(f"dup: {sku}")
  self.seen.add(sku)
  return item

Fine for crawls up to a few million items. The set grows in memory.

Persistent dedup with Redis

For multi-machine or resumable crawls:

import redis

class RedisDedupPipeline:
  @classmethod
  def from_crawler(cls, crawler):
  return cls(crawler.settings.get("REDIS_URL"))

  def __init__(self, url):
  self.url = url

  def open_spider(self, spider):
  self.r = redis.from_url(self.url)
  self.key = f"dedup:{spider.name}"

  def process_item(self, item, spider):
  sku = item.get("sku") or item.get("url")
  if not self.r.sadd(self.key, sku):
  raise DropItem(f"dup: {sku}")
  return item

SADD returns 1 if added (new), 0 if existed. One round-trip per item. Set a TTL on the key if you want dedup to expire (e.g. "don't re-scrape within 24h").

For huge datasets, use a Bloom filter instead, covered in §4.53.

Enrichment pipeline

Add computed fields or look up references:

from datetime import datetime, timezone

class EnrichPipeline:
  def process_item(self, item, spider):
  item["scraped_at"] = datetime.now(timezone.utc).isoformat()
  item["spider"] = spider.name
  if item.get("price"):
  item["price_eur"] = item["price"] * 0.92
  return item

Pipelines can also call external services (geocoders, currency APIs), but be careful: a slow external call inside process_item serializes your output. For high-throughput crawls, queue enrichment as a separate job.

Storage pipeline, Postgres with batching

Don't INSERT one row at a time. Batch.

import psycopg
from scrapy.exceptions import DropItem

BATCH_SIZE = 100

class PostgresPipeline:
  @classmethod
  def from_crawler(cls, crawler):
  return cls(crawler.settings.get("PG_DSN"))

  def __init__(self, dsn):
  self.dsn = dsn
  self.batch = []

  def open_spider(self, spider):
  self.conn = psycopg.connect(self.dsn)
  self.cur = self.conn.cursor()

  def close_spider(self, spider):
  self.flush()
  self.conn.close()

  def flush(self):
  if not self.batch:
  return
  rows = [(i["url"], i["title"], i["price"], i["scraped_at"]) for i in self.batch]
  self.cur.executemany(
  "INSERT INTO products (url, title, price, scraped_at) "
  "VALUES (%s, %s, %s, %s) "
  "ON CONFLICT (url) DO UPDATE SET title=EXCLUDED.title, price=EXCLUDED.price, scraped_at=EXCLUDED.scraped_at",
  rows,
  )
  self.conn.commit()
  self.batch.clear()

  def process_item(self, item, spider):
  self.batch.append(item)
  if len(self.batch) >= BATCH_SIZE:
  self.flush()
  return item

Key points:

  • executemany batches the round-trips.
  • ON CONFLICT … DO UPDATE is upsert; you can also DO NOTHING to skip.
  • Flush on close_spider so the last partial batch isn't lost.
  • Wrap in a try/except in production so one bad row doesn't kill the spider.

Ordering matters

Pipelines run in numeric order. Put ValidatePipeline first (cheap, drops bad items before they hit expensive stages). Dedup second. Enrichment third. Storage last.

If you swap dedup and storage, you'll write duplicate rows. If you swap validation and storage, you'll write garbage.

When pipelines are the wrong place

Pipelines run inline with the crawler. Slow pipelines slow the spider. For:

  • External API enrichment that takes >100ms
  • Large file uploads (images, PDFs)
  • Heavy ML inference

…queue items elsewhere (Celery, Symfony Messenger, RabbitMQ) and process them out of band. Pipelines should be milliseconds. Anything slower belongs in a worker queue.

Hands-on lab

Build a four-stage pipeline against /products:

  1. Validate: drop items missing url, title, or price.
  2. Dedup: by URL using an in-memory set.
  3. Enrich: add scraped_at ISO timestamp.
  4. Store: append to a JSONL file (use Scrapy's built-in FEEDS or write a custom file pipeline).

Run the spider twice in a row. The second run should drop most items in the dedup pipeline. Check stats.item_dropped_count matches your expectation.

Hands-on lab

Practice this lesson on Catalog108, our first-party scraping sandbox.

Open lab target → /products

Quiz, check your understanding

Pass mark is 70%. Pick the best answer; you’ll see the explanation right after.

Item Pipelines: Validation, Deduplication, Storage1 / 8

How do you signal that an item should NOT be passed to the next pipeline?

Score so far: 0 / 0