diff --git a/PocketVeto — Feature Roadmap.md b/PocketVeto — Feature Roadmap.md index faf07ed..7f4e5ec 100644 --- a/PocketVeto — Feature Roadmap.md +++ b/PocketVeto — Feature Roadmap.md @@ -76,3 +76,89 @@ - [ ] **Source Viewer Option B** — in-app bill text viewer with cited passage highlighted and scroll-to-anchor. Deferred pending UX review of Option A (GovInfo link). - [ ] **Raw Diff Panel** — Python `difflib` diff between stored document versions, shown as collapsible "Raw Changes" below amendment brief. Zero API calls. Deferred — AI amendment brief is the primary "what changed" story. - [ ] **Shareable Collection Subscriptions** — "Follow this collection" mechanic so other users can subscribe to a public collection and get its bills added to their feed. +- [ ] Pocket Veto mode (follow stance) — toggle on a bill to treat it as “I don’t want this to pass”; adds to watchlist and triggers milestone alerts (committee report-out, calendared, vote scheduled, passed chamber, etc.) +- [ ] Pocket Veto notification rules — alert only on advancement milestones + failure outcomes (failed committee / failed floor / stalled) +- [ ] Follow modes — support Neutral (normal follow) + Pocket Veto now; optional Pocket Boost later +- [ ] UI: FollowButton becomes FollowMode selector (Neutral / Pocket Veto) with explanation tooltip + + +### PocketVeto function + +#### How it should work (so it’s useful and not cringey) + +Instead of “follow/unfollow,” each bill gets a **Follow Mode**: + +- **Follow** (neutral): “Keep me posted on meaningful changes.” + +- **Pocket Veto** (oppose): “Alert me if this bill is advancing toward passage.” + +- (Optional later) **Pocket Boost** (support): “Alert me when action is needed / when it’s in trouble.” also suggest an action the user can take to let their representatives know that you support this bill. + + +For Pocket Veto specifically, the key is **threshold alerts**, not spam: + +- **Committee referral** + +- **Committee hearing scheduled** + +- **Markup scheduled** + +- **Reported out of committee** + +- **Placed on calendar** + +- **Floor vote scheduled** + +- **Passed chamber** + +- **Conference / reconciliation activity** + +- **Sent to President** + +- **Signed / Vetoed** + + +And the “failed” side: + +- **Failed in committee** + +- **Failed floor vote** + +- **Stalled** (no action for X days while similar bills move) + + +#### Why it’s valuable for “normal people” + +Most people don’t want to follow politics continuously. They want: + +- “Tell me if the bad thing is about to happen.” + That’s exactly what Pocket Veto mode does. + + +#### Guardrail to keep it non-partisan / non-toxic + +Make it explicit in UI copy: + +- It’s a **personal alert preference**, not a moral label. + +- It doesn’t publish your stance unless you share it. + + +#### Data model addition (simple) + +Add fields to `follows` (or a new table): + +- `follow_mode`: `neutral | pocket_veto | pocket_boost` + +- `alert_sensitivity`: `low | medium | high` (optional) + + +Then alert rules can be: + +- neutral: material changes + +- pocket_veto: only “advancing toward passage” milestones + +- pocket_boost: “action points” + milestones + + diff --git a/backend/alembic/versions/0010_backfill_bill_congress_urls.py b/backend/alembic/versions/0010_backfill_bill_congress_urls.py new file mode 100644 index 0000000..45c20cc --- /dev/null +++ b/backend/alembic/versions/0010_backfill_bill_congress_urls.py @@ -0,0 +1,56 @@ +"""backfill bill congress_urls with proper public URLs + +Bills stored before this fix have congress_url set to the API endpoint +(https://api.congress.gov/v3/bill/...) instead of the public page +(https://www.congress.gov/bill/...). This migration rebuilds all URLs +from the congress_number, bill_type, and bill_number columns which are +already stored correctly. + +Revision ID: 0010 +Revises: 0009 +Create Date: 2026-03-01 +""" +import sqlalchemy as sa +from alembic import op + +revision = "0010" +down_revision = "0009" +branch_labels = None +depends_on = None + +_BILL_TYPE_SLUG = { + "hr": "house-bill", + "s": "senate-bill", + "hjres": "house-joint-resolution", + "sjres": "senate-joint-resolution", + "hres": "house-resolution", + "sres": "senate-resolution", + "hconres": "house-concurrent-resolution", + "sconres": "senate-concurrent-resolution", +} + + +def _ordinal(n: int) -> str: + if 11 <= n % 100 <= 13: + return f"{n}th" + suffixes = {1: "st", 2: "nd", 3: "rd"} + return f"{n}{suffixes.get(n % 10, 'th')}" + + +def upgrade(): + conn = op.get_bind() + bills = conn.execute( + sa.text("SELECT bill_id, congress_number, bill_type, bill_number FROM bills") + ).fetchall() + for bill in bills: + slug = _BILL_TYPE_SLUG.get(bill.bill_type, bill.bill_type) + url = f"https://www.congress.gov/bill/{_ordinal(bill.congress_number)}-congress/{slug}/{bill.bill_number}" + conn.execute( + sa.text("UPDATE bills SET congress_url = :url WHERE bill_id = :bill_id"), + {"url": url, "bill_id": bill.bill_id}, + ) + + +def downgrade(): + # Original API URLs cannot be recovered — no-op + pass diff --git a/backend/alembic/versions/0011_add_notifications.py b/backend/alembic/versions/0011_add_notifications.py new file mode 100644 index 0000000..99eb4dc --- /dev/null +++ b/backend/alembic/versions/0011_add_notifications.py @@ -0,0 +1,39 @@ +"""add notifications: rss_token on users, notification_events table + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-03-01 +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision = "0011" +down_revision = "0010" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("users", sa.Column("rss_token", sa.String(), nullable=True)) + op.create_index("ix_users_rss_token", "users", ["rss_token"], unique=True) + + op.create_table( + "notification_events", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("user_id", sa.Integer(), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), + sa.Column("bill_id", sa.String(), sa.ForeignKey("bills.bill_id", ondelete="CASCADE"), nullable=False), + sa.Column("event_type", sa.String(50), nullable=False), + sa.Column("payload", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()")), + sa.Column("dispatched_at", sa.DateTime(timezone=True), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_notification_events_user_id", "notification_events", ["user_id"]) + op.create_index("ix_notification_events_dispatched_at", "notification_events", ["dispatched_at"]) + + +def downgrade(): + op.drop_table("notification_events") + op.drop_index("ix_users_rss_token", table_name="users") + op.drop_column("users", "rss_token") diff --git a/backend/alembic/versions/0012_dedupe_bill_actions_unique.py b/backend/alembic/versions/0012_dedupe_bill_actions_unique.py new file mode 100644 index 0000000..5b62d3a --- /dev/null +++ b/backend/alembic/versions/0012_dedupe_bill_actions_unique.py @@ -0,0 +1,32 @@ +"""Deduplicate bill_actions and add unique constraint on (bill_id, action_date, action_text) + +Revision ID: 0012 +Revises: 0011 +""" +from alembic import op + +revision = "0012" +down_revision = "0011" +branch_labels = None +depends_on = None + + +def upgrade(): + # Remove duplicate rows keeping the lowest id for each (bill_id, action_date, action_text) + op.execute(""" + DELETE FROM bill_actions a + USING bill_actions b + WHERE a.id > b.id + AND a.bill_id = b.bill_id + AND a.action_date IS NOT DISTINCT FROM b.action_date + AND a.action_text IS NOT DISTINCT FROM b.action_text + """) + op.create_unique_constraint( + "uq_bill_actions_bill_date_text", + "bill_actions", + ["bill_id", "action_date", "action_text"], + ) + + +def downgrade(): + op.drop_constraint("uq_bill_actions_bill_date_text", "bill_actions", type_="unique") diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index b2e1a17..2ce10ee 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -130,6 +130,10 @@ async def get_stats( WHERE bb.id IS NULL AND bd.raw_text IS NOT NULL """) )).scalar() + # Bills that have never had their action history fetched + bills_missing_actions = (await db.execute( + text("SELECT COUNT(*) FROM bills WHERE actions_fetched_at IS NULL") + )).scalar() return { "total_bills": total_bills, "docs_fetched": docs_fetched, @@ -141,6 +145,7 @@ async def get_stats( "pending_llm": pending_llm, "bills_missing_sponsor": bills_missing_sponsor, "bills_missing_metadata": bills_missing_metadata, + "bills_missing_actions": bills_missing_actions, "remaining": total_bills - total_briefs, } @@ -183,6 +188,14 @@ async def trigger_fetch_actions(current_user: User = Depends(get_current_admin)) return {"task_id": task.id, "status": "queued"} +@router.post("/backfill-all-actions") +async def backfill_all_actions(current_user: User = Depends(get_current_admin)): + """Queue action fetches for every bill that has never had actions fetched.""" + from app.workers.congress_poller import backfill_all_bill_actions + task = backfill_all_bill_actions.delay() + return {"task_id": task.id, "status": "queued"} + + @router.post("/backfill-metadata") async def backfill_metadata(current_user: User = Depends(get_current_admin)): """Fill in null introduced_date, congress_url, chamber for existing bills.""" diff --git a/backend/app/api/members.py b/backend/app/api/members.py index d92cf76..ca75aa8 100644 --- a/backend/app/api/members.py +++ b/backend/app/api/members.py @@ -69,12 +69,11 @@ async def get_member(bioguide_id: str, db: AsyncSession = Depends(get_db)): if not member: raise HTTPException(status_code=404, detail="Member not found") - # Kick off member interest scoring on first view (non-blocking) + # Kick off member interest on first view — single combined task avoids duplicate API calls if member.detail_fetched is None: try: - from app.workers.member_interest import fetch_member_news, calculate_member_trend_score - fetch_member_news.delay(bioguide_id) - calculate_member_trend_score.delay(bioguide_id) + from app.workers.member_interest import sync_member_interest + sync_member_interest.delay(bioguide_id) except Exception: pass diff --git a/backend/app/api/notifications.py b/backend/app/api/notifications.py new file mode 100644 index 0000000..25e6916 --- /dev/null +++ b/backend/app/api/notifications.py @@ -0,0 +1,138 @@ +""" +Notifications API — user notification settings and per-user RSS feed. +""" +import secrets +from xml.etree.ElementTree import Element, SubElement, tostring + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import Response +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.dependencies import get_current_user +from app.database import get_db +from app.models.notification import NotificationEvent +from app.models.user import User +from app.schemas.schemas import NotificationSettingsResponse, NotificationSettingsUpdate + +router = APIRouter() + +_EVENT_LABELS = { + "new_document": "New Bill Text", + "new_amendment": "Amendment Filed", + "bill_updated": "Bill Updated", +} + + +def _prefs_to_response(prefs: dict, rss_token: str | None) -> NotificationSettingsResponse: + return NotificationSettingsResponse( + ntfy_topic_url=prefs.get("ntfy_topic_url", ""), + ntfy_auth_method=prefs.get("ntfy_auth_method", "none"), + ntfy_token=prefs.get("ntfy_token", ""), + ntfy_username=prefs.get("ntfy_username", ""), + ntfy_password=prefs.get("ntfy_password", ""), + ntfy_enabled=prefs.get("ntfy_enabled", False), + rss_enabled=prefs.get("rss_enabled", False), + rss_token=rss_token, + ) + + +@router.get("/settings", response_model=NotificationSettingsResponse) +async def get_notification_settings( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + user = await db.get(User, current_user.id) + # Auto-generate RSS token on first visit so the feed URL is always available + if not user.rss_token: + user.rss_token = secrets.token_urlsafe(32) + await db.commit() + await db.refresh(user) + return _prefs_to_response(user.notification_prefs or {}, user.rss_token) + + +@router.put("/settings", response_model=NotificationSettingsResponse) +async def update_notification_settings( + body: NotificationSettingsUpdate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + user = await db.get(User, current_user.id) + prefs = dict(user.notification_prefs or {}) + + if body.ntfy_topic_url is not None: + prefs["ntfy_topic_url"] = body.ntfy_topic_url.strip() + if body.ntfy_auth_method is not None: + prefs["ntfy_auth_method"] = body.ntfy_auth_method + if body.ntfy_token is not None: + prefs["ntfy_token"] = body.ntfy_token.strip() + if body.ntfy_username is not None: + prefs["ntfy_username"] = body.ntfy_username.strip() + if body.ntfy_password is not None: + prefs["ntfy_password"] = body.ntfy_password.strip() + if body.ntfy_enabled is not None: + prefs["ntfy_enabled"] = body.ntfy_enabled + if body.rss_enabled is not None: + prefs["rss_enabled"] = body.rss_enabled + + user.notification_prefs = prefs + + if not user.rss_token: + user.rss_token = secrets.token_urlsafe(32) + + await db.commit() + await db.refresh(user) + return _prefs_to_response(user.notification_prefs or {}, user.rss_token) + + +@router.post("/settings/rss-reset", response_model=NotificationSettingsResponse) +async def reset_rss_token( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Regenerate the RSS token, invalidating the old feed URL.""" + user = await db.get(User, current_user.id) + user.rss_token = secrets.token_urlsafe(32) + await db.commit() + await db.refresh(user) + return _prefs_to_response(user.notification_prefs or {}, user.rss_token) + + +@router.get("/feed/{rss_token}.xml", include_in_schema=False) +async def rss_feed(rss_token: str, db: AsyncSession = Depends(get_db)): + """Public tokenized RSS feed — no auth required.""" + result = await db.execute(select(User).where(User.rss_token == rss_token)) + user = result.scalar_one_or_none() + if not user: + raise HTTPException(status_code=404, detail="Feed not found") + + events_result = await db.execute( + select(NotificationEvent) + .where(NotificationEvent.user_id == user.id) + .order_by(NotificationEvent.created_at.desc()) + .limit(50) + ) + events = events_result.scalars().all() + return Response(content=_build_rss(events), media_type="application/rss+xml") + + +def _build_rss(events: list) -> bytes: + rss = Element("rss", version="2.0") + channel = SubElement(rss, "channel") + SubElement(channel, "title").text = "PocketVeto — Bill Alerts" + SubElement(channel, "description").text = "Updates on your followed bills" + SubElement(channel, "language").text = "en-us" + + for event in events: + payload = event.payload or {} + item = SubElement(channel, "item") + label = _EVENT_LABELS.get(event.event_type, "Update") + bill_label = payload.get("bill_label", event.bill_id.upper()) + SubElement(item, "title").text = f"{label}: {bill_label} — {payload.get('bill_title', '')}" + SubElement(item, "description").text = payload.get("brief_summary", "") + if payload.get("bill_url"): + SubElement(item, "link").text = payload["bill_url"] + SubElement(item, "pubDate").text = event.created_at.strftime("%a, %d %b %Y %H:%M:%S +0000") + SubElement(item, "guid").text = str(event.id) + + return tostring(rss, encoding="unicode").encode("utf-8") diff --git a/backend/app/main.py b/backend/app/main.py index fba258e..28602ba 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from app.api import bills, members, follows, dashboard, search, settings, admin, health, auth +from app.api import bills, members, follows, dashboard, search, settings, admin, health, auth, notifications from app.config import settings as config app = FastAPI( @@ -27,3 +27,4 @@ app.include_router(search.router, prefix="/api/search", tags=["search"]) app.include_router(settings.router, prefix="/api/settings", tags=["settings"]) app.include_router(admin.router, prefix="/api/admin", tags=["admin"]) app.include_router(health.router, prefix="/api/health", tags=["health"]) +app.include_router(notifications.router, prefix="/api/notifications", tags=["notifications"]) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 00de67f..1860f62 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -4,6 +4,7 @@ from app.models.follow import Follow from app.models.member import Member from app.models.member_interest import MemberTrendScore, MemberNewsArticle from app.models.news import NewsArticle +from app.models.notification import NotificationEvent from app.models.setting import AppSetting from app.models.trend import TrendScore from app.models.committee import Committee, CommitteeBill @@ -19,6 +20,7 @@ __all__ = [ "MemberTrendScore", "MemberNewsArticle", "NewsArticle", + "NotificationEvent", "AppSetting", "TrendScore", "Committee", diff --git a/backend/app/models/notification.py b/backend/app/models/notification.py new file mode 100644 index 0000000..5b3d991 --- /dev/null +++ b/backend/app/models/notification.py @@ -0,0 +1,27 @@ +from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func + +from app.database import Base + + +class NotificationEvent(Base): + __tablename__ = "notification_events" + + id = Column(Integer, primary_key=True, autoincrement=True) + user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + bill_id = Column(String, ForeignKey("bills.bill_id", ondelete="CASCADE"), nullable=False) + # new_document | new_amendment | bill_updated + event_type = Column(String(50), nullable=False) + # {bill_title, bill_label, brief_summary, bill_url} + payload = Column(JSONB) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + dispatched_at = Column(DateTime(timezone=True), nullable=True) + + user = relationship("User", back_populates="notification_events") + + __table_args__ = ( + Index("ix_notification_events_user_id", "user_id"), + Index("ix_notification_events_dispatched_at", "dispatched_at"), + ) diff --git a/backend/app/models/user.py b/backend/app/models/user.py index 7f1bb18..dd6ef28 100644 --- a/backend/app/models/user.py +++ b/backend/app/models/user.py @@ -14,6 +14,8 @@ class User(Base): hashed_password = Column(String, nullable=False) is_admin = Column(Boolean, nullable=False, default=False) notification_prefs = Column(JSONB, nullable=False, default=dict) + rss_token = Column(String, unique=True, nullable=True, index=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) follows = relationship("Follow", back_populates="user", cascade="all, delete-orphan") + notification_events = relationship("NotificationEvent", back_populates="user", cascade="all, delete-orphan") diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index 0743c9d..4c9c9e4 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -8,8 +8,12 @@ from pydantic import BaseModel class NotificationSettingsResponse(BaseModel): ntfy_topic_url: str = "" + ntfy_auth_method: str = "none" # none | token | basic ntfy_token: str = "" + ntfy_username: str = "" + ntfy_password: str = "" ntfy_enabled: bool = False + rss_enabled: bool = False rss_token: Optional[str] = None model_config = {"from_attributes": True} @@ -17,8 +21,12 @@ class NotificationSettingsResponse(BaseModel): class NotificationSettingsUpdate(BaseModel): ntfy_topic_url: Optional[str] = None + ntfy_auth_method: Optional[str] = None ntfy_token: Optional[str] = None + ntfy_username: Optional[str] = None + ntfy_password: Optional[str] = None ntfy_enabled: Optional[bool] = None + rss_enabled: Optional[bool] = None T = TypeVar("T") diff --git a/backend/app/services/congress_api.py b/backend/app/services/congress_api.py index cbd860f..57b02f3 100644 --- a/backend/app/services/congress_api.py +++ b/backend/app/services/congress_api.py @@ -15,6 +15,30 @@ from app.config import settings BASE_URL = "https://api.congress.gov/v3" +_BILL_TYPE_SLUG = { + "hr": "house-bill", + "s": "senate-bill", + "hjres": "house-joint-resolution", + "sjres": "senate-joint-resolution", + "hres": "house-resolution", + "sres": "senate-resolution", + "hconres": "house-concurrent-resolution", + "sconres": "senate-concurrent-resolution", +} + + +def _congress_ordinal(n: int) -> str: + if 11 <= n % 100 <= 13: + return f"{n}th" + suffixes = {1: "st", 2: "nd", 3: "rd"} + return f"{n}{suffixes.get(n % 10, 'th')}" + + +def build_bill_public_url(congress: int, bill_type: str, bill_number: int) -> str: + """Return the public congress.gov page URL for a bill (not the API endpoint).""" + slug = _BILL_TYPE_SLUG.get(bill_type.lower(), bill_type.lower()) + return f"https://www.congress.gov/bill/{_congress_ordinal(congress)}-congress/{slug}/{bill_number}" + def _get_current_congress() -> int: """Calculate the current Congress number. 119th started Jan 3, 2025.""" @@ -98,7 +122,7 @@ def parse_bill_from_api(data: dict, congress: int) -> dict: "latest_action_text": latest_action.get("text"), "status": latest_action.get("text", "")[:100] if latest_action.get("text") else None, "chamber": "House" if bill_type.startswith("h") else "Senate", - "congress_url": data.get("url"), + "congress_url": build_bill_public_url(congress, bill_type, bill_number), } diff --git a/backend/app/services/news_service.py b/backend/app/services/news_service.py index 2f7f67b..1ca143e 100644 --- a/backend/app/services/news_service.py +++ b/backend/app/services/news_service.py @@ -7,10 +7,11 @@ News correlation service. import logging import time import urllib.parse -from datetime import datetime, timedelta, timezone +from datetime import date, datetime, timedelta, timezone from typing import Optional import feedparser +import redis import requests from tenacity import retry, stop_after_attempt, wait_exponential @@ -22,6 +23,34 @@ NEWSAPI_BASE = "https://newsapi.org/v2" GOOGLE_NEWS_RSS = "https://news.google.com/rss/search" NEWSAPI_DAILY_LIMIT = 95 # Leave 5 as buffer +_NEWSAPI_REDIS_PREFIX = "newsapi:daily_calls:" + + +def _newsapi_redis(): + return redis.from_url(settings.REDIS_URL, decode_responses=True) + + +def _newsapi_quota_ok() -> bool: + """Return True if we have quota remaining for today.""" + try: + key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}" + used = int(_newsapi_redis().get(key) or 0) + return used < NEWSAPI_DAILY_LIMIT + except Exception: + return True # Don't block on Redis errors + + +def _newsapi_record_call(): + try: + r = _newsapi_redis() + key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}" + pipe = r.pipeline() + pipe.incr(key) + pipe.expire(key, 90000) # 25 hours — expires safely after midnight + pipe.execute() + except Exception: + pass + @retry(stop=stop_after_attempt(2), wait=wait_exponential(min=1, max=5)) def _newsapi_get(endpoint: str, params: dict) -> dict: @@ -51,6 +80,9 @@ def fetch_newsapi_articles(query: str, days: int = 30) -> list[dict]: """Fetch articles from NewsAPI.org. Returns empty list if quota is exhausted or key not set.""" if not settings.NEWSAPI_KEY: return [] + if not _newsapi_quota_ok(): + logger.warning("NewsAPI daily quota exhausted — skipping fetch") + return [] try: from_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d") data = _newsapi_get("everything", { @@ -60,6 +92,7 @@ def fetch_newsapi_articles(query: str, days: int = 30) -> list[dict]: "pageSize": 10, "from": from_date, }) + _newsapi_record_call() articles = data.get("articles", []) return [ { diff --git a/backend/app/workers/congress_poller.py b/backend/app/workers/congress_poller.py index 151c199..4ba7a79 100644 --- a/backend/app/workers/congress_poller.py +++ b/backend/app/workers/congress_poller.py @@ -10,6 +10,7 @@ import time from datetime import datetime, timedelta, timezone from sqlalchemy import or_ +from sqlalchemy.dialects.postgresql import insert as pg_insert from app.database import get_sync_db from app.models import Bill, BillAction, Member, AppSetting @@ -227,30 +228,15 @@ def fetch_bill_actions(self, bill_id: str): break for action in actions_data: - action_date_str = action.get("actionDate") - action_text = action.get("text", "") - action_type = action.get("type") - chamber = action.get("chamber") - - # Idempotency check: skip if (bill_id, action_date, action_text) exists - exists = ( - db.query(BillAction) - .filter( - BillAction.bill_id == bill_id, - BillAction.action_date == action_date_str, - BillAction.action_text == action_text, - ) - .first() - ) - if not exists: - db.add(BillAction( - bill_id=bill_id, - action_date=action_date_str, - action_text=action_text, - action_type=action_type, - chamber=chamber, - )) - inserted += 1 + stmt = pg_insert(BillAction.__table__).values( + bill_id=bill_id, + action_date=action.get("actionDate"), + action_text=action.get("text", ""), + action_type=action.get("type"), + chamber=action.get("chamber"), + ).on_conflict_do_nothing(constraint="uq_bill_actions_bill_date_text") + result = db.execute(stmt) + inserted += result.rowcount db.commit() offset += 250 @@ -297,6 +283,28 @@ def fetch_actions_for_active_bills(self): db.close() +@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_all_bill_actions") +def backfill_all_bill_actions(self): + """One-time backfill: enqueue action fetches for every bill that has never had actions fetched.""" + db = get_sync_db() + try: + bills = ( + db.query(Bill) + .filter(Bill.actions_fetched_at.is_(None)) + .order_by(Bill.latest_action_date.desc()) + .all() + ) + queued = 0 + for bill in bills: + fetch_bill_actions.delay(bill.bill_id) + queued += 1 + time.sleep(0.05) # ~20 tasks/sec — workers will self-throttle against Congress.gov + logger.info(f"backfill_all_bill_actions: queued {queued} bills") + return {"queued": queued} + finally: + db.close() + + def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool: """Update bill fields if anything has changed. Returns True if updated.""" changed = False diff --git a/backend/app/workers/member_interest.py b/backend/app/workers/member_interest.py index 288108c..be1c621 100644 --- a/backend/app/workers/member_interest.py +++ b/backend/app/workers/member_interest.py @@ -26,6 +26,81 @@ def _parse_pub_at(raw: str | None) -> datetime | None: return None +@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.sync_member_interest") +def sync_member_interest(self, bioguide_id: str): + """ + Fetch news and score a member in a single API pass. + Called on first profile view — avoids the 2x NewsAPI + GNews calls that + result from queuing fetch_member_news and calculate_member_trend_score separately. + """ + db = get_sync_db() + try: + member = db.get(Member, bioguide_id) + if not member or not member.first_name or not member.last_name: + return {"status": "skipped"} + + query = news_service.build_member_query( + first_name=member.first_name, + last_name=member.last_name, + chamber=member.chamber, + ) + + # Single fetch — results reused for both article storage and scoring + newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) + gnews_articles = news_service.fetch_gnews_articles(query, days=30) + all_articles = newsapi_articles + gnews_articles + + saved = 0 + for article in all_articles: + url = article.get("url") + if not url: + continue + existing = ( + db.query(MemberNewsArticle) + .filter_by(member_id=bioguide_id, url=url) + .first() + ) + if existing: + continue + db.add(MemberNewsArticle( + member_id=bioguide_id, + source=article.get("source", "")[:200], + headline=article.get("headline", ""), + url=url, + published_at=_parse_pub_at(article.get("published_at")), + relevance_score=1.0, + )) + saved += 1 + + # Score using counts already in hand — no second API round-trip + today = date.today() + if not db.query(MemberTrendScore).filter_by(member_id=bioguide_id, score_date=today).first(): + keywords = trends_service.keywords_for_member(member.first_name, member.last_name) + gtrends_score = trends_service.get_trends_score(keywords) + composite = calculate_composite_score( + len(newsapi_articles), len(gnews_articles), gtrends_score + ) + db.add(MemberTrendScore( + member_id=bioguide_id, + score_date=today, + newsapi_count=len(newsapi_articles), + gnews_count=len(gnews_articles), + gtrends_score=gtrends_score, + composite_score=composite, + )) + + db.commit() + logger.info(f"Synced member interest for {bioguide_id}: {saved} articles saved") + return {"status": "ok", "saved": saved} + + except Exception as exc: + db.rollback() + logger.error(f"Member interest sync failed for {bioguide_id}: {exc}") + raise self.retry(exc=exc, countdown=300) + finally: + db.close() + + @celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.fetch_member_news") def fetch_member_news(self, bioguide_id: str): """Fetch and store recent news articles for a specific member.""" diff --git a/backend/app/workers/notification_dispatcher.py b/backend/app/workers/notification_dispatcher.py new file mode 100644 index 0000000..d158f1d --- /dev/null +++ b/backend/app/workers/notification_dispatcher.py @@ -0,0 +1,115 @@ +""" +Notification dispatcher — sends pending notification events via ntfy. + +RSS is pull-based so no dispatch is needed for it; events are simply +marked dispatched once ntfy is sent (or immediately if the user has no +ntfy configured but has an RSS token, so the feed can clean up old items). + +Runs every 5 minutes on Celery Beat. +""" +import logging +from datetime import datetime, timezone + +import requests + +from app.database import get_sync_db +from app.models.notification import NotificationEvent +from app.models.user import User +from app.workers.celery_app import celery_app + +logger = logging.getLogger(__name__) + +NTFY_TIMEOUT = 10 + +_EVENT_TITLES = { + "new_document": "New Bill Text Published", + "new_amendment": "Amendment Filed", + "bill_updated": "Bill Updated", +} + + +@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications") +def dispatch_notifications(self): + """Fan out pending notification events to ntfy and mark dispatched.""" + db = get_sync_db() + try: + pending = ( + db.query(NotificationEvent) + .filter(NotificationEvent.dispatched_at.is_(None)) + .order_by(NotificationEvent.created_at) + .limit(200) + .all() + ) + + sent = 0 + failed = 0 + now = datetime.now(timezone.utc) + + for event in pending: + user = db.get(User, event.user_id) + if not user: + event.dispatched_at = now + db.commit() + continue + + prefs = user.notification_prefs or {} + ntfy_url = prefs.get("ntfy_topic_url", "").strip() + ntfy_auth_method = prefs.get("ntfy_auth_method", "none") + ntfy_token = prefs.get("ntfy_token", "").strip() + ntfy_username = prefs.get("ntfy_username", "").strip() + ntfy_password = prefs.get("ntfy_password", "").strip() + ntfy_enabled = prefs.get("ntfy_enabled", False) + rss_enabled = prefs.get("rss_enabled", False) + + if ntfy_enabled and ntfy_url: + try: + _send_ntfy(event, ntfy_url, ntfy_auth_method, ntfy_token, ntfy_username, ntfy_password) + sent += 1 + except Exception as e: + logger.warning(f"ntfy dispatch failed for event {event.id}: {e}") + failed += 1 + + # Mark dispatched once handled by at least one enabled channel. + # RSS is pull-based — no action needed beyond creating the event record. + if (ntfy_enabled and ntfy_url) or rss_enabled: + event.dispatched_at = now + db.commit() + + logger.info(f"dispatch_notifications: {sent} sent, {failed} failed, {len(pending)} pending") + return {"sent": sent, "failed": failed, "total": len(pending)} + finally: + db.close() + + +def _send_ntfy( + event: NotificationEvent, + topic_url: str, + auth_method: str = "none", + token: str = "", + username: str = "", + password: str = "", +) -> None: + import base64 + payload = event.payload or {} + bill_label = payload.get("bill_label", event.bill_id.upper()) + bill_title = payload.get("bill_title", "") + message = f"{bill_label}: {bill_title}" + if payload.get("brief_summary"): + message += f"\n\n{payload['brief_summary'][:280]}" + + headers = { + "Title": _EVENT_TITLES.get(event.event_type, "Bill Update"), + "Priority": "default", + "Tags": "scroll", + } + if payload.get("bill_url"): + headers["Click"] = payload["bill_url"] + + if auth_method == "token" and token: + headers["Authorization"] = f"Bearer {token}" + elif auth_method == "basic" and username: + creds = base64.b64encode(f"{username}:{password}".encode()).decode() + headers["Authorization"] = f"Basic {creds}" + + resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT) + resp.raise_for_status() diff --git a/frontend/app/notifications/page.tsx b/frontend/app/notifications/page.tsx new file mode 100644 index 0000000..3440032 --- /dev/null +++ b/frontend/app/notifications/page.tsx @@ -0,0 +1,282 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { useQuery, useMutation } from "@tanstack/react-query"; +import { Bell, Rss, CheckCircle, Copy, RefreshCw } from "lucide-react"; +import { notificationsAPI } from "@/lib/api"; + +const AUTH_METHODS = [ + { value: "none", label: "No authentication", hint: "Public ntfy.sh topics or open self-hosted servers" }, + { value: "token", label: "Access token", hint: "ntfy token (tk_...)" }, + { value: "basic", label: "Username & password", hint: "For servers behind HTTP basic auth or nginx ACL" }, +]; + +export default function NotificationsPage() { + const { data: settings, refetch } = useQuery({ + queryKey: ["notification-settings"], + queryFn: () => notificationsAPI.getSettings(), + }); + + const update = useMutation({ + mutationFn: (data: Parameters[0]) => + notificationsAPI.updateSettings(data), + onSuccess: () => refetch(), + }); + + const resetRss = useMutation({ + mutationFn: () => notificationsAPI.resetRssToken(), + onSuccess: () => refetch(), + }); + + // ntfy form state + const [topicUrl, setTopicUrl] = useState(""); + const [authMethod, setAuthMethod] = useState("none"); + const [token, setToken] = useState(""); + const [username, setUsername] = useState(""); + const [password, setPassword] = useState(""); + const [ntfySaved, setNtfySaved] = useState(false); + + // RSS state + const [rssSaved, setRssSaved] = useState(false); + const [copied, setCopied] = useState(false); + + // Populate from loaded settings + useEffect(() => { + if (!settings) return; + setTopicUrl(settings.ntfy_topic_url ?? ""); + setAuthMethod(settings.ntfy_auth_method ?? "none"); + setToken(settings.ntfy_token ?? ""); + setUsername(settings.ntfy_username ?? ""); + setPassword(settings.ntfy_password ?? ""); + }, [settings]); + + const saveNtfy = (enabled: boolean) => { + update.mutate( + { + ntfy_topic_url: topicUrl, + ntfy_auth_method: authMethod, + ntfy_token: authMethod === "token" ? token : "", + ntfy_username: authMethod === "basic" ? username : "", + ntfy_password: authMethod === "basic" ? password : "", + ntfy_enabled: enabled, + }, + { onSuccess: () => { setNtfySaved(true); setTimeout(() => setNtfySaved(false), 2000); } } + ); + }; + + const toggleRss = (enabled: boolean) => { + update.mutate( + { rss_enabled: enabled }, + { onSuccess: () => { setRssSaved(true); setTimeout(() => setRssSaved(false), 2000); } } + ); + }; + + const rssUrl = settings?.rss_token + ? `${typeof window !== "undefined" ? window.location.origin : ""}/api/notifications/feed/${settings.rss_token}.xml` + : null; + + return ( +
+
+

+ Notifications +

+

+ Get alerted when bills you follow are updated, new text is published, or amendments are filed. +

+
+ + {/* ntfy */} +
+
+
+

+ Push Notifications (ntfy) +

+

+ Uses ntfy — a free, open-source push notification service. + Use the public ntfy.sh server or your own self-hosted instance. +

+
+ {settings?.ntfy_enabled && ( + + Active + + )} +
+ + {/* Topic URL */} +
+ +

+ The full URL to your ntfy topic, e.g.{" "} + https://ntfy.sh/my-pocketveto-alerts +

+ setTopicUrl(e.target.value)} + className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" + /> +
+ + {/* Auth method */} +
+ +
+ {AUTH_METHODS.map(({ value, label, hint }) => ( + + ))} +
+
+ + {/* Token input */} + {authMethod === "token" && ( +
+ + setToken(e.target.value)} + className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" + /> +
+ )} + + {/* Basic auth inputs */} + {authMethod === "basic" && ( +
+
+ + setUsername(e.target.value)} + className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" + /> +
+
+ + setPassword(e.target.value)} + className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" + /> +
+
+ )} + + {/* Actions */} +
+ + {settings?.ntfy_enabled && ( + + )} +
+
+ + {/* RSS */} +
+
+
+

+ RSS Feed +

+

+ A private, tokenized RSS feed of your bill alerts — subscribe in any RSS reader. + Independent of ntfy; enable either or both. +

+
+ {settings?.rss_enabled && ( + + Active + + )} +
+ + {rssUrl && ( +
+ +
+ {rssUrl} + +
+
+ )} + +
+ {!settings?.rss_enabled ? ( + + ) : ( + + )} + {rssUrl && ( + + )} +
+
+
+ ); +} diff --git a/frontend/app/settings/page.tsx b/frontend/app/settings/page.tsx index b680b6b..33b3011 100644 --- a/frontend/app/settings/page.tsx +++ b/frontend/app/settings/page.tsx @@ -13,14 +13,11 @@ import { Trash2, ShieldCheck, ShieldOff, - FileText, - Brain, BarChart3, Bell, - Copy, - Rss, } from "lucide-react"; -import { settingsAPI, adminAPI, notificationsAPI, type AdminUser, type LLMModel, type ApiHealthResult } from "@/lib/api"; +import Link from "next/link"; +import { settingsAPI, adminAPI, type AdminUser, type LLMModel, type ApiHealthResult } from "@/lib/api"; import { useAuthStore } from "@/stores/authStore"; const LLM_PROVIDERS = [ @@ -80,27 +77,6 @@ export default function SettingsPage() { onSuccess: () => qc.invalidateQueries({ queryKey: ["admin-users"] }), }); - const { data: notifSettings, refetch: refetchNotif } = useQuery({ - queryKey: ["notification-settings"], - queryFn: () => notificationsAPI.getSettings(), - }); - - const updateNotif = useMutation({ - mutationFn: (data: Parameters[0]) => - notificationsAPI.updateSettings(data), - onSuccess: () => refetchNotif(), - }); - - const resetRss = useMutation({ - mutationFn: () => notificationsAPI.resetRssToken(), - onSuccess: () => refetchNotif(), - }); - - const [ntfyUrl, setNtfyUrl] = useState(""); - const [ntfyToken, setNtfyToken] = useState(""); - const [notifSaved, setNotifSaved] = useState(false); - const [copied, setCopied] = useState(false); - // Live model list from provider API const { data: modelsData, isFetching: modelsFetching, refetch: refetchModels } = useQuery({ queryKey: ["llm-models", settings?.llm_provider], @@ -194,6 +170,21 @@ export default function SettingsPage() {

Manage users, LLM provider, and system settings

+ {/* Notifications link */} + +
+ +
+
Notification Settings
+
Configure ntfy push alerts and RSS feed per user
+
+
+ + + {/* Analysis Status */}

@@ -488,113 +479,6 @@ export default function SettingsPage() {

- {/* Notifications */} -
-

- Notifications -

- - {/* ntfy */} -
-
- -

- Your ntfy topic — use ntfy.sh (public) or your self-hosted server. - e.g. https://ntfy.sh/your-topic -

- setNtfyUrl(e.target.value)} - className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" - /> -
-
- -

Required only for private/self-hosted topics with access control.

- setNtfyToken(e.target.value)} - className="w-full px-3 py-2 text-sm bg-background border border-border rounded-md focus:outline-none focus:ring-1 focus:ring-primary" - /> -
-
- - {notifSettings?.ntfy_enabled && ( - - )} - {notifSettings?.ntfy_enabled && ( - - ntfy active - - )} -
-
- - {/* RSS */} -
-
- - RSS Feed -
- {notifSettings?.rss_token ? ( -
-
- - {`${window.location.origin}/api/notifications/feed/${notifSettings.rss_token}.xml`} - - -
- -
- ) : ( -

- Save your ntfy settings above to generate your personal RSS feed URL. -

- )} -
-
- {/* API Health */}
@@ -683,6 +567,15 @@ export default function SettingsPage() { fn: adminAPI.triggerFetchActions, status: "on-demand", }, + { + key: "backfill-actions", + name: "Backfill All Action Histories", + description: "One-time catch-up: fetch action histories for all bills that were imported before this feature existed. Run once to populate timelines across your full bill archive.", + fn: adminAPI.backfillAllActions, + status: stats ? (stats.bills_missing_actions > 0 ? "needed" : "ok") : "on-demand", + count: stats?.bills_missing_actions, + countLabel: "bills missing action history", + }, { key: "sponsors", name: "Backfill Sponsors", diff --git a/frontend/components/shared/Sidebar.tsx b/frontend/components/shared/Sidebar.tsx index f3f92ed..5cc20d6 100644 --- a/frontend/components/shared/Sidebar.tsx +++ b/frontend/components/shared/Sidebar.tsx @@ -8,6 +8,7 @@ import { Users, Tags, Heart, + Bell, Settings, Landmark, LogOut, @@ -24,6 +25,7 @@ const NAV = [ { href: "/members", label: "Members", icon: Users, adminOnly: false }, { href: "/topics", label: "Topics", icon: Tags, adminOnly: false }, { href: "/following", label: "Following", icon: Heart, adminOnly: false }, + { href: "/notifications", label: "Notifications", icon: Bell, adminOnly: false }, { href: "/settings", label: "Admin", icon: Settings, adminOnly: true }, ]; diff --git a/frontend/lib/api.ts b/frontend/lib/api.ts index 364e3a0..f8d5ec2 100644 --- a/frontend/lib/api.ts +++ b/frontend/lib/api.ts @@ -161,6 +161,7 @@ export interface AnalysisStats { pending_llm: number; bills_missing_sponsor: number; bills_missing_metadata: number; + bills_missing_actions: number; remaining: number; } @@ -199,6 +200,8 @@ export const adminAPI = { apiClient.post("/api/admin/backfill-citations").then((r) => r.data), triggerFetchActions: () => apiClient.post("/api/admin/trigger-fetch-actions").then((r) => r.data), + backfillAllActions: () => + apiClient.post("/api/admin/backfill-all-actions").then((r) => r.data), backfillMetadata: () => apiClient.post("/api/admin/backfill-metadata").then((r) => r.data), resumeAnalysis: () => diff --git a/frontend/lib/types.ts b/frontend/lib/types.ts index 840d0ad..1605b3a 100644 --- a/frontend/lib/types.ts +++ b/frontend/lib/types.ts @@ -157,7 +157,11 @@ export interface SettingsData { export interface NotificationSettings { ntfy_topic_url: string; + ntfy_auth_method: string; // "none" | "token" | "basic" ntfy_token: string; + ntfy_username: string; + ntfy_password: string; ntfy_enabled: boolean; + rss_enabled: boolean; rss_token: string | null; }