Celery for Python Workers
Celery is the heavyweight Python distributed-task system. When its complexity earns its keep, and when rq or raw Redis is enough.
What you’ll learn
- Set up a Celery worker with Redis or RabbitMQ broker.
- Define tasks, chain them, schedule them.
- Know when Celery is overkill vs essential.
Celery is the Python world's most powerful distributed task system. It's also the most complex. For scraping pipelines with many task types, intricate retries, and scheduled work, it's hard to beat. For simple "fetch this URL" queues, it's overkill, rq is usually a better starting point.
When Celery wins
- Multi-stage pipelines: fetch → parse → enrich → store, each as a different task type.
- Chained or branching workflows (
task_a | task_b | task_c). - Beat scheduler for periodic tasks.
- Need for autoscaling worker pools, complex routing.
- You're already using Django (Celery integrates well).
When Celery is overkill
- One task type, simple fan-out, rq or raw Redis suffice.
- You don't need scheduled tasks (cron + script is fine).
- Operational simplicity matters more than feature depth.
Minimal Celery setup
# celery_app.py
from celery import Celery
app = Celery(
"scraper",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def scrape_url(self, url):
try:
resp = httpx.get(url, timeout=10)
resp.raise_for_status()
return {"url": url, "status": resp.status_code, "text": resp.text}
except Exception as e:
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
# Run a worker
celery -A celery_app worker --concurrency=8 --loglevel=info
Producer:
from celery_app import scrape_url
for url in urls:
scrape_url.delay(url)
delay enqueues. The worker picks it up. Retries are automatic (subject to max_retries).
Routing tasks to specific workers
For mixed workloads (HTTP scraping + heavy parsing on separate machines):
app.conf.task_routes = {
"scraper.scrape_url": {"queue": "fetch"},
"scraper.parse_html": {"queue": "parse"},
"scraper.enrich_item": {"queue": "enrich"},
}
# Run different workers for different queues
celery -A celery_app worker -Q fetch --concurrency=20
celery -A celery_app worker -Q parse --concurrency=4
celery -A celery_app worker -Q enrich --concurrency=2
Each queue runs on appropriately-sized hardware. Fetch is I/O-bound (high concurrency). Parse is CPU-bound (process per core). Enrich might be DB-bound (low concurrency).
Chaining tasks
from celery import chain
workflow = chain(
scrape_url.s("https://practice.scrapingcentral.com/products/1"),
parse_html.s(),
enrich_item.s(),
)
result = workflow.apply_async()
The output of one task is the input to the next. Useful for sequential dependencies.
Beat scheduler, periodic tasks
# celery_app.py
from celery.schedules import crontab
app.conf.beat_schedule = {
"scrape-products-hourly": {
"task": "scraper.scrape_listing",
"schedule": crontab(minute="0"),
"args": ("https://practice.scrapingcentral.com/products",),
},
}
Run beat alongside worker:
celery -A celery_app beat
celery -A celery_app worker
Beat schedules tasks on the configured cron; workers consume.
Backoff and retry strategies
@app.task(bind=True, autoretry_for=(httpx.HTTPError,), retry_backoff=True, retry_jitter=True, max_retries=5)
def scrape_url(self, url):
return httpx.get(url, timeout=10).text
autoretry_for declares which exceptions trigger retry. retry_backoff=True exponential. retry_jitter=True adds randomness.
For per-request 429 handling, custom retry inside the task body:
@app.task(bind=True)
def scrape_url(self, url):
r = httpx.get(url)
if r.status_code == 429:
retry_after = int(r.headers.get("Retry-After", 60))
raise self.retry(countdown=retry_after, max_retries=3)
return r.text
Idempotency and ACKing
By default, Celery acks tasks BEFORE running them. If a worker crashes mid-task, the task is lost. To get at-least-once semantics, use acks_late=True:
@app.task(acks_late=True, reject_on_worker_lost=True)
def scrape_url(self, url):
...
acks_late means the worker acks only after success. If the worker crashes, the broker redelivers. Required for production-grade reliability, but means your task MUST be idempotent (could run twice).
Monitoring
Flower is Celery's web dashboard:
celery -A celery_app flower
Shows live worker status, queue depths, task history, success/failure rates. For production, indispensable.
Celery vs rq vs Symfony Messenger
| Concern | Celery | rq | Symfony Messenger |
|---|---|---|---|
| Complexity | High | Low | Medium |
| Multi-stage pipelines | Excellent | Awkward | Good |
| Scheduled tasks | Beat (built-in) | Add-on | Native via Scheduler |
| Multi-language | Python | Python | PHP |
| Operational overhead | Significant | Low | Medium |
| Sweet spot | Complex DAGs, scheduled, multi-language | Simple workers | Symfony app integration |
Pick by team and project shape. Don't pick Celery for a simple URL queue.
Common gotchas
- Memory leaks in long-lived workers. Use
--max-tasks-per-child=1000to recycle workers periodically. - Forgotten
acks_late. Default behavior loses messages on worker crash. - Heavy results in the backend. Don't return giant HTML strings; write to DB and return small references.
- Beat duplicate schedules across multiple beats. Run ONE beat process; multiple beats schedule duplicates.
Hands-on lab
Build a 3-stage Celery scraping pipeline:
scrape_url, fetch raw HTML.parse_html, extract items.store_items, write to Postgres.
Chain them. Run with separate worker queues per stage. Run beat to schedule the listing scrape every 15 minutes. Monitor with Flower.
The exercise teaches both Celery's power and its weight. If the cost-benefit feels off for your project, that's the signal to use a lighter tool.
Quiz, check your understanding
Pass mark is 70%. Pick the best answer; you’ll see the explanation right after.