Files
PocketVeto/backend/app/workers/notification_dispatcher.py
Jack Levy 22b205ff39 feat(notifications): add Click link to all ntfy alerts (test + real)
Test notification:
- Click header -> {PUBLIC_URL}/notifications so tapping the test opens the app

Real bill alerts (dispatcher):
- Title reformatted: "New Bill Text: HR 1234" (event type + bill identifier)
- Body: bill full name on first line, AI summary below (300 chars)
- Tags updated per event type (page, memo, siren) instead of generic scroll
- Click header was already set from bill_url; no change needed there

Authored-By: Jack Levy
2026-03-01 12:19:05 -05:00

130 lines
4.3 KiB
Python

"""
Notification dispatcher — sends pending notification events via ntfy.
RSS is pull-based so no dispatch is needed for it; events are simply
marked dispatched once ntfy is sent (or immediately if the user has no
ntfy configured but has an RSS token, so the feed can clean up old items).
Runs every 5 minutes on Celery Beat.
"""
import logging
from datetime import datetime, timezone
import requests
from app.database import get_sync_db
from app.models.notification import NotificationEvent
from app.models.user import User
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 10
_EVENT_TITLES = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
}
_EVENT_TAGS = {
"new_document": "page_facing_up",
"new_amendment": "memo",
"bill_updated": "rotating_light",
}
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
def dispatch_notifications(self):
"""Fan out pending notification events to ntfy and mark dispatched."""
db = get_sync_db()
try:
pending = (
db.query(NotificationEvent)
.filter(NotificationEvent.dispatched_at.is_(None))
.order_by(NotificationEvent.created_at)
.limit(200)
.all()
)
sent = 0
failed = 0
now = datetime.now(timezone.utc)
for event in pending:
user = db.get(User, event.user_id)
if not user:
event.dispatched_at = now
db.commit()
continue
prefs = user.notification_prefs or {}
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
ntfy_enabled = prefs.get("ntfy_enabled", False)
rss_enabled = prefs.get("rss_enabled", False)
if ntfy_enabled and ntfy_url:
try:
_send_ntfy(event, ntfy_url, ntfy_auth_method, ntfy_token, ntfy_username, ntfy_password)
sent += 1
except Exception as e:
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
failed += 1
# Mark dispatched once handled by at least one enabled channel.
# RSS is pull-based — no action needed beyond creating the event record.
if (ntfy_enabled and ntfy_url) or rss_enabled:
event.dispatched_at = now
db.commit()
logger.info(f"dispatch_notifications: {sent} sent, {failed} failed, {len(pending)} pending")
return {"sent": sent, "failed": failed, "total": len(pending)}
finally:
db.close()
def _send_ntfy(
event: NotificationEvent,
topic_url: str,
auth_method: str = "none",
token: str = "",
username: str = "",
password: str = "",
) -> None:
import base64
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
# Title line: event type + bill identifier (e.g. "New Bill Text: HR 1234")
title = f"{event_label}: {bill_label}"
# Body: full bill name, then AI summary if available
lines = [bill_title] if bill_title else []
if payload.get("brief_summary"):
lines.append("")
lines.append(payload["brief_summary"][:300])
message = "\n".join(lines) or bill_label
headers = {
"Title": title,
"Priority": "default",
"Tags": _EVENT_TAGS.get(event.event_type, "bell"),
}
if payload.get("bill_url"):
headers["Click"] = payload["bill_url"]
if auth_method == "token" and token:
headers["Authorization"] = f"Bearer {token}"
elif auth_method == "basic" and username:
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()