Initial commit

This commit is contained in:
Jack Levy
2026-02-28 21:08:19 -05:00
commit e418dd9ae0
85 changed files with 5261 additions and 0 deletions

View File

View File

@@ -0,0 +1,62 @@
from celery import Celery
from celery.schedules import crontab
from kombu import Queue
from app.config import settings
celery_app = Celery(
"pocketveto",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=[
"app.workers.congress_poller",
"app.workers.document_fetcher",
"app.workers.llm_processor",
"app.workers.news_fetcher",
"app.workers.trend_scorer",
],
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
# Late ack: task is only removed from queue after completion, not on pickup.
# Combined with idempotent tasks, this ensures no work is lost if a worker crashes.
task_acks_late=True,
# Prevent workers from prefetching LLM tasks and blocking other workers.
worker_prefetch_multiplier=1,
# Route tasks to named queues
task_routes={
"app.workers.congress_poller.*": {"queue": "polling"},
"app.workers.document_fetcher.*": {"queue": "documents"},
"app.workers.llm_processor.*": {"queue": "llm"},
"app.workers.news_fetcher.*": {"queue": "news"},
"app.workers.trend_scorer.*": {"queue": "news"},
},
task_queues=[
Queue("polling"),
Queue("documents"),
Queue("llm"),
Queue("news"),
],
# RedBeat stores schedule in Redis — restart-safe and dynamically updatable
redbeat_redis_url=settings.REDIS_URL,
beat_scheduler="redbeat.RedBeatScheduler",
beat_schedule={
"poll-congress-bills": {
"task": "app.workers.congress_poller.poll_congress_bills",
"schedule": crontab(minute=f"*/{settings.CONGRESS_POLL_INTERVAL_MINUTES}"),
},
"fetch-news-active-bills": {
"task": "app.workers.news_fetcher.fetch_news_for_active_bills",
"schedule": crontab(hour="*/6", minute=0),
},
"calculate-trend-scores": {
"task": "app.workers.trend_scorer.calculate_all_trend_scores",
"schedule": crontab(hour=2, minute=0),
},
},
)

View File

@@ -0,0 +1,172 @@
"""
Congress.gov poller — incremental bill and member sync.
Runs on Celery Beat schedule (every 30 min by default).
Uses fromDateTime to fetch only recently updated bills.
All operations are idempotent.
"""
import logging
from datetime import datetime, timezone
from app.database import get_sync_db
from app.models import Bill, BillAction, Member, AppSetting
from app.services import congress_api
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
def _get_setting(db, key: str, default=None) -> str | None:
row = db.get(AppSetting, key)
return row.value if row else default
def _set_setting(db, key: str, value: str) -> None:
row = db.get(AppSetting, key)
if row:
row.value = value
else:
db.add(AppSetting(key=key, value=value))
db.commit()
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.poll_congress_bills")
def poll_congress_bills(self):
"""Fetch recently updated bills from Congress.gov and enqueue document + LLM processing."""
db = get_sync_db()
try:
last_polled = _get_setting(db, "congress_last_polled_at")
current_congress = congress_api.get_current_congress()
logger.info(f"Polling Congress {current_congress} (since {last_polled})")
new_count = 0
updated_count = 0
offset = 0
while True:
response = congress_api.get_bills(
congress=current_congress,
offset=offset,
limit=250,
from_date_time=last_polled,
)
bills_data = response.get("bills", [])
if not bills_data:
break
for bill_data in bills_data:
parsed = congress_api.parse_bill_from_api(bill_data, current_congress)
bill_id = parsed["bill_id"]
existing = db.get(Bill, bill_id)
if existing is None:
# Upsert sponsor member if referenced
sponsor_id = _sync_sponsor(db, bill_data)
parsed["sponsor_id"] = sponsor_id
parsed["last_checked_at"] = datetime.now(timezone.utc)
db.add(Bill(**parsed))
db.commit()
new_count += 1
# Enqueue document fetch
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(bill_id)
else:
_update_bill_if_changed(db, existing, parsed)
updated_count += 1
db.commit()
offset += 250
if len(bills_data) < 250:
break
# Update last polled timestamp
_set_setting(db, "congress_last_polled_at", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))
logger.info(f"Poll complete: {new_count} new, {updated_count} updated")
return {"new": new_count, "updated": updated_count}
except Exception as exc:
db.rollback()
logger.error(f"Poll failed: {exc}")
raise self.retry(exc=exc, countdown=60)
finally:
db.close()
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.sync_members")
def sync_members(self):
"""Sync current Congress members."""
db = get_sync_db()
try:
offset = 0
synced = 0
while True:
response = congress_api.get_members(offset=offset, limit=250, current_member=True)
members_data = response.get("members", [])
if not members_data:
break
for member_data in members_data:
parsed = congress_api.parse_member_from_api(member_data)
if not parsed.get("bioguide_id"):
continue
existing = db.get(Member, parsed["bioguide_id"])
if existing is None:
db.add(Member(**parsed))
else:
for k, v in parsed.items():
setattr(existing, k, v)
synced += 1
db.commit()
offset += 250
if len(members_data) < 250:
break
logger.info(f"Synced {synced} members")
return {"synced": synced}
except Exception as exc:
db.rollback()
raise self.retry(exc=exc, countdown=120)
finally:
db.close()
def _sync_sponsor(db, bill_data: dict) -> str | None:
"""Ensure the bill sponsor exists in the members table. Returns bioguide_id or None."""
sponsors = bill_data.get("sponsors", [])
if not sponsors:
return None
sponsor_raw = sponsors[0]
bioguide_id = sponsor_raw.get("bioguideId")
if not bioguide_id:
return None
existing = db.get(Member, bioguide_id)
if existing is None:
db.add(Member(
bioguide_id=bioguide_id,
name=sponsor_raw.get("fullName", ""),
first_name=sponsor_raw.get("firstName"),
last_name=sponsor_raw.get("lastName"),
party=sponsor_raw.get("party", "")[:10] if sponsor_raw.get("party") else None,
state=sponsor_raw.get("state"),
))
db.commit()
return bioguide_id
def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool:
"""Update bill fields if anything has changed. Returns True if updated."""
changed = False
track_fields = ["title", "short_title", "latest_action_date", "latest_action_text", "status"]
for field in track_fields:
new_val = parsed.get(field)
if new_val and getattr(existing, field) != new_val:
setattr(existing, field, new_val)
changed = True
if changed:
existing.last_checked_at = datetime.now(timezone.utc)
db.commit()
# Check for new text versions now that the bill has changed
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(existing.bill_id)
return changed

View File

@@ -0,0 +1,87 @@
"""
Document fetcher — retrieves bill text from GovInfo and stores it.
Triggered by congress_poller when a new bill is detected.
"""
import logging
from datetime import datetime, timezone
from app.database import get_sync_db
from app.models import Bill, BillDocument
from app.services import congress_api, govinfo_api
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3, name="app.workers.document_fetcher.fetch_bill_documents")
def fetch_bill_documents(self, bill_id: str):
"""Fetch bill text from GovInfo and store it. Then enqueue LLM processing."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
logger.warning(f"Bill {bill_id} not found in DB")
return {"status": "not_found"}
# Get text versions from Congress.gov
try:
text_response = congress_api.get_bill_text_versions(
bill.congress_number, bill.bill_type, bill.bill_number
)
except Exception as e:
logger.warning(f"No text versions for {bill_id}: {e}")
return {"status": "no_text_versions"}
text_versions = text_response.get("textVersions", [])
if not text_versions:
return {"status": "no_text_versions"}
url, fmt = govinfo_api.find_best_text_url(text_versions)
if not url:
return {"status": "no_suitable_format"}
# Idempotency: skip if we already have this exact document version
existing = (
db.query(BillDocument)
.filter_by(bill_id=bill_id, govinfo_url=url)
.filter(BillDocument.raw_text.isnot(None))
.first()
)
if existing:
return {"status": "already_fetched", "bill_id": bill_id}
logger.info(f"Fetching {bill_id} document ({fmt}) from {url}")
raw_text = govinfo_api.fetch_text_from_url(url, fmt)
if not raw_text:
raise ValueError(f"Empty text returned for {bill_id}")
# Get version label from first text version
type_obj = text_versions[0].get("type", {}) if text_versions else {}
doc_version = type_obj.get("name") if isinstance(type_obj, dict) else type_obj
doc = BillDocument(
bill_id=bill_id,
doc_type="bill_text",
doc_version=doc_version,
govinfo_url=url,
raw_text=raw_text,
fetched_at=datetime.now(timezone.utc),
)
db.add(doc)
db.commit()
db.refresh(doc)
logger.info(f"Stored document {doc.id} for bill {bill_id} ({len(raw_text):,} chars)")
# Enqueue LLM processing
from app.workers.llm_processor import process_document_with_llm
process_document_with_llm.delay(doc.id)
return {"status": "ok", "document_id": doc.id, "chars": len(raw_text)}
except Exception as exc:
db.rollback()
logger.error(f"Document fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=120)
finally:
db.close()

View File

@@ -0,0 +1,107 @@
"""
LLM processor — generates AI briefs for fetched bill documents.
Triggered by document_fetcher after successful text retrieval.
"""
import logging
from app.database import get_sync_db
from app.models import Bill, BillBrief, BillDocument, Member
from app.services.llm_service import get_llm_provider
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
max_retries=2,
rate_limit="10/m", # Respect LLM provider rate limits
name="app.workers.llm_processor.process_document_with_llm",
)
def process_document_with_llm(self, document_id: int):
"""Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions."""
db = get_sync_db()
try:
# Idempotency: skip if brief already exists for this document
existing = db.query(BillBrief).filter_by(document_id=document_id).first()
if existing:
return {"status": "already_processed", "brief_id": existing.id}
doc = db.get(BillDocument, document_id)
if not doc or not doc.raw_text:
logger.warning(f"Document {document_id} not found or has no text")
return {"status": "no_document"}
bill = db.get(Bill, doc.bill_id)
if not bill:
return {"status": "no_bill"}
sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None
bill_metadata = {
"title": bill.title or "Unknown Title",
"sponsor_name": sponsor.name if sponsor else "Unknown",
"party": sponsor.party if sponsor else "Unknown",
"state": sponsor.state if sponsor else "Unknown",
"chamber": bill.chamber or "Unknown",
"introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown",
"latest_action_text": bill.latest_action_text or "None",
"latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown",
}
# Check if a full brief already exists for this bill (from an earlier document version)
previous_full_brief = (
db.query(BillBrief)
.filter_by(bill_id=doc.bill_id, brief_type="full")
.order_by(BillBrief.created_at.desc())
.first()
)
provider = get_llm_provider()
if previous_full_brief and previous_full_brief.document_id:
# New version of a bill we've already analyzed — generate amendment brief
previous_doc = db.get(BillDocument, previous_full_brief.document_id)
if previous_doc and previous_doc.raw_text:
logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata)
brief_type = "amendment"
else:
logger.info(f"Previous document unavailable, generating full brief for document {document_id}")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
else:
logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
db_brief = BillBrief(
bill_id=doc.bill_id,
document_id=document_id,
brief_type=brief_type,
summary=brief.summary,
key_points=brief.key_points,
risks=brief.risks,
deadlines=brief.deadlines,
topic_tags=brief.topic_tags,
llm_provider=brief.llm_provider,
llm_model=brief.llm_model,
)
db.add(db_brief)
db.commit()
db.refresh(db_brief)
logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}")
# Trigger news fetch now that we have topic tags
from app.workers.news_fetcher import fetch_news_for_bill
fetch_news_for_bill.delay(doc.bill_id)
return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type}
except Exception as exc:
db.rollback()
logger.error(f"LLM processing failed for document {document_id}: {exc}")
raise self.retry(exc=exc, countdown=300) # 5 min backoff for LLM failures
finally:
db.close()

View File

@@ -0,0 +1,104 @@
"""
News fetcher — correlates bills with news articles.
Triggered after LLM brief creation and on a 6-hour schedule for active bills.
"""
import logging
from datetime import date, datetime, timedelta, timezone
from sqlalchemy import and_
from app.database import get_sync_db
from app.models import Bill, BillBrief, NewsArticle
from app.services import news_service
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill")
def fetch_news_for_bill(self, bill_id: str):
"""Fetch news articles for a specific bill."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"status": "not_found"}
# Get topic tags from latest brief
latest_brief = (
db.query(BillBrief)
.filter_by(bill_id=bill_id)
.order_by(BillBrief.created_at.desc())
.first()
)
topic_tags = latest_brief.topic_tags if latest_brief else []
query = news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
)
articles = news_service.fetch_newsapi_articles(query)
saved = 0
for article in articles:
url = article.get("url")
if not url:
continue
# Idempotency: skip duplicate URLs
existing = db.query(NewsArticle).filter_by(url=url).first()
if existing:
continue
pub_at = None
if article.get("published_at"):
try:
pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00"))
except Exception:
pass
db.add(NewsArticle(
bill_id=bill_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=pub_at,
relevance_score=1.0,
))
saved += 1
db.commit()
logger.info(f"Saved {saved} news articles for bill {bill_id}")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"News fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills")
def fetch_news_for_active_bills(self):
"""
Scheduled task: fetch news for bills with recent actions (last 7 days).
Respects the 100/day NewsAPI limit by processing at most 80 bills per run.
"""
db = get_sync_db()
try:
cutoff = date.today() - timedelta(days=7)
active_bills = (
db.query(Bill)
.filter(Bill.latest_action_date >= cutoff)
.order_by(Bill.latest_action_date.desc())
.limit(80)
.all()
)
for bill in active_bills:
fetch_news_for_bill.delay(bill.bill_id)
logger.info(f"Queued news fetch for {len(active_bills)} active bills")
return {"queued": len(active_bills)}
finally:
db.close()

View File

@@ -0,0 +1,111 @@
"""
Trend scorer — calculates the daily zeitgeist score for bills.
Runs nightly via Celery Beat.
"""
import logging
from datetime import date, timedelta
from sqlalchemy import and_
from app.database import get_sync_db
from app.models import Bill, BillBrief, TrendScore
from app.services import news_service, trends_service
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
def calculate_composite_score(newsapi_count: int, gnews_count: int, gtrends_score: float) -> float:
"""
Weighted composite score (0100):
NewsAPI article count → 040 pts (saturates at 20 articles)
Google News RSS count → 030 pts (saturates at 50 articles)
Google Trends score → 030 pts (0100 input)
"""
newsapi_pts = min(newsapi_count / 20, 1.0) * 40
gnews_pts = min(gnews_count / 50, 1.0) * 30
gtrends_pts = (gtrends_score / 100) * 30
return round(newsapi_pts + gnews_pts + gtrends_pts, 2)
@celery_app.task(bind=True, name="app.workers.trend_scorer.calculate_all_trend_scores")
def calculate_all_trend_scores(self):
"""Nightly task: calculate trend scores for bills active in the last 90 days."""
db = get_sync_db()
try:
cutoff = date.today() - timedelta(days=90)
active_bills = (
db.query(Bill)
.filter(Bill.latest_action_date >= cutoff)
.all()
)
scored = 0
today = date.today()
for bill in active_bills:
# Skip if already scored today
existing = (
db.query(TrendScore)
.filter_by(bill_id=bill.bill_id, score_date=today)
.first()
)
if existing:
continue
# Get latest brief for topic tags
latest_brief = (
db.query(BillBrief)
.filter_by(bill_id=bill.bill_id)
.order_by(BillBrief.created_at.desc())
.first()
)
topic_tags = latest_brief.topic_tags if latest_brief else []
# Build search query
query = news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
)
# Fetch counts
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
newsapi_count = len(newsapi_articles)
gnews_count = news_service.fetch_gnews_count(query, days=30)
# Google Trends
keywords = trends_service.keywords_for_bill(
title=bill.title or "",
short_title=bill.short_title or "",
topic_tags=topic_tags,
)
gtrends_score = trends_service.get_trends_score(keywords)
composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score)
db.add(TrendScore(
bill_id=bill.bill_id,
score_date=today,
newsapi_count=newsapi_count,
gnews_count=gnews_count,
gtrends_score=gtrends_score,
composite_score=composite,
))
scored += 1
if scored % 20 == 0:
db.commit()
db.commit()
logger.info(f"Scored {scored} bills")
return {"scored": scored}
except Exception as exc:
db.rollback()
logger.error(f"Trend scoring failed: {exc}")
raise
finally:
db.close()