Files
PocketVeto/backend/app/api/admin.py
Jack Levy d5711312b8 feat: bill action pipeline, What Changed UI, citation backfill, admin panel
Backend:
- Add fetch_bill_actions task with pagination and idempotent upsert
- Add fetch_actions_for_active_bills nightly batch (4 AM UTC beat schedule)
- Wire fetch_bill_actions into new-bill creation and _update_bill_if_changed
- Add backfill_brief_citations task: detects pre-citation briefs by JSONB
  type check, deletes them, re-queues LLM processing against stored text
  (LLM calls only — zero Congress.gov or GovInfo calls)
- Add admin endpoints: POST /bills/{id}/reprocess, /backfill-citations,
  /trigger-fetch-actions; add uncited_briefs count to /stats

Frontend:
- New BriefPanel component: wraps AIBriefCard, adds amber "What Changed"
  badge for amendment briefs and collapsible version history with
  inline brief expansion
- Swap AIBriefCard for BriefPanel on bill detail page
- Admin panel: Backfill Citations + Fetch Bill Actions buttons; amber
  warning in stats when uncited briefs remain
- Add feature roadmap document with phased plan through Phase 5

Co-Authored-By: Jack Levy
2026-03-01 03:03:29 -05:00

184 lines
7.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.dependencies import get_current_admin
from app.database import get_db
from app.models import Bill, BillBrief, BillDocument, Follow
from app.models.user import User
from app.schemas.schemas import UserResponse
router = APIRouter()
# ── User Management ───────────────────────────────────────────────────────────
class UserWithStats(UserResponse):
follow_count: int
@router.get("/users", response_model=list[UserWithStats])
async def list_users(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
"""List all users with their follow counts."""
users_result = await db.execute(select(User).order_by(User.created_at))
users = users_result.scalars().all()
counts_result = await db.execute(
select(Follow.user_id, func.count(Follow.id).label("cnt"))
.group_by(Follow.user_id)
)
counts = {row.user_id: row.cnt for row in counts_result}
return [
UserWithStats(
id=u.id,
email=u.email,
is_admin=u.is_admin,
notification_prefs=u.notification_prefs or {},
created_at=u.created_at,
follow_count=counts.get(u.id, 0),
)
for u in users
]
@router.delete("/users/{user_id}", status_code=204)
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
"""Delete a user account (cascades to their follows). Cannot delete yourself."""
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
await db.delete(user)
await db.commit()
@router.patch("/users/{user_id}/toggle-admin", response_model=UserResponse)
async def toggle_admin(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
"""Promote or demote a user's admin status."""
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot change your own admin status")
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.is_admin = not user.is_admin
await db.commit()
await db.refresh(user)
return user
# ── Analysis Stats ────────────────────────────────────────────────────────────
@router.get("/stats")
async def get_stats(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_admin),
):
"""Return analysis pipeline progress counters."""
total_bills = (await db.execute(select(func.count()).select_from(Bill))).scalar()
docs_fetched = (await db.execute(
select(func.count()).select_from(BillDocument).where(BillDocument.raw_text.isnot(None))
)).scalar()
total_briefs = (await db.execute(select(func.count()).select_from(BillBrief))).scalar()
full_briefs = (await db.execute(
select(func.count()).select_from(BillBrief).where(BillBrief.brief_type == "full")
)).scalar()
amendment_briefs = (await db.execute(
select(func.count()).select_from(BillBrief).where(BillBrief.brief_type == "amendment")
)).scalar()
uncited_briefs = (await db.execute(
text("""
SELECT COUNT(*) FROM bill_briefs
WHERE key_points IS NOT NULL
AND jsonb_array_length(key_points) > 0
AND jsonb_typeof(key_points->0) = 'string'
""")
)).scalar()
return {
"total_bills": total_bills,
"docs_fetched": docs_fetched,
"briefs_generated": total_briefs,
"full_briefs": full_briefs,
"amendment_briefs": amendment_briefs,
"uncited_briefs": uncited_briefs,
"remaining": total_bills - total_briefs,
}
# ── Celery Tasks ──────────────────────────────────────────────────────────────
@router.post("/backfill-citations")
async def backfill_citations(current_user: User = Depends(get_current_admin)):
"""Delete pre-citation briefs and re-queue LLM processing using stored document text."""
from app.workers.llm_processor import backfill_brief_citations
task = backfill_brief_citations.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/backfill-sponsors")
async def backfill_sponsors(current_user: User = Depends(get_current_admin)):
from app.workers.congress_poller import backfill_sponsor_ids
task = backfill_sponsor_ids.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/trigger-poll")
async def trigger_poll(current_user: User = Depends(get_current_admin)):
from app.workers.congress_poller import poll_congress_bills
task = poll_congress_bills.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/trigger-member-sync")
async def trigger_member_sync(current_user: User = Depends(get_current_admin)):
from app.workers.congress_poller import sync_members
task = sync_members.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/trigger-fetch-actions")
async def trigger_fetch_actions(current_user: User = Depends(get_current_admin)):
from app.workers.congress_poller import fetch_actions_for_active_bills
task = fetch_actions_for_active_bills.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/trigger-trend-scores")
async def trigger_trend_scores(current_user: User = Depends(get_current_admin)):
from app.workers.trend_scorer import calculate_all_trend_scores
task = calculate_all_trend_scores.delay()
return {"task_id": task.id, "status": "queued"}
@router.post("/bills/{bill_id}/reprocess")
async def reprocess_bill(bill_id: str, current_user: User = Depends(get_current_admin)):
"""Queue document and action fetches for a specific bill. Useful for debugging."""
from app.workers.document_fetcher import fetch_bill_documents
from app.workers.congress_poller import fetch_bill_actions
doc_task = fetch_bill_documents.delay(bill_id)
actions_task = fetch_bill_actions.delay(bill_id)
return {"task_ids": {"documents": doc_task.id, "actions": actions_task.id}}
@router.get("/task-status/{task_id}")
async def get_task_status(task_id: str, current_user: User = Depends(get_current_admin)):
from app.workers.celery_app import celery_app
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}