""" News fetcher — correlates bills with news articles. Triggered after LLM brief creation and on a 6-hour schedule for active bills. """ import logging from datetime import date, datetime, timedelta, timezone from sqlalchemy import and_ from app.database import get_sync_db from app.models import Bill, BillBrief, NewsArticle from app.services import news_service from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) def _save_articles(db, bill_id: str, articles: list[dict]) -> int: """Persist a list of article dicts for a bill, skipping duplicates. Returns saved count.""" saved = 0 for article in articles: url = article.get("url") if not url: continue existing = db.query(NewsArticle).filter_by(bill_id=bill_id, url=url).first() if existing: continue pub_at = None if article.get("published_at"): try: pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00")) except Exception: pass db.add(NewsArticle( bill_id=bill_id, source=article.get("source", "")[:200], headline=article.get("headline", ""), url=url, published_at=pub_at, relevance_score=1.0, )) saved += 1 return saved @celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill") def fetch_news_for_bill(self, bill_id: str): """Fetch news articles for a specific bill.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill: return {"status": "not_found"} query = news_service.build_news_query( bill_title=bill.title, short_title=bill.short_title, sponsor_name=None, bill_type=bill.bill_type, bill_number=bill.bill_number, ) newsapi_articles = news_service.fetch_newsapi_articles(query) gnews_articles = news_service.fetch_gnews_articles(query) saved = _save_articles(db, bill_id, newsapi_articles + gnews_articles) db.commit() logger.info(f"Saved {saved} news articles for bill {bill_id}") return {"status": "ok", "saved": saved} except Exception as exc: db.rollback() logger.error(f"News fetch failed for {bill_id}: {exc}") raise self.retry(exc=exc, countdown=300) finally: db.close() @celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill_batch") def fetch_news_for_bill_batch(self, bill_ids: list): """ Fetch news for a batch of bills in ONE NewsAPI call using OR query syntax (up to NEWSAPI_BATCH_SIZE bills per call). Google News is fetched per-bill but served from the 2-hour Redis cache so the RSS is only hit once per query. """ db = get_sync_db() try: bills = [db.get(Bill, bid) for bid in bill_ids] bills = [b for b in bills if b] if not bills: return {"status": "no_bills"} # Build (bill_id, query) pairs for the batch NewsAPI call bill_queries = [ ( bill.bill_id, news_service.build_news_query( bill_title=bill.title, short_title=bill.short_title, sponsor_name=None, bill_type=bill.bill_type, bill_number=bill.bill_number, ), ) for bill in bills ] # One NewsAPI call for the whole batch newsapi_batch = news_service.fetch_newsapi_articles_batch(bill_queries) total_saved = 0 for bill in bills: query = next(q for bid, q in bill_queries if bid == bill.bill_id) newsapi_articles = newsapi_batch.get(bill.bill_id, []) # Google News is cached — fine to call per-bill (cache hit after first) gnews_articles = news_service.fetch_gnews_articles(query) total_saved += _save_articles(db, bill.bill_id, newsapi_articles + gnews_articles) db.commit() logger.info(f"Batch saved {total_saved} articles for {len(bills)} bills") return {"status": "ok", "bills": len(bills), "saved": total_saved} except Exception as exc: db.rollback() logger.error(f"Batch news fetch failed: {exc}") raise self.retry(exc=exc, countdown=300) finally: db.close() @celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills") def fetch_news_for_active_bills(self): """ Scheduled task: fetch news for bills with recent actions (last 7 days). Groups bills into batches of NEWSAPI_BATCH_SIZE to multiply effective quota. """ db = get_sync_db() try: cutoff = date.today() - timedelta(days=7) active_bills = ( db.query(Bill) .filter(Bill.latest_action_date >= cutoff) .order_by(Bill.latest_action_date.desc()) .limit(80) .all() ) bill_ids = [b.bill_id for b in active_bills] batch_size = news_service.NEWSAPI_BATCH_SIZE batches = [bill_ids[i:i + batch_size] for i in range(0, len(bill_ids), batch_size)] for batch in batches: fetch_news_for_bill_batch.delay(batch) logger.info( f"Queued {len(batches)} news batches for {len(active_bills)} active bills " f"({batch_size} bills/batch)" ) return {"queued_batches": len(batches), "total_bills": len(active_bills)} finally: db.close()