- get_llm_provider() now accepts provider + model args so DB overrides propagate through to all provider constructors (was always reading env vars, ignoring the admin UI selection) - /test-llm replaced with lightweight ping (max_tokens=20) instead of running a full bill analysis; shows model name + reply, no truncation - /api/settings/llm-models endpoint fetches available models live from each provider's API (OpenAI, Anthropic REST, Gemini, Ollama) - Admin UI model picker dynamically populated from provider API; falls back to manual text input on error; Custom model name option kept - Default Gemini model updated: gemini-1.5-pro → gemini-2.0-flash Co-Authored-By: Jack Levy
202 lines
7.6 KiB
Python
202 lines
7.6 KiB
Python
"""
|
|
LLM processor — generates AI briefs for fetched bill documents.
|
|
Triggered by document_fetcher after successful text retrieval.
|
|
"""
|
|
import logging
|
|
import time
|
|
|
|
from sqlalchemy import text
|
|
|
|
from app.database import get_sync_db
|
|
from app.models import Bill, BillBrief, BillDocument, Member
|
|
from app.services.llm_service import get_llm_provider
|
|
from app.workers.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(
|
|
bind=True,
|
|
max_retries=2,
|
|
rate_limit="10/m", # Respect LLM provider rate limits
|
|
name="app.workers.llm_processor.process_document_with_llm",
|
|
)
|
|
def process_document_with_llm(self, document_id: int):
|
|
"""Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions."""
|
|
db = get_sync_db()
|
|
try:
|
|
# Idempotency: skip if brief already exists for this document
|
|
existing = db.query(BillBrief).filter_by(document_id=document_id).first()
|
|
if existing:
|
|
return {"status": "already_processed", "brief_id": existing.id}
|
|
|
|
doc = db.get(BillDocument, document_id)
|
|
if not doc or not doc.raw_text:
|
|
logger.warning(f"Document {document_id} not found or has no text")
|
|
return {"status": "no_document"}
|
|
|
|
bill = db.get(Bill, doc.bill_id)
|
|
if not bill:
|
|
return {"status": "no_bill"}
|
|
|
|
sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None
|
|
|
|
bill_metadata = {
|
|
"title": bill.title or "Unknown Title",
|
|
"sponsor_name": sponsor.name if sponsor else "Unknown",
|
|
"party": sponsor.party if sponsor else "Unknown",
|
|
"state": sponsor.state if sponsor else "Unknown",
|
|
"chamber": bill.chamber or "Unknown",
|
|
"introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown",
|
|
"latest_action_text": bill.latest_action_text or "None",
|
|
"latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown",
|
|
}
|
|
|
|
# Check if a full brief already exists for this bill (from an earlier document version)
|
|
previous_full_brief = (
|
|
db.query(BillBrief)
|
|
.filter_by(bill_id=doc.bill_id, brief_type="full")
|
|
.order_by(BillBrief.created_at.desc())
|
|
.first()
|
|
)
|
|
|
|
from app.models.setting import AppSetting
|
|
prov_row = db.get(AppSetting, "llm_provider")
|
|
model_row = db.get(AppSetting, "llm_model")
|
|
provider = get_llm_provider(
|
|
prov_row.value if prov_row else None,
|
|
model_row.value if model_row else None,
|
|
)
|
|
|
|
if previous_full_brief and previous_full_brief.document_id:
|
|
# New version of a bill we've already analyzed — generate amendment brief
|
|
previous_doc = db.get(BillDocument, previous_full_brief.document_id)
|
|
if previous_doc and previous_doc.raw_text:
|
|
logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})")
|
|
brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata)
|
|
brief_type = "amendment"
|
|
else:
|
|
logger.info(f"Previous document unavailable, generating full brief for document {document_id}")
|
|
brief = provider.generate_brief(doc.raw_text, bill_metadata)
|
|
brief_type = "full"
|
|
else:
|
|
logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})")
|
|
brief = provider.generate_brief(doc.raw_text, bill_metadata)
|
|
brief_type = "full"
|
|
|
|
db_brief = BillBrief(
|
|
bill_id=doc.bill_id,
|
|
document_id=document_id,
|
|
brief_type=brief_type,
|
|
summary=brief.summary,
|
|
key_points=brief.key_points,
|
|
risks=brief.risks,
|
|
deadlines=brief.deadlines,
|
|
topic_tags=brief.topic_tags,
|
|
llm_provider=brief.llm_provider,
|
|
llm_model=brief.llm_model,
|
|
govinfo_url=doc.govinfo_url,
|
|
)
|
|
db.add(db_brief)
|
|
db.commit()
|
|
db.refresh(db_brief)
|
|
|
|
logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}")
|
|
|
|
# Emit notification events for users who follow this bill
|
|
_emit_notification_events(db, bill, doc.bill_id, brief_type, brief.summary)
|
|
|
|
# Trigger news fetch now that we have topic tags
|
|
from app.workers.news_fetcher import fetch_news_for_bill
|
|
fetch_news_for_bill.delay(doc.bill_id)
|
|
|
|
return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"LLM processing failed for document {document_id}: {exc}")
|
|
raise self.retry(exc=exc, countdown=300) # 5 min backoff for LLM failures
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _emit_notification_events(db, bill, bill_id: str, brief_type: str, summary: str | None) -> None:
|
|
"""Create a NotificationEvent row for every user following this bill."""
|
|
from app.models.follow import Follow
|
|
from app.models.notification import NotificationEvent
|
|
from app.config import settings
|
|
|
|
followers = db.query(Follow).filter_by(follow_type="bill", follow_value=bill_id).all()
|
|
if not followers:
|
|
return
|
|
|
|
base_url = (settings.PUBLIC_URL or settings.LOCAL_URL).rstrip("/")
|
|
payload = {
|
|
"bill_title": bill.short_title or bill.title or "",
|
|
"bill_label": f"{bill.bill_type.upper()} {bill.bill_number}",
|
|
"brief_summary": (summary or "")[:300],
|
|
"bill_url": f"{base_url}/bills/{bill_id}",
|
|
}
|
|
event_type = "new_amendment" if brief_type == "amendment" else "new_document"
|
|
|
|
for follow in followers:
|
|
db.add(NotificationEvent(
|
|
user_id=follow.user_id,
|
|
bill_id=bill_id,
|
|
event_type=event_type,
|
|
payload=payload,
|
|
))
|
|
db.commit()
|
|
|
|
|
|
@celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_citations")
|
|
def backfill_brief_citations(self):
|
|
"""
|
|
Find briefs generated before citation support was added (key_points contains plain
|
|
strings instead of {text, citation, quote} objects), delete them, and re-queue
|
|
LLM processing against the already-stored document text.
|
|
|
|
No Congress.gov or GovInfo calls — only LLM calls.
|
|
"""
|
|
db = get_sync_db()
|
|
try:
|
|
uncited = db.execute(text("""
|
|
SELECT id, document_id, bill_id
|
|
FROM bill_briefs
|
|
WHERE key_points IS NOT NULL
|
|
AND jsonb_array_length(key_points) > 0
|
|
AND jsonb_typeof(key_points->0) = 'string'
|
|
""")).fetchall()
|
|
|
|
total = len(uncited)
|
|
queued = 0
|
|
skipped = 0
|
|
|
|
for row in uncited:
|
|
if not row.document_id:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Confirm the document still has text before deleting the brief
|
|
doc = db.get(BillDocument, row.document_id)
|
|
if not doc or not doc.raw_text:
|
|
skipped += 1
|
|
continue
|
|
|
|
brief = db.get(BillBrief, row.id)
|
|
if brief:
|
|
db.delete(brief)
|
|
db.commit()
|
|
|
|
process_document_with_llm.delay(row.document_id)
|
|
queued += 1
|
|
time.sleep(0.1) # Avoid burst-queuing all LLM tasks at once
|
|
|
|
logger.info(
|
|
f"backfill_brief_citations: {total} uncited briefs found, "
|
|
f"{queued} re-queued, {skipped} skipped (no document text)"
|
|
)
|
|
return {"total": total, "queued": queued, "skipped": skipped}
|
|
finally:
|
|
db.close()
|