diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index 3a5dadd..9247988 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -263,6 +263,24 @@ async def reprocess_bill(bill_id: str, current_user: User = Depends(get_current_ return {"task_ids": {"documents": doc_task.id, "actions": actions_task.id}} +@router.get("/newsapi-quota") +async def get_newsapi_quota(current_user: User = Depends(get_current_admin)): + """Return today's remaining NewsAPI daily quota (calls used vs. 100/day limit).""" + from app.services.news_service import get_newsapi_quota_remaining + import asyncio + remaining = await asyncio.to_thread(get_newsapi_quota_remaining) + return {"remaining": remaining, "limit": 100} + + +@router.post("/clear-gnews-cache") +async def clear_gnews_cache_endpoint(current_user: User = Depends(get_current_admin)): + """Flush the Google News RSS Redis cache so fresh data is fetched on next run.""" + from app.services.news_service import clear_gnews_cache + import asyncio + cleared = await asyncio.to_thread(clear_gnews_cache) + return {"cleared": cleared} + + @router.get("/api-health") async def api_health(current_user: User = Depends(get_current_admin)): """Test each external API and return status + latency for each.""" diff --git a/backend/app/api/settings.py b/backend/app/api/settings.py index 687d032..e3de447 100644 --- a/backend/app/api/settings.py +++ b/backend/app/api/settings.py @@ -30,6 +30,12 @@ async def get_settings( congress_poll_interval_minutes=int(overrides.get("congress_poll_interval_minutes", settings.CONGRESS_POLL_INTERVAL_MINUTES)), newsapi_enabled=bool(settings.NEWSAPI_KEY), pytrends_enabled=settings.PYTRENDS_ENABLED, + api_keys_configured={ + "openai": bool(settings.OPENAI_API_KEY), + "anthropic": bool(settings.ANTHROPIC_API_KEY), + "gemini": bool(settings.GEMINI_API_KEY), + "ollama": True, # no API key required + }, ) diff --git a/backend/app/config.py b/backend/app/config.py index 9996e14..f94d3ba 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -28,10 +28,10 @@ class Settings(BaseSettings): LLM_PROVIDER: str = "openai" # openai | anthropic | gemini | ollama OPENAI_API_KEY: str = "" - OPENAI_MODEL: str = "gpt-4o" + OPENAI_MODEL: str = "gpt-4o-mini" # gpt-4o-mini: excellent JSON quality at ~10x lower cost than gpt-4o ANTHROPIC_API_KEY: str = "" - ANTHROPIC_MODEL: str = "claude-opus-4-6" + ANTHROPIC_MODEL: str = "claude-sonnet-4-6" # Sonnet matches Opus for structured tasks at ~5x lower cost GEMINI_API_KEY: str = "" GEMINI_MODEL: str = "gemini-2.0-flash" @@ -39,6 +39,11 @@ class Settings(BaseSettings): OLLAMA_BASE_URL: str = "http://host.docker.internal:11434" OLLAMA_MODEL: str = "llama3.1" + # Max LLM requests per minute — Celery enforces this globally across all workers. + # Safe defaults: free Gemini=15 RPM, Anthropic paid=50 RPM, OpenAI paid=500 RPM. + # Raise this in .env once you confirm your API tier. + LLM_RATE_LIMIT_RPM: int = 10 + # Google Civic Information API (zip → representative lookup) # Free key: https://console.cloud.google.com/apis/library/civicinfo.googleapis.com CIVIC_API_KEY: str = "" diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index 25ab10b..940ef6b 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -298,6 +298,7 @@ class SettingsResponse(BaseModel): congress_poll_interval_minutes: int newsapi_enabled: bool pytrends_enabled: bool + api_keys_configured: dict[str, bool] # ── Collections ──────────────────────────────────────────────────────────────── diff --git a/backend/app/services/govinfo_api.py b/backend/app/services/govinfo_api.py index 15911b1..f65c0e9 100644 --- a/backend/app/services/govinfo_api.py +++ b/backend/app/services/govinfo_api.py @@ -2,7 +2,9 @@ GovInfo API client for fetching actual bill text. Priority order for text formats: htm > txt > pdf +ETag support: stores ETags in Redis so repeat fetches skip unchanged documents. """ +import hashlib import logging import re from typing import Optional @@ -17,6 +19,21 @@ logger = logging.getLogger(__name__) GOVINFO_BASE = "https://api.govinfo.gov" FORMAT_PRIORITY = ["htm", "html", "txt", "pdf"] +_ETAG_CACHE_TTL = 86400 * 30 # 30 days + + +class DocumentUnchangedError(Exception): + """Raised when GovInfo confirms the document is unchanged via ETag (HTTP 304).""" + pass + + +def _etag_redis(): + import redis + return redis.from_url(settings.REDIS_URL, decode_responses=True) + + +def _etag_key(url: str) -> str: + return f"govinfo:etag:{hashlib.md5(url.encode()).hexdigest()}" @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=15)) @@ -41,7 +58,6 @@ def find_best_text_url(text_versions: list[dict]) -> Optional[tuple[str, str]]: """ From a list of text version objects (from Congress.gov API), find the best available text format. Returns (url, format) or None. - Matches by URL extension since Congress.gov type strings are "Formatted Text", "PDF", etc. """ for fmt in FORMAT_PRIORITY: for version in text_versions: @@ -55,17 +71,46 @@ def find_best_text_url(text_versions: list[dict]) -> Optional[tuple[str, str]]: def fetch_text_from_url(url: str, fmt: str) -> Optional[str]: - """Download and extract plain text from a GovInfo document URL.""" + """ + Download and extract plain text from a GovInfo document URL. + + Uses ETag conditional GET: if GovInfo returns 304 Not Modified, + raises DocumentUnchangedError so the caller can skip reprocessing. + On a successful 200 response, stores the new ETag in Redis for next time. + """ + headers = {} try: - response = requests.get(url, timeout=120) + stored_etag = _etag_redis().get(_etag_key(url)) + if stored_etag: + headers["If-None-Match"] = stored_etag + except Exception: + pass + + try: + response = requests.get(url, headers=headers, timeout=120) + + if response.status_code == 304: + raise DocumentUnchangedError(f"Document unchanged (ETag match): {url}") + response.raise_for_status() + # Persist ETag for future conditional requests + etag = response.headers.get("ETag") + if etag: + try: + _etag_redis().setex(_etag_key(url), _ETAG_CACHE_TTL, etag) + except Exception: + pass + if fmt in ("htm", "html"): return _extract_from_html(response.text) elif fmt == "txt": return response.text elif fmt == "pdf": return _extract_from_pdf(response.content) + + except DocumentUnchangedError: + raise except Exception as e: logger.error(f"Failed to fetch text from {url}: {e}") return None @@ -74,11 +119,9 @@ def fetch_text_from_url(url: str, fmt: str) -> Optional[str]: def _extract_from_html(html: str) -> str: """Strip HTML tags and clean up whitespace.""" soup = BeautifulSoup(html, "lxml") - # Remove script/style tags for tag in soup(["script", "style", "nav", "header", "footer"]): tag.decompose() text = soup.get_text(separator="\n") - # Collapse excessive whitespace text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r" {2,}", " ", text) return text.strip() diff --git a/backend/app/services/llm_service.py b/backend/app/services/llm_service.py index ad68990..51d5da5 100644 --- a/backend/app/services/llm_service.py +++ b/backend/app/services/llm_service.py @@ -14,6 +14,32 @@ from app.config import settings logger = logging.getLogger(__name__) + +class RateLimitError(Exception): + """Raised when a provider returns a rate-limit response (HTTP 429 / quota exceeded).""" + + def __init__(self, provider: str, retry_after: int = 60): + self.provider = provider + self.retry_after = retry_after + super().__init__(f"{provider} rate limit exceeded; retry after {retry_after}s") + + +def _detect_rate_limit(exc: Exception) -> bool: + """Return True if exc represents a provider rate-limit / quota error.""" + exc_type = type(exc).__name__.lower() + exc_str = str(exc).lower() + # OpenAI / Anthropic SDK raise a class named *RateLimitError + if "ratelimit" in exc_type or "rate_limit" in exc_type: + return True + # Google Gemini SDK raises ResourceExhausted + if "resourceexhausted" in exc_type: + return True + # Generic HTTP 429 or quota messages (e.g. Ollama, raw requests) + if "429" in exc_str or "rate limit" in exc_str or "quota" in exc_str: + return True + return False + + SYSTEM_PROMPT = """You are a nonpartisan legislative analyst specializing in translating complex \ legislation into clear, accurate summaries for informed citizens. You analyze bills objectively \ without political bias. @@ -182,6 +208,19 @@ def parse_brief_json(raw: str | dict, provider: str, model: str) -> ReverseBrief class LLMProvider(ABC): + _provider_name: str = "unknown" + + def _call(self, fn): + """Invoke fn(), translating provider-specific rate-limit errors to RateLimitError.""" + try: + return fn() + except RateLimitError: + raise + except Exception as exc: + if _detect_rate_limit(exc): + raise RateLimitError(self._provider_name) from exc + raise + @abstractmethod def generate_brief(self, doc_text: str, bill_metadata: dict) -> ReverseBrief: pass @@ -196,6 +235,8 @@ class LLMProvider(ABC): class OpenAIProvider(LLMProvider): + _provider_name = "openai" + def __init__(self, model: str | None = None): from openai import OpenAI self.client = OpenAI(api_key=settings.OPENAI_API_KEY) @@ -203,7 +244,7 @@ class OpenAIProvider(LLMProvider): def generate_brief(self, doc_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_prompt(doc_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self.client.chat.completions.create( + response = self._call(lambda: self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, @@ -211,13 +252,13 @@ class OpenAIProvider(LLMProvider): ], response_format={"type": "json_object"}, temperature=0.1, - ) + )) raw = response.choices[0].message.content return parse_brief_json(raw, "openai", self.model) def generate_amendment_brief(self, new_text: str, previous_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_amendment_prompt(new_text, previous_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self.client.chat.completions.create( + response = self._call(lambda: self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": AMENDMENT_SYSTEM_PROMPT}, @@ -225,20 +266,22 @@ class OpenAIProvider(LLMProvider): ], response_format={"type": "json_object"}, temperature=0.1, - ) + )) raw = response.choices[0].message.content return parse_brief_json(raw, "openai", self.model) def generate_text(self, prompt: str) -> str: - response = self.client.chat.completions.create( + response = self._call(lambda: self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.3, - ) + )) return response.choices[0].message.content or "" class AnthropicProvider(LLMProvider): + _provider_name = "anthropic" + def __init__(self, model: str | None = None): import anthropic self.client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY) @@ -246,36 +289,46 @@ class AnthropicProvider(LLMProvider): def generate_brief(self, doc_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_prompt(doc_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self.client.messages.create( + response = self._call(lambda: self.client.messages.create( model=self.model, max_tokens=4096, - system=SYSTEM_PROMPT + "\n\nIMPORTANT: Respond with ONLY valid JSON. No other text.", + system=[{ + "type": "text", + "text": SYSTEM_PROMPT + "\n\nIMPORTANT: Respond with ONLY valid JSON. No other text.", + "cache_control": {"type": "ephemeral"}, + }], messages=[{"role": "user", "content": prompt}], - ) + )) raw = response.content[0].text return parse_brief_json(raw, "anthropic", self.model) def generate_amendment_brief(self, new_text: str, previous_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_amendment_prompt(new_text, previous_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self.client.messages.create( + response = self._call(lambda: self.client.messages.create( model=self.model, max_tokens=4096, - system=AMENDMENT_SYSTEM_PROMPT + "\n\nIMPORTANT: Respond with ONLY valid JSON. No other text.", + system=[{ + "type": "text", + "text": AMENDMENT_SYSTEM_PROMPT + "\n\nIMPORTANT: Respond with ONLY valid JSON. No other text.", + "cache_control": {"type": "ephemeral"}, + }], messages=[{"role": "user", "content": prompt}], - ) + )) raw = response.content[0].text return parse_brief_json(raw, "anthropic", self.model) def generate_text(self, prompt: str) -> str: - response = self.client.messages.create( + response = self._call(lambda: self.client.messages.create( model=self.model, max_tokens=1024, messages=[{"role": "user", "content": prompt}], - ) + )) return response.content[0].text class GeminiProvider(LLMProvider): + _provider_name = "gemini" + def __init__(self, model: str | None = None): import google.generativeai as genai genai.configure(api_key=settings.GEMINI_API_KEY) @@ -291,12 +344,12 @@ class GeminiProvider(LLMProvider): def generate_brief(self, doc_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_prompt(doc_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self._make_model(SYSTEM_PROMPT).generate_content(prompt) + response = self._call(lambda: self._make_model(SYSTEM_PROMPT).generate_content(prompt)) return parse_brief_json(response.text, "gemini", self.model_name) def generate_amendment_brief(self, new_text: str, previous_text: str, bill_metadata: dict) -> ReverseBrief: prompt = build_amendment_prompt(new_text, previous_text, bill_metadata, MAX_TOKENS_DEFAULT) - response = self._make_model(AMENDMENT_SYSTEM_PROMPT).generate_content(prompt) + response = self._call(lambda: self._make_model(AMENDMENT_SYSTEM_PROMPT).generate_content(prompt)) return parse_brief_json(response.text, "gemini", self.model_name) def generate_text(self, prompt: str) -> str: @@ -304,11 +357,13 @@ class GeminiProvider(LLMProvider): model_name=self.model_name, generation_config={"temperature": 0.3}, ) - response = model.generate_content(prompt) + response = self._call(lambda: model.generate_content(prompt)) return response.text class OllamaProvider(LLMProvider): + _provider_name = "ollama" + def __init__(self, model: str | None = None): self.base_url = settings.OLLAMA_BASE_URL.rstrip("/") self.model = model or settings.OLLAMA_MODEL diff --git a/backend/app/services/news_service.py b/backend/app/services/news_service.py index 1ca143e..7527e86 100644 --- a/backend/app/services/news_service.py +++ b/backend/app/services/news_service.py @@ -4,6 +4,8 @@ News correlation service. - NewsAPI.org: structured news articles per bill (100 req/day limit) - Google News RSS: volume signal for zeitgeist scoring (no limit) """ +import hashlib +import json import logging import time import urllib.parse @@ -22,11 +24,13 @@ logger = logging.getLogger(__name__) NEWSAPI_BASE = "https://newsapi.org/v2" GOOGLE_NEWS_RSS = "https://news.google.com/rss/search" NEWSAPI_DAILY_LIMIT = 95 # Leave 5 as buffer +NEWSAPI_BATCH_SIZE = 4 # Bills per OR-combined API call _NEWSAPI_REDIS_PREFIX = "newsapi:daily_calls:" +_GNEWS_CACHE_TTL = 7200 # 2 hours — both trend_scorer and news_fetcher share cache -def _newsapi_redis(): +def _redis(): return redis.from_url(settings.REDIS_URL, decode_responses=True) @@ -34,7 +38,7 @@ def _newsapi_quota_ok() -> bool: """Return True if we have quota remaining for today.""" try: key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}" - used = int(_newsapi_redis().get(key) or 0) + used = int(_redis().get(key) or 0) return used < NEWSAPI_DAILY_LIMIT except Exception: return True # Don't block on Redis errors @@ -42,7 +46,7 @@ def _newsapi_quota_ok() -> bool: def _newsapi_record_call(): try: - r = _newsapi_redis() + r = _redis() key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}" pipe = r.pipeline() pipe.incr(key) @@ -52,6 +56,28 @@ def _newsapi_record_call(): pass +def get_newsapi_quota_remaining() -> int: + """Return the number of NewsAPI calls still available today.""" + try: + key = f"{_NEWSAPI_REDIS_PREFIX}{date.today().isoformat()}" + used = int(_redis().get(key) or 0) + return max(0, NEWSAPI_DAILY_LIMIT - used) + except Exception: + return NEWSAPI_DAILY_LIMIT + + +def clear_gnews_cache() -> int: + """Delete all cached Google News RSS results. Returns number of keys deleted.""" + try: + r = _redis() + keys = r.keys("gnews:*") + if keys: + return r.delete(*keys) + return 0 + except Exception: + return 0 + + @retry(stop=stop_after_attempt(2), wait=wait_exponential(min=1, max=5)) def _newsapi_get(endpoint: str, params: dict) -> dict: params["apiKey"] = settings.NEWSAPI_KEY @@ -109,8 +135,85 @@ def fetch_newsapi_articles(query: str, days: int = 30) -> list[dict]: return [] +def fetch_newsapi_articles_batch( + bill_queries: list[tuple[str, str]], + days: int = 30, +) -> dict[str, list[dict]]: + """ + Fetch NewsAPI articles for up to NEWSAPI_BATCH_SIZE bills in ONE API call + using OR syntax. Returns {bill_id: [articles]} — each article attributed + to the bill whose query terms appear in the headline/description. + """ + empty = {bill_id: [] for bill_id, _ in bill_queries} + if not settings.NEWSAPI_KEY or not bill_queries: + return empty + if not _newsapi_quota_ok(): + logger.warning("NewsAPI daily quota exhausted — skipping batch fetch") + return empty + + combined_q = " OR ".join(q for _, q in bill_queries) + try: + from_date = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d") + data = _newsapi_get("everything", { + "q": combined_q, + "language": "en", + "sortBy": "relevancy", + "pageSize": 20, + "from": from_date, + }) + _newsapi_record_call() + articles = data.get("articles", []) + + result: dict[str, list[dict]] = {bill_id: [] for bill_id, _ in bill_queries} + for article in articles: + content = " ".join([ + article.get("title", ""), + article.get("description", "") or "", + ]).lower() + for bill_id, query in bill_queries: + # Match if any meaningful term from this bill's query appears in the article + terms = [t.strip('" ').lower() for t in query.split(" OR ")] + if any(len(t) > 3 and t in content for t in terms): + result[bill_id].append({ + "source": article.get("source", {}).get("name", ""), + "headline": article.get("title", ""), + "url": article.get("url", ""), + "published_at": article.get("publishedAt"), + }) + return result + except Exception as e: + logger.error(f"NewsAPI batch fetch failed: {e}") + return empty + + +# ── Google News RSS ───────────────────────────────────────────────────────────── + +def _gnews_cache_key(query: str, kind: str, days: int) -> str: + h = hashlib.md5(f"{query}:{days}".encode()).hexdigest()[:12] + return f"gnews:{kind}:{h}" + + def fetch_gnews_count(query: str, days: int = 30) -> int: - """Count articles in Google News RSS for the past N days. Used as volume signal.""" + """Count articles in Google News RSS. Results cached in Redis for 2 hours.""" + cache_key = _gnews_cache_key(query, "count", days) + try: + cached = _redis().get(cache_key) + if cached is not None: + return int(cached) + except Exception: + pass + + count = _fetch_gnews_count_raw(query, days) + + try: + _redis().setex(cache_key, _GNEWS_CACHE_TTL, count) + except Exception: + pass + return count + + +def _fetch_gnews_count_raw(query: str, days: int) -> int: + """Fetch gnews article count directly (no cache).""" try: encoded = urllib.parse.quote(f"{query} when:{days}d") url = f"{GOOGLE_NEWS_RSS}?q={encoded}&hl=en-US&gl=US&ceid=US:en" @@ -124,11 +227,9 @@ def fetch_gnews_count(query: str, days: int = 30) -> int: def _gnews_entry_url(entry) -> str: """Extract the article URL from a feedparser Google News RSS entry.""" - # Primary: entry.link attribute link = getattr(entry, "link", None) or entry.get("link", "") if link: return link - # Fallback: scan entry.links list for rel=alternate for lnk in getattr(entry, "links", []): href = lnk.get("href", "") if href: @@ -137,7 +238,27 @@ def _gnews_entry_url(entry) -> str: def fetch_gnews_articles(query: str, days: int = 30) -> list[dict]: - """Fetch articles from Google News RSS. No rate limit — unlimited source.""" + """Fetch articles from Google News RSS. Results cached in Redis for 2 hours.""" + import time as time_mod + cache_key = _gnews_cache_key(query, "articles", days) + try: + cached = _redis().get(cache_key) + if cached is not None: + return json.loads(cached) + except Exception: + pass + + articles = _fetch_gnews_articles_raw(query, days) + + try: + _redis().setex(cache_key, _GNEWS_CACHE_TTL, json.dumps(articles)) + except Exception: + pass + return articles + + +def _fetch_gnews_articles_raw(query: str, days: int) -> list[dict]: + """Fetch gnews articles directly (no cache).""" import time as time_mod try: encoded = urllib.parse.quote(f"{query} when:{days}d") @@ -154,7 +275,6 @@ def fetch_gnews_articles(query: str, days: int = 30) -> list[dict]: ).isoformat() except Exception: pass - # Source: feedparser puts it in entry.source.title for Google News source = "" src = getattr(entry, "source", None) if src: diff --git a/backend/app/services/trends_service.py b/backend/app/services/trends_service.py index 520288c..aed6c45 100644 --- a/backend/app/services/trends_service.py +++ b/backend/app/services/trends_service.py @@ -50,6 +50,46 @@ def get_trends_score(keywords: list[str]) -> float: return 0.0 +def get_trends_scores_batch(keyword_groups: list[list[str]]) -> list[float]: + """ + Get pytrends scores for up to 5 keyword groups in a SINGLE pytrends call. + Takes the first (most relevant) keyword from each group and compares them + relative to each other. Falls back to per-group individual calls if the + batch fails. + + Returns a list of scores (0–100) in the same order as keyword_groups. + """ + if not settings.PYTRENDS_ENABLED or not keyword_groups: + return [0.0] * len(keyword_groups) + + # Extract the primary (first) keyword from each group, skip empty groups + primaries = [(i, kws[0]) for i, kws in enumerate(keyword_groups) if kws] + if not primaries: + return [0.0] * len(keyword_groups) + + try: + from pytrends.request import TrendReq + + time.sleep(random.uniform(2.0, 5.0)) + pytrends = TrendReq(hl="en-US", tz=0, timeout=(10, 25)) + kw_list = [kw for _, kw in primaries[:5]] + + pytrends.build_payload(kw_list, timeframe="today 3-m", geo="US") + data = pytrends.interest_over_time() + + scores = [0.0] * len(keyword_groups) + if data is not None and not data.empty: + for idx, kw in primaries[:5]: + if kw in data.columns: + scores[idx] = float(data[kw].tail(14).mean()) + return scores + + except Exception as e: + logger.debug(f"pytrends batch failed (non-critical): {e}") + # Fallback: return zeros (individual calls would just multiply failures) + return [0.0] * len(keyword_groups) + + def keywords_for_member(first_name: str, last_name: str) -> list[str]: """Extract meaningful search keywords for a member of Congress.""" full_name = f"{first_name} {last_name}".strip() diff --git a/backend/app/workers/congress_poller.py b/backend/app/workers/congress_poller.py index 32139c7..80bbacb 100644 --- a/backend/app/workers/congress_poller.py +++ b/backend/app/workers/congress_poller.py @@ -38,6 +38,27 @@ def _set_setting(db, key: str, value: str) -> None: # (hres, sres, hconres, sconres) are procedural and not worth analyzing. TRACKED_BILL_TYPES = {"hr", "s", "hjres", "sjres"} +# Action categories that produce new bill text versions on GovInfo. +# Procedural/administrative actions (referral to committee, calendar placement) +# rarely produce a new text version, so we skip document fetching for them. +_DOC_PRODUCING_CATEGORIES = {"vote", "committee_report", "presidential", "new_document", "new_amendment"} + + +def _is_congress_off_hours() -> bool: + """Return True during periods when Congress.gov is unlikely to publish new content.""" + try: + from zoneinfo import ZoneInfo + now_est = datetime.now(ZoneInfo("America/New_York")) + except Exception: + return False + # Weekends + if now_est.weekday() >= 5: + return True + # Nights: before 9 AM or after 9 PM EST + if now_est.hour < 9 or now_est.hour >= 21: + return True + return False + @celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.poll_congress_bills") def poll_congress_bills(self): @@ -45,6 +66,16 @@ def poll_congress_bills(self): db = get_sync_db() try: last_polled = _get_setting(db, "congress_last_polled_at") + + # Adaptive: skip off-hours polls if last poll was recent (< 1 hour ago) + if _is_congress_off_hours() and last_polled: + try: + last_dt = datetime.fromisoformat(last_polled.replace("Z", "+00:00")) + if (datetime.now(timezone.utc) - last_dt) < timedelta(hours=1): + logger.info("Skipping poll — off-hours and last poll < 1 hour ago") + return {"new": 0, "updated": 0, "skipped": "off_hours"} + except Exception: + pass # On first run, seed from 2 months back rather than the full congress history if not last_polled: two_months_ago = datetime.now(timezone.utc) - timedelta(days=60) @@ -75,23 +106,19 @@ def poll_congress_bills(self): existing = db.get(Bill, bill_id) if existing is None: - # Bill list endpoint has no sponsor data — fetch detail to get it - try: - detail = congress_api.get_bill_detail( - current_congress, parsed["bill_type"], parsed["bill_number"] - ) - sponsor_id = _sync_sponsor(db, detail.get("bill", {})) - except Exception: - sponsor_id = None - parsed["sponsor_id"] = sponsor_id + # Save bill immediately; fetch sponsor detail asynchronously + parsed["sponsor_id"] = None parsed["last_checked_at"] = datetime.now(timezone.utc) db.add(Bill(**parsed)) db.commit() new_count += 1 - # Enqueue document and action fetches + # Enqueue document, action, and sponsor fetches from app.workers.document_fetcher import fetch_bill_documents fetch_bill_documents.delay(bill_id) fetch_bill_actions.delay(bill_id) + fetch_sponsor_for_bill.delay( + bill_id, current_congress, parsed["bill_type"], parsed["bill_number"] + ) else: _update_bill_if_changed(db, existing, parsed) updated_count += 1 @@ -176,6 +203,29 @@ def _sync_sponsor(db, bill_data: dict) -> str | None: return bioguide_id +@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.fetch_sponsor_for_bill") +def fetch_sponsor_for_bill(self, bill_id: str, congress: int, bill_type: str, bill_number: str): + """Async sponsor fetch: get bill detail from Congress.gov and link the sponsor. Idempotent.""" + db = get_sync_db() + try: + bill = db.get(Bill, bill_id) + if not bill: + return {"status": "not_found"} + if bill.sponsor_id: + return {"status": "already_set", "sponsor_id": bill.sponsor_id} + detail = congress_api.get_bill_detail(congress, bill_type, bill_number) + sponsor_id = _sync_sponsor(db, detail.get("bill", {})) + if sponsor_id: + bill.sponsor_id = sponsor_id + db.commit() + return {"status": "ok", "sponsor_id": sponsor_id} + except Exception as exc: + db.rollback() + raise self.retry(exc=exc, countdown=60) + finally: + db.close() + + @celery_app.task(bind=True, name="app.workers.congress_poller.backfill_sponsor_ids") def backfill_sponsor_ids(self): """Backfill sponsor_id for all bills where it is NULL by fetching bill detail from Congress.gov.""" @@ -332,9 +382,6 @@ def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool: if dirty: db.commit() if changed: - from app.workers.document_fetcher import fetch_bill_documents - fetch_bill_documents.delay(existing.bill_id) - fetch_bill_actions.delay(existing.bill_id) from app.workers.notification_utils import ( emit_bill_notification, emit_member_follow_notifications, @@ -343,6 +390,12 @@ def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool: ) action_text = parsed.get("latest_action_text", "") action_category = categorize_action(action_text) + # Only fetch new documents for actions that produce new text versions on GovInfo. + # Skip procedural/administrative actions (referral, calendar) to avoid unnecessary calls. + if not action_category or action_category in _DOC_PRODUCING_CATEGORIES: + from app.workers.document_fetcher import fetch_bill_documents + fetch_bill_documents.delay(existing.bill_id) + fetch_bill_actions.delay(existing.bill_id) if action_category: emit_bill_notification(db, existing, "bill_updated", action_text, action_category=action_category) emit_member_follow_notifications(db, existing, "bill_updated", action_text, action_category=action_category) diff --git a/backend/app/workers/document_fetcher.py b/backend/app/workers/document_fetcher.py index b745e6d..b70ef77 100644 --- a/backend/app/workers/document_fetcher.py +++ b/backend/app/workers/document_fetcher.py @@ -8,6 +8,7 @@ from datetime import datetime, timezone from app.database import get_sync_db from app.models import Bill, BillDocument from app.services import congress_api, govinfo_api +from app.services.govinfo_api import DocumentUnchangedError from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) @@ -51,7 +52,11 @@ def fetch_bill_documents(self, bill_id: str): return {"status": "already_fetched", "bill_id": bill_id} logger.info(f"Fetching {bill_id} document ({fmt}) from {url}") - raw_text = govinfo_api.fetch_text_from_url(url, fmt) + try: + raw_text = govinfo_api.fetch_text_from_url(url, fmt) + except DocumentUnchangedError: + logger.info(f"Document unchanged for {bill_id} (ETag match) — skipping") + return {"status": "unchanged", "bill_id": bill_id} if not raw_text: raise ValueError(f"Empty text returned for {bill_id}") diff --git a/backend/app/workers/llm_processor.py b/backend/app/workers/llm_processor.py index 4e0280e..e9cc351 100644 --- a/backend/app/workers/llm_processor.py +++ b/backend/app/workers/llm_processor.py @@ -7,9 +7,10 @@ import time from sqlalchemy import text +from app.config import settings from app.database import get_sync_db from app.models import Bill, BillBrief, BillDocument, Member -from app.services.llm_service import get_llm_provider +from app.services.llm_service import RateLimitError, get_llm_provider from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) @@ -17,8 +18,8 @@ logger = logging.getLogger(__name__) @celery_app.task( bind=True, - max_retries=2, - rate_limit="10/m", # Respect LLM provider rate limits + max_retries=8, + rate_limit=f"{settings.LLM_RATE_LIMIT_RPM}/m", name="app.workers.llm_processor.process_document_with_llm", ) def process_document_with_llm(self, document_id: int): @@ -120,10 +121,14 @@ def process_document_with_llm(self, document_id: int): return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type} + except RateLimitError as exc: + db.rollback() + logger.warning(f"LLM rate limit hit ({exc.provider}); retrying in {exc.retry_after}s") + raise self.retry(exc=exc, countdown=exc.retry_after) except Exception as exc: db.rollback() logger.error(f"LLM processing failed for document {document_id}: {exc}") - raise self.retry(exc=exc, countdown=300) # 5 min backoff for LLM failures + raise self.retry(exc=exc, countdown=300) # 5 min backoff for other failures finally: db.close() diff --git a/backend/app/workers/news_fetcher.py b/backend/app/workers/news_fetcher.py index 9b208d3..51d7d03 100644 --- a/backend/app/workers/news_fetcher.py +++ b/backend/app/workers/news_fetcher.py @@ -15,6 +15,34 @@ from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) +def _save_articles(db, bill_id: str, articles: list[dict]) -> int: + """Persist a list of article dicts for a bill, skipping duplicates. Returns saved count.""" + saved = 0 + for article in articles: + url = article.get("url") + if not url: + continue + existing = db.query(NewsArticle).filter_by(bill_id=bill_id, url=url).first() + if existing: + continue + pub_at = None + if article.get("published_at"): + try: + pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00")) + except Exception: + pass + db.add(NewsArticle( + bill_id=bill_id, + source=article.get("source", "")[:200], + headline=article.get("headline", ""), + url=url, + published_at=pub_at, + relevance_score=1.0, + )) + saved += 1 + return saved + + @celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill") def fetch_news_for_bill(self, bill_id: str): """Fetch news articles for a specific bill.""" @@ -24,15 +52,6 @@ def fetch_news_for_bill(self, bill_id: str): if not bill: return {"status": "not_found"} - # Get topic tags from latest brief - latest_brief = ( - db.query(BillBrief) - .filter_by(bill_id=bill_id) - .order_by(BillBrief.created_at.desc()) - .first() - ) - topic_tags = latest_brief.topic_tags if latest_brief else [] - query = news_service.build_news_query( bill_title=bill.title, short_title=bill.short_title, @@ -43,33 +62,7 @@ def fetch_news_for_bill(self, bill_id: str): newsapi_articles = news_service.fetch_newsapi_articles(query) gnews_articles = news_service.fetch_gnews_articles(query) - all_articles = newsapi_articles + gnews_articles - - saved = 0 - for article in all_articles: - url = article.get("url") - if not url: - continue - # Idempotency: skip duplicates per bill (same article can appear for multiple bills) - existing = db.query(NewsArticle).filter_by(bill_id=bill_id, url=url).first() - if existing: - continue - pub_at = None - if article.get("published_at"): - try: - pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00")) - except Exception: - pass - db.add(NewsArticle( - bill_id=bill_id, - source=article.get("source", "")[:200], - headline=article.get("headline", ""), - url=url, - published_at=pub_at, - relevance_score=1.0, - )) - saved += 1 - + saved = _save_articles(db, bill_id, newsapi_articles + gnews_articles) db.commit() logger.info(f"Saved {saved} news articles for bill {bill_id}") return {"status": "ok", "saved": saved} @@ -82,11 +75,63 @@ def fetch_news_for_bill(self, bill_id: str): db.close() +@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill_batch") +def fetch_news_for_bill_batch(self, bill_ids: list): + """ + Fetch news for a batch of bills in ONE NewsAPI call using OR query syntax + (up to NEWSAPI_BATCH_SIZE bills per call). Google News is fetched per-bill + but served from the 2-hour Redis cache so the RSS is only hit once per query. + """ + db = get_sync_db() + try: + bills = [db.get(Bill, bid) for bid in bill_ids] + bills = [b for b in bills if b] + if not bills: + return {"status": "no_bills"} + + # Build (bill_id, query) pairs for the batch NewsAPI call + bill_queries = [ + ( + bill.bill_id, + news_service.build_news_query( + bill_title=bill.title, + short_title=bill.short_title, + sponsor_name=None, + bill_type=bill.bill_type, + bill_number=bill.bill_number, + ), + ) + for bill in bills + ] + + # One NewsAPI call for the whole batch + newsapi_batch = news_service.fetch_newsapi_articles_batch(bill_queries) + + total_saved = 0 + for bill in bills: + query = next(q for bid, q in bill_queries if bid == bill.bill_id) + newsapi_articles = newsapi_batch.get(bill.bill_id, []) + # Google News is cached — fine to call per-bill (cache hit after first) + gnews_articles = news_service.fetch_gnews_articles(query) + total_saved += _save_articles(db, bill.bill_id, newsapi_articles + gnews_articles) + + db.commit() + logger.info(f"Batch saved {total_saved} articles for {len(bills)} bills") + return {"status": "ok", "bills": len(bills), "saved": total_saved} + + except Exception as exc: + db.rollback() + logger.error(f"Batch news fetch failed: {exc}") + raise self.retry(exc=exc, countdown=300) + finally: + db.close() + + @celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills") def fetch_news_for_active_bills(self): """ Scheduled task: fetch news for bills with recent actions (last 7 days). - Respects the 100/day NewsAPI limit by processing at most 80 bills per run. + Groups bills into batches of NEWSAPI_BATCH_SIZE to multiply effective quota. """ db = get_sync_db() try: @@ -98,10 +143,17 @@ def fetch_news_for_active_bills(self): .limit(80) .all() ) - for bill in active_bills: - fetch_news_for_bill.delay(bill.bill_id) - logger.info(f"Queued news fetch for {len(active_bills)} active bills") - return {"queued": len(active_bills)} + bill_ids = [b.bill_id for b in active_bills] + batch_size = news_service.NEWSAPI_BATCH_SIZE + batches = [bill_ids[i:i + batch_size] for i in range(0, len(bill_ids), batch_size)] + for batch in batches: + fetch_news_for_bill_batch.delay(batch) + + logger.info( + f"Queued {len(batches)} news batches for {len(active_bills)} active bills " + f"({batch_size} bills/batch)" + ) + return {"queued_batches": len(batches), "total_bills": len(active_bills)} finally: db.close() diff --git a/backend/app/workers/trend_scorer.py b/backend/app/workers/trend_scorer.py index 8b57193..0abfdc4 100644 --- a/backend/app/workers/trend_scorer.py +++ b/backend/app/workers/trend_scorer.py @@ -14,6 +14,8 @@ from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) +_PYTRENDS_BATCH = 5 # max keywords pytrends accepts per call + def calculate_composite_score(newsapi_count: int, gnews_count: int, gtrends_score: float) -> float: """ @@ -40,66 +42,76 @@ def calculate_all_trend_scores(self): .all() ) - scored = 0 today = date.today() + # Filter to bills not yet scored today + bills_to_score = [] for bill in active_bills: - # Skip if already scored today existing = ( db.query(TrendScore) .filter_by(bill_id=bill.bill_id, score_date=today) .first() ) - if existing: - continue + if not existing: + bills_to_score.append(bill) - # Get latest brief for topic tags - latest_brief = ( - db.query(BillBrief) - .filter_by(bill_id=bill.bill_id) - .order_by(BillBrief.created_at.desc()) - .first() - ) - topic_tags = latest_brief.topic_tags if latest_brief else [] + scored = 0 - # Build search query - query = news_service.build_news_query( - bill_title=bill.title, - short_title=bill.short_title, - sponsor_name=None, - bill_type=bill.bill_type, - bill_number=bill.bill_number, - ) + # Process in batches of _PYTRENDS_BATCH so one pytrends call covers multiple bills + for batch_start in range(0, len(bills_to_score), _PYTRENDS_BATCH): + batch = bills_to_score[batch_start: batch_start + _PYTRENDS_BATCH] - # Fetch counts - newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) - newsapi_count = len(newsapi_articles) - gnews_count = news_service.fetch_gnews_count(query, days=30) + # Collect keyword groups for pytrends batch call + keyword_groups = [] + bill_queries = [] + for bill in batch: + latest_brief = ( + db.query(BillBrief) + .filter_by(bill_id=bill.bill_id) + .order_by(BillBrief.created_at.desc()) + .first() + ) + topic_tags = latest_brief.topic_tags if latest_brief else [] + query = news_service.build_news_query( + bill_title=bill.title, + short_title=bill.short_title, + sponsor_name=None, + bill_type=bill.bill_type, + bill_number=bill.bill_number, + ) + keywords = trends_service.keywords_for_bill( + title=bill.title or "", + short_title=bill.short_title or "", + topic_tags=topic_tags, + ) + keyword_groups.append(keywords) + bill_queries.append(query) - # Google Trends - keywords = trends_service.keywords_for_bill( - title=bill.title or "", - short_title=bill.short_title or "", - topic_tags=topic_tags, - ) - gtrends_score = trends_service.get_trends_score(keywords) + # One pytrends call for the whole batch + gtrends_scores = trends_service.get_trends_scores_batch(keyword_groups) - composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score) + for i, bill in enumerate(batch): + query = bill_queries[i] + # NewsAPI + Google News counts (gnews served from 2-hour cache) + newsapi_articles = news_service.fetch_newsapi_articles(query, days=30) + newsapi_count = len(newsapi_articles) + gnews_count = news_service.fetch_gnews_count(query, days=30) + gtrends_score = gtrends_scores[i] - db.add(TrendScore( - bill_id=bill.bill_id, - score_date=today, - newsapi_count=newsapi_count, - gnews_count=gnews_count, - gtrends_score=gtrends_score, - composite_score=composite, - )) - scored += 1 + composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score) - if scored % 20 == 0: - db.commit() + db.add(TrendScore( + bill_id=bill.bill_id, + score_date=today, + newsapi_count=newsapi_count, + gnews_count=gnews_count, + gtrends_score=gtrends_score, + composite_score=composite, + )) + scored += 1 + + db.commit() - db.commit() logger.info(f"Scored {scored} bills") return {"scored": scored} diff --git a/frontend/app/settings/page.tsx b/frontend/app/settings/page.tsx index c157411..eb8af19 100644 --- a/frontend/app/settings/page.tsx +++ b/frontend/app/settings/page.tsx @@ -26,10 +26,34 @@ import { settingsAPI, adminAPI, notificationsAPI, type AdminUser, type LLMModel, import { useAuthStore } from "@/stores/authStore"; const LLM_PROVIDERS = [ - { value: "openai", label: "OpenAI", hint: "Requires OPENAI_API_KEY in .env" }, - { value: "anthropic", label: "Anthropic (Claude)", hint: "Requires ANTHROPIC_API_KEY in .env" }, - { value: "gemini", label: "Google Gemini", hint: "Requires GEMINI_API_KEY in .env" }, - { value: "ollama", label: "Ollama (Local)", hint: "Requires Ollama running on host" }, + { + value: "openai", + label: "OpenAI", + hint: "Requires OPENAI_API_KEY in .env", + rateNote: "Free: 3 RPM · Paid tier 1: 500 RPM", + modelNote: "Recommended: gpt-4o-mini — excellent JSON quality at ~10× lower cost than gpt-4o", + }, + { + value: "anthropic", + label: "Anthropic (Claude)", + hint: "Requires ANTHROPIC_API_KEY in .env", + rateNote: "Tier 1: 50 RPM · Tier 2: 1,000 RPM", + modelNote: "Recommended: claude-sonnet-4-6 — matches Opus quality at ~5× lower cost", + }, + { + value: "gemini", + label: "Google Gemini", + hint: "Requires GEMINI_API_KEY in .env", + rateNote: "Free: 15 RPM · Paid: 2,000 RPM", + modelNote: "Recommended: gemini-2.0-flash — best value, generous free tier", + }, + { + value: "ollama", + label: "Ollama (Local)", + hint: "Requires Ollama running on host", + rateNote: "No API rate limits", + modelNote: "Recommended: llama3.1 or mistral for reliable structured JSON output", + }, ]; @@ -139,6 +163,27 @@ export default function SettingsPage() { const [confirmDelete, setConfirmDelete] = useState(null); const [showMaintenance, setShowMaintenance] = useState(false); + const { data: newsApiQuota, refetch: refetchQuota } = useQuery({ + queryKey: ["newsapi-quota"], + queryFn: () => adminAPI.getNewsApiQuota(), + enabled: !!currentUser?.is_admin && !!settings?.newsapi_enabled, + staleTime: 60_000, + }); + const [clearingCache, setClearingCache] = useState(false); + const [cacheClearResult, setCacheClearResult] = useState(null); + const clearGnewsCache = async () => { + setClearingCache(true); + setCacheClearResult(null); + try { + const result = await adminAPI.clearGnewsCache(); + setCacheClearResult(`Cleared ${result.cleared} cached entries`); + } catch (e: unknown) { + setCacheClearResult(e instanceof Error ? e.message : "Failed"); + } finally { + setClearingCache(false); + } + }; + const testLLM = async () => { setTesting(true); setTestResult(null); @@ -421,26 +466,42 @@ export default function SettingsPage() { LLM Provider
- {LLM_PROVIDERS.map(({ value, label, hint }) => ( - - ))} + {LLM_PROVIDERS.map(({ value, label, hint, rateNote, modelNote }) => { + const hasKey = settings?.api_keys_configured?.[value] ?? true; + return ( + + ); + })}
{/* Model picker — live from provider API */} @@ -568,9 +629,16 @@ export default function SettingsPage() {
NewsAPI.org
100 requests/day free tier
- - {settings?.newsapi_enabled ? "Configured" : "Not configured"} - +
+ {newsApiQuota && ( + + {newsApiQuota.remaining}/{newsApiQuota.limit} remaining today + + )} + + {settings?.newsapi_enabled ? "Configured" : "Not configured"} + +
@@ -697,6 +765,31 @@ export default function SettingsPage() {
); + // Clear RSS cache — inline action (returns count, not task_id) + const ClearCacheRow = ( +
+
+
+
+ Clear Google News Cache + {cacheClearResult && ( + ✓ {cacheClearResult} + )} +
+

+ Flush the 2-hour Google News RSS cache so fresh articles are fetched on the next trend scoring or news run. +

+
+ +
+ ); + const recurring: ControlItem[] = [ { key: "poll", @@ -798,6 +891,7 @@ export default function SettingsPage() { <>
{recurring.map(renderRow)} + {ClearCacheRow}
{/* Maintenance subsection */} diff --git a/frontend/lib/api.ts b/frontend/lib/api.ts index 6a81fdf..d6dc159 100644 --- a/frontend/lib/api.ts +++ b/frontend/lib/api.ts @@ -293,4 +293,8 @@ export const adminAPI = { apiClient.get("/api/admin/api-health").then((r) => r.data), getTaskStatus: (taskId: string) => apiClient.get(`/api/admin/task-status/${taskId}`).then((r) => r.data), + getNewsApiQuota: () => + apiClient.get<{ remaining: number; limit: number }>("/api/admin/newsapi-quota").then((r) => r.data), + clearGnewsCache: () => + apiClient.post<{ cleared: number }>("/api/admin/clear-gnews-cache").then((r) => r.data), }; diff --git a/frontend/lib/types.ts b/frontend/lib/types.ts index d171c62..dca60fd 100644 --- a/frontend/lib/types.ts +++ b/frontend/lib/types.ts @@ -157,6 +157,7 @@ export interface SettingsData { congress_poll_interval_minutes: number; newsapi_enabled: boolean; pytrends_enabled: boolean; + api_keys_configured: Record; } export interface BillNote {