Files
PocketVeto/backend/app/workers/llm_processor.py
Jack Levy 73881b2404 feat(notifications): follow modes, milestone alerts, notification enhancements
Follow Modes (neutral / pocket_veto / pocket_boost):
- Alembic migration 0013 adds follow_mode column to follows table
- FollowButton rewritten as mode-aware dropdown for bills; simple toggle for members/topics
- PATCH /api/follows/{id}/mode endpoint with validation
- Dispatcher filters pocket_veto follows (suppress new_document/new_amendment events)
- Dispatcher adds ntfy Actions header for pocket_boost follows

Change-driven (milestone) Alerts:
- New notification_utils.py with shared emit helpers and 30-min dedup
- congress_poller emits bill_updated events on milestone action text
- llm_processor replaced with shared emit util (also notifies member/topic followers)

Notification Enhancements:
- ntfy priority levels (high for bill_updated, default for others)
- Quiet hours (UTC): dispatcher holds events outside allowed window
- Digest mode (daily/weekly): send_notification_digest Celery beat task
- Notification history endpoint + Recent Alerts UI section
- Enriched following page (bill titles, member photos/details via sub-components)
- Follow mode test buttons in admin settings panel

Infrastructure:
- nginx: switch upstream blocks to set $variable proxy_pass so Docker DNS
  re-resolves upstream IPs after container rebuilds (valid=10s)
- TROUBLESHOOTING.md documenting common Docker/nginx/postgres gotchas

Authored-By: Jack Levy
2026-03-01 15:09:13 -05:00

234 lines
8.8 KiB
Python

"""
LLM processor — generates AI briefs for fetched bill documents.
Triggered by document_fetcher after successful text retrieval.
"""
import logging
import time
from sqlalchemy import text
from app.database import get_sync_db
from app.models import Bill, BillBrief, BillDocument, Member
from app.services.llm_service import get_llm_provider
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
max_retries=2,
rate_limit="10/m", # Respect LLM provider rate limits
name="app.workers.llm_processor.process_document_with_llm",
)
def process_document_with_llm(self, document_id: int):
"""Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions."""
db = get_sync_db()
try:
# Idempotency: skip if brief already exists for this document
existing = db.query(BillBrief).filter_by(document_id=document_id).first()
if existing:
return {"status": "already_processed", "brief_id": existing.id}
doc = db.get(BillDocument, document_id)
if not doc or not doc.raw_text:
logger.warning(f"Document {document_id} not found or has no text")
return {"status": "no_document"}
bill = db.get(Bill, doc.bill_id)
if not bill:
return {"status": "no_bill"}
sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None
bill_metadata = {
"title": bill.title or "Unknown Title",
"sponsor_name": sponsor.name if sponsor else "Unknown",
"party": sponsor.party if sponsor else "Unknown",
"state": sponsor.state if sponsor else "Unknown",
"chamber": bill.chamber or "Unknown",
"introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown",
"latest_action_text": bill.latest_action_text or "None",
"latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown",
}
# Check if a full brief already exists for this bill (from an earlier document version)
previous_full_brief = (
db.query(BillBrief)
.filter_by(bill_id=doc.bill_id, brief_type="full")
.order_by(BillBrief.created_at.desc())
.first()
)
from app.models.setting import AppSetting
prov_row = db.get(AppSetting, "llm_provider")
model_row = db.get(AppSetting, "llm_model")
provider = get_llm_provider(
prov_row.value if prov_row else None,
model_row.value if model_row else None,
)
if previous_full_brief and previous_full_brief.document_id:
# New version of a bill we've already analyzed — generate amendment brief
previous_doc = db.get(BillDocument, previous_full_brief.document_id)
if previous_doc and previous_doc.raw_text:
logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata)
brief_type = "amendment"
else:
logger.info(f"Previous document unavailable, generating full brief for document {document_id}")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
else:
logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
db_brief = BillBrief(
bill_id=doc.bill_id,
document_id=document_id,
brief_type=brief_type,
summary=brief.summary,
key_points=brief.key_points,
risks=brief.risks,
deadlines=brief.deadlines,
topic_tags=brief.topic_tags,
llm_provider=brief.llm_provider,
llm_model=brief.llm_model,
govinfo_url=doc.govinfo_url,
)
db.add(db_brief)
db.commit()
db.refresh(db_brief)
logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}")
# Emit notification events for bill followers, sponsor followers, and topic followers
from app.workers.notification_utils import (
emit_bill_notification,
emit_member_follow_notifications,
emit_topic_follow_notifications,
)
event_type = "new_amendment" if brief_type == "amendment" else "new_document"
emit_bill_notification(db, bill, event_type, brief.summary)
emit_member_follow_notifications(db, bill, event_type, brief.summary)
emit_topic_follow_notifications(db, bill, event_type, brief.summary, brief.topic_tags or [])
# Trigger news fetch now that we have topic tags
from app.workers.news_fetcher import fetch_news_for_bill
fetch_news_for_bill.delay(doc.bill_id)
return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type}
except Exception as exc:
db.rollback()
logger.error(f"LLM processing failed for document {document_id}: {exc}")
raise self.retry(exc=exc, countdown=300) # 5 min backoff for LLM failures
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_citations")
def backfill_brief_citations(self):
"""
Find briefs generated before citation support was added (key_points contains plain
strings instead of {text, citation, quote} objects), delete them, and re-queue
LLM processing against the already-stored document text.
No Congress.gov or GovInfo calls — only LLM calls.
"""
db = get_sync_db()
try:
uncited = db.execute(text("""
SELECT id, document_id, bill_id
FROM bill_briefs
WHERE key_points IS NOT NULL
AND jsonb_array_length(key_points) > 0
AND jsonb_typeof(key_points->0) = 'string'
""")).fetchall()
total = len(uncited)
queued = 0
skipped = 0
for row in uncited:
if not row.document_id:
skipped += 1
continue
# Confirm the document still has text before deleting the brief
doc = db.get(BillDocument, row.document_id)
if not doc or not doc.raw_text:
skipped += 1
continue
brief = db.get(BillBrief, row.id)
if brief:
db.delete(brief)
db.commit()
process_document_with_llm.delay(row.document_id)
queued += 1
time.sleep(0.1) # Avoid burst-queuing all LLM tasks at once
logger.info(
f"backfill_brief_citations: {total} uncited briefs found, "
f"{queued} re-queued, {skipped} skipped (no document text)"
)
return {"total": total, "queued": queued, "skipped": skipped}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.llm_processor.resume_pending_analysis")
def resume_pending_analysis(self):
"""
Two-pass backfill for bills missing analysis:
Pass 1 — Documents with no brief (LLM tasks failed/timed out):
Find BillDocuments that have raw_text but no BillBrief, re-queue LLM.
Pass 2 — Bills with no document at all:
Find Bills with no BillDocument, re-queue document fetch (which will
then chain into LLM if text is available on GovInfo).
"""
db = get_sync_db()
try:
# Pass 1: docs with raw_text but no brief
docs_no_brief = db.execute(text("""
SELECT bd.id
FROM bill_documents bd
LEFT JOIN bill_briefs bb ON bb.document_id = bd.id
WHERE bb.id IS NULL AND bd.raw_text IS NOT NULL
""")).fetchall()
queued_llm = 0
for row in docs_no_brief:
process_document_with_llm.delay(row.id)
queued_llm += 1
time.sleep(0.1)
# Pass 2: bills with no document at all
bills_no_doc = db.execute(text("""
SELECT b.bill_id
FROM bills b
LEFT JOIN bill_documents bd ON bd.bill_id = b.bill_id
WHERE bd.id IS NULL
""")).fetchall()
queued_fetch = 0
from app.workers.document_fetcher import fetch_bill_documents
for row in bills_no_doc:
fetch_bill_documents.delay(row.bill_id)
queued_fetch += 1
time.sleep(0.1)
logger.info(
f"resume_pending_analysis: {queued_llm} LLM tasks queued, "
f"{queued_fetch} document fetch tasks queued"
)
return {"queued_llm": queued_llm, "queued_fetch": queued_fetch}
finally:
db.close()