Nine efficiency improvements across the data pipeline:
1. NewsAPI OR batching (news_service.py + news_fetcher.py)
- Combine up to 4 bills per NewsAPI call using OR query syntax
- NEWSAPI_BATCH_SIZE=4 means ~4× effective daily quota (100→400 bill-fetches)
- fetch_news_for_bill_batch task; fetch_news_for_active_bills queues batches
2. Google News RSS cache (news_service.py)
- 2-hour Redis cache shared between news_fetcher and trend_scorer
- Eliminates duplicate RSS hits when both workers run against same bill
- clear_gnews_cache() admin helper + admin endpoint
3. pytrends keyword batching (trends_service.py + trend_scorer.py)
- Compare up to 5 bills per pytrends call instead of 1
- get_trends_scores_batch() returns scores in original order
- Reduces pytrends calls by ~5× and associated rate-limit risk
4. GovInfo ETags (govinfo_api.py + document_fetcher.py)
- If-None-Match conditional GET; DocumentUnchangedError on HTTP 304
- ETags stored in Redis (30-day TTL) keyed by MD5(url)
- document_fetcher catches DocumentUnchangedError → {"status": "unchanged"}
5. Anthropic prompt caching (llm_service.py)
- cache_control: {type: ephemeral} on system messages in AnthropicProvider
- Caches the ~700-token system prompt server-side; ~50% cost reduction on
repeated calls within the 5-minute cache window
6. Async sponsor fetch (congress_poller.py)
- New fetch_sponsor_for_bill Celery task replaces blocking get_bill_detail()
inline in poll loop
- Bills saved immediately with sponsor_id=None; sponsor linked async
- Removes 0.25s sleep per new bill from poll hot path
7. Skip doc fetch for procedural actions (congress_poller.py)
- _DOC_PRODUCING_CATEGORIES = {vote, committee_report, presidential, ...}
- fetch_bill_documents only enqueued when action is likely to produce
new GovInfo text (saves ~60–70% of unnecessary document fetch attempts)
8. Adaptive poll frequency (congress_poller.py)
- _is_congress_off_hours(): weekends + before 9AM / after 9PM EST
- Skips poll if off-hours AND last poll < 1 hour ago
- Prevents wasteful polling when Congress is not in session
9. Admin panel additions (admin.py + settings/page.tsx + api.ts)
- GET /api/admin/newsapi-quota → remaining calls today
- POST /api/admin/clear-gnews-cache → flush RSS cache
- Settings page shows NewsAPI quota remaining (amber if < 10)
- "Clear Google News Cache" button in Manual Controls
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
124 lines
4.4 KiB
Python
124 lines
4.4 KiB
Python
"""
|
||
Trend scorer — calculates the daily zeitgeist score for bills.
|
||
Runs nightly via Celery Beat.
|
||
"""
|
||
import logging
|
||
from datetime import date, timedelta
|
||
|
||
from sqlalchemy import and_
|
||
|
||
from app.database import get_sync_db
|
||
from app.models import Bill, BillBrief, TrendScore
|
||
from app.services import news_service, trends_service
|
||
from app.workers.celery_app import celery_app
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_PYTRENDS_BATCH = 5 # max keywords pytrends accepts per call
|
||
|
||
|
||
def calculate_composite_score(newsapi_count: int, gnews_count: int, gtrends_score: float) -> float:
|
||
"""
|
||
Weighted composite score (0–100):
|
||
NewsAPI article count → 0–40 pts (saturates at 20 articles)
|
||
Google News RSS count → 0–30 pts (saturates at 50 articles)
|
||
Google Trends score → 0–30 pts (0–100 input)
|
||
"""
|
||
newsapi_pts = min(newsapi_count / 20, 1.0) * 40
|
||
gnews_pts = min(gnews_count / 50, 1.0) * 30
|
||
gtrends_pts = (gtrends_score / 100) * 30
|
||
return round(newsapi_pts + gnews_pts + gtrends_pts, 2)
|
||
|
||
|
||
@celery_app.task(bind=True, name="app.workers.trend_scorer.calculate_all_trend_scores")
|
||
def calculate_all_trend_scores(self):
|
||
"""Nightly task: calculate trend scores for bills active in the last 90 days."""
|
||
db = get_sync_db()
|
||
try:
|
||
cutoff = date.today() - timedelta(days=90)
|
||
active_bills = (
|
||
db.query(Bill)
|
||
.filter(Bill.latest_action_date >= cutoff)
|
||
.all()
|
||
)
|
||
|
||
today = date.today()
|
||
|
||
# Filter to bills not yet scored today
|
||
bills_to_score = []
|
||
for bill in active_bills:
|
||
existing = (
|
||
db.query(TrendScore)
|
||
.filter_by(bill_id=bill.bill_id, score_date=today)
|
||
.first()
|
||
)
|
||
if not existing:
|
||
bills_to_score.append(bill)
|
||
|
||
scored = 0
|
||
|
||
# Process in batches of _PYTRENDS_BATCH so one pytrends call covers multiple bills
|
||
for batch_start in range(0, len(bills_to_score), _PYTRENDS_BATCH):
|
||
batch = bills_to_score[batch_start: batch_start + _PYTRENDS_BATCH]
|
||
|
||
# Collect keyword groups for pytrends batch call
|
||
keyword_groups = []
|
||
bill_queries = []
|
||
for bill in batch:
|
||
latest_brief = (
|
||
db.query(BillBrief)
|
||
.filter_by(bill_id=bill.bill_id)
|
||
.order_by(BillBrief.created_at.desc())
|
||
.first()
|
||
)
|
||
topic_tags = latest_brief.topic_tags if latest_brief else []
|
||
query = news_service.build_news_query(
|
||
bill_title=bill.title,
|
||
short_title=bill.short_title,
|
||
sponsor_name=None,
|
||
bill_type=bill.bill_type,
|
||
bill_number=bill.bill_number,
|
||
)
|
||
keywords = trends_service.keywords_for_bill(
|
||
title=bill.title or "",
|
||
short_title=bill.short_title or "",
|
||
topic_tags=topic_tags,
|
||
)
|
||
keyword_groups.append(keywords)
|
||
bill_queries.append(query)
|
||
|
||
# One pytrends call for the whole batch
|
||
gtrends_scores = trends_service.get_trends_scores_batch(keyword_groups)
|
||
|
||
for i, bill in enumerate(batch):
|
||
query = bill_queries[i]
|
||
# NewsAPI + Google News counts (gnews served from 2-hour cache)
|
||
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
|
||
newsapi_count = len(newsapi_articles)
|
||
gnews_count = news_service.fetch_gnews_count(query, days=30)
|
||
gtrends_score = gtrends_scores[i]
|
||
|
||
composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score)
|
||
|
||
db.add(TrendScore(
|
||
bill_id=bill.bill_id,
|
||
score_date=today,
|
||
newsapi_count=newsapi_count,
|
||
gnews_count=gnews_count,
|
||
gtrends_score=gtrends_score,
|
||
composite_score=composite,
|
||
))
|
||
scored += 1
|
||
|
||
db.commit()
|
||
|
||
logger.info(f"Scored {scored} bills")
|
||
return {"scored": scored}
|
||
|
||
except Exception as exc:
|
||
db.rollback()
|
||
logger.error(f"Trend scoring failed: {exc}")
|
||
raise
|
||
finally:
|
||
db.close()
|