Nine efficiency improvements across the data pipeline:
1. NewsAPI OR batching (news_service.py + news_fetcher.py)
- Combine up to 4 bills per NewsAPI call using OR query syntax
- NEWSAPI_BATCH_SIZE=4 means ~4× effective daily quota (100→400 bill-fetches)
- fetch_news_for_bill_batch task; fetch_news_for_active_bills queues batches
2. Google News RSS cache (news_service.py)
- 2-hour Redis cache shared between news_fetcher and trend_scorer
- Eliminates duplicate RSS hits when both workers run against same bill
- clear_gnews_cache() admin helper + admin endpoint
3. pytrends keyword batching (trends_service.py + trend_scorer.py)
- Compare up to 5 bills per pytrends call instead of 1
- get_trends_scores_batch() returns scores in original order
- Reduces pytrends calls by ~5× and associated rate-limit risk
4. GovInfo ETags (govinfo_api.py + document_fetcher.py)
- If-None-Match conditional GET; DocumentUnchangedError on HTTP 304
- ETags stored in Redis (30-day TTL) keyed by MD5(url)
- document_fetcher catches DocumentUnchangedError → {"status": "unchanged"}
5. Anthropic prompt caching (llm_service.py)
- cache_control: {type: ephemeral} on system messages in AnthropicProvider
- Caches the ~700-token system prompt server-side; ~50% cost reduction on
repeated calls within the 5-minute cache window
6. Async sponsor fetch (congress_poller.py)
- New fetch_sponsor_for_bill Celery task replaces blocking get_bill_detail()
inline in poll loop
- Bills saved immediately with sponsor_id=None; sponsor linked async
- Removes 0.25s sleep per new bill from poll hot path
7. Skip doc fetch for procedural actions (congress_poller.py)
- _DOC_PRODUCING_CATEGORIES = {vote, committee_report, presidential, ...}
- fetch_bill_documents only enqueued when action is likely to produce
new GovInfo text (saves ~60–70% of unnecessary document fetch attempts)
8. Adaptive poll frequency (congress_poller.py)
- _is_congress_off_hours(): weekends + before 9AM / after 9PM EST
- Skips poll if off-hours AND last poll < 1 hour ago
- Prevents wasteful polling when Congress is not in session
9. Admin panel additions (admin.py + settings/page.tsx + api.ts)
- GET /api/admin/newsapi-quota → remaining calls today
- POST /api/admin/clear-gnews-cache → flush RSS cache
- Settings page shows NewsAPI quota remaining (amber if < 10)
- "Clear Google News Cache" button in Manual Controls
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
139 lines
4.2 KiB
Python
139 lines
4.2 KiB
Python
"""
|
|
GovInfo API client for fetching actual bill text.
|
|
|
|
Priority order for text formats: htm > txt > pdf
|
|
ETag support: stores ETags in Redis so repeat fetches skip unchanged documents.
|
|
"""
|
|
import hashlib
|
|
import logging
|
|
import re
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
GOVINFO_BASE = "https://api.govinfo.gov"
|
|
FORMAT_PRIORITY = ["htm", "html", "txt", "pdf"]
|
|
_ETAG_CACHE_TTL = 86400 * 30 # 30 days
|
|
|
|
|
|
class DocumentUnchangedError(Exception):
|
|
"""Raised when GovInfo confirms the document is unchanged via ETag (HTTP 304)."""
|
|
pass
|
|
|
|
|
|
def _etag_redis():
|
|
import redis
|
|
return redis.from_url(settings.REDIS_URL, decode_responses=True)
|
|
|
|
|
|
def _etag_key(url: str) -> str:
|
|
return f"govinfo:etag:{hashlib.md5(url.encode()).hexdigest()}"
|
|
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=15))
|
|
def _get(url: str, params: dict = None) -> requests.Response:
|
|
p = {"api_key": settings.DATA_GOV_API_KEY, **(params or {})}
|
|
response = requests.get(url, params=p, timeout=60)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
|
|
def get_package_summary(package_id: str) -> dict:
|
|
response = _get(f"{GOVINFO_BASE}/packages/{package_id}/summary")
|
|
return response.json()
|
|
|
|
|
|
def get_package_content_detail(package_id: str) -> dict:
|
|
response = _get(f"{GOVINFO_BASE}/packages/{package_id}/content-detail")
|
|
return response.json()
|
|
|
|
|
|
def find_best_text_url(text_versions: list[dict]) -> Optional[tuple[str, str]]:
|
|
"""
|
|
From a list of text version objects (from Congress.gov API), find the best
|
|
available text format. Returns (url, format) or None.
|
|
"""
|
|
for fmt in FORMAT_PRIORITY:
|
|
for version in text_versions:
|
|
for fmt_info in version.get("formats", []):
|
|
if not isinstance(fmt_info, dict):
|
|
continue
|
|
url = fmt_info.get("url", "")
|
|
if url.lower().endswith(f".{fmt}"):
|
|
return url, fmt
|
|
return None, None
|
|
|
|
|
|
def fetch_text_from_url(url: str, fmt: str) -> Optional[str]:
|
|
"""
|
|
Download and extract plain text from a GovInfo document URL.
|
|
|
|
Uses ETag conditional GET: if GovInfo returns 304 Not Modified,
|
|
raises DocumentUnchangedError so the caller can skip reprocessing.
|
|
On a successful 200 response, stores the new ETag in Redis for next time.
|
|
"""
|
|
headers = {}
|
|
try:
|
|
stored_etag = _etag_redis().get(_etag_key(url))
|
|
if stored_etag:
|
|
headers["If-None-Match"] = stored_etag
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=120)
|
|
|
|
if response.status_code == 304:
|
|
raise DocumentUnchangedError(f"Document unchanged (ETag match): {url}")
|
|
|
|
response.raise_for_status()
|
|
|
|
# Persist ETag for future conditional requests
|
|
etag = response.headers.get("ETag")
|
|
if etag:
|
|
try:
|
|
_etag_redis().setex(_etag_key(url), _ETAG_CACHE_TTL, etag)
|
|
except Exception:
|
|
pass
|
|
|
|
if fmt in ("htm", "html"):
|
|
return _extract_from_html(response.text)
|
|
elif fmt == "txt":
|
|
return response.text
|
|
elif fmt == "pdf":
|
|
return _extract_from_pdf(response.content)
|
|
|
|
except DocumentUnchangedError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch text from {url}: {e}")
|
|
return None
|
|
|
|
|
|
def _extract_from_html(html: str) -> str:
|
|
"""Strip HTML tags and clean up whitespace."""
|
|
soup = BeautifulSoup(html, "lxml")
|
|
for tag in soup(["script", "style", "nav", "header", "footer"]):
|
|
tag.decompose()
|
|
text = soup.get_text(separator="\n")
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
text = re.sub(r" {2,}", " ", text)
|
|
return text.strip()
|
|
|
|
|
|
def _extract_from_pdf(content: bytes) -> Optional[str]:
|
|
"""Extract text from PDF bytes using pdfminer."""
|
|
try:
|
|
from io import BytesIO
|
|
from pdfminer.high_level import extract_text as pdf_extract
|
|
return pdf_extract(BytesIO(content))
|
|
except Exception as e:
|
|
logger.error(f"PDF extraction failed: {e}")
|
|
return None
|