""" Notification dispatcher — sends pending notification events via ntfy. RSS is pull-based so no dispatch is needed for it; events are simply marked dispatched once ntfy is sent (or immediately if the user has no ntfy configured but has an RSS token, so the feed can clean up old items). Runs every 5 minutes on Celery Beat. """ import base64 import logging from collections import defaultdict from datetime import datetime, timedelta, timezone import requests from app.database import get_sync_db from app.models.follow import Follow from app.models.notification import NotificationEvent from app.models.user import User from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) NTFY_TIMEOUT = 10 _EVENT_TITLES = { "new_document": "New Bill Text", "new_amendment": "Amendment Filed", "bill_updated": "Bill Updated", } _EVENT_TAGS = { "new_document": "page_facing_up", "new_amendment": "memo", "bill_updated": "rotating_light", } # Milestone events are more urgent than LLM brief events _EVENT_PRIORITY = { "bill_updated": "high", "new_document": "default", "new_amendment": "default", } def _in_quiet_hours(prefs: dict, now: datetime) -> bool: """Return True if the current UTC hour falls within the user's quiet window.""" start = prefs.get("quiet_hours_start") end = prefs.get("quiet_hours_end") if start is None or end is None: return False h = now.hour if start <= end: return start <= h < end # Wraps midnight (e.g. 22 → 8) return h >= start or h < end @celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications") def dispatch_notifications(self): """Fan out pending notification events to ntfy and mark dispatched.""" db = get_sync_db() try: pending = ( db.query(NotificationEvent) .filter(NotificationEvent.dispatched_at.is_(None)) .order_by(NotificationEvent.created_at) .limit(200) .all() ) sent = 0 failed = 0 held = 0 now = datetime.now(timezone.utc) for event in pending: user = db.get(User, event.user_id) if not user: event.dispatched_at = now db.commit() continue # Look up follow mode for this (user, bill) pair follow = db.query(Follow).filter_by( user_id=event.user_id, follow_type="bill", follow_value=event.bill_id ).first() follow_mode = follow.follow_mode if follow else "neutral" # Pocket Veto: only milestone (bill_updated) events; skip LLM brief events if follow_mode == "pocket_veto" and event.event_type in ("new_document", "new_amendment"): event.dispatched_at = now db.commit() continue # Referral-tier events (committee referrals) are noisy for neutral follows; # pocket_veto and pocket_boost users want them as early warnings if follow_mode == "neutral" and (event.payload or {}).get("milestone_tier") == "referral": event.dispatched_at = now db.commit() continue prefs = user.notification_prefs or {} ntfy_url = prefs.get("ntfy_topic_url", "").strip() ntfy_auth_method = prefs.get("ntfy_auth_method", "none") ntfy_token = prefs.get("ntfy_token", "").strip() ntfy_username = prefs.get("ntfy_username", "").strip() ntfy_password = prefs.get("ntfy_password", "").strip() ntfy_enabled = prefs.get("ntfy_enabled", False) rss_enabled = prefs.get("rss_enabled", False) digest_enabled = prefs.get("digest_enabled", False) ntfy_configured = ntfy_enabled and bool(ntfy_url) # Hold events when ntfy is configured but delivery should be deferred in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False hold = ntfy_configured and (in_quiet or digest_enabled) if hold: held += 1 continue # Leave undispatched — digest task or next run after quiet hours if ntfy_configured: try: _send_ntfy( event, ntfy_url, ntfy_auth_method, ntfy_token, ntfy_username, ntfy_password, follow_mode=follow_mode, ) sent += 1 except Exception as e: logger.warning(f"ntfy dispatch failed for event {event.id}: {e}") failed += 1 # Mark dispatched: ntfy was attempted, or user has no ntfy (RSS-only or neither) event.dispatched_at = now db.commit() logger.info( f"dispatch_notifications: {sent} sent, {failed} failed, " f"{held} held (quiet hours/digest), {len(pending)} total pending" ) return {"sent": sent, "failed": failed, "held": held, "total": len(pending)} finally: db.close() @celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest") def send_notification_digest(self): """ Send a bundled ntfy digest for users with digest mode enabled. Runs daily; weekly-frequency users only receive on Mondays. """ db = get_sync_db() try: now = datetime.now(timezone.utc) users = db.query(User).all() digest_users = [ u for u in users if (u.notification_prefs or {}).get("digest_enabled", False) and (u.notification_prefs or {}).get("ntfy_enabled", False) and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip() ] sent = 0 for user in digest_users: prefs = user.notification_prefs or {} frequency = prefs.get("digest_frequency", "daily") # Weekly digests only fire on Mondays (weekday 0) if frequency == "weekly" and now.weekday() != 0: continue lookback_hours = 168 if frequency == "weekly" else 24 cutoff = now - timedelta(hours=lookback_hours) events = ( db.query(NotificationEvent) .filter_by(user_id=user.id) .filter( NotificationEvent.dispatched_at.is_(None), NotificationEvent.created_at > cutoff, ) .order_by(NotificationEvent.created_at.desc()) .all() ) if not events: continue try: ntfy_url = prefs.get("ntfy_topic_url", "").strip() _send_digest_ntfy(events, ntfy_url, prefs) for event in events: event.dispatched_at = now db.commit() sent += 1 except Exception as e: logger.warning(f"Digest send failed for user {user.id}: {e}") logger.info(f"send_notification_digest: digests sent to {sent} users") return {"sent": sent} finally: db.close() def _send_ntfy( event: NotificationEvent, topic_url: str, auth_method: str = "none", token: str = "", username: str = "", password: str = "", follow_mode: str = "neutral", ) -> None: payload = event.payload or {} bill_label = payload.get("bill_label", event.bill_id.upper()) bill_title = payload.get("bill_title", "") event_label = _EVENT_TITLES.get(event.event_type, "Bill Update") title = f"{event_label}: {bill_label}" lines = [bill_title] if bill_title else [] if payload.get("brief_summary"): lines.append("") lines.append(payload["brief_summary"][:300]) message = "\n".join(lines) or bill_label headers = { "Title": title, "Priority": _EVENT_PRIORITY.get(event.event_type, "default"), "Tags": _EVENT_TAGS.get(event.event_type, "bell"), } if payload.get("bill_url"): headers["Click"] = payload["bill_url"] if follow_mode == "pocket_boost": headers["Actions"] = ( f"view, View Bill, {payload.get('bill_url', '')}; " "view, Find Your Rep, https://www.house.gov/representatives/find-your-representative" ) if auth_method == "token" and token: headers["Authorization"] = f"Bearer {token}" elif auth_method == "basic" and username: creds = base64.b64encode(f"{username}:{password}".encode()).decode() headers["Authorization"] = f"Basic {creds}" resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) resp.raise_for_status() def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None: auth_method = prefs.get("ntfy_auth_method", "none") ntfy_token = prefs.get("ntfy_token", "").strip() ntfy_username = prefs.get("ntfy_username", "").strip() ntfy_password = prefs.get("ntfy_password", "").strip() headers = { "Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}", "Priority": "default", "Tags": "newspaper", } if auth_method == "token" and ntfy_token: headers["Authorization"] = f"Bearer {ntfy_token}" elif auth_method == "basic" and ntfy_username: creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode() headers["Authorization"] = f"Basic {creds}" # Group by bill, show up to 10 by_bill: dict = defaultdict(list) for event in events: by_bill[event.bill_id].append(event) lines = [] for bill_id, bill_events in list(by_bill.items())[:10]: payload = bill_events[0].payload or {} bill_label = payload.get("bill_label", bill_id.upper()) event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events}) lines.append(f"• {bill_label}: {', '.join(event_labels)}") if len(by_bill) > 10: lines.append(f" …and {len(by_bill) - 10} more bills") message = "\n".join(lines) resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) resp.raise_for_status()