Item Pipelines: Validation, Deduplication, Storage
Chain processors that transform every scraped item: validate, dedupe, enrich, store. The Scrapy abstraction that scales from a CSV to a Postgres cluster.
What you’ll learn
- Write a pipeline that validates items against required fields and drops invalid ones.
- Implement an in-memory and persistent dedup pipeline.
- Stream items to Postgres safely with batched inserts.
Pipelines are sequential processors. Each item flows through every enabled pipeline in priority order. Reject early, enrich in the middle, store at the end.
The interface
A pipeline class needs four optional methods:
class MyPipeline:
def open_spider(self, spider):
# Called once when the spider starts
...
def close_spider(self, spider):
# Called once when the spider stops, flush, close connections
...
def process_item(self, item, spider):
# Called for every item. Return item to pass it on,
# or raise DropItem to discard.
return item
@classmethod
def from_crawler(cls, crawler):
# Factory hook, receive settings, signals, etc.
return cls()
process_item is the only required method. Return the item (potentially modified) to pass it down the chain. Raise DropItem to discard.
Enable pipelines in settings.py:
ITEM_PIPELINES = {
"myproject.pipelines.ValidatePipeline": 100,
"myproject.pipelines.DedupPipeline": 200,
"myproject.pipelines.EnrichPipeline": 300,
"myproject.pipelines.PostgresPipeline": 500,
}
Lower numbers run first. Convention is 100, 200, 300…, leave room to insert pipelines between them later.
Validation pipeline
Drop items missing required fields. Log why.
from scrapy.exceptions import DropItem
REQUIRED = ["url", "title", "price"]
class ValidatePipeline:
def process_item(self, item, spider):
for field in REQUIRED:
if not item.get(field):
raise DropItem(f"missing {field}: {item.get('url')}")
if not isinstance(item.get("price"), (int, float)):
raise DropItem(f"price not numeric: {item.get('url')}")
return item
DropItem increments item_dropped_count in stats. After a run, check the stats, a high drop rate means your selectors or loader are wrong.
Deduplication pipeline
Two flavors: in-memory (fast, simple, per-run) and persistent (survives restarts).
In-memory dedup
class InMemoryDedupPipeline:
def __init__(self):
self.seen = set()
def process_item(self, item, spider):
sku = item.get("sku") or item.get("url")
if sku in self.seen:
raise DropItem(f"dup: {sku}")
self.seen.add(sku)
return item
Fine for crawls up to a few million items. The set grows in memory.
Persistent dedup with Redis
For multi-machine or resumable crawls:
import redis
class RedisDedupPipeline:
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings.get("REDIS_URL"))
def __init__(self, url):
self.url = url
def open_spider(self, spider):
self.r = redis.from_url(self.url)
self.key = f"dedup:{spider.name}"
def process_item(self, item, spider):
sku = item.get("sku") or item.get("url")
if not self.r.sadd(self.key, sku):
raise DropItem(f"dup: {sku}")
return item
SADD returns 1 if added (new), 0 if existed. One round-trip per item. Set a TTL on the key if you want dedup to expire (e.g. "don't re-scrape within 24h").
For huge datasets, use a Bloom filter instead, covered in §4.53.
Enrichment pipeline
Add computed fields or look up references:
from datetime import datetime, timezone
class EnrichPipeline:
def process_item(self, item, spider):
item["scraped_at"] = datetime.now(timezone.utc).isoformat()
item["spider"] = spider.name
if item.get("price"):
item["price_eur"] = item["price"] * 0.92
return item
Pipelines can also call external services (geocoders, currency APIs), but be careful: a slow external call inside process_item serializes your output. For high-throughput crawls, queue enrichment as a separate job.
Storage pipeline, Postgres with batching
Don't INSERT one row at a time. Batch.
import psycopg
from scrapy.exceptions import DropItem
BATCH_SIZE = 100
class PostgresPipeline:
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings.get("PG_DSN"))
def __init__(self, dsn):
self.dsn = dsn
self.batch = []
def open_spider(self, spider):
self.conn = psycopg.connect(self.dsn)
self.cur = self.conn.cursor()
def close_spider(self, spider):
self.flush()
self.conn.close()
def flush(self):
if not self.batch:
return
rows = [(i["url"], i["title"], i["price"], i["scraped_at"]) for i in self.batch]
self.cur.executemany(
"INSERT INTO products (url, title, price, scraped_at) "
"VALUES (%s, %s, %s, %s) "
"ON CONFLICT (url) DO UPDATE SET title=EXCLUDED.title, price=EXCLUDED.price, scraped_at=EXCLUDED.scraped_at",
rows,
)
self.conn.commit()
self.batch.clear()
def process_item(self, item, spider):
self.batch.append(item)
if len(self.batch) >= BATCH_SIZE:
self.flush()
return item
Key points:
executemanybatches the round-trips.ON CONFLICT … DO UPDATEis upsert; you can alsoDO NOTHINGto skip.- Flush on
close_spiderso the last partial batch isn't lost. - Wrap in a try/except in production so one bad row doesn't kill the spider.
Ordering matters
Pipelines run in numeric order. Put ValidatePipeline first (cheap, drops bad items before they hit expensive stages). Dedup second. Enrichment third. Storage last.
If you swap dedup and storage, you'll write duplicate rows. If you swap validation and storage, you'll write garbage.
When pipelines are the wrong place
Pipelines run inline with the crawler. Slow pipelines slow the spider. For:
- External API enrichment that takes >100ms
- Large file uploads (images, PDFs)
- Heavy ML inference
…queue items elsewhere (Celery, Symfony Messenger, RabbitMQ) and process them out of band. Pipelines should be milliseconds. Anything slower belongs in a worker queue.
Hands-on lab
Build a four-stage pipeline against /products:
- Validate: drop items missing
url,title, orprice. - Dedup: by URL using an in-memory set.
- Enrich: add
scraped_atISO timestamp. - Store: append to a JSONL file (use Scrapy's built-in
FEEDSor write a custom file pipeline).
Run the spider twice in a row. The second run should drop most items in the dedup pipeline. Check stats.item_dropped_count matches your expectation.
Hands-on lab
Practice this lesson on Catalog108, our first-party scraping sandbox.
Open lab target →/productsQuiz, check your understanding
Pass mark is 70%. Pick the best answer; you’ll see the explanation right after.