""" Document fetcher — retrieves bill text from GovInfo and stores it. Triggered by congress_poller when a new bill is detected. """ import logging from datetime import datetime, timezone from app.database import get_sync_db from app.models import Bill, BillDocument from app.services import congress_api, govinfo_api from app.services.govinfo_api import DocumentUnchangedError from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) @celery_app.task(bind=True, max_retries=3, name="app.workers.document_fetcher.fetch_bill_documents") def fetch_bill_documents(self, bill_id: str): """Fetch bill text from GovInfo and store it. Then enqueue LLM processing.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill: logger.warning(f"Bill {bill_id} not found in DB") return {"status": "not_found"} # Get text versions from Congress.gov try: text_response = congress_api.get_bill_text_versions( bill.congress_number, bill.bill_type, bill.bill_number ) except Exception as e: logger.warning(f"No text versions for {bill_id}: {e}") return {"status": "no_text_versions"} text_versions = text_response.get("textVersions", []) if not text_versions: return {"status": "no_text_versions"} url, fmt = govinfo_api.find_best_text_url(text_versions) if not url: return {"status": "no_suitable_format"} # Idempotency: skip if we already have this exact document version existing = ( db.query(BillDocument) .filter_by(bill_id=bill_id, govinfo_url=url) .filter(BillDocument.raw_text.isnot(None)) .first() ) if existing: return {"status": "already_fetched", "bill_id": bill_id} logger.info(f"Fetching {bill_id} document ({fmt}) from {url}") try: raw_text = govinfo_api.fetch_text_from_url(url, fmt) except DocumentUnchangedError: logger.info(f"Document unchanged for {bill_id} (ETag match) — skipping") return {"status": "unchanged", "bill_id": bill_id} if not raw_text: raise ValueError(f"Empty text returned for {bill_id}") # Get version label from first text version type_obj = text_versions[0].get("type", {}) if text_versions else {} doc_version = type_obj.get("name") if isinstance(type_obj, dict) else type_obj doc = BillDocument( bill_id=bill_id, doc_type="bill_text", doc_version=doc_version, govinfo_url=url, raw_text=raw_text, fetched_at=datetime.now(timezone.utc), ) db.add(doc) db.commit() db.refresh(doc) logger.info(f"Stored document {doc.id} for bill {bill_id} ({len(raw_text):,} chars)") # Enqueue LLM processing from app.workers.llm_processor import process_document_with_llm process_document_with_llm.delay(doc.id) return {"status": "ok", "document_id": doc.id, "chars": len(raw_text)} except Exception as exc: db.rollback() logger.error(f"Document fetch failed for {bill_id}: {exc}") raise self.retry(exc=exc, countdown=120) finally: db.close()