""" Trend scorer — calculates the daily zeitgeist score for bills. Runs nightly via Celery Beat. """ import logging from datetime import date, timedelta from sqlalchemy import and_ from app.database import get_sync_db from app.models import Bill, BillBrief, TrendScore from app.services import news_service, trends_service from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) _PYTRENDS_BATCH = 5 # max keywords pytrends accepts per call def calculate_composite_score(newsapi_count: int, gnews_count: int, gtrends_score: float) -> float: """ Weighted composite score (0–100): NewsAPI article count → 0–40 pts (saturates at 20 articles) Google News RSS count → 0–30 pts (saturates at 50 articles) Google Trends score → 0–30 pts (0–100 input) """ newsapi_pts = min(newsapi_count / 20, 1.0) * 40 gnews_pts = min(gnews_count / 50, 1.0) * 30 gtrends_pts = (gtrends_score / 100) * 30 return round(newsapi_pts + gnews_pts + gtrends_pts, 2) @celery_app.task(bind=True, name="app.workers.trend_scorer.calculate_all_trend_scores") def calculate_all_trend_scores(self): """Nightly task: calculate trend scores for bills active in the last 90 days.""" db = get_sync_db() try: cutoff = date.today() - timedelta(days=90) active_bills = ( db.query(Bill) .filter(Bill.latest_action_date >= cutoff) .all() ) today = date.today() # Filter to bills not yet scored today bills_to_score = [] for bill in active_bills: existing = ( db.query(TrendScore) .filter_by(bill_id=bill.bill_id, score_date=today) .first() ) if not existing: bills_to_score.append(bill) scored = 0 # Process in batches of _PYTRENDS_BATCH so one pytrends call covers multiple bills for batch_start in range(0, len(bills_to_score), _PYTRENDS_BATCH): batch = bills_to_score[batch_start: batch_start + _PYTRENDS_BATCH] # Collect keyword groups for pytrends batch call keyword_groups = [] bill_queries = [] for bill in batch: latest_brief = ( db.query(BillBrief) .filter_by(bill_id=bill.bill_id) .order_by(BillBrief.created_at.desc()) .first() ) topic_tags = latest_brief.topic_tags if latest_brief else [] query = news_service.build_news_query( bill_title=bill.title, short_title=bill.short_title, sponsor_name=None, bill_type=bill.bill_type, bill_number=bill.bill_number, ) keywords = trends_service.keywords_for_bill( title=bill.title or "", short_title=bill.short_title or "", topic_tags=topic_tags, ) keyword_groups.append(keywords) bill_queries.append(query) # One pytrends call for the whole batch gtrends_scores = trends_service.get_trends_scores_batch(keyword_groups) for i, bill in enumerate(batch): try: query = bill_queries[i] # NewsAPI + Google News counts (gnews served from 2-hour cache) newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) newsapi_count = len(newsapi_articles) gnews_count = news_service.fetch_gnews_count(query, days=30) gtrends_score = gtrends_scores[i] composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score) db.add(TrendScore( bill_id=bill.bill_id, score_date=today, newsapi_count=newsapi_count, gnews_count=gnews_count, gtrends_score=gtrends_score, composite_score=composite, )) scored += 1 except Exception as exc: logger.warning(f"Trend scoring skipped for {bill.bill_id}: {exc}") db.commit() logger.info(f"Scored {scored} bills") return {"scored": scored} except Exception as exc: db.rollback() logger.error(f"Trend scoring failed: {exc}") raise finally: db.close()