HTTP headers are ASCII-only; the em dash in "PocketVeto — Test Notification" caused a UnicodeEncodeError on every test attempt. Replaced with colon. Frontend catch blocks now extract the real server error detail from the axios response body instead of showing a generic fallback message. Authored-By: Jack Levy
206 lines
7.8 KiB
Python
206 lines
7.8 KiB
Python
"""
|
|
Notifications API — user notification settings and per-user RSS feed.
|
|
"""
|
|
import base64
|
|
import secrets
|
|
from xml.etree.ElementTree import Element, SubElement, tostring
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.responses import Response
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.dependencies import get_current_user
|
|
from app.database import get_db
|
|
from app.models.notification import NotificationEvent
|
|
from app.models.user import User
|
|
from app.schemas.schemas import (
|
|
NotificationSettingsResponse,
|
|
NotificationSettingsUpdate,
|
|
NotificationTestResult,
|
|
NtfyTestRequest,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
_EVENT_LABELS = {
|
|
"new_document": "New Bill Text",
|
|
"new_amendment": "Amendment Filed",
|
|
"bill_updated": "Bill Updated",
|
|
}
|
|
|
|
|
|
def _prefs_to_response(prefs: dict, rss_token: str | None) -> NotificationSettingsResponse:
|
|
return NotificationSettingsResponse(
|
|
ntfy_topic_url=prefs.get("ntfy_topic_url", ""),
|
|
ntfy_auth_method=prefs.get("ntfy_auth_method", "none"),
|
|
ntfy_token=prefs.get("ntfy_token", ""),
|
|
ntfy_username=prefs.get("ntfy_username", ""),
|
|
ntfy_password=prefs.get("ntfy_password", ""),
|
|
ntfy_enabled=prefs.get("ntfy_enabled", False),
|
|
rss_enabled=prefs.get("rss_enabled", False),
|
|
rss_token=rss_token,
|
|
)
|
|
|
|
|
|
@router.get("/settings", response_model=NotificationSettingsResponse)
|
|
async def get_notification_settings(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await db.get(User, current_user.id)
|
|
# Auto-generate RSS token on first visit so the feed URL is always available
|
|
if not user.rss_token:
|
|
user.rss_token = secrets.token_urlsafe(32)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
|
|
|
|
|
|
@router.put("/settings", response_model=NotificationSettingsResponse)
|
|
async def update_notification_settings(
|
|
body: NotificationSettingsUpdate,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await db.get(User, current_user.id)
|
|
prefs = dict(user.notification_prefs or {})
|
|
|
|
if body.ntfy_topic_url is not None:
|
|
prefs["ntfy_topic_url"] = body.ntfy_topic_url.strip()
|
|
if body.ntfy_auth_method is not None:
|
|
prefs["ntfy_auth_method"] = body.ntfy_auth_method
|
|
if body.ntfy_token is not None:
|
|
prefs["ntfy_token"] = body.ntfy_token.strip()
|
|
if body.ntfy_username is not None:
|
|
prefs["ntfy_username"] = body.ntfy_username.strip()
|
|
if body.ntfy_password is not None:
|
|
prefs["ntfy_password"] = body.ntfy_password.strip()
|
|
if body.ntfy_enabled is not None:
|
|
prefs["ntfy_enabled"] = body.ntfy_enabled
|
|
if body.rss_enabled is not None:
|
|
prefs["rss_enabled"] = body.rss_enabled
|
|
|
|
user.notification_prefs = prefs
|
|
|
|
if not user.rss_token:
|
|
user.rss_token = secrets.token_urlsafe(32)
|
|
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
|
|
|
|
|
|
@router.post("/settings/rss-reset", response_model=NotificationSettingsResponse)
|
|
async def reset_rss_token(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Regenerate the RSS token, invalidating the old feed URL."""
|
|
user = await db.get(User, current_user.id)
|
|
user.rss_token = secrets.token_urlsafe(32)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return _prefs_to_response(user.notification_prefs or {}, user.rss_token)
|
|
|
|
|
|
@router.post("/test/ntfy", response_model=NotificationTestResult)
|
|
async def test_ntfy(
|
|
body: NtfyTestRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Send a test push notification to verify ntfy settings."""
|
|
url = body.ntfy_topic_url.strip()
|
|
if not url:
|
|
return NotificationTestResult(status="error", detail="Topic URL is required")
|
|
|
|
headers: dict[str, str] = {
|
|
"Title": "PocketVeto: Test Notification",
|
|
"Priority": "default",
|
|
"Tags": "white_check_mark",
|
|
}
|
|
if body.ntfy_auth_method == "token" and body.ntfy_token.strip():
|
|
headers["Authorization"] = f"Bearer {body.ntfy_token.strip()}"
|
|
elif body.ntfy_auth_method == "basic" and body.ntfy_username.strip():
|
|
creds = base64.b64encode(
|
|
f"{body.ntfy_username.strip()}:{body.ntfy_password}".encode()
|
|
).decode()
|
|
headers["Authorization"] = f"Basic {creds}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(
|
|
url,
|
|
content="Your PocketVeto notification settings are working correctly.".encode("utf-8"),
|
|
headers=headers,
|
|
)
|
|
resp.raise_for_status()
|
|
return NotificationTestResult(status="ok", detail=f"Test notification sent (HTTP {resp.status_code})")
|
|
except httpx.HTTPStatusError as e:
|
|
return NotificationTestResult(status="error", detail=f"HTTP {e.response.status_code}: {e.response.text[:200]}")
|
|
except httpx.RequestError as e:
|
|
return NotificationTestResult(status="error", detail=f"Connection error: {e}")
|
|
|
|
|
|
@router.post("/test/rss", response_model=NotificationTestResult)
|
|
async def test_rss(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Verify the user's RSS feed is reachable and return its event count."""
|
|
user = await db.get(User, current_user.id)
|
|
if not user.rss_token:
|
|
return NotificationTestResult(status="error", detail="RSS token not generated — save settings first")
|
|
|
|
count_result = await db.execute(
|
|
select(NotificationEvent).where(NotificationEvent.user_id == user.id)
|
|
)
|
|
event_count = len(count_result.scalars().all())
|
|
|
|
return NotificationTestResult(
|
|
status="ok",
|
|
detail=f"RSS feed is active with {event_count} event{'s' if event_count != 1 else ''}. Subscribe to the URL shown above.",
|
|
event_count=event_count,
|
|
)
|
|
|
|
|
|
@router.get("/feed/{rss_token}.xml", include_in_schema=False)
|
|
async def rss_feed(rss_token: str, db: AsyncSession = Depends(get_db)):
|
|
"""Public tokenized RSS feed — no auth required."""
|
|
result = await db.execute(select(User).where(User.rss_token == rss_token))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="Feed not found")
|
|
|
|
events_result = await db.execute(
|
|
select(NotificationEvent)
|
|
.where(NotificationEvent.user_id == user.id)
|
|
.order_by(NotificationEvent.created_at.desc())
|
|
.limit(50)
|
|
)
|
|
events = events_result.scalars().all()
|
|
return Response(content=_build_rss(events), media_type="application/rss+xml")
|
|
|
|
|
|
def _build_rss(events: list) -> bytes:
|
|
rss = Element("rss", version="2.0")
|
|
channel = SubElement(rss, "channel")
|
|
SubElement(channel, "title").text = "PocketVeto — Bill Alerts"
|
|
SubElement(channel, "description").text = "Updates on your followed bills"
|
|
SubElement(channel, "language").text = "en-us"
|
|
|
|
for event in events:
|
|
payload = event.payload or {}
|
|
item = SubElement(channel, "item")
|
|
label = _EVENT_LABELS.get(event.event_type, "Update")
|
|
bill_label = payload.get("bill_label", event.bill_id.upper())
|
|
SubElement(item, "title").text = f"{label}: {bill_label} — {payload.get('bill_title', '')}"
|
|
SubElement(item, "description").text = payload.get("brief_summary", "")
|
|
if payload.get("bill_url"):
|
|
SubElement(item, "link").text = payload["bill_url"]
|
|
SubElement(item, "pubDate").text = event.created_at.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
|
SubElement(item, "guid").text = str(event.id)
|
|
|
|
return tostring(rss, encoding="unicode").encode("utf-8")
|