Nine efficiency improvements across the data pipeline:
1. NewsAPI OR batching (news_service.py + news_fetcher.py)
- Combine up to 4 bills per NewsAPI call using OR query syntax
- NEWSAPI_BATCH_SIZE=4 means ~4× effective daily quota (100→400 bill-fetches)
- fetch_news_for_bill_batch task; fetch_news_for_active_bills queues batches
2. Google News RSS cache (news_service.py)
- 2-hour Redis cache shared between news_fetcher and trend_scorer
- Eliminates duplicate RSS hits when both workers run against same bill
- clear_gnews_cache() admin helper + admin endpoint
3. pytrends keyword batching (trends_service.py + trend_scorer.py)
- Compare up to 5 bills per pytrends call instead of 1
- get_trends_scores_batch() returns scores in original order
- Reduces pytrends calls by ~5× and associated rate-limit risk
4. GovInfo ETags (govinfo_api.py + document_fetcher.py)
- If-None-Match conditional GET; DocumentUnchangedError on HTTP 304
- ETags stored in Redis (30-day TTL) keyed by MD5(url)
- document_fetcher catches DocumentUnchangedError → {"status": "unchanged"}
5. Anthropic prompt caching (llm_service.py)
- cache_control: {type: ephemeral} on system messages in AnthropicProvider
- Caches the ~700-token system prompt server-side; ~50% cost reduction on
repeated calls within the 5-minute cache window
6. Async sponsor fetch (congress_poller.py)
- New fetch_sponsor_for_bill Celery task replaces blocking get_bill_detail()
inline in poll loop
- Bills saved immediately with sponsor_id=None; sponsor linked async
- Removes 0.25s sleep per new bill from poll hot path
7. Skip doc fetch for procedural actions (congress_poller.py)
- _DOC_PRODUCING_CATEGORIES = {vote, committee_report, presidential, ...}
- fetch_bill_documents only enqueued when action is likely to produce
new GovInfo text (saves ~60–70% of unnecessary document fetch attempts)
8. Adaptive poll frequency (congress_poller.py)
- _is_congress_off_hours(): weekends + before 9AM / after 9PM EST
- Skips poll if off-hours AND last poll < 1 hour ago
- Prevents wasteful polling when Congress is not in session
9. Admin panel additions (admin.py + settings/page.tsx + api.ts)
- GET /api/admin/newsapi-quota → remaining calls today
- POST /api/admin/clear-gnews-cache → flush RSS cache
- Settings page shows NewsAPI quota remaining (amber if < 10)
- "Clear Google News Cache" button in Manual Controls
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
"""
|
|
Document fetcher — retrieves bill text from GovInfo and stores it.
|
|
Triggered by congress_poller when a new bill is detected.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from app.database import get_sync_db
|
|
from app.models import Bill, BillDocument
|
|
from app.services import congress_api, govinfo_api
|
|
from app.services.govinfo_api import DocumentUnchangedError
|
|
from app.workers.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(bind=True, max_retries=3, name="app.workers.document_fetcher.fetch_bill_documents")
|
|
def fetch_bill_documents(self, bill_id: str):
|
|
"""Fetch bill text from GovInfo and store it. Then enqueue LLM processing."""
|
|
db = get_sync_db()
|
|
try:
|
|
bill = db.get(Bill, bill_id)
|
|
if not bill:
|
|
logger.warning(f"Bill {bill_id} not found in DB")
|
|
return {"status": "not_found"}
|
|
|
|
# Get text versions from Congress.gov
|
|
try:
|
|
text_response = congress_api.get_bill_text_versions(
|
|
bill.congress_number, bill.bill_type, bill.bill_number
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"No text versions for {bill_id}: {e}")
|
|
return {"status": "no_text_versions"}
|
|
|
|
text_versions = text_response.get("textVersions", [])
|
|
if not text_versions:
|
|
return {"status": "no_text_versions"}
|
|
|
|
url, fmt = govinfo_api.find_best_text_url(text_versions)
|
|
if not url:
|
|
return {"status": "no_suitable_format"}
|
|
|
|
# Idempotency: skip if we already have this exact document version
|
|
existing = (
|
|
db.query(BillDocument)
|
|
.filter_by(bill_id=bill_id, govinfo_url=url)
|
|
.filter(BillDocument.raw_text.isnot(None))
|
|
.first()
|
|
)
|
|
if existing:
|
|
return {"status": "already_fetched", "bill_id": bill_id}
|
|
|
|
logger.info(f"Fetching {bill_id} document ({fmt}) from {url}")
|
|
try:
|
|
raw_text = govinfo_api.fetch_text_from_url(url, fmt)
|
|
except DocumentUnchangedError:
|
|
logger.info(f"Document unchanged for {bill_id} (ETag match) — skipping")
|
|
return {"status": "unchanged", "bill_id": bill_id}
|
|
if not raw_text:
|
|
raise ValueError(f"Empty text returned for {bill_id}")
|
|
|
|
# Get version label from first text version
|
|
type_obj = text_versions[0].get("type", {}) if text_versions else {}
|
|
doc_version = type_obj.get("name") if isinstance(type_obj, dict) else type_obj
|
|
|
|
doc = BillDocument(
|
|
bill_id=bill_id,
|
|
doc_type="bill_text",
|
|
doc_version=doc_version,
|
|
govinfo_url=url,
|
|
raw_text=raw_text,
|
|
fetched_at=datetime.now(timezone.utc),
|
|
)
|
|
db.add(doc)
|
|
db.commit()
|
|
db.refresh(doc)
|
|
|
|
logger.info(f"Stored document {doc.id} for bill {bill_id} ({len(raw_text):,} chars)")
|
|
|
|
# Enqueue LLM processing
|
|
from app.workers.llm_processor import process_document_with_llm
|
|
process_document_with_llm.delay(doc.id)
|
|
|
|
return {"status": "ok", "document_id": doc.id, "chars": len(raw_text)}
|
|
|
|
except Exception as exc:
|
|
db.rollback()
|
|
logger.error(f"Document fetch failed for {bill_id}: {exc}")
|
|
raise self.retry(exc=exc, countdown=120)
|
|
finally:
|
|
db.close()
|