""" LLM processor — generates AI briefs for fetched bill documents. Triggered by document_fetcher after successful text retrieval. """ import logging import time from sqlalchemy import text from app.config import settings from app.database import get_sync_db from app.models import Bill, BillBrief, BillDocument, Member from app.services.llm_service import RateLimitError, get_llm_provider from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task( bind=True, max_retries=8, rate_limit=f"{settings.LLM_RATE_LIMIT_RPM}/m", name="app.workers.llm_processor.process_document_with_llm", ) def process_document_with_llm(self, document_id: int): """Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions.""" db = get_sync_db() try: # Idempotency: skip if brief already exists for this document existing = db.query(BillBrief).filter_by(document_id=document_id).first() if existing: return {"status": "already_processed", "brief_id": existing.id} doc = db.get(BillDocument, document_id) if not doc or not doc.raw_text: logger.warning(f"Document {document_id} not found or has no text") return {"status": "no_document"} bill = db.get(Bill, doc.bill_id) if not bill: return {"status": "no_bill"} sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None bill_metadata = { "title": bill.title or "Unknown Title", "sponsor_name": sponsor.name if sponsor else "Unknown", "party": sponsor.party if sponsor else "Unknown", "state": sponsor.state if sponsor else "Unknown", "chamber": bill.chamber or "Unknown", "introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown", "latest_action_text": bill.latest_action_text or "None", "latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown", } # Check if a full brief already exists for this bill (from an earlier document version) previous_full_brief = ( db.query(BillBrief) .filter_by(bill_id=doc.bill_id, brief_type="full") .order_by(BillBrief.created_at.desc()) .first() ) from app.models.setting import AppSetting prov_row = db.get(AppSetting, "llm_provider") model_row = db.get(AppSetting, "llm_model") provider = get_llm_provider( prov_row.value if prov_row else None, model_row.value if model_row else None, ) if previous_full_brief and previous_full_brief.document_id: # New version of a bill we've already analyzed — generate amendment brief previous_doc = db.get(BillDocument, previous_full_brief.document_id) if previous_doc and previous_doc.raw_text: logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})") brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata) brief_type = "amendment" else: logger.info(f"Previous document unavailable, generating full brief for document {document_id}") brief = provider.generate_brief(doc.raw_text, bill_metadata) brief_type = "full" else: logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})") brief = provider.generate_brief(doc.raw_text, bill_metadata) brief_type = "full" db_brief = BillBrief( bill_id=doc.bill_id, document_id=document_id, brief_type=brief_type, summary=brief.summary, key_points=brief.key_points, risks=brief.risks, deadlines=brief.deadlines, topic_tags=brief.topic_tags, llm_provider=brief.llm_provider, llm_model=brief.llm_model, govinfo_url=doc.govinfo_url, ) db.add(db_brief) db.commit() db.refresh(db_brief) logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}") # Emit notification events for bill followers, sponsor followers, and topic followers from app.workers.notification_utils import ( emit_bill_notification, emit_member_follow_notifications, emit_topic_follow_notifications, ) event_type = "new_amendment" if brief_type == "amendment" else "new_document" emit_bill_notification(db, bill, event_type, brief.summary) emit_member_follow_notifications(db, bill, event_type, brief.summary) emit_topic_follow_notifications(db, bill, event_type, brief.summary, brief.topic_tags or []) # Trigger news fetch now that we have topic tags from app.workers.news_fetcher import fetch_news_for_bill fetch_news_for_bill.delay(doc.bill_id) return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type} except RateLimitError as exc: db.rollback() logger.warning(f"LLM rate limit hit ({exc.provider}); retrying in {exc.retry_after}s") raise self.retry(exc=exc, countdown=exc.retry_after) except Exception as exc: db.rollback() logger.error(f"LLM processing failed for document {document_id}: {exc}") raise self.retry(exc=exc, countdown=300) # 5 min backoff for other failures finally: db.close() @celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_citations") def backfill_brief_citations(self): """ Find briefs generated before citation support was added (key_points contains plain strings instead of {text, citation, quote} objects), delete them, and re-queue LLM processing against the already-stored document text. No Congress.gov or GovInfo calls — only LLM calls. """ db = get_sync_db() try: uncited = db.execute(text(""" SELECT id, document_id, bill_id FROM bill_briefs WHERE key_points IS NOT NULL AND jsonb_array_length(key_points) > 0 AND jsonb_typeof(key_points->0) = 'string' """)).fetchall() total = len(uncited) queued = 0 skipped = 0 for row in uncited: if not row.document_id: skipped += 1 continue # Confirm the document still has text before deleting the brief doc = db.get(BillDocument, row.document_id) if not doc or not doc.raw_text: skipped += 1 continue brief = db.get(BillBrief, row.id) if brief: db.delete(brief) db.commit() process_document_with_llm.delay(row.document_id) queued += 1 time.sleep(0.1) # Avoid burst-queuing all LLM tasks at once logger.info( f"backfill_brief_citations: {total} uncited briefs found, " f"{queued} re-queued, {skipped} skipped (no document text)" ) return {"total": total, "queued": queued, "skipped": skipped} finally: db.close() @celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_labels") def backfill_brief_labels(self): """ Add fact/inference labels to existing cited brief points without re-generating them. Sends one compact classification call per brief (all unlabeled points batched). Skips briefs already fully labeled and plain-string points (no quote to classify). """ import json from sqlalchemy.orm.attributes import flag_modified from app.models.setting import AppSetting db = get_sync_db() try: unlabeled_ids = db.execute(text(""" SELECT id FROM bill_briefs WHERE ( key_points IS NOT NULL AND EXISTS ( SELECT 1 FROM jsonb_array_elements(key_points) AS p WHERE jsonb_typeof(p) = 'object' AND (p->>'label') IS NULL ) ) OR ( risks IS NOT NULL AND EXISTS ( SELECT 1 FROM jsonb_array_elements(risks) AS r WHERE jsonb_typeof(r) = 'object' AND (r->>'label') IS NULL ) ) """)).fetchall() total = len(unlabeled_ids) updated = 0 skipped = 0 prov_row = db.get(AppSetting, "llm_provider") model_row = db.get(AppSetting, "llm_model") provider = get_llm_provider( prov_row.value if prov_row else None, model_row.value if model_row else None, ) for row in unlabeled_ids: brief = db.get(BillBrief, row.id) if not brief: skipped += 1 continue # Collect all unlabeled cited points across both fields to_classify: list[tuple[str, int, dict]] = [] for field_name in ("key_points", "risks"): for i, p in enumerate(getattr(brief, field_name) or []): if isinstance(p, dict) and p.get("label") is None: to_classify.append((field_name, i, p)) if not to_classify: skipped += 1 continue lines = [ f'{i + 1}. TEXT: "{p["text"]}" | QUOTE: "{p.get("quote", "")}"' for i, (_, __, p) in enumerate(to_classify) ] prompt = ( "Classify each item as 'cited_fact' or 'inference'.\n" "cited_fact = the claim is explicitly and directly stated in the quoted text.\n" "inference = analytical interpretation, projection, or implication not literally stated.\n\n" "Return ONLY a JSON array of strings, one per item, in order. No explanation.\n\n" "Items:\n" + "\n".join(lines) ) try: raw = provider.generate_text(prompt).strip() if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] labels = json.loads(raw.strip()) if not isinstance(labels, list) or len(labels) != len(to_classify): logger.warning(f"Brief {brief.id}: label count mismatch, skipping") skipped += 1 continue except Exception as exc: logger.warning(f"Brief {brief.id}: classification failed: {exc}") skipped += 1 time.sleep(0.5) continue fields_modified: set[str] = set() for (field_name, point_idx, _), label in zip(to_classify, labels): if label in ("cited_fact", "inference"): getattr(brief, field_name)[point_idx]["label"] = label fields_modified.add(field_name) for field_name in fields_modified: flag_modified(brief, field_name) db.commit() updated += 1 time.sleep(0.2) logger.info( f"backfill_brief_labels: {total} briefs found, " f"{updated} updated, {skipped} skipped" ) return {"total": total, "updated": updated, "skipped": skipped} finally: db.close() @celery_app.task(bind=True, name="app.workers.llm_processor.resume_pending_analysis") def resume_pending_analysis(self): """ Two-pass backfill for bills missing analysis: Pass 1 — Documents with no brief (LLM tasks failed/timed out): Find BillDocuments that have raw_text but no BillBrief, re-queue LLM. Pass 2 — Bills with no document at all: Find Bills with no BillDocument, re-queue document fetch (which will then chain into LLM if text is available on GovInfo). """ db = get_sync_db() try: # Pass 1: docs with raw_text but no brief docs_no_brief = db.execute(text(""" SELECT bd.id FROM bill_documents bd LEFT JOIN bill_briefs bb ON bb.document_id = bd.id WHERE bb.id IS NULL AND bd.raw_text IS NOT NULL """)).fetchall() queued_llm = 0 for row in docs_no_brief: process_document_with_llm.delay(row.id) queued_llm += 1 time.sleep(0.1) # Pass 2: bills with no document at all bills_no_doc = db.execute(text(""" SELECT b.bill_id FROM bills b LEFT JOIN bill_documents bd ON bd.bill_id = b.bill_id WHERE bd.id IS NULL """)).fetchall() queued_fetch = 0 from app.workers.document_fetcher import fetch_bill_documents for row in bills_no_doc: fetch_bill_documents.delay(row.bill_id) queued_fetch += 1 time.sleep(0.1) logger.info( f"resume_pending_analysis: {queued_llm} LLM tasks queued, " f"{queued_fetch} document fetch tasks queued" ) return {"queued_llm": queued_llm, "queued_fetch": queued_fetch} finally: db.close()