Files
PocketVeto/backend/app/workers/notification_dispatcher.py
Jack Levy d8c1c99b9c fix: send referral notifications to direct bill/member followers
The dispatcher was suppressing all referral-tier events (committee
referrals) for neutral-mode users, regardless of whether they
directly followed a bill or just followed a topic. This meant
directly-followed bills like HR 7711 and S 3853 showed ✓ in
Recent Alerts but no ntfy notification was ever fired.

Now only topic-follow referral events are suppressed for neutral
users (topic follows are loose and noisy). Direct bill follows and
member follows always receive referral events.

Authored-By: Jack Levy
2026-03-02 16:00:03 -05:00

439 lines
16 KiB
Python

"""
Notification dispatcher — sends pending notification events via ntfy.
RSS is pull-based so no dispatch is needed for it; events are simply
marked dispatched once ntfy is sent (or immediately if the user has no
ntfy configured but has an RSS token, so the feed can clean up old items).
Runs every 5 minutes on Celery Beat.
"""
import base64
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import requests
from app.database import get_sync_db
from app.models.follow import Follow
from app.models.notification import NotificationEvent
from app.models.user import User
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 10
_EVENT_TITLES = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
"weekly_digest": "Weekly Digest",
}
_EVENT_TAGS = {
"new_document": "page_facing_up",
"new_amendment": "memo",
"bill_updated": "rotating_light",
}
# Milestone events are more urgent than LLM brief events
_EVENT_PRIORITY = {
"bill_updated": "high",
"new_document": "default",
"new_amendment": "default",
}
def _in_quiet_hours(prefs: dict, now: datetime) -> bool:
"""Return True if the current local time falls within the user's quiet window.
Quiet hours are stored as local-time hour integers. If the user has a stored
IANA timezone name we convert ``now`` (UTC) to that zone before comparing.
Falls back to UTC if the timezone is absent or unrecognised.
"""
start = prefs.get("quiet_hours_start")
end = prefs.get("quiet_hours_end")
if start is None or end is None:
return False
tz_name = prefs.get("timezone")
if tz_name:
try:
from zoneinfo import ZoneInfo
h = now.astimezone(ZoneInfo(tz_name)).hour
except Exception:
h = now.hour # unrecognised timezone — degrade gracefully to UTC
else:
h = now.hour
if start <= end:
return start <= h < end
# Wraps midnight (e.g. 22 → 8)
return h >= start or h < end
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
def dispatch_notifications(self):
"""Fan out pending notification events to ntfy and mark dispatched."""
db = get_sync_db()
try:
pending = (
db.query(NotificationEvent)
.filter(NotificationEvent.dispatched_at.is_(None))
.order_by(NotificationEvent.created_at)
.limit(200)
.all()
)
sent = 0
failed = 0
held = 0
now = datetime.now(timezone.utc)
for event in pending:
user = db.get(User, event.user_id)
if not user:
event.dispatched_at = now
db.commit()
continue
# Look up follow mode for this (user, bill) pair
follow = db.query(Follow).filter_by(
user_id=event.user_id, follow_type="bill", follow_value=event.bill_id
).first()
follow_mode = follow.follow_mode if follow else "neutral"
# Pocket Veto: only milestone (bill_updated) events; skip LLM brief events
if follow_mode == "pocket_veto" and event.event_type in ("new_document", "new_amendment"):
event.dispatched_at = now
db.commit()
continue
# Referral-tier events (committee referrals) are noisy for loose topic follows;
# suppress them only for topic-follow events so direct bill/member followers
# still get notified when their bill is referred to committee.
payload = event.payload or {}
is_topic_follow = payload.get("source") == "topic_follow"
if follow_mode == "neutral" and payload.get("milestone_tier") == "referral" and is_topic_follow:
event.dispatched_at = now
db.commit()
continue
prefs = user.notification_prefs or {}
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
ntfy_enabled = prefs.get("ntfy_enabled", False)
rss_enabled = prefs.get("rss_enabled", False)
digest_enabled = prefs.get("digest_enabled", False)
ntfy_configured = ntfy_enabled and bool(ntfy_url)
# Hold events when ntfy is configured but delivery should be deferred
in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False
hold = ntfy_configured and (in_quiet or digest_enabled)
if hold:
held += 1
continue # Leave undispatched — digest task or next run after quiet hours
if ntfy_configured:
try:
_send_ntfy(
event, ntfy_url, ntfy_auth_method, ntfy_token,
ntfy_username, ntfy_password, follow_mode=follow_mode,
)
sent += 1
except Exception as e:
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
failed += 1
# Mark dispatched: ntfy was attempted, or user has no ntfy (RSS-only or neither)
event.dispatched_at = now
db.commit()
logger.info(
f"dispatch_notifications: {sent} sent, {failed} failed, "
f"{held} held (quiet hours/digest), {len(pending)} total pending"
)
return {"sent": sent, "failed": failed, "held": held, "total": len(pending)}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest")
def send_notification_digest(self):
"""
Send a bundled ntfy digest for users with digest mode enabled.
Runs daily; weekly-frequency users only receive on Mondays.
"""
db = get_sync_db()
try:
now = datetime.now(timezone.utc)
users = db.query(User).all()
digest_users = [
u for u in users
if (u.notification_prefs or {}).get("digest_enabled", False)
and (u.notification_prefs or {}).get("ntfy_enabled", False)
and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip()
]
sent = 0
for user in digest_users:
prefs = user.notification_prefs or {}
frequency = prefs.get("digest_frequency", "daily")
# Weekly digests only fire on Mondays (weekday 0)
if frequency == "weekly" and now.weekday() != 0:
continue
lookback_hours = 168 if frequency == "weekly" else 24
cutoff = now - timedelta(hours=lookback_hours)
events = (
db.query(NotificationEvent)
.filter_by(user_id=user.id)
.filter(
NotificationEvent.dispatched_at.is_(None),
NotificationEvent.created_at > cutoff,
)
.order_by(NotificationEvent.created_at.desc())
.all()
)
if not events:
continue
try:
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
_send_digest_ntfy(events, ntfy_url, prefs)
for event in events:
event.dispatched_at = now
db.commit()
sent += 1
except Exception as e:
logger.warning(f"Digest send failed for user {user.id}: {e}")
logger.info(f"send_notification_digest: digests sent to {sent} users")
return {"sent": sent}
finally:
db.close()
def _send_ntfy(
event: NotificationEvent,
topic_url: str,
auth_method: str = "none",
token: str = "",
username: str = "",
password: str = "",
follow_mode: str = "neutral",
) -> None:
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
title = f"{event_label}: {bill_label}"
lines = [bill_title] if bill_title else []
if payload.get("brief_summary"):
lines.append("")
lines.append(payload["brief_summary"][:300])
message = "\n".join(lines) or bill_label
headers = {
"Title": title,
"Priority": _EVENT_PRIORITY.get(event.event_type, "default"),
"Tags": _EVENT_TAGS.get(event.event_type, "bell"),
}
if payload.get("bill_url"):
headers["Click"] = payload["bill_url"]
if follow_mode == "pocket_boost":
headers["Actions"] = (
f"view, View Bill, {payload.get('bill_url', '')}; "
"view, Find Your Rep, https://www.house.gov/representatives/find-your-representative"
)
if auth_method == "token" and token:
headers["Authorization"] = f"Bearer {token}"
elif auth_method == "basic" and username:
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_weekly_digest")
def send_weekly_digest(self):
"""
Proactive week-in-review summary for followed bills.
Runs every Monday at 8:30 AM UTC. Queries bills followed by each user
for any activity in the past 7 days and dispatches a low-noise summary
via ntfy and/or creates a NotificationEvent for the RSS feed.
Unlike send_notification_digest (which bundles queued events), this task
generates a fresh summary independent of the notification event queue.
"""
from app.config import settings as app_settings
from app.models.bill import Bill
db = get_sync_db()
try:
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=7)
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
users = db.query(User).all()
ntfy_sent = 0
rss_created = 0
for user in users:
prefs = user.notification_prefs or {}
ntfy_enabled = prefs.get("ntfy_enabled", False)
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
rss_enabled = prefs.get("rss_enabled", False)
ntfy_configured = ntfy_enabled and bool(ntfy_url)
if not ntfy_configured and not rss_enabled:
continue
bill_follows = db.query(Follow).filter_by(
user_id=user.id, follow_type="bill"
).all()
if not bill_follows:
continue
bill_ids = [f.follow_value for f in bill_follows]
active_bills = (
db.query(Bill)
.filter(
Bill.bill_id.in_(bill_ids),
Bill.updated_at >= cutoff,
)
.order_by(Bill.updated_at.desc())
.limit(20)
.all()
)
if not active_bills:
continue
count = len(active_bills)
anchor = active_bills[0]
summary_lines = []
for bill in active_bills[:10]:
lbl = _format_bill_label(bill)
action = (bill.latest_action_text or "")[:80]
summary_lines.append(f"{lbl}: {action}" if action else f"{lbl}")
if count > 10:
summary_lines.append(f" …and {count - 10} more")
summary = "\n".join(summary_lines)
# Mark dispatched_at immediately so dispatch_notifications skips this event;
# it still appears in the RSS feed since that endpoint reads all events.
event = NotificationEvent(
user_id=user.id,
bill_id=anchor.bill_id,
event_type="weekly_digest",
dispatched_at=now,
payload={
"bill_label": "Weekly Digest",
"bill_title": f"{count} followed bill{'s' if count != 1 else ''} had activity this week",
"brief_summary": summary,
"bill_count": count,
"bill_url": f"{base_url}/bills/{anchor.bill_id}",
},
)
db.add(event)
rss_created += 1
if ntfy_configured:
try:
_send_weekly_digest_ntfy(count, summary, ntfy_url, prefs)
ntfy_sent += 1
except Exception as e:
logger.warning(f"Weekly digest ntfy failed for user {user.id}: {e}")
db.commit()
logger.info(f"send_weekly_digest: {ntfy_sent} ntfy sent, {rss_created} events created")
return {"ntfy_sent": ntfy_sent, "rss_created": rss_created}
finally:
db.close()
def _format_bill_label(bill) -> str:
_TYPE_MAP = {
"hr": "H.R.", "s": "S.", "hjres": "H.J.Res.", "sjres": "S.J.Res.",
"hconres": "H.Con.Res.", "sconres": "S.Con.Res.", "hres": "H.Res.", "sres": "S.Res.",
}
prefix = _TYPE_MAP.get(bill.bill_type.lower(), bill.bill_type.upper())
return f"{prefix} {bill.bill_number}"
def _send_weekly_digest_ntfy(count: int, summary: str, ntfy_url: str, prefs: dict) -> None:
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
headers = {
"Title": f"PocketVeto Weekly — {count} bill{'s' if count != 1 else ''} updated",
"Priority": "low",
"Tags": "newspaper,calendar",
}
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(ntfy_url, data=summary.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()
def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None:
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
headers = {
"Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}",
"Priority": "default",
"Tags": "newspaper",
}
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
# Group by bill, show up to 10
by_bill: dict = defaultdict(list)
for event in events:
by_bill[event.bill_id].append(event)
lines = []
for bill_id, bill_events in list(by_bill.items())[:10]:
payload = bill_events[0].payload or {}
bill_label = payload.get("bill_label", bill_id.upper())
event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events})
lines.append(f"{bill_label}: {', '.join(event_labels)}")
if len(by_bill) > 10:
lines.append(f" …and {len(by_bill) - 10} more bills")
message = "\n".join(lines)
resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()