Files
PocketVeto/backend/app/api/notifications.py
Jack Levy 4308404cca fix: use SMTP_SSL for port 465, STARTTLS for 587
Auto-detect SSL vs STARTTLS based on port number instead of always
using SMTP + starttls(), which times out on port 465 (implicit SSL).

Authored by: Jack Levy
2026-03-14 23:18:32 -04:00

465 lines
19 KiB
Python

"""
Notifications API — user notification settings and per-user RSS feed.
"""
import base64
import secrets
from xml.etree.ElementTree import Element, SubElement, tostring
import httpx
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import HTMLResponse, Response
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings as app_settings
from app.core.dependencies import get_current_user
from app.database import get_db
from app.models.notification import NotificationEvent
from app.models.user import User
from app.schemas.schemas import (
FollowModeTestRequest,
NotificationEventSchema,
NotificationSettingsResponse,
NotificationSettingsUpdate,
NotificationTestResult,
NtfyTestRequest,
)
router = APIRouter()
_EVENT_LABELS = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
"weekly_digest": "Weekly Digest",
}
def _prefs_to_response(prefs: dict, rss_token: str | None) -> NotificationSettingsResponse:
return NotificationSettingsResponse(
ntfy_topic_url=prefs.get("ntfy_topic_url", ""),
ntfy_auth_method=prefs.get("ntfy_auth_method", "none"),
ntfy_token=prefs.get("ntfy_token", ""),
ntfy_username=prefs.get("ntfy_username", ""),
ntfy_password=prefs.get("ntfy_password", ""),
ntfy_enabled=prefs.get("ntfy_enabled", False),
rss_enabled=prefs.get("rss_enabled", False),
rss_token=rss_token,
email_enabled=prefs.get("email_enabled", False),
email_address=prefs.get("email_address", ""),
digest_enabled=prefs.get("digest_enabled", False),
digest_frequency=prefs.get("digest_frequency", "daily"),
quiet_hours_start=prefs.get("quiet_hours_start"),
quiet_hours_end=prefs.get("quiet_hours_end"),
timezone=prefs.get("timezone"),
alert_filters=prefs.get("alert_filters"),
)
@router.get("/settings", response_model=NotificationSettingsResponse)
async def get_notification_settings(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
user = await db.get(User, current_user.id)
# Auto-generate RSS token on first visit so the feed URL is always available
if not user.rss_token:
user.rss_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.put("/settings", response_model=NotificationSettingsResponse)
async def update_notification_settings(
body: NotificationSettingsUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
user = await db.get(User, current_user.id)
prefs = dict(user.notification_prefs or {})
if body.ntfy_topic_url is not None:
prefs["ntfy_topic_url"] = body.ntfy_topic_url.strip()
if body.ntfy_auth_method is not None:
prefs["ntfy_auth_method"] = body.ntfy_auth_method
if body.ntfy_token is not None:
prefs["ntfy_token"] = body.ntfy_token.strip()
if body.ntfy_username is not None:
prefs["ntfy_username"] = body.ntfy_username.strip()
if body.ntfy_password is not None:
prefs["ntfy_password"] = body.ntfy_password.strip()
if body.ntfy_enabled is not None:
prefs["ntfy_enabled"] = body.ntfy_enabled
if body.rss_enabled is not None:
prefs["rss_enabled"] = body.rss_enabled
if body.email_enabled is not None:
prefs["email_enabled"] = body.email_enabled
if body.email_address is not None:
prefs["email_address"] = body.email_address.strip()
if body.digest_enabled is not None:
prefs["digest_enabled"] = body.digest_enabled
if body.digest_frequency is not None:
prefs["digest_frequency"] = body.digest_frequency
if body.quiet_hours_start is not None:
prefs["quiet_hours_start"] = body.quiet_hours_start
if body.quiet_hours_end is not None:
prefs["quiet_hours_end"] = body.quiet_hours_end
if body.timezone is not None:
prefs["timezone"] = body.timezone
if body.alert_filters is not None:
prefs["alert_filters"] = body.alert_filters
# Allow clearing quiet hours by passing -1
if body.quiet_hours_start == -1:
prefs.pop("quiet_hours_start", None)
prefs.pop("quiet_hours_end", None)
prefs.pop("timezone", None)
user.notification_prefs = prefs
if not user.rss_token:
user.rss_token = secrets.token_urlsafe(32)
# Generate unsubscribe token the first time an email address is saved
if prefs.get("email_address") and not user.email_unsubscribe_token:
user.email_unsubscribe_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.post("/settings/rss-reset", response_model=NotificationSettingsResponse)
async def reset_rss_token(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Regenerate the RSS token, invalidating the old feed URL."""
user = await db.get(User, current_user.id)
user.rss_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.post("/test/ntfy", response_model=NotificationTestResult)
async def test_ntfy(
body: NtfyTestRequest,
current_user: User = Depends(get_current_user),
):
"""Send a test push notification to verify ntfy settings."""
url = body.ntfy_topic_url.strip()
if not url:
return NotificationTestResult(status="error", detail="Topic URL is required")
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
headers: dict[str, str] = {
"Title": "PocketVeto: Test Notification",
"Priority": "default",
"Tags": "white_check_mark",
"Click": f"{base_url}/notifications",
}
if body.ntfy_auth_method == "token" and body.ntfy_token.strip():
headers["Authorization"] = f"Bearer {body.ntfy_token.strip()}"
elif body.ntfy_auth_method == "basic" and body.ntfy_username.strip():
creds = base64.b64encode(
f"{body.ntfy_username.strip()}:{body.ntfy_password}".encode()
).decode()
headers["Authorization"] = f"Basic {creds}"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url,
content=(
"Your PocketVeto notification settings are working correctly. "
"Real alerts will link directly to the relevant bill page."
).encode("utf-8"),
headers=headers,
)
resp.raise_for_status()
return NotificationTestResult(status="ok", detail=f"Test notification sent (HTTP {resp.status_code})")
except httpx.HTTPStatusError as e:
return NotificationTestResult(status="error", detail=f"HTTP {e.response.status_code}: {e.response.text[:200]}")
except httpx.RequestError as e:
return NotificationTestResult(status="error", detail=f"Connection error: {e}")
@router.post("/test/email", response_model=NotificationTestResult)
async def test_email(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Send a test email to the user's configured email address."""
import smtplib
from email.mime.text import MIMEText
user = await db.get(User, current_user.id)
prefs = user.notification_prefs or {}
email_addr = prefs.get("email_address", "").strip()
if not email_addr:
return NotificationTestResult(status="error", detail="No email address saved. Save your address first.")
if not app_settings.SMTP_HOST:
return NotificationTestResult(status="error", detail="SMTP not configured on this server. Set SMTP_HOST in .env")
try:
from_addr = app_settings.SMTP_FROM or app_settings.SMTP_USER
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
body = (
"This is a test email from PocketVeto.\n\n"
"Your email notification settings are working correctly. "
"Real alerts will include bill titles, summaries, and direct links.\n\n"
f"Visit your notifications page: {base_url}/notifications"
)
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = "PocketVeto: Test Email Notification"
msg["From"] = from_addr
msg["To"] = email_addr
use_ssl = app_settings.SMTP_PORT == 465
if use_ssl:
ctx = smtplib.SMTP_SSL(app_settings.SMTP_HOST, app_settings.SMTP_PORT, timeout=10)
else:
ctx = smtplib.SMTP(app_settings.SMTP_HOST, app_settings.SMTP_PORT, timeout=10)
with ctx as s:
if not use_ssl and app_settings.SMTP_STARTTLS:
s.starttls()
if app_settings.SMTP_USER:
s.login(app_settings.SMTP_USER, app_settings.SMTP_PASSWORD)
s.sendmail(from_addr, [email_addr], msg.as_string())
return NotificationTestResult(status="ok", detail=f"Test email sent to {email_addr}")
except smtplib.SMTPAuthenticationError:
return NotificationTestResult(status="error", detail="SMTP authentication failed — check SMTP_USER and SMTP_PASSWORD in .env")
except smtplib.SMTPConnectError:
return NotificationTestResult(status="error", detail=f"Could not connect to {app_settings.SMTP_HOST}:{app_settings.SMTP_PORT}")
except Exception as e:
return NotificationTestResult(status="error", detail=str(e))
@router.post("/test/rss", response_model=NotificationTestResult)
async def test_rss(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Verify the user's RSS feed is reachable and return its event count."""
user = await db.get(User, current_user.id)
if not user.rss_token:
return NotificationTestResult(status="error", detail="RSS token not generated — save settings first")
count_result = await db.execute(
select(NotificationEvent).where(NotificationEvent.user_id == user.id)
)
event_count = len(count_result.scalars().all())
return NotificationTestResult(
status="ok",
detail=f"RSS feed is active with {event_count} event{'s' if event_count != 1 else ''}. Subscribe to the URL shown above.",
event_count=event_count,
)
@router.get("/history", response_model=list[NotificationEventSchema])
async def get_notification_history(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Return the 50 most recent notification events for the current user."""
result = await db.execute(
select(NotificationEvent)
.where(NotificationEvent.user_id == current_user.id)
.order_by(NotificationEvent.created_at.desc())
.limit(50)
)
return result.scalars().all()
@router.post("/test/follow-mode", response_model=NotificationTestResult)
async def test_follow_mode(
body: FollowModeTestRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Simulate dispatcher behaviour for a given follow mode + event type."""
from sqlalchemy import select as sa_select
from app.models.follow import Follow
VALID_MODES = {"pocket_veto", "pocket_boost"}
VALID_EVENTS = {"new_document", "new_amendment", "bill_updated"}
if body.mode not in VALID_MODES:
return NotificationTestResult(status="error", detail=f"mode must be one of {VALID_MODES}")
if body.event_type not in VALID_EVENTS:
return NotificationTestResult(status="error", detail=f"event_type must be one of {VALID_EVENTS}")
result = await db.execute(
sa_select(Follow).where(
Follow.user_id == current_user.id,
Follow.follow_type == "bill",
).limit(1)
)
follow = result.scalar_one_or_none()
if not follow:
return NotificationTestResult(
status="error",
detail="No bill follows found — follow at least one bill first",
)
# Pocket Veto suppression: brief events are silently dropped
if body.mode == "pocket_veto" and body.event_type in ("new_document", "new_amendment"):
return NotificationTestResult(
status="ok",
detail=(
f"✓ Suppressed — Pocket Veto correctly blocked a '{body.event_type}' event. "
"No ntfy was sent (this is the expected behaviour)."
),
)
# Everything else would send ntfy — check the user has it configured
user = await db.get(User, current_user.id)
prefs = user.notification_prefs or {}
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_enabled = prefs.get("ntfy_enabled", False)
if not ntfy_enabled or not ntfy_url:
return NotificationTestResult(
status="error",
detail="ntfy not configured or disabled — enable it in Notification Settings first.",
)
bill_url = f"{(app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip('/')}/bills/{follow.follow_value}"
event_titles = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
}
mode_label = body.mode.replace("_", " ").title()
headers: dict[str, str] = {
"Title": f"[{mode_label} Test] {event_titles[body.event_type]}: {follow.follow_value.upper()}",
"Priority": "default",
"Tags": "test_tube",
"Click": bill_url,
}
if body.mode == "pocket_boost":
headers["Actions"] = (
f"view, View Bill, {bill_url}; "
"view, Find Your Rep, https://www.house.gov/representatives/find-your-representative"
)
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
message_lines = [
f"This is a test of {mode_label} mode for bill {follow.follow_value.upper()}.",
f"Event type: {event_titles[body.event_type]}",
]
if body.mode == "pocket_boost":
message_lines.append("Tap the action buttons below to view the bill or find your representative.")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
ntfy_url,
content="\n".join(message_lines).encode("utf-8"),
headers=headers,
)
resp.raise_for_status()
detail = f"✓ ntfy sent (HTTP {resp.status_code})"
if body.mode == "pocket_boost":
detail += " — check your phone for 'View Bill' and 'Find Your Rep' action buttons"
return NotificationTestResult(status="ok", detail=detail)
except httpx.HTTPStatusError as e:
return NotificationTestResult(status="error", detail=f"HTTP {e.response.status_code}: {e.response.text[:200]}")
except httpx.RequestError as e:
return NotificationTestResult(status="error", detail=f"Connection error: {e}")
@router.get("/unsubscribe/{token}", response_class=HTMLResponse, include_in_schema=False)
async def email_unsubscribe(token: str, db: AsyncSession = Depends(get_db)):
"""One-click email unsubscribe — no login required."""
result = await db.execute(
select(User).where(User.email_unsubscribe_token == token)
)
user = result.scalar_one_or_none()
if not user:
return HTMLResponse(
_unsubscribe_page("Invalid or expired link", success=False),
status_code=404,
)
prefs = dict(user.notification_prefs or {})
prefs["email_enabled"] = False
user.notification_prefs = prefs
await db.commit()
return HTMLResponse(_unsubscribe_page("You've been unsubscribed from PocketVeto email notifications.", success=True))
def _unsubscribe_page(message: str, success: bool) -> str:
color = "#16a34a" if success else "#dc2626"
icon = "" if success else ""
return f"""<!DOCTYPE html>
<html lang="en">
<head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>PocketVeto — Unsubscribe</title>
<style>
body{{font-family:system-ui,sans-serif;background:#f9fafb;display:flex;align-items:center;justify-content:center;min-height:100vh;margin:0}}
.card{{background:#fff;border:1px solid #e5e7eb;border-radius:12px;padding:2.5rem;max-width:420px;width:100%;text-align:center;box-shadow:0 1px 3px rgba(0,0,0,.08)}}
.icon{{font-size:2.5rem;color:{color};margin-bottom:1rem}}
h1{{font-size:1.1rem;font-weight:600;color:#111827;margin:0 0 .5rem}}
p{{font-size:.9rem;color:#6b7280;margin:0 0 1.5rem;line-height:1.5}}
a{{color:#2563eb;text-decoration:none;font-size:.875rem}}a:hover{{text-decoration:underline}}
</style></head>
<body><div class="card">
<div class="icon">{icon}</div>
<h1>Email Notifications</h1>
<p>{message}</p>
<a href="/">Return to PocketVeto</a>
</div></body></html>"""
@router.get("/feed/{rss_token}.xml", include_in_schema=False)
async def rss_feed(rss_token: str, db: AsyncSession = Depends(get_db)):
"""Public tokenized RSS feed — no auth required."""
result = await db.execute(select(User).where(User.rss_token == rss_token))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="Feed not found")
events_result = await db.execute(
select(NotificationEvent)
.where(NotificationEvent.user_id == user.id)
.order_by(NotificationEvent.created_at.desc())
.limit(50)
)
events = events_result.scalars().all()
return Response(content=_build_rss(events), media_type="application/rss+xml")
def _build_rss(events: list) -> bytes:
rss = Element("rss", version="2.0")
channel = SubElement(rss, "channel")
SubElement(channel, "title").text = "PocketVeto — Bill Alerts"
SubElement(channel, "description").text = "Updates on your followed bills"
SubElement(channel, "language").text = "en-us"
for event in events:
payload = event.payload or {}
item = SubElement(channel, "item")
label = _EVENT_LABELS.get(event.event_type, "Update")
bill_label = payload.get("bill_label", event.bill_id.upper())
SubElement(item, "title").text = f"{label}: {bill_label}{payload.get('bill_title', '')}"
SubElement(item, "description").text = payload.get("brief_summary", "")
if payload.get("bill_url"):
SubElement(item, "link").text = payload["bill_url"]
SubElement(item, "pubDate").text = event.created_at.strftime("%a, %d %b %Y %H:%M:%S +0000")
SubElement(item, "guid").text = str(event.id)
return tostring(rss, encoding="unicode").encode("utf-8")