feat: per-user notifications (ntfy + RSS), deduplicated actions, backfill task

Notifications:
- New /notifications page accessible to all users (ntfy + RSS config)
- ntfy now supports no-auth, Bearer token, and HTTP Basic auth (for ACL-protected self-hosted servers)
- RSS enabled/disabled independently of ntfy; token auto-generated on first GET
- Notification settings removed from admin-only Settings page; replaced with link card
- Sidebar adds Notifications nav link for all users
- notification_dispatcher.py: fan-out now marks RSS events dispatched independently

Action history:
- Migration 0012: deduplicates existing bill_actions rows and adds UNIQUE(bill_id, action_date, action_text)
- congress_poller.py: replaces existence-check inserts with ON CONFLICT DO NOTHING (race-condition safe)
- Added backfill_all_bill_actions task (no date filter) + admin endpoint POST /backfill-all-actions

Authored-By: Jack Levy
This commit is contained in:
Jack Levy
2026-03-01 12:04:13 -05:00
parent 91790fd798
commit 2e2fefb795
22 changed files with 1006 additions and 164 deletions

View File

@@ -10,6 +10,7 @@ import time
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.database import get_sync_db
from app.models import Bill, BillAction, Member, AppSetting
@@ -227,30 +228,15 @@ def fetch_bill_actions(self, bill_id: str):
break
for action in actions_data:
action_date_str = action.get("actionDate")
action_text = action.get("text", "")
action_type = action.get("type")
chamber = action.get("chamber")
# Idempotency check: skip if (bill_id, action_date, action_text) exists
exists = (
db.query(BillAction)
.filter(
BillAction.bill_id == bill_id,
BillAction.action_date == action_date_str,
BillAction.action_text == action_text,
)
.first()
)
if not exists:
db.add(BillAction(
bill_id=bill_id,
action_date=action_date_str,
action_text=action_text,
action_type=action_type,
chamber=chamber,
))
inserted += 1
stmt = pg_insert(BillAction.__table__).values(
bill_id=bill_id,
action_date=action.get("actionDate"),
action_text=action.get("text", ""),
action_type=action.get("type"),
chamber=action.get("chamber"),
).on_conflict_do_nothing(constraint="uq_bill_actions_bill_date_text")
result = db.execute(stmt)
inserted += result.rowcount
db.commit()
offset += 250
@@ -297,6 +283,28 @@ def fetch_actions_for_active_bills(self):
db.close()
@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_all_bill_actions")
def backfill_all_bill_actions(self):
"""One-time backfill: enqueue action fetches for every bill that has never had actions fetched."""
db = get_sync_db()
try:
bills = (
db.query(Bill)
.filter(Bill.actions_fetched_at.is_(None))
.order_by(Bill.latest_action_date.desc())
.all()
)
queued = 0
for bill in bills:
fetch_bill_actions.delay(bill.bill_id)
queued += 1
time.sleep(0.05) # ~20 tasks/sec — workers will self-throttle against Congress.gov
logger.info(f"backfill_all_bill_actions: queued {queued} bills")
return {"queued": queued}
finally:
db.close()
def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool:
"""Update bill fields if anything has changed. Returns True if updated."""
changed = False

View File

@@ -26,6 +26,81 @@ def _parse_pub_at(raw: str | None) -> datetime | None:
return None
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.sync_member_interest")
def sync_member_interest(self, bioguide_id: str):
"""
Fetch news and score a member in a single API pass.
Called on first profile view — avoids the 2x NewsAPI + GNews calls that
result from queuing fetch_member_news and calculate_member_trend_score separately.
"""
db = get_sync_db()
try:
member = db.get(Member, bioguide_id)
if not member or not member.first_name or not member.last_name:
return {"status": "skipped"}
query = news_service.build_member_query(
first_name=member.first_name,
last_name=member.last_name,
chamber=member.chamber,
)
# Single fetch — results reused for both article storage and scoring
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
gnews_articles = news_service.fetch_gnews_articles(query, days=30)
all_articles = newsapi_articles + gnews_articles
saved = 0
for article in all_articles:
url = article.get("url")
if not url:
continue
existing = (
db.query(MemberNewsArticle)
.filter_by(member_id=bioguide_id, url=url)
.first()
)
if existing:
continue
db.add(MemberNewsArticle(
member_id=bioguide_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=_parse_pub_at(article.get("published_at")),
relevance_score=1.0,
))
saved += 1
# Score using counts already in hand — no second API round-trip
today = date.today()
if not db.query(MemberTrendScore).filter_by(member_id=bioguide_id, score_date=today).first():
keywords = trends_service.keywords_for_member(member.first_name, member.last_name)
gtrends_score = trends_service.get_trends_score(keywords)
composite = calculate_composite_score(
len(newsapi_articles), len(gnews_articles), gtrends_score
)
db.add(MemberTrendScore(
member_id=bioguide_id,
score_date=today,
newsapi_count=len(newsapi_articles),
gnews_count=len(gnews_articles),
gtrends_score=gtrends_score,
composite_score=composite,
))
db.commit()
logger.info(f"Synced member interest for {bioguide_id}: {saved} articles saved")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"Member interest sync failed for {bioguide_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.fetch_member_news")
def fetch_member_news(self, bioguide_id: str):
"""Fetch and store recent news articles for a specific member."""

View File

@@ -0,0 +1,115 @@
"""
Notification dispatcher — sends pending notification events via ntfy.
RSS is pull-based so no dispatch is needed for it; events are simply
marked dispatched once ntfy is sent (or immediately if the user has no
ntfy configured but has an RSS token, so the feed can clean up old items).
Runs every 5 minutes on Celery Beat.
"""
import logging
from datetime import datetime, timezone
import requests
from app.database import get_sync_db
from app.models.notification import NotificationEvent
from app.models.user import User
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 10
_EVENT_TITLES = {
"new_document": "New Bill Text Published",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
}
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
def dispatch_notifications(self):
"""Fan out pending notification events to ntfy and mark dispatched."""
db = get_sync_db()
try:
pending = (
db.query(NotificationEvent)
.filter(NotificationEvent.dispatched_at.is_(None))
.order_by(NotificationEvent.created_at)
.limit(200)
.all()
)
sent = 0
failed = 0
now = datetime.now(timezone.utc)
for event in pending:
user = db.get(User, event.user_id)
if not user:
event.dispatched_at = now
db.commit()
continue
prefs = user.notification_prefs or {}
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
ntfy_enabled = prefs.get("ntfy_enabled", False)
rss_enabled = prefs.get("rss_enabled", False)
if ntfy_enabled and ntfy_url:
try:
_send_ntfy(event, ntfy_url, ntfy_auth_method, ntfy_token, ntfy_username, ntfy_password)
sent += 1
except Exception as e:
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
failed += 1
# Mark dispatched once handled by at least one enabled channel.
# RSS is pull-based — no action needed beyond creating the event record.
if (ntfy_enabled and ntfy_url) or rss_enabled:
event.dispatched_at = now
db.commit()
logger.info(f"dispatch_notifications: {sent} sent, {failed} failed, {len(pending)} pending")
return {"sent": sent, "failed": failed, "total": len(pending)}
finally:
db.close()
def _send_ntfy(
event: NotificationEvent,
topic_url: str,
auth_method: str = "none",
token: str = "",
username: str = "",
password: str = "",
) -> None:
import base64
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
message = f"{bill_label}: {bill_title}"
if payload.get("brief_summary"):
message += f"\n\n{payload['brief_summary'][:280]}"
headers = {
"Title": _EVENT_TITLES.get(event.event_type, "Bill Update"),
"Priority": "default",
"Tags": "scroll",
}
if payload.get("bill_url"):
headers["Click"] = payload["bill_url"]
if auth_method == "token" and token:
headers["Authorization"] = f"Bearer {token}"
elif auth_method == "basic" and username:
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()