Files
PocketVeto/backend/app/workers/notification_dispatcher.py
Jack Levy 73881b2404 feat(notifications): follow modes, milestone alerts, notification enhancements
Follow Modes (neutral / pocket_veto / pocket_boost):
- Alembic migration 0013 adds follow_mode column to follows table
- FollowButton rewritten as mode-aware dropdown for bills; simple toggle for members/topics
- PATCH /api/follows/{id}/mode endpoint with validation
- Dispatcher filters pocket_veto follows (suppress new_document/new_amendment events)
- Dispatcher adds ntfy Actions header for pocket_boost follows

Change-driven (milestone) Alerts:
- New notification_utils.py with shared emit helpers and 30-min dedup
- congress_poller emits bill_updated events on milestone action text
- llm_processor replaced with shared emit util (also notifies member/topic followers)

Notification Enhancements:
- ntfy priority levels (high for bill_updated, default for others)
- Quiet hours (UTC): dispatcher holds events outside allowed window
- Digest mode (daily/weekly): send_notification_digest Celery beat task
- Notification history endpoint + Recent Alerts UI section
- Enriched following page (bill titles, member photos/details via sub-components)
- Follow mode test buttons in admin settings panel

Infrastructure:
- nginx: switch upstream blocks to set $variable proxy_pass so Docker DNS
  re-resolves upstream IPs after container rebuilds (valid=10s)
- TROUBLESHOOTING.md documenting common Docker/nginx/postgres gotchas

Authored-By: Jack Levy
2026-03-01 15:09:13 -05:00

282 lines
9.8 KiB
Python

"""
Notification dispatcher — sends pending notification events via ntfy.
RSS is pull-based so no dispatch is needed for it; events are simply
marked dispatched once ntfy is sent (or immediately if the user has no
ntfy configured but has an RSS token, so the feed can clean up old items).
Runs every 5 minutes on Celery Beat.
"""
import base64
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import requests
from app.database import get_sync_db
from app.models.follow import Follow
from app.models.notification import NotificationEvent
from app.models.user import User
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 10
_EVENT_TITLES = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
}
_EVENT_TAGS = {
"new_document": "page_facing_up",
"new_amendment": "memo",
"bill_updated": "rotating_light",
}
# Milestone events are more urgent than LLM brief events
_EVENT_PRIORITY = {
"bill_updated": "high",
"new_document": "default",
"new_amendment": "default",
}
def _in_quiet_hours(prefs: dict, now: datetime) -> bool:
"""Return True if the current UTC hour falls within the user's quiet window."""
start = prefs.get("quiet_hours_start")
end = prefs.get("quiet_hours_end")
if start is None or end is None:
return False
h = now.hour
if start <= end:
return start <= h < end
# Wraps midnight (e.g. 22 → 8)
return h >= start or h < end
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
def dispatch_notifications(self):
"""Fan out pending notification events to ntfy and mark dispatched."""
db = get_sync_db()
try:
pending = (
db.query(NotificationEvent)
.filter(NotificationEvent.dispatched_at.is_(None))
.order_by(NotificationEvent.created_at)
.limit(200)
.all()
)
sent = 0
failed = 0
held = 0
now = datetime.now(timezone.utc)
for event in pending:
user = db.get(User, event.user_id)
if not user:
event.dispatched_at = now
db.commit()
continue
# Look up follow mode for this (user, bill) pair
follow = db.query(Follow).filter_by(
user_id=event.user_id, follow_type="bill", follow_value=event.bill_id
).first()
follow_mode = follow.follow_mode if follow else "neutral"
# Pocket Veto: only milestone (bill_updated) events; skip LLM brief events
if follow_mode == "pocket_veto" and event.event_type in ("new_document", "new_amendment"):
event.dispatched_at = now
db.commit()
continue
prefs = user.notification_prefs or {}
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
ntfy_enabled = prefs.get("ntfy_enabled", False)
rss_enabled = prefs.get("rss_enabled", False)
digest_enabled = prefs.get("digest_enabled", False)
ntfy_configured = ntfy_enabled and bool(ntfy_url)
# Hold events when ntfy is configured but delivery should be deferred
in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False
hold = ntfy_configured and (in_quiet or digest_enabled)
if hold:
held += 1
continue # Leave undispatched — digest task or next run after quiet hours
if ntfy_configured:
try:
_send_ntfy(
event, ntfy_url, ntfy_auth_method, ntfy_token,
ntfy_username, ntfy_password, follow_mode=follow_mode,
)
sent += 1
except Exception as e:
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
failed += 1
# Mark dispatched: ntfy was attempted, or user has no ntfy (RSS-only or neither)
event.dispatched_at = now
db.commit()
logger.info(
f"dispatch_notifications: {sent} sent, {failed} failed, "
f"{held} held (quiet hours/digest), {len(pending)} total pending"
)
return {"sent": sent, "failed": failed, "held": held, "total": len(pending)}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest")
def send_notification_digest(self):
"""
Send a bundled ntfy digest for users with digest mode enabled.
Runs daily; weekly-frequency users only receive on Mondays.
"""
db = get_sync_db()
try:
now = datetime.now(timezone.utc)
users = db.query(User).all()
digest_users = [
u for u in users
if (u.notification_prefs or {}).get("digest_enabled", False)
and (u.notification_prefs or {}).get("ntfy_enabled", False)
and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip()
]
sent = 0
for user in digest_users:
prefs = user.notification_prefs or {}
frequency = prefs.get("digest_frequency", "daily")
# Weekly digests only fire on Mondays (weekday 0)
if frequency == "weekly" and now.weekday() != 0:
continue
lookback_hours = 168 if frequency == "weekly" else 24
cutoff = now - timedelta(hours=lookback_hours)
events = (
db.query(NotificationEvent)
.filter_by(user_id=user.id)
.filter(
NotificationEvent.dispatched_at.is_(None),
NotificationEvent.created_at > cutoff,
)
.order_by(NotificationEvent.created_at.desc())
.all()
)
if not events:
continue
try:
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
_send_digest_ntfy(events, ntfy_url, prefs)
for event in events:
event.dispatched_at = now
db.commit()
sent += 1
except Exception as e:
logger.warning(f"Digest send failed for user {user.id}: {e}")
logger.info(f"send_notification_digest: digests sent to {sent} users")
return {"sent": sent}
finally:
db.close()
def _send_ntfy(
event: NotificationEvent,
topic_url: str,
auth_method: str = "none",
token: str = "",
username: str = "",
password: str = "",
follow_mode: str = "neutral",
) -> None:
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
title = f"{event_label}: {bill_label}"
lines = [bill_title] if bill_title else []
if payload.get("brief_summary"):
lines.append("")
lines.append(payload["brief_summary"][:300])
message = "\n".join(lines) or bill_label
headers = {
"Title": title,
"Priority": _EVENT_PRIORITY.get(event.event_type, "default"),
"Tags": _EVENT_TAGS.get(event.event_type, "bell"),
}
if payload.get("bill_url"):
headers["Click"] = payload["bill_url"]
if follow_mode == "pocket_boost":
headers["Actions"] = (
f"view, View Bill, {payload.get('bill_url', '')}; "
"view, Find Your Rep, https://www.house.gov/representatives/find-your-representative"
)
if auth_method == "token" and token:
headers["Authorization"] = f"Bearer {token}"
elif auth_method == "basic" and username:
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()
def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None:
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
headers = {
"Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}",
"Priority": "default",
"Tags": "newspaper",
}
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
# Group by bill, show up to 10
by_bill: dict = defaultdict(list)
for event in events:
by_bill[event.bill_id].append(event)
lines = []
for bill_id, bill_events in list(by_bill.items())[:10]:
payload = bill_events[0].payload or {}
bill_label = payload.get("bill_label", bill_id.upper())
event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events})
lines.append(f"{bill_label}: {', '.join(event_labels)}")
if len(by_bill) > 10:
lines.append(f" …and {len(by_bill) - 10} more bills")
message = "\n".join(lines)
resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()