Files
PocketVeto/backend/app/workers/document_fetcher.py
Jack Levy 7e5c5b473e feat: API optimizations — quota batching, ETags, caching, async sponsor (v0.9.7)
Nine efficiency improvements across the data pipeline:

1. NewsAPI OR batching (news_service.py + news_fetcher.py)
   - Combine up to 4 bills per NewsAPI call using OR query syntax
   - NEWSAPI_BATCH_SIZE=4 means ~4× effective daily quota (100→400 bill-fetches)
   - fetch_news_for_bill_batch task; fetch_news_for_active_bills queues batches

2. Google News RSS cache (news_service.py)
   - 2-hour Redis cache shared between news_fetcher and trend_scorer
   - Eliminates duplicate RSS hits when both workers run against same bill
   - clear_gnews_cache() admin helper + admin endpoint

3. pytrends keyword batching (trends_service.py + trend_scorer.py)
   - Compare up to 5 bills per pytrends call instead of 1
   - get_trends_scores_batch() returns scores in original order
   - Reduces pytrends calls by ~5× and associated rate-limit risk

4. GovInfo ETags (govinfo_api.py + document_fetcher.py)
   - If-None-Match conditional GET; DocumentUnchangedError on HTTP 304
   - ETags stored in Redis (30-day TTL) keyed by MD5(url)
   - document_fetcher catches DocumentUnchangedError → {"status": "unchanged"}

5. Anthropic prompt caching (llm_service.py)
   - cache_control: {type: ephemeral} on system messages in AnthropicProvider
   - Caches the ~700-token system prompt server-side; ~50% cost reduction on
     repeated calls within the 5-minute cache window

6. Async sponsor fetch (congress_poller.py)
   - New fetch_sponsor_for_bill Celery task replaces blocking get_bill_detail()
     inline in poll loop
   - Bills saved immediately with sponsor_id=None; sponsor linked async
   - Removes 0.25s sleep per new bill from poll hot path

7. Skip doc fetch for procedural actions (congress_poller.py)
   - _DOC_PRODUCING_CATEGORIES = {vote, committee_report, presidential, ...}
   - fetch_bill_documents only enqueued when action is likely to produce
     new GovInfo text (saves ~60–70% of unnecessary document fetch attempts)

8. Adaptive poll frequency (congress_poller.py)
   - _is_congress_off_hours(): weekends + before 9AM / after 9PM EST
   - Skips poll if off-hours AND last poll < 1 hour ago
   - Prevents wasteful polling when Congress is not in session

9. Admin panel additions (admin.py + settings/page.tsx + api.ts)
   - GET /api/admin/newsapi-quota → remaining calls today
   - POST /api/admin/clear-gnews-cache → flush RSS cache
   - Settings page shows NewsAPI quota remaining (amber if < 10)
   - "Clear Google News Cache" button in Manual Controls

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 16:50:51 -04:00

93 lines
3.3 KiB
Python

"""
Document fetcher — retrieves bill text from GovInfo and stores it.
Triggered by congress_poller when a new bill is detected.
"""
import logging
from datetime import datetime, timezone
from app.database import get_sync_db
from app.models import Bill, BillDocument
from app.services import congress_api, govinfo_api
from app.services.govinfo_api import DocumentUnchangedError
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3, name="app.workers.document_fetcher.fetch_bill_documents")
def fetch_bill_documents(self, bill_id: str):
"""Fetch bill text from GovInfo and store it. Then enqueue LLM processing."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
logger.warning(f"Bill {bill_id} not found in DB")
return {"status": "not_found"}
# Get text versions from Congress.gov
try:
text_response = congress_api.get_bill_text_versions(
bill.congress_number, bill.bill_type, bill.bill_number
)
except Exception as e:
logger.warning(f"No text versions for {bill_id}: {e}")
return {"status": "no_text_versions"}
text_versions = text_response.get("textVersions", [])
if not text_versions:
return {"status": "no_text_versions"}
url, fmt = govinfo_api.find_best_text_url(text_versions)
if not url:
return {"status": "no_suitable_format"}
# Idempotency: skip if we already have this exact document version
existing = (
db.query(BillDocument)
.filter_by(bill_id=bill_id, govinfo_url=url)
.filter(BillDocument.raw_text.isnot(None))
.first()
)
if existing:
return {"status": "already_fetched", "bill_id": bill_id}
logger.info(f"Fetching {bill_id} document ({fmt}) from {url}")
try:
raw_text = govinfo_api.fetch_text_from_url(url, fmt)
except DocumentUnchangedError:
logger.info(f"Document unchanged for {bill_id} (ETag match) — skipping")
return {"status": "unchanged", "bill_id": bill_id}
if not raw_text:
raise ValueError(f"Empty text returned for {bill_id}")
# Get version label from first text version
type_obj = text_versions[0].get("type", {}) if text_versions else {}
doc_version = type_obj.get("name") if isinstance(type_obj, dict) else type_obj
doc = BillDocument(
bill_id=bill_id,
doc_type="bill_text",
doc_version=doc_version,
govinfo_url=url,
raw_text=raw_text,
fetched_at=datetime.now(timezone.utc),
)
db.add(doc)
db.commit()
db.refresh(doc)
logger.info(f"Stored document {doc.id} for bill {bill_id} ({len(raw_text):,} chars)")
# Enqueue LLM processing
from app.workers.llm_processor import process_document_with_llm
process_document_with_llm.delay(doc.id)
return {"status": "ok", "document_id": doc.id, "chars": len(raw_text)}
except Exception as exc:
db.rollback()
logger.error(f"Document fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=120)
finally:
db.close()