Same bug as members.py: health check used old CD\d+FP$ regex (no match for CD119) and skipped GEOID. Now mirrors members.py logic: GEOID primary, STATE+CD\d+ fallback, Congressional layer filter. Authored by: Jack Levy
507 lines
20 KiB
Python
507 lines
20 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import func, select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.dependencies import get_current_admin
|
|
from app.database import get_db
|
|
from app.models import Bill, BillBrief, BillDocument, Follow
|
|
from app.models.user import User
|
|
from app.schemas.schemas import UserResponse
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ── User Management ───────────────────────────────────────────────────────────
|
|
|
|
class UserWithStats(UserResponse):
|
|
follow_count: int
|
|
|
|
|
|
@router.get("/users", response_model=list[UserWithStats])
|
|
async def list_users(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin),
|
|
):
|
|
"""List all users with their follow counts."""
|
|
users_result = await db.execute(select(User).order_by(User.created_at))
|
|
users = users_result.scalars().all()
|
|
|
|
counts_result = await db.execute(
|
|
select(Follow.user_id, func.count(Follow.id).label("cnt"))
|
|
.group_by(Follow.user_id)
|
|
)
|
|
counts = {row.user_id: row.cnt for row in counts_result}
|
|
|
|
return [
|
|
UserWithStats(
|
|
id=u.id,
|
|
email=u.email,
|
|
is_admin=u.is_admin,
|
|
notification_prefs=u.notification_prefs or {},
|
|
created_at=u.created_at,
|
|
follow_count=counts.get(u.id, 0),
|
|
)
|
|
for u in users
|
|
]
|
|
|
|
|
|
@router.delete("/users/{user_id}", status_code=204)
|
|
async def delete_user(
|
|
user_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin),
|
|
):
|
|
"""Delete a user account (cascades to their follows). Cannot delete yourself."""
|
|
if user_id == current_user.id:
|
|
raise HTTPException(status_code=400, detail="Cannot delete your own account")
|
|
user = await db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
await db.delete(user)
|
|
await db.commit()
|
|
|
|
|
|
@router.patch("/users/{user_id}/toggle-admin", response_model=UserResponse)
|
|
async def toggle_admin(
|
|
user_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin),
|
|
):
|
|
"""Promote or demote a user's admin status."""
|
|
if user_id == current_user.id:
|
|
raise HTTPException(status_code=400, detail="Cannot change your own admin status")
|
|
user = await db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
user.is_admin = not user.is_admin
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
# ── Analysis Stats ────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/stats")
|
|
async def get_stats(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin),
|
|
):
|
|
"""Return analysis pipeline progress counters."""
|
|
total_bills = (await db.execute(select(func.count()).select_from(Bill))).scalar()
|
|
docs_fetched = (await db.execute(
|
|
select(func.count()).select_from(BillDocument).where(BillDocument.raw_text.isnot(None))
|
|
)).scalar()
|
|
total_briefs = (await db.execute(select(func.count()).select_from(BillBrief))).scalar()
|
|
full_briefs = (await db.execute(
|
|
select(func.count()).select_from(BillBrief).where(BillBrief.brief_type == "full")
|
|
)).scalar()
|
|
amendment_briefs = (await db.execute(
|
|
select(func.count()).select_from(BillBrief).where(BillBrief.brief_type == "amendment")
|
|
)).scalar()
|
|
uncited_briefs = (await db.execute(
|
|
text("""
|
|
SELECT COUNT(*) FROM bill_briefs
|
|
WHERE key_points IS NOT NULL
|
|
AND jsonb_array_length(key_points) > 0
|
|
AND jsonb_typeof(key_points->0) = 'string'
|
|
""")
|
|
)).scalar()
|
|
# Bills with null sponsor
|
|
bills_missing_sponsor = (await db.execute(
|
|
text("SELECT COUNT(*) FROM bills WHERE sponsor_id IS NULL")
|
|
)).scalar()
|
|
# Bills with null metadata (introduced_date / chamber / congress_url)
|
|
bills_missing_metadata = (await db.execute(
|
|
text("SELECT COUNT(*) FROM bills WHERE introduced_date IS NULL OR chamber IS NULL OR congress_url IS NULL")
|
|
)).scalar()
|
|
# Bills with no document record at all (text not yet published on GovInfo)
|
|
no_text_bills = (await db.execute(
|
|
text("""
|
|
SELECT COUNT(*) FROM bills b
|
|
LEFT JOIN bill_documents bd ON bd.bill_id = b.bill_id
|
|
WHERE bd.id IS NULL
|
|
""")
|
|
)).scalar()
|
|
# Documents that have text but no brief (LLM not yet run / failed)
|
|
pending_llm = (await db.execute(
|
|
text("""
|
|
SELECT COUNT(*) FROM bill_documents bd
|
|
LEFT JOIN bill_briefs bb ON bb.document_id = bd.id
|
|
WHERE bb.id IS NULL AND bd.raw_text IS NOT NULL
|
|
""")
|
|
)).scalar()
|
|
# Bills that have never had their action history fetched
|
|
bills_missing_actions = (await db.execute(
|
|
text("SELECT COUNT(*) FROM bills WHERE actions_fetched_at IS NULL")
|
|
)).scalar()
|
|
# Cited brief points (objects) that have no label yet
|
|
unlabeled_briefs = (await db.execute(
|
|
text("""
|
|
SELECT COUNT(*) FROM bill_briefs
|
|
WHERE (
|
|
key_points IS NOT NULL AND EXISTS (
|
|
SELECT 1 FROM jsonb_array_elements(key_points) AS p
|
|
WHERE jsonb_typeof(p) = 'object' AND (p->>'label') IS NULL
|
|
)
|
|
) OR (
|
|
risks IS NOT NULL AND EXISTS (
|
|
SELECT 1 FROM jsonb_array_elements(risks) AS r
|
|
WHERE jsonb_typeof(r) = 'object' AND (r->>'label') IS NULL
|
|
)
|
|
)
|
|
""")
|
|
)).scalar()
|
|
return {
|
|
"total_bills": total_bills,
|
|
"docs_fetched": docs_fetched,
|
|
"briefs_generated": total_briefs,
|
|
"full_briefs": full_briefs,
|
|
"amendment_briefs": amendment_briefs,
|
|
"uncited_briefs": uncited_briefs,
|
|
"no_text_bills": no_text_bills,
|
|
"pending_llm": pending_llm,
|
|
"bills_missing_sponsor": bills_missing_sponsor,
|
|
"bills_missing_metadata": bills_missing_metadata,
|
|
"bills_missing_actions": bills_missing_actions,
|
|
"unlabeled_briefs": unlabeled_briefs,
|
|
"remaining": total_bills - total_briefs,
|
|
}
|
|
|
|
|
|
# ── Celery Tasks ──────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/backfill-citations")
|
|
async def backfill_citations(current_user: User = Depends(get_current_admin)):
|
|
"""Delete pre-citation briefs and re-queue LLM processing using stored document text."""
|
|
from app.workers.llm_processor import backfill_brief_citations
|
|
task = backfill_brief_citations.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-sponsors")
|
|
async def backfill_sponsors(current_user: User = Depends(get_current_admin)):
|
|
from app.workers.congress_poller import backfill_sponsor_ids
|
|
task = backfill_sponsor_ids.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/trigger-poll")
|
|
async def trigger_poll(current_user: User = Depends(get_current_admin)):
|
|
from app.workers.congress_poller import poll_congress_bills
|
|
task = poll_congress_bills.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/trigger-member-sync")
|
|
async def trigger_member_sync(current_user: User = Depends(get_current_admin)):
|
|
from app.workers.congress_poller import sync_members
|
|
task = sync_members.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/trigger-fetch-actions")
|
|
async def trigger_fetch_actions(current_user: User = Depends(get_current_admin)):
|
|
from app.workers.congress_poller import fetch_actions_for_active_bills
|
|
task = fetch_actions_for_active_bills.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-all-actions")
|
|
async def backfill_all_actions(current_user: User = Depends(get_current_admin)):
|
|
"""Queue action fetches for every bill that has never had actions fetched."""
|
|
from app.workers.congress_poller import backfill_all_bill_actions
|
|
task = backfill_all_bill_actions.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-metadata")
|
|
async def backfill_metadata(current_user: User = Depends(get_current_admin)):
|
|
"""Fill in null introduced_date, congress_url, chamber for existing bills."""
|
|
from app.workers.congress_poller import backfill_bill_metadata
|
|
task = backfill_bill_metadata.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-labels")
|
|
async def backfill_labels(current_user: User = Depends(get_current_admin)):
|
|
"""Classify existing cited brief points as fact or inference without re-generating briefs."""
|
|
from app.workers.llm_processor import backfill_brief_labels
|
|
task = backfill_brief_labels.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-cosponsors")
|
|
async def backfill_cosponsors(current_user: User = Depends(get_current_admin)):
|
|
"""Fetch co-sponsor data from Congress.gov for all bills that haven't been fetched yet."""
|
|
from app.workers.bill_classifier import backfill_all_bill_cosponsors
|
|
task = backfill_all_bill_cosponsors.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/backfill-categories")
|
|
async def backfill_categories(current_user: User = Depends(get_current_admin)):
|
|
"""Classify all bills with text but no category as substantive/commemorative/administrative."""
|
|
from app.workers.bill_classifier import backfill_bill_categories
|
|
task = backfill_bill_categories.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/calculate-effectiveness")
|
|
async def calculate_effectiveness(current_user: User = Depends(get_current_admin)):
|
|
"""Recalculate member effectiveness scores and percentiles now."""
|
|
from app.workers.bill_classifier import calculate_effectiveness_scores
|
|
task = calculate_effectiveness_scores.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/resume-analysis")
|
|
async def resume_analysis(current_user: User = Depends(get_current_admin)):
|
|
"""Re-queue LLM processing for docs with no brief, and document fetching for bills with no doc."""
|
|
from app.workers.llm_processor import resume_pending_analysis
|
|
task = resume_pending_analysis.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/trigger-weekly-digest")
|
|
async def trigger_weekly_digest(current_user: User = Depends(get_current_admin)):
|
|
"""Send the weekly bill activity summary to all eligible users now."""
|
|
from app.workers.notification_dispatcher import send_weekly_digest
|
|
task = send_weekly_digest.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/trigger-trend-scores")
|
|
async def trigger_trend_scores(current_user: User = Depends(get_current_admin)):
|
|
from app.workers.trend_scorer import calculate_all_trend_scores
|
|
task = calculate_all_trend_scores.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.post("/bills/{bill_id}/reprocess")
|
|
async def reprocess_bill(bill_id: str, current_user: User = Depends(get_current_admin)):
|
|
"""Queue document and action fetches for a specific bill. Useful for debugging."""
|
|
from app.workers.document_fetcher import fetch_bill_documents
|
|
from app.workers.congress_poller import fetch_bill_actions
|
|
doc_task = fetch_bill_documents.delay(bill_id)
|
|
actions_task = fetch_bill_actions.delay(bill_id)
|
|
return {"task_ids": {"documents": doc_task.id, "actions": actions_task.id}}
|
|
|
|
|
|
@router.get("/newsapi-quota")
|
|
async def get_newsapi_quota(current_user: User = Depends(get_current_admin)):
|
|
"""Return today's remaining NewsAPI daily quota (calls used vs. 100/day limit)."""
|
|
from app.services.news_service import get_newsapi_quota_remaining
|
|
import asyncio
|
|
remaining = await asyncio.to_thread(get_newsapi_quota_remaining)
|
|
return {"remaining": remaining, "limit": 100}
|
|
|
|
|
|
@router.post("/clear-gnews-cache")
|
|
async def clear_gnews_cache_endpoint(current_user: User = Depends(get_current_admin)):
|
|
"""Flush the Google News RSS Redis cache so fresh data is fetched on next run."""
|
|
from app.services.news_service import clear_gnews_cache
|
|
import asyncio
|
|
cleared = await asyncio.to_thread(clear_gnews_cache)
|
|
return {"cleared": cleared}
|
|
|
|
|
|
@router.post("/submit-llm-batch")
|
|
async def submit_llm_batch_endpoint(current_user: User = Depends(get_current_admin)):
|
|
"""Submit all unbriefed documents to the Batch API (OpenAI/Anthropic only)."""
|
|
from app.workers.llm_batch_processor import submit_llm_batch
|
|
task = submit_llm_batch.delay()
|
|
return {"task_id": task.id, "status": "queued"}
|
|
|
|
|
|
@router.get("/llm-batch-status")
|
|
async def get_llm_batch_status(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_admin),
|
|
):
|
|
"""Return the current batch job state, or no_active_batch if none."""
|
|
import json
|
|
from app.models.setting import AppSetting
|
|
row = await db.get(AppSetting, "llm_active_batch")
|
|
if not row:
|
|
return {"status": "no_active_batch"}
|
|
try:
|
|
return json.loads(row.value)
|
|
except Exception:
|
|
return {"status": "unknown"}
|
|
|
|
|
|
@router.get("/api-health")
|
|
async def api_health(current_user: User = Depends(get_current_admin)):
|
|
"""Test each external API and return status + latency for each."""
|
|
import asyncio
|
|
results = await asyncio.gather(
|
|
asyncio.to_thread(_test_congress),
|
|
asyncio.to_thread(_test_govinfo),
|
|
asyncio.to_thread(_test_newsapi),
|
|
asyncio.to_thread(_test_gnews),
|
|
asyncio.to_thread(_test_rep_lookup),
|
|
return_exceptions=True,
|
|
)
|
|
keys = ["congress_gov", "govinfo", "newsapi", "google_news", "rep_lookup"]
|
|
return {
|
|
k: r if isinstance(r, dict) else {"status": "error", "detail": str(r)}
|
|
for k, r in zip(keys, results)
|
|
}
|
|
|
|
|
|
def _timed(fn):
|
|
"""Run fn(), return its dict merged with latency_ms."""
|
|
import time as _time
|
|
t0 = _time.perf_counter()
|
|
result = fn()
|
|
result["latency_ms"] = round((_time.perf_counter() - t0) * 1000)
|
|
return result
|
|
|
|
|
|
def _test_congress() -> dict:
|
|
from app.config import settings
|
|
from app.services import congress_api
|
|
if not settings.DATA_GOV_API_KEY:
|
|
return {"status": "error", "detail": "DATA_GOV_API_KEY not configured"}
|
|
def _call():
|
|
data = congress_api.get_bills(119, limit=1)
|
|
count = data.get("pagination", {}).get("count") or len(data.get("bills", []))
|
|
return {"status": "ok", "detail": f"{count:,} bills available in 119th Congress"}
|
|
try:
|
|
return _timed(_call)
|
|
except Exception as exc:
|
|
return {"status": "error", "detail": str(exc)}
|
|
|
|
|
|
def _test_govinfo() -> dict:
|
|
from app.config import settings
|
|
import requests as req
|
|
if not settings.DATA_GOV_API_KEY:
|
|
return {"status": "error", "detail": "DATA_GOV_API_KEY not configured"}
|
|
def _call():
|
|
# /collections lists all available collections — simple health check endpoint
|
|
resp = req.get(
|
|
"https://api.govinfo.gov/collections",
|
|
params={"api_key": settings.DATA_GOV_API_KEY},
|
|
timeout=15,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
collections = data.get("collections", [])
|
|
bills_col = next((c for c in collections if c.get("collectionCode") == "BILLS"), None)
|
|
if bills_col:
|
|
count = bills_col.get("packageCount", "?")
|
|
return {"status": "ok", "detail": f"BILLS collection: {count:,} packages" if isinstance(count, int) else "GovInfo reachable, BILLS collection found"}
|
|
return {"status": "ok", "detail": f"GovInfo reachable — {len(collections)} collections available"}
|
|
try:
|
|
return _timed(_call)
|
|
except Exception as exc:
|
|
return {"status": "error", "detail": str(exc)}
|
|
|
|
|
|
def _test_newsapi() -> dict:
|
|
from app.config import settings
|
|
import requests as req
|
|
if not settings.NEWSAPI_KEY:
|
|
return {"status": "skipped", "detail": "NEWSAPI_KEY not configured"}
|
|
def _call():
|
|
resp = req.get(
|
|
"https://newsapi.org/v2/top-headlines",
|
|
params={"country": "us", "pageSize": 1, "apiKey": settings.NEWSAPI_KEY},
|
|
timeout=10,
|
|
)
|
|
data = resp.json()
|
|
if data.get("status") != "ok":
|
|
return {"status": "error", "detail": data.get("message", "Unknown error")}
|
|
return {"status": "ok", "detail": f"{data.get('totalResults', 0):,} headlines available"}
|
|
try:
|
|
return _timed(_call)
|
|
except Exception as exc:
|
|
return {"status": "error", "detail": str(exc)}
|
|
|
|
|
|
def _test_gnews() -> dict:
|
|
import requests as req
|
|
def _call():
|
|
resp = req.get(
|
|
"https://news.google.com/rss/search",
|
|
params={"q": "congress", "hl": "en-US", "gl": "US", "ceid": "US:en"},
|
|
timeout=10,
|
|
headers={"User-Agent": "Mozilla/5.0"},
|
|
)
|
|
resp.raise_for_status()
|
|
item_count = resp.text.count("<item>")
|
|
return {"status": "ok", "detail": f"{item_count} items in test RSS feed"}
|
|
try:
|
|
return _timed(_call)
|
|
except Exception as exc:
|
|
return {"status": "error", "detail": str(exc)}
|
|
|
|
|
|
def _test_rep_lookup() -> dict:
|
|
import re as _re
|
|
import requests as req
|
|
def _call():
|
|
# Step 1: Nominatim ZIP → lat/lng
|
|
r1 = req.get(
|
|
"https://nominatim.openstreetmap.org/search",
|
|
params={"postalcode": "20001", "country": "US", "format": "json", "limit": "1"},
|
|
headers={"User-Agent": "PocketVeto/1.0"},
|
|
timeout=10,
|
|
)
|
|
r1.raise_for_status()
|
|
places = r1.json()
|
|
if not places:
|
|
return {"status": "error", "detail": "Nominatim: no result for test ZIP 20001"}
|
|
lat, lng = places[0]["lat"], places[0]["lon"]
|
|
half = 0.5
|
|
# Step 2: TIGERweb identify → congressional district
|
|
r2 = req.get(
|
|
"https://tigerweb.geo.census.gov/arcgis/rest/services/TIGERweb/Legislative/MapServer/identify",
|
|
params={
|
|
"f": "json",
|
|
"geometry": f"{lng},{lat}",
|
|
"geometryType": "esriGeometryPoint",
|
|
"sr": "4326",
|
|
"layers": "all",
|
|
"tolerance": "2",
|
|
"mapExtent": f"{float(lng)-half},{float(lat)-half},{float(lng)+half},{float(lat)+half}",
|
|
"imageDisplay": "100,100,96",
|
|
},
|
|
timeout=15,
|
|
)
|
|
r2.raise_for_status()
|
|
results = r2.json().get("results", [])
|
|
for item in results:
|
|
if "Congressional" not in (item.get("layerName") or ""):
|
|
continue
|
|
attrs = item.get("attributes", {})
|
|
# Primary: GEOID = state FIPS (2) + district (2)
|
|
geoid = str(attrs.get("GEOID") or "").strip()
|
|
if len(geoid) == 4 and geoid.isdigit():
|
|
district = str(int(geoid[2:])) if geoid[2:].strip("0") else "At-large"
|
|
return {"status": "ok", "detail": f"Nominatim + TIGERweb reachable — district {district} found for ZIP 20001"}
|
|
# Fallback: STATE + CD{session} (e.g. STATE="06", CD119="30")
|
|
state_val = attrs.get("STATE")
|
|
cd_field = next((k for k in attrs if _re.match(r"CD\d+$", k)), None)
|
|
if state_val and cd_field:
|
|
district = str(int(str(attrs[cd_field]))) if str(attrs[cd_field]).strip("0") else "At-large"
|
|
return {"status": "ok", "detail": f"Nominatim + TIGERweb reachable — district {district} found for ZIP 20001"}
|
|
layers = [r.get("layerName") for r in results]
|
|
return {"status": "error", "detail": f"Reachable but no CD field found. Layers: {layers}"}
|
|
try:
|
|
return _timed(_call)
|
|
except Exception as exc:
|
|
return {"status": "error", "detail": str(exc)}
|
|
|
|
|
|
@router.get("/task-status/{task_id}")
|
|
async def get_task_status(task_id: str, current_user: User = Depends(get_current_admin)):
|
|
from app.workers.celery_app import celery_app
|
|
result = celery_app.AsyncResult(task_id)
|
|
return {
|
|
"task_id": task_id,
|
|
"status": result.status,
|
|
"result": result.result if result.ready() else None,
|
|
}
|