Files
PocketVeto/backend/app/workers/news_fetcher.py
Jack Levy a66b5b4bcb feat(interest): add public interest tracking for members of Congress
Adds Google Trends, NewsAPI, and Google News RSS scoring for members,
mirroring the existing bill interest pipeline. Member profiles now show
a Public Interest chart (with signal breakdown) and a Related News panel.

Key changes:
- New member_trend_scores + member_news_articles tables (migration 0008)
- fetch_gnews_articles() added to news_service for unlimited RSS article storage
- Bill news fetcher now combines NewsAPI + Google News RSS (more coverage)
- New member_interest Celery worker with scheduled news + trend tasks
- GET /members/{id}/trend and /news API endpoints
- TrendChart redesigned with signal breakdown badges and bar+line combo chart
- NewsPanel accepts generic article shape (bills and members)

Co-Authored-By: Jack Levy
2026-03-01 00:36:30 -05:00

108 lines
3.5 KiB
Python

"""
News fetcher — correlates bills with news articles.
Triggered after LLM brief creation and on a 6-hour schedule for active bills.
"""
import logging
from datetime import date, datetime, timedelta, timezone
from sqlalchemy import and_
from app.database import get_sync_db
from app.models import Bill, BillBrief, NewsArticle
from app.services import news_service
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill")
def fetch_news_for_bill(self, bill_id: str):
"""Fetch news articles for a specific bill."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"status": "not_found"}
# Get topic tags from latest brief
latest_brief = (
db.query(BillBrief)
.filter_by(bill_id=bill_id)
.order_by(BillBrief.created_at.desc())
.first()
)
topic_tags = latest_brief.topic_tags if latest_brief else []
query = news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
)
newsapi_articles = news_service.fetch_newsapi_articles(query)
gnews_articles = news_service.fetch_gnews_articles(query)
all_articles = newsapi_articles + gnews_articles
saved = 0
for article in all_articles:
url = article.get("url")
if not url:
continue
# Idempotency: skip duplicate URLs
existing = db.query(NewsArticle).filter_by(url=url).first()
if existing:
continue
pub_at = None
if article.get("published_at"):
try:
pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00"))
except Exception:
pass
db.add(NewsArticle(
bill_id=bill_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=pub_at,
relevance_score=1.0,
))
saved += 1
db.commit()
logger.info(f"Saved {saved} news articles for bill {bill_id}")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"News fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills")
def fetch_news_for_active_bills(self):
"""
Scheduled task: fetch news for bills with recent actions (last 7 days).
Respects the 100/day NewsAPI limit by processing at most 80 bills per run.
"""
db = get_sync_db()
try:
cutoff = date.today() - timedelta(days=7)
active_bills = (
db.query(Bill)
.filter(Bill.latest_action_date >= cutoff)
.order_by(Bill.latest_action_date.desc())
.limit(80)
.all()
)
for bill in active_bills:
fetch_news_for_bill.delay(bill.bill_id)
logger.info(f"Queued news fetch for {len(active_bills)} active bills")
return {"queued": len(active_bills)}
finally:
db.close()