""" Member interest worker — tracks public interest in members of Congress. Fetches news articles and calculates trend scores for members using the same composite scoring model as bills (NewsAPI + Google News RSS + pytrends). Runs on a schedule and can also be triggered per-member. """ import logging from datetime import date, datetime, timedelta, timezone from app.database import get_sync_db from app.models import Member, MemberNewsArticle, MemberTrendScore from app.services import news_service, trends_service from app.workers.celery_app import celery_app from app.workers.trend_scorer import calculate_composite_score logger = logging.getLogger(__name__) def _parse_pub_at(raw: str | None) -> datetime | None: if not raw: return None try: return datetime.fromisoformat(raw.replace("Z", "+00:00")) except Exception: return None @celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.fetch_member_news") def fetch_member_news(self, bioguide_id: str): """Fetch and store recent news articles for a specific member.""" db = get_sync_db() try: member = db.get(Member, bioguide_id) if not member or not member.first_name or not member.last_name: return {"status": "skipped"} query = news_service.build_member_query( first_name=member.first_name, last_name=member.last_name, chamber=member.chamber, ) newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) gnews_articles = news_service.fetch_gnews_articles(query, days=30) all_articles = newsapi_articles + gnews_articles saved = 0 for article in all_articles: url = article.get("url") if not url: continue existing = ( db.query(MemberNewsArticle) .filter_by(member_id=bioguide_id, url=url) .first() ) if existing: continue db.add(MemberNewsArticle( member_id=bioguide_id, source=article.get("source", "")[:200], headline=article.get("headline", ""), url=url, published_at=_parse_pub_at(article.get("published_at")), relevance_score=1.0, )) saved += 1 db.commit() logger.info(f"Saved {saved} news articles for member {bioguide_id}") return {"status": "ok", "saved": saved} except Exception as exc: db.rollback() logger.error(f"Member news fetch failed for {bioguide_id}: {exc}") raise self.retry(exc=exc, countdown=300) finally: db.close() @celery_app.task(bind=True, name="app.workers.member_interest.calculate_member_trend_score") def calculate_member_trend_score(self, bioguide_id: str): """Calculate and store today's public interest score for a member.""" db = get_sync_db() try: member = db.get(Member, bioguide_id) if not member or not member.first_name or not member.last_name: return {"status": "skipped"} today = date.today() existing = ( db.query(MemberTrendScore) .filter_by(member_id=bioguide_id, score_date=today) .first() ) if existing: return {"status": "already_scored"} query = news_service.build_member_query( first_name=member.first_name, last_name=member.last_name, chamber=member.chamber, ) keywords = trends_service.keywords_for_member(member.first_name, member.last_name) newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) newsapi_count = len(newsapi_articles) gnews_count = news_service.fetch_gnews_count(query, days=30) gtrends_score = trends_service.get_trends_score(keywords) composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score) db.add(MemberTrendScore( member_id=bioguide_id, score_date=today, newsapi_count=newsapi_count, gnews_count=gnews_count, gtrends_score=gtrends_score, composite_score=composite, )) db.commit() logger.info(f"Scored member {bioguide_id}: composite={composite:.1f}") return {"status": "ok", "composite": composite} except Exception as exc: db.rollback() logger.error(f"Member trend scoring failed for {bioguide_id}: {exc}") raise finally: db.close() @celery_app.task(bind=True, name="app.workers.member_interest.fetch_news_for_active_members") def fetch_news_for_active_members(self): """ Scheduled task: fetch news for members who have been viewed or followed. Prioritises members with detail_fetched set (profile has been viewed). """ db = get_sync_db() try: members = ( db.query(Member) .filter(Member.detail_fetched.isnot(None)) .filter(Member.first_name.isnot(None)) .all() ) for member in members: fetch_member_news.delay(member.bioguide_id) logger.info(f"Queued news fetch for {len(members)} members") return {"queued": len(members)} finally: db.close() @celery_app.task(bind=True, name="app.workers.member_interest.calculate_all_member_trend_scores") def calculate_all_member_trend_scores(self): """ Scheduled nightly task: score all members that have been viewed. Members are scored only after their profile has been loaded at least once. """ db = get_sync_db() try: members = ( db.query(Member) .filter(Member.detail_fetched.isnot(None)) .filter(Member.first_name.isnot(None)) .all() ) for member in members: calculate_member_trend_score.delay(member.bioguide_id) logger.info(f"Queued trend scoring for {len(members)} members") return {"queued": len(members)} finally: db.close()