Files
PocketVeto/backend/app/api/notifications.py
Jack Levy 50399adf44 feat(notifications): add Test button for ntfy and RSS with inline result
- POST /api/notifications/test/ntfy — sends a real push using current form
  values (not saved settings) so auth can be verified before saving; returns
  status + HTTP detail on success or error message on failure
- POST /api/notifications/test/rss — confirms the feed token exists and
  returns event count; no bill FK required
- NtfyTestRequest + NotificationTestResult schemas added
- Frontend: Test button next to Save on both ntfy and RSS sections; result
  shown inline as a green/red pill; uses current form state for ntfy so
  the user can test before committing

All future notification types should follow the same test-before-save pattern.

Authored-By: Jack Levy
2026-03-01 12:10:10 -05:00

206 lines
7.8 KiB
Python

"""
Notifications API — user notification settings and per-user RSS feed.
"""
import base64
import secrets
from xml.etree.ElementTree import Element, SubElement, tostring
import httpx
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import Response
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.dependencies import get_current_user
from app.database import get_db
from app.models.notification import NotificationEvent
from app.models.user import User
from app.schemas.schemas import (
NotificationSettingsResponse,
NotificationSettingsUpdate,
NotificationTestResult,
NtfyTestRequest,
)
router = APIRouter()
_EVENT_LABELS = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
}
def _prefs_to_response(prefs: dict, rss_token: str | None) -> NotificationSettingsResponse:
return NotificationSettingsResponse(
ntfy_topic_url=prefs.get("ntfy_topic_url", ""),
ntfy_auth_method=prefs.get("ntfy_auth_method", "none"),
ntfy_token=prefs.get("ntfy_token", ""),
ntfy_username=prefs.get("ntfy_username", ""),
ntfy_password=prefs.get("ntfy_password", ""),
ntfy_enabled=prefs.get("ntfy_enabled", False),
rss_enabled=prefs.get("rss_enabled", False),
rss_token=rss_token,
)
@router.get("/settings", response_model=NotificationSettingsResponse)
async def get_notification_settings(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
user = await db.get(User, current_user.id)
# Auto-generate RSS token on first visit so the feed URL is always available
if not user.rss_token:
user.rss_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.put("/settings", response_model=NotificationSettingsResponse)
async def update_notification_settings(
body: NotificationSettingsUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
user = await db.get(User, current_user.id)
prefs = dict(user.notification_prefs or {})
if body.ntfy_topic_url is not None:
prefs["ntfy_topic_url"] = body.ntfy_topic_url.strip()
if body.ntfy_auth_method is not None:
prefs["ntfy_auth_method"] = body.ntfy_auth_method
if body.ntfy_token is not None:
prefs["ntfy_token"] = body.ntfy_token.strip()
if body.ntfy_username is not None:
prefs["ntfy_username"] = body.ntfy_username.strip()
if body.ntfy_password is not None:
prefs["ntfy_password"] = body.ntfy_password.strip()
if body.ntfy_enabled is not None:
prefs["ntfy_enabled"] = body.ntfy_enabled
if body.rss_enabled is not None:
prefs["rss_enabled"] = body.rss_enabled
user.notification_prefs = prefs
if not user.rss_token:
user.rss_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.post("/settings/rss-reset", response_model=NotificationSettingsResponse)
async def reset_rss_token(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Regenerate the RSS token, invalidating the old feed URL."""
user = await db.get(User, current_user.id)
user.rss_token = secrets.token_urlsafe(32)
await db.commit()
await db.refresh(user)
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
@router.post("/test/ntfy", response_model=NotificationTestResult)
async def test_ntfy(
body: NtfyTestRequest,
current_user: User = Depends(get_current_user),
):
"""Send a test push notification to verify ntfy settings."""
url = body.ntfy_topic_url.strip()
if not url:
return NotificationTestResult(status="error", detail="Topic URL is required")
headers: dict[str, str] = {
"Title": "PocketVeto — Test Notification",
"Priority": "default",
"Tags": "white_check_mark",
}
if body.ntfy_auth_method == "token" and body.ntfy_token.strip():
headers["Authorization"] = f"Bearer {body.ntfy_token.strip()}"
elif body.ntfy_auth_method == "basic" and body.ntfy_username.strip():
creds = base64.b64encode(
f"{body.ntfy_username.strip()}:{body.ntfy_password}".encode()
).decode()
headers["Authorization"] = f"Basic {creds}"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
url,
content="Your PocketVeto notification settings are working correctly.".encode("utf-8"),
headers=headers,
)
resp.raise_for_status()
return NotificationTestResult(status="ok", detail=f"Test notification sent (HTTP {resp.status_code})")
except httpx.HTTPStatusError as e:
return NotificationTestResult(status="error", detail=f"HTTP {e.response.status_code}: {e.response.text[:200]}")
except httpx.RequestError as e:
return NotificationTestResult(status="error", detail=f"Connection error: {e}")
@router.post("/test/rss", response_model=NotificationTestResult)
async def test_rss(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Verify the user's RSS feed is reachable and return its event count."""
user = await db.get(User, current_user.id)
if not user.rss_token:
return NotificationTestResult(status="error", detail="RSS token not generated — save settings first")
count_result = await db.execute(
select(NotificationEvent).where(NotificationEvent.user_id == user.id)
)
event_count = len(count_result.scalars().all())
return NotificationTestResult(
status="ok",
detail=f"RSS feed is active with {event_count} event{'s' if event_count != 1 else ''}. Subscribe to the URL shown above.",
event_count=event_count,
)
@router.get("/feed/{rss_token}.xml", include_in_schema=False)
async def rss_feed(rss_token: str, db: AsyncSession = Depends(get_db)):
"""Public tokenized RSS feed — no auth required."""
result = await db.execute(select(User).where(User.rss_token == rss_token))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="Feed not found")
events_result = await db.execute(
select(NotificationEvent)
.where(NotificationEvent.user_id == user.id)
.order_by(NotificationEvent.created_at.desc())
.limit(50)
)
events = events_result.scalars().all()
return Response(content=_build_rss(events), media_type="application/rss+xml")
def _build_rss(events: list) -> bytes:
rss = Element("rss", version="2.0")
channel = SubElement(rss, "channel")
SubElement(channel, "title").text = "PocketVeto — Bill Alerts"
SubElement(channel, "description").text = "Updates on your followed bills"
SubElement(channel, "language").text = "en-us"
for event in events:
payload = event.payload or {}
item = SubElement(channel, "item")
label = _EVENT_LABELS.get(event.event_type, "Update")
bill_label = payload.get("bill_label", event.bill_id.upper())
SubElement(item, "title").text = f"{label}: {bill_label}{payload.get('bill_title', '')}"
SubElement(item, "description").text = payload.get("brief_summary", "")
if payload.get("bill_url"):
SubElement(item, "link").text = payload["bill_url"]
SubElement(item, "pubDate").text = event.created_at.strftime("%a, %d %b %Y %H:%M:%S +0000")
SubElement(item, "guid").text = str(event.id)
return tostring(rss, encoding="unicode").encode("utf-8")