Building a News Aggregator Scraper
Build a complete news aggregator that collects articles from multiple sources using RSS feeds and web scraping. Deduplicate, categorize, and store results.
Python Scraping · #25intermediate4 min read
A news aggregator collects articles from multiple sources into a single feed. This is a practical project that combines RSS parsing, web scraping, deduplication, and data storage.
The Complete News Aggregator
import feedparser
import requests
from bs4 import BeautifulSoup
import sqlite3
import hashlib
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
import json
import time
@dataclass
class Article:
title: str
url: str
source: str
summary: str
published: str
category: str
content_hash: str
class NewsAggregator:
def __init__(self, db_path="news.db"):
self.db_path = db_path
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"NewsAggregator/1.0 (+https://scrapingcentral.com)"
)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
content_hash TEXT PRIMARY KEY,
title TEXT NOT NULL,
url TEXT UNIQUE,
source TEXT,
summary TEXT,
published TEXT,
category TEXT,
collected_at TEXT
)
""")
def _hash_content(self, title, url):
"""Generate a unique hash for deduplication."""
content = f"{title.lower().strip()}|{url.strip()}"
return hashlib.md5(content.encode()).hexdigest()
def fetch_rss(self, feed_url, category="general"):
"""Parse an RSS feed and return articles."""
try:
feed = feedparser.parse(feed_url)
articles = []
for entry in feed.entries:
title = entry.get("title", "").strip()
url = entry.get("link", "").strip()
if not title or not url:
continue
article = Article(
title=title,
url=url,
source=feed.feed.get("title", feed_url),
summary=entry.get("summary", "")[:500],
published=entry.get("published", ""),
category=category,
content_hash=self._hash_content(title, url),
)
articles.append(article)
return articles
except Exception as e:
print(f"Error fetching {feed_url}: {e}")
return []
def scrape_page(self, url, source_name, category="general"):
"""Scrape articles from an HTML page (for sites without RSS)."""
try:
response = self.session.get(url, timeout=15)
soup = BeautifulSoup(response.text, "html.parser")
articles = []
for a in soup.select("a[href]"):
title = a.get_text(strip=True)
href = a.get("href", "")
# Filter: only keep links that look like articles
if len(title) < 20 or len(title) > 200:
continue
if not href.startswith("http"):
from urllib.parse import urljoin
href = urljoin(url, href)
article = Article(
title=title,
url=href,
source=source_name,
summary="",
published=datetime.now().isoformat(),
category=category,
content_hash=self._hash_content(title, href),
)
articles.append(article)
return articles
except Exception as e:
print(f"Error scraping {url}: {e}")
return []
def store_articles(self, articles):
"""Store articles in the database, skipping duplicates."""
new_count = 0
with sqlite3.connect(self.db_path) as conn:
for article in articles:
try:
conn.execute("""
INSERT INTO articles
(content_hash, title, url, source, summary,
published, category, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
article.content_hash,
article.title,
article.url,
article.source,
article.summary,
article.published,
article.category,
datetime.now().isoformat(),
))
new_count += 1
except sqlite3.IntegrityError:
pass # Duplicate
return new_count
def collect_all(self, sources):
"""Collect articles from all configured sources."""
all_articles = []
def fetch_source(source):
if source["type"] == "rss":
return self.fetch_rss(source["url"], source.get("category", "general"))
elif source["type"] == "html":
return self.scrape_page(
source["url"], source["name"], source.get("category", "general")
)
return []
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch_source, sources)
for articles in results:
all_articles.extend(articles)
new_count = self.store_articles(all_articles)
print(f"Collected {len(all_articles)} articles, {new_count} new.")
return new_count
def get_latest(self, limit=20, category=None):
"""Get the latest articles from the database."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
if category:
rows = conn.execute(
"SELECT * FROM articles WHERE category = ? ORDER BY collected_at DESC LIMIT ?",
(category, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM articles ORDER BY collected_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# Configuration
SOURCES = [
{
"type": "rss",
"url": "https://news.ycombinator.com/rss",
"category": "tech",
},
{
"type": "rss",
"url": "https://www.reddit.com/r/python/.rss",
"category": "python",
},
{
"type": "rss",
"url": "https://realpython.com/atom.xml",
"category": "python",
},
]
if __name__ == "__main__":
aggregator = NewsAggregator()
# Collect from all sources
aggregator.collect_all(SOURCES)
# Display latest articles
print("\n--- Latest Articles ---\n")
for article in aggregator.get_latest(10):
print(f"[{article['category'].upper()}] {article['title']}")
print(f" Source: {article['source']}")
print(f" URL: {article['url']}")
print()
Exporting to JSON
aggregator = NewsAggregator()
articles = aggregator.get_latest(50)
with open("latest_news.json", "w", encoding="utf-8") as f:
json.dump(articles, f, indent=2, ensure_ascii=False)
print(f"Exported {len(articles)} articles to latest_news.json")
Tips
- Use RSS feeds whenever available, they are reliable, structured, and polite to fetch.
- Content hashing prevents duplicate articles from appearing when sources are refreshed.
- Run the aggregator on a schedule (cron or a task scheduler) to keep your news database current.
- For sources that block automated requests, route them through ScraperAPI or ScrapingAnt to fetch reliably.
- Add a
source_reliabilityscore to rank articles from more trusted sources higher.
Next Steps
- Explore Zyte API for advanced scraping capabilities
- Learn web scraping best practices and design patterns