96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
"""
|
|
GovInfo API client for fetching actual bill text.
|
|
|
|
Priority order for text formats: htm > txt > pdf
|
|
"""
|
|
import logging
|
|
import re
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
GOVINFO_BASE = "https://api.govinfo.gov"
|
|
FORMAT_PRIORITY = ["htm", "html", "txt", "pdf"]
|
|
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=15))
|
|
def _get(url: str, params: dict = None) -> requests.Response:
|
|
p = {"api_key": settings.DATA_GOV_API_KEY, **(params or {})}
|
|
response = requests.get(url, params=p, timeout=60)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
|
|
def get_package_summary(package_id: str) -> dict:
|
|
response = _get(f"{GOVINFO_BASE}/packages/{package_id}/summary")
|
|
return response.json()
|
|
|
|
|
|
def get_package_content_detail(package_id: str) -> dict:
|
|
response = _get(f"{GOVINFO_BASE}/packages/{package_id}/content-detail")
|
|
return response.json()
|
|
|
|
|
|
def find_best_text_url(text_versions: list[dict]) -> Optional[tuple[str, str]]:
|
|
"""
|
|
From a list of text version objects (from Congress.gov API), find the best
|
|
available text format. Returns (url, format) or None.
|
|
Matches by URL extension since Congress.gov type strings are "Formatted Text", "PDF", etc.
|
|
"""
|
|
for fmt in FORMAT_PRIORITY:
|
|
for version in text_versions:
|
|
for fmt_info in version.get("formats", []):
|
|
if not isinstance(fmt_info, dict):
|
|
continue
|
|
url = fmt_info.get("url", "")
|
|
if url.lower().endswith(f".{fmt}"):
|
|
return url, fmt
|
|
return None, None
|
|
|
|
|
|
def fetch_text_from_url(url: str, fmt: str) -> Optional[str]:
|
|
"""Download and extract plain text from a GovInfo document URL."""
|
|
try:
|
|
response = requests.get(url, timeout=120)
|
|
response.raise_for_status()
|
|
|
|
if fmt in ("htm", "html"):
|
|
return _extract_from_html(response.text)
|
|
elif fmt == "txt":
|
|
return response.text
|
|
elif fmt == "pdf":
|
|
return _extract_from_pdf(response.content)
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch text from {url}: {e}")
|
|
return None
|
|
|
|
|
|
def _extract_from_html(html: str) -> str:
|
|
"""Strip HTML tags and clean up whitespace."""
|
|
soup = BeautifulSoup(html, "lxml")
|
|
# Remove script/style tags
|
|
for tag in soup(["script", "style", "nav", "header", "footer"]):
|
|
tag.decompose()
|
|
text = soup.get_text(separator="\n")
|
|
# Collapse excessive whitespace
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
text = re.sub(r" {2,}", " ", text)
|
|
return text.strip()
|
|
|
|
|
|
def _extract_from_pdf(content: bytes) -> Optional[str]:
|
|
"""Extract text from PDF bytes using pdfminer."""
|
|
try:
|
|
from io import BytesIO
|
|
from pdfminer.high_level import extract_text as pdf_extract
|
|
return pdf_extract(BytesIO(content))
|
|
except Exception as e:
|
|
logger.error(f"PDF extraction failed: {e}")
|
|
return None
|