""" Notification dispatcher — sends pending notification events via ntfy. RSS is pull-based so no dispatch is needed for it; events are simply marked dispatched once ntfy is sent (or immediately if the user has no ntfy configured but has an RSS token, so the feed can clean up old items). Runs every 5 minutes on Celery Beat. """ import base64 import logging from collections import defaultdict from datetime import datetime, timedelta, timezone import requests from app.database import get_sync_db from app.models.follow import Follow from app.models.notification import NotificationEvent from app.models.user import User from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) NTFY_TIMEOUT = 10 _EVENT_TITLES = { "new_document": "New Bill Text", "new_amendment": "Amendment Filed", "bill_updated": "Bill Updated", "weekly_digest": "Weekly Digest", } _EVENT_TAGS = { "new_document": "page_facing_up", "new_amendment": "memo", "bill_updated": "rotating_light", } # Milestone events are more urgent than LLM brief events _EVENT_PRIORITY = { "bill_updated": "high", "new_document": "default", "new_amendment": "default", } _FILTER_DEFAULTS = { "new_document": False, "new_amendment": False, "vote": False, "presidential": False, "committee_report": False, "calendar": False, "procedural": False, "referral": False, } def _should_dispatch(event, prefs: dict, follow_mode: str = "neutral") -> bool: payload = event.payload or {} source = payload.get("source", "bill_follow") # Map event type directly for document events if event.event_type == "new_document": key = "new_document" elif event.event_type == "new_amendment": key = "new_amendment" else: # Use action_category if present (new events), fall back from milestone_tier (old events) key = payload.get("action_category") if not key: key = "referral" if payload.get("milestone_tier") == "referral" else "vote" all_filters = prefs.get("alert_filters") if all_filters is None: return True # user hasn't configured filters yet — send everything if source in ("member_follow", "topic_follow"): source_filters = all_filters.get(source) if source_filters is None: return True # section not configured — send everything if not source_filters.get("enabled", True): return False # master toggle off # Per-entity mute checks if source == "member_follow": muted_ids = source_filters.get("muted_ids") or [] if payload.get("matched_member_id") in muted_ids: return False if source == "topic_follow": muted_tags = source_filters.get("muted_tags") or [] if payload.get("matched_topic") in muted_tags: return False return bool(source_filters.get(key, _FILTER_DEFAULTS.get(key, True))) # Bill follow — use follow mode filters (existing behaviour) mode_filters = all_filters.get(follow_mode) or {} return bool(mode_filters.get(key, _FILTER_DEFAULTS.get(key, True))) def _in_quiet_hours(prefs: dict, now: datetime) -> bool: """Return True if the current local time falls within the user's quiet window. Quiet hours are stored as local-time hour integers. If the user has a stored IANA timezone name we convert ``now`` (UTC) to that zone before comparing. Falls back to UTC if the timezone is absent or unrecognised. """ start = prefs.get("quiet_hours_start") end = prefs.get("quiet_hours_end") if start is None or end is None: return False tz_name = prefs.get("timezone") if tz_name: try: from zoneinfo import ZoneInfo h = now.astimezone(ZoneInfo(tz_name)).hour except Exception: h = now.hour # unrecognised timezone — degrade gracefully to UTC else: h = now.hour if start <= end: return start <= h < end # Wraps midnight (e.g. 22 → 8) return h >= start or h < end @celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications") def dispatch_notifications(self): """Fan out pending notification events to ntfy and mark dispatched.""" db = get_sync_db() try: pending = ( db.query(NotificationEvent) .filter(NotificationEvent.dispatched_at.is_(None)) .order_by(NotificationEvent.created_at) .limit(200) .all() ) sent = 0 failed = 0 held = 0 now = datetime.now(timezone.utc) for event in pending: user = db.get(User, event.user_id) if not user: event.dispatched_at = now db.commit() continue # Look up follow mode for this (user, bill) pair follow = db.query(Follow).filter_by( user_id=event.user_id, follow_type="bill", follow_value=event.bill_id ).first() follow_mode = follow.follow_mode if follow else "neutral" prefs = user.notification_prefs or {} if not _should_dispatch(event, prefs, follow_mode): event.dispatched_at = now db.commit() continue ntfy_url = prefs.get("ntfy_topic_url", "").strip() ntfy_auth_method = prefs.get("ntfy_auth_method", "none") ntfy_token = prefs.get("ntfy_token", "").strip() ntfy_username = prefs.get("ntfy_username", "").strip() ntfy_password = prefs.get("ntfy_password", "").strip() ntfy_enabled = prefs.get("ntfy_enabled", False) rss_enabled = prefs.get("rss_enabled", False) digest_enabled = prefs.get("digest_enabled", False) ntfy_configured = ntfy_enabled and bool(ntfy_url) # Hold events when ntfy is configured but delivery should be deferred in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False hold = ntfy_configured and (in_quiet or digest_enabled) if hold: held += 1 continue # Leave undispatched — digest task or next run after quiet hours if ntfy_configured: try: _send_ntfy( event, ntfy_url, ntfy_auth_method, ntfy_token, ntfy_username, ntfy_password, follow_mode=follow_mode, ) sent += 1 except Exception as e: logger.warning(f"ntfy dispatch failed for event {event.id}: {e}") failed += 1 email_enabled = prefs.get("email_enabled", False) email_address = prefs.get("email_address", "").strip() if email_enabled and email_address: try: _send_email(event, email_address, unsubscribe_token=user.email_unsubscribe_token) sent += 1 except Exception as e: logger.warning(f"email dispatch failed for event {event.id}: {e}") failed += 1 # Mark dispatched: channels were attempted, or user has no channels configured (RSS-only) event.dispatched_at = now db.commit() logger.info( f"dispatch_notifications: {sent} sent, {failed} failed, " f"{held} held (quiet hours/digest), {len(pending)} total pending" ) return {"sent": sent, "failed": failed, "held": held, "total": len(pending)} finally: db.close() @celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest") def send_notification_digest(self): """ Send a bundled ntfy digest for users with digest mode enabled. Runs daily; weekly-frequency users only receive on Mondays. """ db = get_sync_db() try: now = datetime.now(timezone.utc) users = db.query(User).all() digest_users = [ u for u in users if (u.notification_prefs or {}).get("digest_enabled", False) and (u.notification_prefs or {}).get("ntfy_enabled", False) and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip() ] sent = 0 for user in digest_users: prefs = user.notification_prefs or {} frequency = prefs.get("digest_frequency", "daily") # Weekly digests only fire on Mondays (weekday 0) if frequency == "weekly" and now.weekday() != 0: continue lookback_hours = 168 if frequency == "weekly" else 24 cutoff = now - timedelta(hours=lookback_hours) events = ( db.query(NotificationEvent) .filter_by(user_id=user.id) .filter( NotificationEvent.dispatched_at.is_(None), NotificationEvent.created_at > cutoff, ) .order_by(NotificationEvent.created_at.desc()) .all() ) if not events: continue try: ntfy_url = prefs.get("ntfy_topic_url", "").strip() _send_digest_ntfy(events, ntfy_url, prefs) for event in events: event.dispatched_at = now db.commit() sent += 1 except Exception as e: logger.warning(f"Digest send failed for user {user.id}: {e}") logger.info(f"send_notification_digest: digests sent to {sent} users") return {"sent": sent} finally: db.close() def _build_reason(payload: dict) -> str | None: source = payload.get("source", "bill_follow") mode_labels = {"pocket_veto": "Pocket Veto", "pocket_boost": "Pocket Boost", "neutral": "Following"} if source == "bill_follow": mode = payload.get("follow_mode", "neutral") return f"\U0001f4cc {mode_labels.get(mode, 'Following')} this bill" if source == "member_follow": name = payload.get("matched_member_name") return f"\U0001f464 You follow {name}" if name else "\U0001f464 Member you follow" if source == "topic_follow": topic = payload.get("matched_topic") return f"\U0001f3f7 You follow \"{topic}\"" if topic else "\U0001f3f7 Topic you follow" return None def _send_email( event: NotificationEvent, email_address: str, unsubscribe_token: str | None = None, ) -> None: """Send a plain-text email notification via SMTP.""" import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from app.config import settings as app_settings if not app_settings.SMTP_HOST or not email_address: return payload = event.payload or {} bill_label = payload.get("bill_label", event.bill_id.upper()) bill_title = payload.get("bill_title", "") event_label = _EVENT_TITLES.get(event.event_type, "Bill Update") base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/") subject = f"PocketVeto: {event_label} — {bill_label}" lines = [f"{event_label}: {bill_label}"] if bill_title: lines.append(bill_title) if payload.get("brief_summary"): lines.append("") lines.append(payload["brief_summary"][:500]) reason = _build_reason(payload) if reason: lines.append("") lines.append(reason) if payload.get("bill_url"): lines.append("") lines.append(f"View bill: {payload['bill_url']}") unsubscribe_url = f"{base_url}/api/notifications/unsubscribe/{unsubscribe_token}" if unsubscribe_token else None if unsubscribe_url: lines.append("") lines.append(f"Unsubscribe from email alerts: {unsubscribe_url}") body = "\n".join(lines) from_addr = app_settings.SMTP_FROM or app_settings.SMTP_USER msg = MIMEMultipart() msg["Subject"] = subject msg["From"] = from_addr msg["To"] = email_address if unsubscribe_url: msg["List-Unsubscribe"] = f"<{unsubscribe_url}>" msg["List-Unsubscribe-Post"] = "List-Unsubscribe=One-Click" msg.attach(MIMEText(body, "plain", "utf-8")) with smtplib.SMTP(app_settings.SMTP_HOST, app_settings.SMTP_PORT, timeout=10) as s: if app_settings.SMTP_STARTTLS: s.starttls() if app_settings.SMTP_USER: s.login(app_settings.SMTP_USER, app_settings.SMTP_PASSWORD) s.sendmail(from_addr, [email_address], msg.as_string()) def _send_ntfy( event: NotificationEvent, topic_url: str, auth_method: str = "none", token: str = "", username: str = "", password: str = "", follow_mode: str = "neutral", ) -> None: payload = event.payload or {} bill_label = payload.get("bill_label", event.bill_id.upper()) bill_title = payload.get("bill_title", "") event_label = _EVENT_TITLES.get(event.event_type, "Bill Update") title = f"{event_label}: {bill_label}" lines = [bill_title] if bill_title else [] if payload.get("brief_summary"): lines.append("") lines.append(payload["brief_summary"][:300]) reason = _build_reason(payload) if reason: lines.append("") lines.append(reason) message = "\n".join(lines) or bill_label headers = { "Title": title, "Priority": _EVENT_PRIORITY.get(event.event_type, "default"), "Tags": _EVENT_TAGS.get(event.event_type, "bell"), } if payload.get("bill_url"): headers["Click"] = payload["bill_url"] if follow_mode == "pocket_boost": headers["Actions"] = ( f"view, View Bill, {payload.get('bill_url', '')}; " "view, Find Your Rep, https://www.house.gov/representatives/find-your-representative" ) if auth_method == "token" and token: headers["Authorization"] = f"Bearer {token}" elif auth_method == "basic" and username: creds = base64.b64encode(f"{username}:{password}".encode()).decode() headers["Authorization"] = f"Basic {creds}" resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) resp.raise_for_status() @celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_weekly_digest") def send_weekly_digest(self): """ Proactive week-in-review summary for followed bills. Runs every Monday at 8:30 AM UTC. Queries bills followed by each user for any activity in the past 7 days and dispatches a low-noise summary via ntfy and/or creates a NotificationEvent for the RSS feed. Unlike send_notification_digest (which bundles queued events), this task generates a fresh summary independent of the notification event queue. """ from app.config import settings as app_settings from app.models.bill import Bill db = get_sync_db() try: now = datetime.now(timezone.utc) cutoff = now - timedelta(days=7) base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/") users = db.query(User).all() ntfy_sent = 0 rss_created = 0 for user in users: prefs = user.notification_prefs or {} ntfy_enabled = prefs.get("ntfy_enabled", False) ntfy_url = prefs.get("ntfy_topic_url", "").strip() rss_enabled = prefs.get("rss_enabled", False) ntfy_configured = ntfy_enabled and bool(ntfy_url) if not ntfy_configured and not rss_enabled: continue bill_follows = db.query(Follow).filter_by( user_id=user.id, follow_type="bill" ).all() if not bill_follows: continue bill_ids = [f.follow_value for f in bill_follows] active_bills = ( db.query(Bill) .filter( Bill.bill_id.in_(bill_ids), Bill.updated_at >= cutoff, ) .order_by(Bill.updated_at.desc()) .limit(20) .all() ) if not active_bills: continue count = len(active_bills) anchor = active_bills[0] summary_lines = [] for bill in active_bills[:10]: lbl = _format_bill_label(bill) action = (bill.latest_action_text or "")[:80] summary_lines.append(f"• {lbl}: {action}" if action else f"• {lbl}") if count > 10: summary_lines.append(f" …and {count - 10} more") summary = "\n".join(summary_lines) # Mark dispatched_at immediately so dispatch_notifications skips this event; # it still appears in the RSS feed since that endpoint reads all events. event = NotificationEvent( user_id=user.id, bill_id=anchor.bill_id, event_type="weekly_digest", dispatched_at=now, payload={ "bill_label": "Weekly Digest", "bill_title": f"{count} followed bill{'s' if count != 1 else ''} had activity this week", "brief_summary": summary, "bill_count": count, "bill_url": f"{base_url}/bills/{anchor.bill_id}", }, ) db.add(event) rss_created += 1 if ntfy_configured: try: _send_weekly_digest_ntfy(count, summary, ntfy_url, prefs) ntfy_sent += 1 except Exception as e: logger.warning(f"Weekly digest ntfy failed for user {user.id}: {e}") db.commit() logger.info(f"send_weekly_digest: {ntfy_sent} ntfy sent, {rss_created} events created") return {"ntfy_sent": ntfy_sent, "rss_created": rss_created} finally: db.close() def _format_bill_label(bill) -> str: _TYPE_MAP = { "hr": "H.R.", "s": "S.", "hjres": "H.J.Res.", "sjres": "S.J.Res.", "hconres": "H.Con.Res.", "sconres": "S.Con.Res.", "hres": "H.Res.", "sres": "S.Res.", } prefix = _TYPE_MAP.get(bill.bill_type.lower(), bill.bill_type.upper()) return f"{prefix} {bill.bill_number}" def _send_weekly_digest_ntfy(count: int, summary: str, ntfy_url: str, prefs: dict) -> None: auth_method = prefs.get("ntfy_auth_method", "none") ntfy_token = prefs.get("ntfy_token", "").strip() ntfy_username = prefs.get("ntfy_username", "").strip() ntfy_password = prefs.get("ntfy_password", "").strip() headers = { "Title": f"PocketVeto Weekly — {count} bill{'s' if count != 1 else ''} updated", "Priority": "low", "Tags": "newspaper,calendar", } if auth_method == "token" and ntfy_token: headers["Authorization"] = f"Bearer {ntfy_token}" elif auth_method == "basic" and ntfy_username: creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode() headers["Authorization"] = f"Basic {creds}" resp = requests.post(ntfy_url, data=summary.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) resp.raise_for_status() def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None: auth_method = prefs.get("ntfy_auth_method", "none") ntfy_token = prefs.get("ntfy_token", "").strip() ntfy_username = prefs.get("ntfy_username", "").strip() ntfy_password = prefs.get("ntfy_password", "").strip() headers = { "Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}", "Priority": "default", "Tags": "newspaper", } if auth_method == "token" and ntfy_token: headers["Authorization"] = f"Bearer {ntfy_token}" elif auth_method == "basic" and ntfy_username: creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode() headers["Authorization"] = f"Basic {creds}" # Group by bill, show up to 10 by_bill: dict = defaultdict(list) for event in events: by_bill[event.bill_id].append(event) lines = [] for bill_id, bill_events in list(by_bill.items())[:10]: payload = bill_events[0].payload or {} bill_label = payload.get("bill_label", bill_id.upper()) event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events}) lines.append(f"• {bill_label}: {', '.join(event_labels)}") if len(by_bill) > 10: lines.append(f" …and {len(by_bill) - 10} more bills") message = "\n".join(lines) resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) resp.raise_for_status()