105 lines
3.3 KiB
Python
105 lines
3.3 KiB
Python
"""
|
|
News fetcher — correlates bills with news articles.
|
|
Triggered after LLM brief creation and on a 6-hour schedule for active bills.
|
|
"""
|
|
import logging
|
|
from datetime import date, datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import and_
|
|
|
|
from app.database import get_sync_db
|
|
from app.models import Bill, BillBrief, NewsArticle
|
|
from app.services import news_service
|
|
from app.workers.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill")
|
|
def fetch_news_for_bill(self, bill_id: str):
|
|
"""Fetch news articles for a specific bill."""
|
|
db = get_sync_db()
|
|
try:
|
|
bill = db.get(Bill, bill_id)
|
|
if not bill:
|
|
return {"status": "not_found"}
|
|
|
|
# Get topic tags from latest brief
|
|
latest_brief = (
|
|
db.query(BillBrief)
|
|
.filter_by(bill_id=bill_id)
|
|
.order_by(BillBrief.created_at.desc())
|
|
.first()
|
|
)
|
|
topic_tags = latest_brief.topic_tags if latest_brief else []
|
|
|
|
query = news_service.build_news_query(
|
|
bill_title=bill.title,
|
|
short_title=bill.short_title,
|
|
sponsor_name=None,
|
|
bill_type=bill.bill_type,
|
|
bill_number=bill.bill_number,
|
|
)
|
|
|
|
articles = news_service.fetch_newsapi_articles(query)
|
|
saved = 0
|
|
for article in articles:
|
|
url = article.get("url")
|
|
if not url:
|
|
continue
|
|
# Idempotency: skip duplicate URLs
|
|
existing = db.query(NewsArticle).filter_by(url=url).first()
|
|
if existing:
|
|
continue
|
|
pub_at = None
|
|
if article.get("published_at"):
|
|
try:
|
|
pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00"))
|
|
except Exception:
|
|
pass
|
|
db.add(NewsArticle(
|
|
bill_id=bill_id,
|
|
source=article.get("source", "")[:200],
|
|
headline=article.get("headline", ""),
|
|
url=url,
|
|
published_at=pub_at,
|
|
relevance_score=1.0,
|
|
))
|
|
saved += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Saved {saved} news articles for bill {bill_id}")
|
|
return {"status": "ok", "saved": saved}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"News fetch failed for {bill_id}: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills")
|
|
def fetch_news_for_active_bills(self):
|
|
"""
|
|
Scheduled task: fetch news for bills with recent actions (last 7 days).
|
|
Respects the 100/day NewsAPI limit by processing at most 80 bills per run.
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
cutoff = date.today() - timedelta(days=7)
|
|
active_bills = (
|
|
db.query(Bill)
|
|
.filter(Bill.latest_action_date >= cutoff)
|
|
.order_by(Bill.latest_action_date.desc())
|
|
.limit(80)
|
|
.all()
|
|
)
|
|
for bill in active_bills:
|
|
fetch_news_for_bill.delay(bill.bill_id)
|
|
|
|
logger.info(f"Queued news fetch for {len(active_bills)} active bills")
|
|
return {"queued": len(active_bills)}
|
|
finally:
|
|
db.close()
|