Notifications: - New /notifications page accessible to all users (ntfy + RSS config) - ntfy now supports no-auth, Bearer token, and HTTP Basic auth (for ACL-protected self-hosted servers) - RSS enabled/disabled independently of ntfy; token auto-generated on first GET - Notification settings removed from admin-only Settings page; replaced with link card - Sidebar adds Notifications nav link for all users - notification_dispatcher.py: fan-out now marks RSS events dispatched independently Action history: - Migration 0012: deduplicates existing bill_actions rows and adds UNIQUE(bill_id, action_date, action_text) - congress_poller.py: replaces existence-check inserts with ON CONFLICT DO NOTHING (race-condition safe) - Added backfill_all_bill_actions task (no date filter) + admin endpoint POST /backfill-all-actions Authored-By: Jack Levy
253 lines
8.9 KiB
Python
253 lines
8.9 KiB
Python
"""
|
|
Member interest worker — tracks public interest in members of Congress.
|
|
|
|
Fetches news articles and calculates trend scores for members using the
|
|
same composite scoring model as bills (NewsAPI + Google News RSS + pytrends).
|
|
Runs on a schedule and can also be triggered per-member.
|
|
"""
|
|
import logging
|
|
from datetime import date, datetime, timedelta, timezone
|
|
|
|
from app.database import get_sync_db
|
|
from app.models import Member, MemberNewsArticle, MemberTrendScore
|
|
from app.services import news_service, trends_service
|
|
from app.workers.celery_app import celery_app
|
|
from app.workers.trend_scorer import calculate_composite_score
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _parse_pub_at(raw: str | None) -> datetime | None:
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.sync_member_interest")
|
|
def sync_member_interest(self, bioguide_id: str):
|
|
"""
|
|
Fetch news and score a member in a single API pass.
|
|
Called on first profile view — avoids the 2x NewsAPI + GNews calls that
|
|
result from queuing fetch_member_news and calculate_member_trend_score separately.
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
member = db.get(Member, bioguide_id)
|
|
if not member or not member.first_name or not member.last_name:
|
|
return {"status": "skipped"}
|
|
|
|
query = news_service.build_member_query(
|
|
first_name=member.first_name,
|
|
last_name=member.last_name,
|
|
chamber=member.chamber,
|
|
)
|
|
|
|
# Single fetch — results reused for both article storage and scoring
|
|
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
|
|
gnews_articles = news_service.fetch_gnews_articles(query, days=30)
|
|
all_articles = newsapi_articles + gnews_articles
|
|
|
|
saved = 0
|
|
for article in all_articles:
|
|
url = article.get("url")
|
|
if not url:
|
|
continue
|
|
existing = (
|
|
db.query(MemberNewsArticle)
|
|
.filter_by(member_id=bioguide_id, url=url)
|
|
.first()
|
|
)
|
|
if existing:
|
|
continue
|
|
db.add(MemberNewsArticle(
|
|
member_id=bioguide_id,
|
|
source=article.get("source", "")[:200],
|
|
headline=article.get("headline", ""),
|
|
url=url,
|
|
published_at=_parse_pub_at(article.get("published_at")),
|
|
relevance_score=1.0,
|
|
))
|
|
saved += 1
|
|
|
|
# Score using counts already in hand — no second API round-trip
|
|
today = date.today()
|
|
if not db.query(MemberTrendScore).filter_by(member_id=bioguide_id, score_date=today).first():
|
|
keywords = trends_service.keywords_for_member(member.first_name, member.last_name)
|
|
gtrends_score = trends_service.get_trends_score(keywords)
|
|
composite = calculate_composite_score(
|
|
len(newsapi_articles), len(gnews_articles), gtrends_score
|
|
)
|
|
db.add(MemberTrendScore(
|
|
member_id=bioguide_id,
|
|
score_date=today,
|
|
newsapi_count=len(newsapi_articles),
|
|
gnews_count=len(gnews_articles),
|
|
gtrends_score=gtrends_score,
|
|
composite_score=composite,
|
|
))
|
|
|
|
db.commit()
|
|
logger.info(f"Synced member interest for {bioguide_id}: {saved} articles saved")
|
|
return {"status": "ok", "saved": saved}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"Member interest sync failed for {bioguide_id}: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.fetch_member_news")
|
|
def fetch_member_news(self, bioguide_id: str):
|
|
"""Fetch and store recent news articles for a specific member."""
|
|
db = get_sync_db()
|
|
try:
|
|
member = db.get(Member, bioguide_id)
|
|
if not member or not member.first_name or not member.last_name:
|
|
return {"status": "skipped"}
|
|
|
|
query = news_service.build_member_query(
|
|
first_name=member.first_name,
|
|
last_name=member.last_name,
|
|
chamber=member.chamber,
|
|
)
|
|
|
|
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
|
|
gnews_articles = news_service.fetch_gnews_articles(query, days=30)
|
|
all_articles = newsapi_articles + gnews_articles
|
|
|
|
saved = 0
|
|
for article in all_articles:
|
|
url = article.get("url")
|
|
if not url:
|
|
continue
|
|
existing = (
|
|
db.query(MemberNewsArticle)
|
|
.filter_by(member_id=bioguide_id, url=url)
|
|
.first()
|
|
)
|
|
if existing:
|
|
continue
|
|
db.add(MemberNewsArticle(
|
|
member_id=bioguide_id,
|
|
source=article.get("source", "")[:200],
|
|
headline=article.get("headline", ""),
|
|
url=url,
|
|
published_at=_parse_pub_at(article.get("published_at")),
|
|
relevance_score=1.0,
|
|
))
|
|
saved += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Saved {saved} news articles for member {bioguide_id}")
|
|
return {"status": "ok", "saved": saved}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"Member news fetch failed for {bioguide_id}: {exc}")
|
|
raise self.retry(exc=exc, countdown=300)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.member_interest.calculate_member_trend_score")
|
|
def calculate_member_trend_score(self, bioguide_id: str):
|
|
"""Calculate and store today's public interest score for a member."""
|
|
db = get_sync_db()
|
|
try:
|
|
member = db.get(Member, bioguide_id)
|
|
if not member or not member.first_name or not member.last_name:
|
|
return {"status": "skipped"}
|
|
|
|
today = date.today()
|
|
existing = (
|
|
db.query(MemberTrendScore)
|
|
.filter_by(member_id=bioguide_id, score_date=today)
|
|
.first()
|
|
)
|
|
if existing:
|
|
return {"status": "already_scored"}
|
|
|
|
query = news_service.build_member_query(
|
|
first_name=member.first_name,
|
|
last_name=member.last_name,
|
|
chamber=member.chamber,
|
|
)
|
|
keywords = trends_service.keywords_for_member(member.first_name, member.last_name)
|
|
|
|
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
|
|
newsapi_count = len(newsapi_articles)
|
|
gnews_count = news_service.fetch_gnews_count(query, days=30)
|
|
gtrends_score = trends_service.get_trends_score(keywords)
|
|
|
|
composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score)
|
|
|
|
db.add(MemberTrendScore(
|
|
member_id=bioguide_id,
|
|
score_date=today,
|
|
newsapi_count=newsapi_count,
|
|
gnews_count=gnews_count,
|
|
gtrends_score=gtrends_score,
|
|
composite_score=composite,
|
|
))
|
|
db.commit()
|
|
logger.info(f"Scored member {bioguide_id}: composite={composite:.1f}")
|
|
return {"status": "ok", "composite": composite}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"Member trend scoring failed for {bioguide_id}: {exc}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.member_interest.fetch_news_for_active_members")
|
|
def fetch_news_for_active_members(self):
|
|
"""
|
|
Scheduled task: fetch news for members who have been viewed or followed.
|
|
Prioritises members with detail_fetched set (profile has been viewed).
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
members = (
|
|
db.query(Member)
|
|
.filter(Member.detail_fetched.isnot(None))
|
|
.filter(Member.first_name.isnot(None))
|
|
.all()
|
|
)
|
|
for member in members:
|
|
fetch_member_news.delay(member.bioguide_id)
|
|
|
|
logger.info(f"Queued news fetch for {len(members)} members")
|
|
return {"queued": len(members)}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.member_interest.calculate_all_member_trend_scores")
|
|
def calculate_all_member_trend_scores(self):
|
|
"""
|
|
Scheduled nightly task: score all members that have been viewed.
|
|
Members are scored only after their profile has been loaded at least once.
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
members = (
|
|
db.query(Member)
|
|
.filter(Member.detail_fetched.isnot(None))
|
|
.filter(Member.first_name.isnot(None))
|
|
.all()
|
|
)
|
|
for member in members:
|
|
calculate_member_trend_score.delay(member.bioguide_id)
|
|
|
|
logger.info(f"Queued trend scoring for {len(members)} members")
|
|
return {"queued": len(members)}
|
|
finally:
|
|
db.close()
|