Follow Modes (neutral / pocket_veto / pocket_boost):
- Alembic migration 0013 adds follow_mode column to follows table
- FollowButton rewritten as mode-aware dropdown for bills; simple toggle for members/topics
- PATCH /api/follows/{id}/mode endpoint with validation
- Dispatcher filters pocket_veto follows (suppress new_document/new_amendment events)
- Dispatcher adds ntfy Actions header for pocket_boost follows
Change-driven (milestone) Alerts:
- New notification_utils.py with shared emit helpers and 30-min dedup
- congress_poller emits bill_updated events on milestone action text
- llm_processor replaced with shared emit util (also notifies member/topic followers)
Notification Enhancements:
- ntfy priority levels (high for bill_updated, default for others)
- Quiet hours (UTC): dispatcher holds events outside allowed window
- Digest mode (daily/weekly): send_notification_digest Celery beat task
- Notification history endpoint + Recent Alerts UI section
- Enriched following page (bill titles, member photos/details via sub-components)
- Follow mode test buttons in admin settings panel
Infrastructure:
- nginx: switch upstream blocks to set $variable proxy_pass so Docker DNS
re-resolves upstream IPs after container rebuilds (valid=10s)
- TROUBLESHOOTING.md documenting common Docker/nginx/postgres gotchas
Authored-By: Jack Levy
282 lines
9.8 KiB
Python
282 lines
9.8 KiB
Python
"""
|
|
Notification dispatcher — sends pending notification events via ntfy.
|
|
|
|
RSS is pull-based so no dispatch is needed for it; events are simply
|
|
marked dispatched once ntfy is sent (or immediately if the user has no
|
|
ntfy configured but has an RSS token, so the feed can clean up old items).
|
|
|
|
Runs every 5 minutes on Celery Beat.
|
|
"""
|
|
import base64
|
|
import logging
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import requests
|
|
|
|
from app.database import get_sync_db
|
|
from app.models.follow import Follow
|
|
from app.models.notification import NotificationEvent
|
|
from app.models.user import User
|
|
from app.workers.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
NTFY_TIMEOUT = 10
|
|
|
|
_EVENT_TITLES = {
|
|
"new_document": "New Bill Text",
|
|
"new_amendment": "Amendment Filed",
|
|
"bill_updated": "Bill Updated",
|
|
}
|
|
|
|
_EVENT_TAGS = {
|
|
"new_document": "page_facing_up",
|
|
"new_amendment": "memo",
|
|
"bill_updated": "rotating_light",
|
|
}
|
|
|
|
# Milestone events are more urgent than LLM brief events
|
|
_EVENT_PRIORITY = {
|
|
"bill_updated": "high",
|
|
"new_document": "default",
|
|
"new_amendment": "default",
|
|
}
|
|
|
|
|
|
def _in_quiet_hours(prefs: dict, now: datetime) -> bool:
|
|
"""Return True if the current UTC hour falls within the user's quiet window."""
|
|
start = prefs.get("quiet_hours_start")
|
|
end = prefs.get("quiet_hours_end")
|
|
if start is None or end is None:
|
|
return False
|
|
h = now.hour
|
|
if start <= end:
|
|
return start <= h < end
|
|
# Wraps midnight (e.g. 22 → 8)
|
|
return h >= start or h < end
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
|
|
def dispatch_notifications(self):
|
|
"""Fan out pending notification events to ntfy and mark dispatched."""
|
|
db = get_sync_db()
|
|
try:
|
|
pending = (
|
|
db.query(NotificationEvent)
|
|
.filter(NotificationEvent.dispatched_at.is_(None))
|
|
.order_by(NotificationEvent.created_at)
|
|
.limit(200)
|
|
.all()
|
|
)
|
|
|
|
sent = 0
|
|
failed = 0
|
|
held = 0
|
|
now = datetime.now(timezone.utc)
|
|
|
|
for event in pending:
|
|
user = db.get(User, event.user_id)
|
|
if not user:
|
|
event.dispatched_at = now
|
|
db.commit()
|
|
continue
|
|
|
|
# Look up follow mode for this (user, bill) pair
|
|
follow = db.query(Follow).filter_by(
|
|
user_id=event.user_id, follow_type="bill", follow_value=event.bill_id
|
|
).first()
|
|
follow_mode = follow.follow_mode if follow else "neutral"
|
|
|
|
# Pocket Veto: only milestone (bill_updated) events; skip LLM brief events
|
|
if follow_mode == "pocket_veto" and event.event_type in ("new_document", "new_amendment"):
|
|
event.dispatched_at = now
|
|
db.commit()
|
|
continue
|
|
|
|
prefs = user.notification_prefs or {}
|
|
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
|
|
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
|
|
ntfy_token = prefs.get("ntfy_token", "").strip()
|
|
ntfy_username = prefs.get("ntfy_username", "").strip()
|
|
ntfy_password = prefs.get("ntfy_password", "").strip()
|
|
ntfy_enabled = prefs.get("ntfy_enabled", False)
|
|
rss_enabled = prefs.get("rss_enabled", False)
|
|
digest_enabled = prefs.get("digest_enabled", False)
|
|
|
|
ntfy_configured = ntfy_enabled and bool(ntfy_url)
|
|
|
|
# Hold events when ntfy is configured but delivery should be deferred
|
|
in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False
|
|
hold = ntfy_configured and (in_quiet or digest_enabled)
|
|
|
|
if hold:
|
|
held += 1
|
|
continue # Leave undispatched — digest task or next run after quiet hours
|
|
|
|
if ntfy_configured:
|
|
try:
|
|
_send_ntfy(
|
|
event, ntfy_url, ntfy_auth_method, ntfy_token,
|
|
ntfy_username, ntfy_password, follow_mode=follow_mode,
|
|
)
|
|
sent += 1
|
|
except Exception as e:
|
|
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
|
|
failed += 1
|
|
|
|
# Mark dispatched: ntfy was attempted, or user has no ntfy (RSS-only or neither)
|
|
event.dispatched_at = now
|
|
db.commit()
|
|
|
|
logger.info(
|
|
f"dispatch_notifications: {sent} sent, {failed} failed, "
|
|
f"{held} held (quiet hours/digest), {len(pending)} total pending"
|
|
)
|
|
return {"sent": sent, "failed": failed, "held": held, "total": len(pending)}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest")
|
|
def send_notification_digest(self):
|
|
"""
|
|
Send a bundled ntfy digest for users with digest mode enabled.
|
|
Runs daily; weekly-frequency users only receive on Mondays.
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
now = datetime.now(timezone.utc)
|
|
users = db.query(User).all()
|
|
digest_users = [
|
|
u for u in users
|
|
if (u.notification_prefs or {}).get("digest_enabled", False)
|
|
and (u.notification_prefs or {}).get("ntfy_enabled", False)
|
|
and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip()
|
|
]
|
|
|
|
sent = 0
|
|
for user in digest_users:
|
|
prefs = user.notification_prefs or {}
|
|
frequency = prefs.get("digest_frequency", "daily")
|
|
|
|
# Weekly digests only fire on Mondays (weekday 0)
|
|
if frequency == "weekly" and now.weekday() != 0:
|
|
continue
|
|
|
|
lookback_hours = 168 if frequency == "weekly" else 24
|
|
cutoff = now - timedelta(hours=lookback_hours)
|
|
|
|
events = (
|
|
db.query(NotificationEvent)
|
|
.filter_by(user_id=user.id)
|
|
.filter(
|
|
NotificationEvent.dispatched_at.is_(None),
|
|
NotificationEvent.created_at > cutoff,
|
|
)
|
|
.order_by(NotificationEvent.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
if not events:
|
|
continue
|
|
|
|
try:
|
|
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
|
|
_send_digest_ntfy(events, ntfy_url, prefs)
|
|
for event in events:
|
|
event.dispatched_at = now
|
|
db.commit()
|
|
sent += 1
|
|
except Exception as e:
|
|
logger.warning(f"Digest send failed for user {user.id}: {e}")
|
|
|
|
logger.info(f"send_notification_digest: digests sent to {sent} users")
|
|
return {"sent": sent}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _send_ntfy(
|
|
event: NotificationEvent,
|
|
topic_url: str,
|
|
auth_method: str = "none",
|
|
token: str = "",
|
|
username: str = "",
|
|
password: str = "",
|
|
follow_mode: str = "neutral",
|
|
) -> None:
|
|
payload = event.payload or {}
|
|
bill_label = payload.get("bill_label", event.bill_id.upper())
|
|
bill_title = payload.get("bill_title", "")
|
|
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
|
|
|
|
title = f"{event_label}: {bill_label}"
|
|
|
|
lines = [bill_title] if bill_title else []
|
|
if payload.get("brief_summary"):
|
|
lines.append("")
|
|
lines.append(payload["brief_summary"][:300])
|
|
message = "\n".join(lines) or bill_label
|
|
|
|
headers = {
|
|
"Title": title,
|
|
"Priority": _EVENT_PRIORITY.get(event.event_type, "default"),
|
|
"Tags": _EVENT_TAGS.get(event.event_type, "bell"),
|
|
}
|
|
if payload.get("bill_url"):
|
|
headers["Click"] = payload["bill_url"]
|
|
|
|
if follow_mode == "pocket_boost":
|
|
headers["Actions"] = (
|
|
f"view, View Bill, {payload.get('bill_url', '')}; "
|
|
"view, Find Your Rep, https://www.house.gov/representatives/find-your-representative"
|
|
)
|
|
|
|
if auth_method == "token" and token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
elif auth_method == "basic" and username:
|
|
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
|
|
headers["Authorization"] = f"Basic {creds}"
|
|
|
|
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
|
|
resp.raise_for_status()
|
|
|
|
|
|
def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None:
|
|
auth_method = prefs.get("ntfy_auth_method", "none")
|
|
ntfy_token = prefs.get("ntfy_token", "").strip()
|
|
ntfy_username = prefs.get("ntfy_username", "").strip()
|
|
ntfy_password = prefs.get("ntfy_password", "").strip()
|
|
|
|
headers = {
|
|
"Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}",
|
|
"Priority": "default",
|
|
"Tags": "newspaper",
|
|
}
|
|
|
|
if auth_method == "token" and ntfy_token:
|
|
headers["Authorization"] = f"Bearer {ntfy_token}"
|
|
elif auth_method == "basic" and ntfy_username:
|
|
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
|
|
headers["Authorization"] = f"Basic {creds}"
|
|
|
|
# Group by bill, show up to 10
|
|
by_bill: dict = defaultdict(list)
|
|
for event in events:
|
|
by_bill[event.bill_id].append(event)
|
|
|
|
lines = []
|
|
for bill_id, bill_events in list(by_bill.items())[:10]:
|
|
payload = bill_events[0].payload or {}
|
|
bill_label = payload.get("bill_label", bill_id.upper())
|
|
event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events})
|
|
lines.append(f"• {bill_label}: {', '.join(event_labels)}")
|
|
|
|
if len(by_bill) > 10:
|
|
lines.append(f" …and {len(by_bill) - 10} more bills")
|
|
|
|
message = "\n".join(lines)
|
|
resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
|
|
resp.raise_for_status()
|