""" LLM processor — generates AI briefs for fetched bill documents. Triggered by document_fetcher after successful text retrieval. """ import logging from app.database import get_sync_db from app.models import Bill, BillBrief, BillDocument, Member from app.services.llm_service import get_llm_provider from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task( bind=True, max_retries=2, rate_limit="10/m", # Respect LLM provider rate limits name="app.workers.llm_processor.process_document_with_llm", ) def process_document_with_llm(self, document_id: int): """Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions.""" db = get_sync_db() try: # Idempotency: skip if brief already exists for this document existing = db.query(BillBrief).filter_by(document_id=document_id).first() if existing: return {"status": "already_processed", "brief_id": existing.id} doc = db.get(BillDocument, document_id) if not doc or not doc.raw_text: logger.warning(f"Document {document_id} not found or has no text") return {"status": "no_document"} bill = db.get(Bill, doc.bill_id) if not bill: return {"status": "no_bill"} sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None bill_metadata = { "title": bill.title or "Unknown Title", "sponsor_name": sponsor.name if sponsor else "Unknown", "party": sponsor.party if sponsor else "Unknown", "state": sponsor.state if sponsor else "Unknown", "chamber": bill.chamber or "Unknown", "introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown", "latest_action_text": bill.latest_action_text or "None", "latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown", } # Check if a full brief already exists for this bill (from an earlier document version) previous_full_brief = ( db.query(BillBrief) .filter_by(bill_id=doc.bill_id, brief_type="full") .order_by(BillBrief.created_at.desc()) .first() ) provider = get_llm_provider() if previous_full_brief and previous_full_brief.document_id: # New version of a bill we've already analyzed — generate amendment brief previous_doc = db.get(BillDocument, previous_full_brief.document_id) if previous_doc and previous_doc.raw_text: logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})") brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata) brief_type = "amendment" else: logger.info(f"Previous document unavailable, generating full brief for document {document_id}") brief = provider.generate_brief(doc.raw_text, bill_metadata) brief_type = "full" else: logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})") brief = provider.generate_brief(doc.raw_text, bill_metadata) brief_type = "full" db_brief = BillBrief( bill_id=doc.bill_id, document_id=document_id, brief_type=brief_type, summary=brief.summary, key_points=brief.key_points, risks=brief.risks, deadlines=brief.deadlines, topic_tags=brief.topic_tags, llm_provider=brief.llm_provider, llm_model=brief.llm_model, ) db.add(db_brief) db.commit() db.refresh(db_brief) logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}") # Trigger news fetch now that we have topic tags from app.workers.news_fetcher import fetch_news_for_bill fetch_news_for_bill.delay(doc.bill_id) return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type} except Exception as exc: db.rollback() logger.error(f"LLM processing failed for document {document_id}: {exc}") raise self.retry(exc=exc, countdown=300) # 5 min backoff for LLM failures finally: db.close()