Files
PocketVeto/backend/app/services/news_service.py
Jack Levy 7e5c5b473e feat: API optimizations — quota batching, ETags, caching, async sponsor (v0.9.7)
Nine efficiency improvements across the data pipeline:

1. NewsAPI OR batching (news_service.py + news_fetcher.py)
   - Combine up to 4 bills per NewsAPI call using OR query syntax
   - NEWSAPI_BATCH_SIZE=4 means ~4× effective daily quota (100→400 bill-fetches)
   - fetch_news_for_bill_batch task; fetch_news_for_active_bills queues batches

2. Google News RSS cache (news_service.py)
   - 2-hour Redis cache shared between news_fetcher and trend_scorer
   - Eliminates duplicate RSS hits when both workers run against same bill
   - clear_gnews_cache() admin helper + admin endpoint

3. pytrends keyword batching (trends_service.py + trend_scorer.py)
   - Compare up to 5 bills per pytrends call instead of 1
   - get_trends_scores_batch() returns scores in original order
   - Reduces pytrends calls by ~5× and associated rate-limit risk

4. GovInfo ETags (govinfo_api.py + document_fetcher.py)
   - If-None-Match conditional GET; DocumentUnchangedError on HTTP 304
   - ETags stored in Redis (30-day TTL) keyed by MD5(url)
   - document_fetcher catches DocumentUnchangedError → {"status": "unchanged"}

5. Anthropic prompt caching (llm_service.py)
   - cache_control: {type: ephemeral} on system messages in AnthropicProvider
   - Caches the ~700-token system prompt server-side; ~50% cost reduction on
     repeated calls within the 5-minute cache window

6. Async sponsor fetch (congress_poller.py)
   - New fetch_sponsor_for_bill Celery task replaces blocking get_bill_detail()
     inline in poll loop
   - Bills saved immediately with sponsor_id=None; sponsor linked async
   - Removes 0.25s sleep per new bill from poll hot path

7. Skip doc fetch for procedural actions (congress_poller.py)
   - _DOC_PRODUCING_CATEGORIES = {vote, committee_report, presidential, ...}
   - fetch_bill_documents only enqueued when action is likely to produce
     new GovInfo text (saves ~60–70% of unnecessary document fetch attempts)

8. Adaptive poll frequency (congress_poller.py)
   - _is_congress_off_hours(): weekends + before 9AM / after 9PM EST
   - Skips poll if off-hours AND last poll < 1 hour ago
   - Prevents wasteful polling when Congress is not in session

9. Admin panel additions (admin.py + settings/page.tsx + api.ts)
   - GET /api/admin/newsapi-quota → remaining calls today
   - POST /api/admin/clear-gnews-cache → flush RSS cache
   - Settings page shows NewsAPI quota remaining (amber if < 10)
   - "Clear Google News Cache" button in Manual Controls

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 16:50:51 -04:00

309 lines
10 KiB
Python

"""
News correlation service.
- NewsAPI.org: structured news articles per bill (100 req/day limit)
- Google News RSS: volume signal for zeitgeist scoring (no limit)
"""
import hashlib
import json
import logging
import time
import urllib.parse
from datetime import date, datetime, timedelta, timezone
from typing import Optional
import feedparser
import redis
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
from app.config import settings
logger = logging.getLogger(__name__)
NEWSAPI_BASE = "https://newsapi.org/v2"
GOOGLE_NEWS_RSS = "https://news.google.com/rss/search"
NEWSAPI_DAILY_LIMIT = 95 # Leave 5 as buffer
NEWSAPI_BATCH_SIZE = 4 # Bills per OR-combined API call
_NEWSAPI_REDIS_PREFIX = "newsapi:daily_calls:"
_GNEWS_CACHE_TTL = 7200 # 2 hours — both trend_scorer and news_fetcher share cache
def _redis():
return redis.from_url(settings.REDIS_URL, decode_responses=True)
def _newsapi_quota_ok() -> bool:
"""Return True if we have quota remaining for today."""
try:
key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}"
used = int(_redis().get(key) or 0)
return used < NEWSAPI_DAILY_LIMIT
except Exception:
return True # Don't block on Redis errors
def _newsapi_record_call():
try:
r = _redis()
key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}"
pipe = r.pipeline()
pipe.incr(key)
pipe.expire(key, 90000) # 25 hours — expires safely after midnight
pipe.execute()
except Exception:
pass
def get_newsapi_quota_remaining() -> int:
"""Return the number of NewsAPI calls still available today."""
try:
key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}"
used = int(_redis().get(key) or 0)
return max(0, NEWSAPI_DAILY_LIMIT - used)
except Exception:
return NEWSAPI_DAILY_LIMIT
def clear_gnews_cache() -> int:
"""Delete all cached Google News RSS results. Returns number of keys deleted."""
try:
r = _redis()
keys = r.keys("gnews:*")
if keys:
return r.delete(*keys)
return 0
except Exception:
return 0
@retry(stop=stop_after_attempt(2), wait=wait_exponential(min=1, max=5))
def _newsapi_get(endpoint: str, params: dict) -> dict:
params["apiKey"] = settings.NEWSAPI_KEY
response = requests.get(f"{NEWSAPI_BASE}/{endpoint}", params=params, timeout=30)
response.raise_for_status()
return response.json()
def build_news_query(bill_title: str, short_title: Optional[str], sponsor_name: Optional[str],
bill_type: str, bill_number: int) -> str:
"""Build a NewsAPI search query for a bill."""
terms = []
if short_title:
terms.append(f'"{short_title}"')
elif bill_title:
# Use first 6 words of title as phrase
words = bill_title.split()[:6]
if len(words) >= 3:
terms.append(f'"{" ".join(words)}"')
# Add bill number as fallback
terms.append(f'"{bill_type.upper()} {bill_number}"')
return " OR ".join(terms[:2]) # Keep queries short for relevance
def fetch_newsapi_articles(query: str, days: int = 30) -> list[dict]:
"""Fetch articles from NewsAPI.org. Returns empty list if quota is exhausted or key not set."""
if not settings.NEWSAPI_KEY:
return []
if not _newsapi_quota_ok():
logger.warning("NewsAPI daily quota exhausted — skipping fetch")
return []
try:
from_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
data = _newsapi_get("everything", {
"q": query,
"language": "en",
"sortBy": "relevancy",
"pageSize": 10,
"from": from_date,
})
_newsapi_record_call()
articles = data.get("articles", [])
return [
{
"source": a.get("source", {}).get("name", ""),
"headline": a.get("title", ""),
"url": a.get("url", ""),
"published_at": a.get("publishedAt"),
}
for a in articles
if a.get("url") and a.get("title")
]
except Exception as e:
logger.error(f"NewsAPI fetch failed: {e}")
return []
def fetch_newsapi_articles_batch(
bill_queries: list[tuple[str, str]],
days: int = 30,
) -> dict[str, list[dict]]:
"""
Fetch NewsAPI articles for up to NEWSAPI_BATCH_SIZE bills in ONE API call
using OR syntax. Returns {bill_id: [articles]} — each article attributed
to the bill whose query terms appear in the headline/description.
"""
empty = {bill_id: [] for bill_id, _ in bill_queries}
if not settings.NEWSAPI_KEY or not bill_queries:
return empty
if not _newsapi_quota_ok():
logger.warning("NewsAPI daily quota exhausted — skipping batch fetch")
return empty
combined_q = " OR ".join(q for _, q in bill_queries)
try:
from_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
data = _newsapi_get("everything", {
"q": combined_q,
"language": "en",
"sortBy": "relevancy",
"pageSize": 20,
"from": from_date,
})
_newsapi_record_call()
articles = data.get("articles", [])
result: dict[str, list[dict]] = {bill_id: [] for bill_id, _ in bill_queries}
for article in articles:
content = " ".join([
article.get("title", ""),
article.get("description", "") or "",
]).lower()
for bill_id, query in bill_queries:
# Match if any meaningful term from this bill's query appears in the article
terms = [t.strip('" ').lower() for t in query.split(" OR ")]
if any(len(t) > 3 and t in content for t in terms):
result[bill_id].append({
"source": article.get("source", {}).get("name", ""),
"headline": article.get("title", ""),
"url": article.get("url", ""),
"published_at": article.get("publishedAt"),
})
return result
except Exception as e:
logger.error(f"NewsAPI batch fetch failed: {e}")
return empty
# ── Google News RSS ─────────────────────────────────────────────────────────────
def _gnews_cache_key(query: str, kind: str, days: int) -> str:
h = hashlib.md5(f"{query}:{days}".encode()).hexdigest()[:12]
return f"gnews:{kind}:{h}"
def fetch_gnews_count(query: str, days: int = 30) -> int:
"""Count articles in Google News RSS. Results cached in Redis for 2 hours."""
cache_key = _gnews_cache_key(query, "count", days)
try:
cached = _redis().get(cache_key)
if cached is not None:
return int(cached)
except Exception:
pass
count = _fetch_gnews_count_raw(query, days)
try:
_redis().setex(cache_key, _GNEWS_CACHE_TTL, count)
except Exception:
pass
return count
def _fetch_gnews_count_raw(query: str, days: int) -> int:
"""Fetch gnews article count directly (no cache)."""
try:
encoded = urllib.parse.quote(f"{query} when:{days}d")
url = f"{GOOGLE_NEWS_RSS}?q={encoded}&hl=en-US&gl=US&ceid=US:en"
time.sleep(1) # Polite delay
feed = feedparser.parse(url)
return len(feed.entries)
except Exception as e:
logger.error(f"Google News RSS fetch failed: {e}")
return 0
def _gnews_entry_url(entry) -> str:
"""Extract the article URL from a feedparser Google News RSS entry."""
link = getattr(entry, "link", None) or entry.get("link", "")
if link:
return link
for lnk in getattr(entry, "links", []):
href = lnk.get("href", "")
if href:
return href
return ""
def fetch_gnews_articles(query: str, days: int = 30) -> list[dict]:
"""Fetch articles from Google News RSS. Results cached in Redis for 2 hours."""
import time as time_mod
cache_key = _gnews_cache_key(query, "articles", days)
try:
cached = _redis().get(cache_key)
if cached is not None:
return json.loads(cached)
except Exception:
pass
articles = _fetch_gnews_articles_raw(query, days)
try:
_redis().setex(cache_key, _GNEWS_CACHE_TTL, json.dumps(articles))
except Exception:
pass
return articles
def _fetch_gnews_articles_raw(query: str, days: int) -> list[dict]:
"""Fetch gnews articles directly (no cache)."""
import time as time_mod
try:
encoded = urllib.parse.quote(f"{query} when:{days}d")
url = f"{GOOGLE_NEWS_RSS}?q={encoded}&hl=en-US&gl=US&ceid=US:en"
time.sleep(1) # Polite delay
feed = feedparser.parse(url)
articles = []
for entry in feed.entries[:20]:
pub_at = None
if getattr(entry, "published_parsed", None):
try:
pub_at = datetime.fromtimestamp(
time_mod.mktime(entry.published_parsed), tz=timezone.utc
).isoformat()
except Exception:
pass
source = ""
src = getattr(entry, "source", None)
if src:
source = getattr(src, "title", "") or src.get("title", "")
headline = entry.get("title", "") or getattr(entry, "title", "")
article_url = _gnews_entry_url(entry)
if article_url and headline:
articles.append({
"source": source or "Google News",
"headline": headline,
"url": article_url,
"published_at": pub_at,
})
return articles
except Exception as e:
logger.error(f"Google News RSS article fetch failed: {e}")
return []
def build_member_query(first_name: str, last_name: str, chamber: Optional[str] = None) -> str:
"""Build a news search query for a member of Congress."""
full_name = f"{first_name} {last_name}".strip()
title = ""
if chamber:
if "senate" in chamber.lower():
title = "Senator"
else:
title = "Rep."
if title:
return f'"{full_name}" OR "{title} {last_name}"'
return f'"{full_name}"'