feat: PocketVeto v1.0.0 — initial public release

Self-hosted US Congress monitoring platform with AI policy briefs,
bill/member/topic follows, ntfy + RSS + email notifications,
alignment scoring, collections, and draft-letter generator.

Authored by: Jack Levy
This commit is contained in:
Jack Levy
2026-03-15 01:35:01 -04:00
commit 4c86a5b9ca
150 changed files with 19859 additions and 0 deletions

View File

View File

@@ -0,0 +1,361 @@
"""
Bill classifier and Member Effectiveness Score workers.
Tasks:
classify_bill_category — lightweight LLM call; triggered after brief generation
fetch_bill_cosponsors — Congress.gov cosponsor fetch; triggered on new bill
calculate_effectiveness_scores — nightly beat task
backfill_bill_categories — one-time backfill for existing bills
backfill_all_bill_cosponsors — one-time backfill for existing bills
"""
import json
import logging
import time
from datetime import datetime, timezone
from sqlalchemy import text
from app.config import settings
from app.database import get_sync_db
from app.models import Bill, BillCosponsor, BillDocument, Member
from app.models.setting import AppSetting
from app.services import congress_api
from app.services.llm_service import RateLimitError, get_llm_provider
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
# ── Classification ─────────────────────────────────────────────────────────────
_CLASSIFICATION_PROMPT = """\
Classify this bill into exactly one category.
Categories:
- substantive: Creates, modifies, or repeals policy, programs, regulations, funding, or rights. Real legislative work.
- commemorative: Names buildings/post offices, recognizes awareness days/weeks, honors individuals or events with no policy effect.
- administrative: Technical corrections, routine reauthorizations, housekeeping changes with no new policy substance.
Respond with ONLY valid JSON: {{"category": "substantive" | "commemorative" | "administrative"}}
BILL TITLE: {title}
BILL TEXT (excerpt):
{excerpt}
Classify now:"""
_VALID_CATEGORIES = {"substantive", "commemorative", "administrative"}
@celery_app.task(
bind=True,
max_retries=3,
rate_limit=f"{settings.LLM_RATE_LIMIT_RPM}/m",
name="app.workers.bill_classifier.classify_bill_category",
)
def classify_bill_category(self, bill_id: str, document_id: int):
"""Set bill_category via a cheap one-shot LLM call. Idempotent."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill or bill.bill_category:
return {"status": "skipped"}
doc = db.get(BillDocument, document_id)
excerpt = (doc.raw_text[:1200] if doc and doc.raw_text else "").strip()
prov_row = db.get(AppSetting, "llm_provider")
model_row = db.get(AppSetting, "llm_model")
provider = get_llm_provider(
prov_row.value if prov_row else None,
model_row.value if model_row else None,
)
prompt = _CLASSIFICATION_PROMPT.format(
title=bill.title or "Unknown",
excerpt=excerpt or "(no text available)",
)
raw = provider.generate_text(prompt).strip()
# Strip markdown fences if present
if raw.startswith("```"):
raw = raw.split("```")[1].lstrip("json").strip()
raw = raw.rstrip("```").strip()
data = json.loads(raw)
category = data.get("category", "").lower()
if category not in _VALID_CATEGORIES:
logger.warning(f"classify_bill_category: invalid category '{category}' for {bill_id}, defaulting to substantive")
category = "substantive"
bill.bill_category = category
db.commit()
logger.info(f"Bill {bill_id} classified as '{category}'")
return {"status": "ok", "bill_id": bill_id, "category": category}
except RateLimitError as exc:
db.rollback()
raise self.retry(exc=exc, countdown=exc.retry_after)
except Exception as exc:
db.rollback()
logger.error(f"classify_bill_category failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=120)
finally:
db.close()
@celery_app.task(bind=True, max_retries=3, name="app.workers.bill_classifier.backfill_bill_categories")
def backfill_bill_categories(self):
"""Queue classification for all bills with text but no category."""
db = get_sync_db()
try:
rows = db.execute(text("""
SELECT bd.bill_id, bd.id AS document_id
FROM bill_documents bd
JOIN bills b ON b.bill_id = bd.bill_id
WHERE b.bill_category IS NULL AND bd.raw_text IS NOT NULL
""")).fetchall()
queued = 0
for row in rows:
classify_bill_category.delay(row.bill_id, row.document_id)
queued += 1
time.sleep(0.05)
logger.info(f"backfill_bill_categories: queued {queued} classification tasks")
return {"queued": queued}
finally:
db.close()
# ── Co-sponsor fetching ────────────────────────────────────────────────────────
@celery_app.task(bind=True, max_retries=3, name="app.workers.bill_classifier.fetch_bill_cosponsors")
def fetch_bill_cosponsors(self, bill_id: str):
"""Fetch and store cosponsor list from Congress.gov. Idempotent."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill or bill.cosponsors_fetched_at:
return {"status": "skipped"}
known_bioguides = {row[0] for row in db.execute(text("SELECT bioguide_id FROM members")).fetchall()}
# Track bioguide_ids already inserted this run to handle within-page dupes
# (Congress.gov sometimes lists the same member twice with different dates)
inserted_this_run: set[str] = set()
inserted = 0
offset = 0
while True:
data = congress_api.get_bill_cosponsors(
bill.congress_number, bill.bill_type, bill.bill_number, offset=offset
)
cosponsors = data.get("cosponsors", [])
if not cosponsors:
break
for cs in cosponsors:
bioguide_id = cs.get("bioguideId")
# Only link to members we've already ingested
if bioguide_id and bioguide_id not in known_bioguides:
bioguide_id = None
# Skip dupes — both across runs (DB check) and within this page
if bioguide_id:
if bioguide_id in inserted_this_run:
continue
exists = db.query(BillCosponsor).filter_by(
bill_id=bill_id, bioguide_id=bioguide_id
).first()
if exists:
inserted_this_run.add(bioguide_id)
continue
date_str = cs.get("sponsorshipDate")
try:
sponsored_date = datetime.strptime(date_str, "%Y-%m-%d").date() if date_str else None
except ValueError:
sponsored_date = None
db.add(BillCosponsor(
bill_id=bill_id,
bioguide_id=bioguide_id,
name=cs.get("fullName") or cs.get("name"),
party=cs.get("party"),
state=cs.get("state"),
sponsored_date=sponsored_date,
))
if bioguide_id:
inserted_this_run.add(bioguide_id)
inserted += 1
db.commit()
offset += 250
if len(cosponsors) < 250:
break
time.sleep(0.25)
bill.cosponsors_fetched_at = datetime.now(timezone.utc)
db.commit()
return {"bill_id": bill_id, "inserted": inserted}
except Exception as exc:
db.rollback()
logger.error(f"fetch_bill_cosponsors failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=60)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.bill_classifier.backfill_all_bill_cosponsors")
def backfill_all_bill_cosponsors(self):
"""Queue cosponsor fetches for all bills that haven't been fetched yet."""
db = get_sync_db()
try:
rows = db.execute(text(
"SELECT bill_id FROM bills WHERE cosponsors_fetched_at IS NULL"
)).fetchall()
queued = 0
for row in rows:
fetch_bill_cosponsors.delay(row.bill_id)
queued += 1
time.sleep(0.05)
logger.info(f"backfill_all_bill_cosponsors: queued {queued} tasks")
return {"queued": queued}
finally:
db.close()
# ── Effectiveness scoring ──────────────────────────────────────────────────────
def _distance_points(latest_action_text: str | None) -> int:
"""Map latest action text to a distance-traveled score."""
text = (latest_action_text or "").lower()
if "became public law" in text or "signed by president" in text or "enacted" in text:
return 50
if "passed house" in text or "passed senate" in text or "agreed to in" in text:
return 20
if "placed on" in text and "calendar" in text:
return 10
if "reported by" in text or "ordered to be reported" in text or "discharged" in text:
return 5
return 1
def _bipartisan_multiplier(db, bill_id: str, sponsor_party: str | None) -> float:
"""1.5x if ≥20% of cosponsors are from the opposing party."""
if not sponsor_party:
return 1.0
cosponsors = db.query(BillCosponsor).filter_by(bill_id=bill_id).all()
if not cosponsors:
return 1.0
opposing = [c for c in cosponsors if c.party and c.party != sponsor_party]
if len(cosponsors) > 0 and len(opposing) / len(cosponsors) >= 0.20:
return 1.5
return 1.0
def _substance_multiplier(bill_category: str | None) -> float:
return 0.1 if bill_category == "commemorative" else 1.0
def _leadership_multiplier(member: Member, congress_number: int) -> float:
"""1.2x if member chaired a committee during this Congress."""
if not member.leadership_json:
return 1.0
for role in member.leadership_json:
if (role.get("congress") == congress_number and
"chair" in (role.get("type") or "").lower()):
return 1.2
return 1.0
def _seniority_tier(terms_json: list | None) -> str:
"""Return 'junior' | 'mid' | 'senior' based on number of terms served."""
if not terms_json:
return "junior"
count = len(terms_json)
if count <= 2:
return "junior"
if count <= 5:
return "mid"
return "senior"
@celery_app.task(bind=True, name="app.workers.bill_classifier.calculate_effectiveness_scores")
def calculate_effectiveness_scores(self):
"""Nightly: compute effectiveness score and within-tier percentile for all members."""
db = get_sync_db()
try:
members = db.query(Member).all()
if not members:
return {"status": "no_members"}
# Map bioguide_id → Member for quick lookup
member_map = {m.bioguide_id: m for m in members}
# Load all bills sponsored by current members (current congress only)
current_congress = congress_api.get_current_congress()
bills = db.query(Bill).filter_by(congress_number=current_congress).all()
# Compute raw score per member
raw_scores: dict[str, float] = {m.bioguide_id: 0.0 for m in members}
for bill in bills:
if not bill.sponsor_id or bill.sponsor_id not in member_map:
continue
sponsor = member_map[bill.sponsor_id]
pts = _distance_points(bill.latest_action_text)
bipartisan = _bipartisan_multiplier(db, bill.bill_id, sponsor.party)
substance = _substance_multiplier(bill.bill_category)
leadership = _leadership_multiplier(sponsor, current_congress)
raw_scores[bill.sponsor_id] = raw_scores.get(bill.sponsor_id, 0.0) + (
pts * bipartisan * substance * leadership
)
# Group members by (tier, party) for percentile normalisation
# We treat party as a proxy for majority/minority — grouped separately so
# a minority-party junior isn't unfairly compared to a majority-party senior.
from collections import defaultdict
buckets: dict[tuple, list[str]] = defaultdict(list)
for m in members:
tier = _seniority_tier(m.terms_json)
party_bucket = m.party or "Unknown"
buckets[(tier, party_bucket)].append(m.bioguide_id)
# Compute percentile within each bucket
percentiles: dict[str, float] = {}
tiers: dict[str, str] = {}
for (tier, _), ids in buckets.items():
scores = [(bid, raw_scores.get(bid, 0.0)) for bid in ids]
scores.sort(key=lambda x: x[1])
n = len(scores)
for rank, (bid, _) in enumerate(scores):
percentiles[bid] = round((rank / max(n - 1, 1)) * 100, 1)
tiers[bid] = tier
# Bulk update members
updated = 0
for m in members:
score = raw_scores.get(m.bioguide_id, 0.0)
pct = percentiles.get(m.bioguide_id)
tier = tiers.get(m.bioguide_id, _seniority_tier(m.terms_json))
m.effectiveness_score = round(score, 2)
m.effectiveness_percentile = pct
m.effectiveness_tier = tier
updated += 1
db.commit()
logger.info(f"calculate_effectiveness_scores: updated {updated} members for Congress {current_congress}")
return {"status": "ok", "updated": updated, "congress": current_congress}
except Exception as exc:
db.rollback()
logger.error(f"calculate_effectiveness_scores failed: {exc}")
raise
finally:
db.close()

View File

@@ -0,0 +1,112 @@
from celery import Celery
from celery.schedules import crontab
from kombu import Queue
from app.config import settings
celery_app = Celery(
"pocketveto",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=[
"app.workers.congress_poller",
"app.workers.document_fetcher",
"app.workers.llm_processor",
"app.workers.news_fetcher",
"app.workers.trend_scorer",
"app.workers.member_interest",
"app.workers.notification_dispatcher",
"app.workers.llm_batch_processor",
"app.workers.bill_classifier",
"app.workers.vote_fetcher",
],
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
# Late ack: task is only removed from queue after completion, not on pickup.
# Combined with idempotent tasks, this ensures no work is lost if a worker crashes.
task_acks_late=True,
# Prevent workers from prefetching LLM tasks and blocking other workers.
worker_prefetch_multiplier=1,
# Route tasks to named queues
task_routes={
"app.workers.congress_poller.*": {"queue": "polling"},
"app.workers.document_fetcher.*": {"queue": "documents"},
"app.workers.llm_processor.*": {"queue": "llm"},
"app.workers.llm_batch_processor.*": {"queue": "llm"},
"app.workers.bill_classifier.*": {"queue": "llm"},
"app.workers.news_fetcher.*": {"queue": "news"},
"app.workers.trend_scorer.*": {"queue": "news"},
"app.workers.member_interest.*": {"queue": "news"},
"app.workers.notification_dispatcher.*": {"queue": "polling"},
"app.workers.vote_fetcher.*": {"queue": "polling"},
},
task_queues=[
Queue("polling"),
Queue("documents"),
Queue("llm"),
Queue("news"),
],
# RedBeat stores schedule in Redis — restart-safe and dynamically updatable
redbeat_redis_url=settings.REDIS_URL,
beat_scheduler="redbeat.RedBeatScheduler",
beat_schedule={
"poll-congress-bills": {
"task": "app.workers.congress_poller.poll_congress_bills",
"schedule": crontab(minute=f"*/{settings.CONGRESS_POLL_INTERVAL_MINUTES}"),
},
"fetch-news-active-bills": {
"task": "app.workers.news_fetcher.fetch_news_for_active_bills",
"schedule": crontab(hour="*/6", minute=0),
},
"calculate-trend-scores": {
"task": "app.workers.trend_scorer.calculate_all_trend_scores",
"schedule": crontab(hour=2, minute=0),
},
"fetch-news-active-members": {
"task": "app.workers.member_interest.fetch_news_for_active_members",
"schedule": crontab(hour="*/12", minute=30),
},
"calculate-member-trend-scores": {
"task": "app.workers.member_interest.calculate_all_member_trend_scores",
"schedule": crontab(hour=3, minute=0),
},
"sync-members": {
"task": "app.workers.congress_poller.sync_members",
"schedule": crontab(hour=1, minute=0), # 1 AM UTC daily — refreshes chamber/district/contact info
},
"fetch-actions-active-bills": {
"task": "app.workers.congress_poller.fetch_actions_for_active_bills",
"schedule": crontab(hour=4, minute=0), # 4 AM UTC, after trend + member scoring
},
"fetch-votes-for-stanced-bills": {
"task": "app.workers.vote_fetcher.fetch_votes_for_stanced_bills",
"schedule": crontab(hour=4, minute=30), # 4:30 AM UTC daily
},
"dispatch-notifications": {
"task": "app.workers.notification_dispatcher.dispatch_notifications",
"schedule": crontab(minute="*/5"), # Every 5 minutes
},
"send-notification-digest": {
"task": "app.workers.notification_dispatcher.send_notification_digest",
"schedule": crontab(hour=8, minute=0), # 8 AM UTC daily
},
"send-weekly-digest": {
"task": "app.workers.notification_dispatcher.send_weekly_digest",
"schedule": crontab(hour=8, minute=30, day_of_week=1), # Monday 8:30 AM UTC
},
"poll-llm-batch-results": {
"task": "app.workers.llm_batch_processor.poll_llm_batch_results",
"schedule": crontab(minute="*/30"),
},
"calculate-effectiveness-scores": {
"task": "app.workers.bill_classifier.calculate_effectiveness_scores",
"schedule": crontab(hour=5, minute=0), # 5 AM UTC, after all other nightly tasks
},
},
)

View File

@@ -0,0 +1,480 @@
"""
Congress.gov poller — incremental bill and member sync.
Runs on Celery Beat schedule (every 30 min by default).
Uses fromDateTime to fetch only recently updated bills.
All operations are idempotent.
"""
import logging
import time
from datetime import datetime, timedelta, timezone
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.database import get_sync_db
from app.models import Bill, BillAction, Member, AppSetting
from app.services import congress_api
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
def _get_setting(db, key: str, default=None) -> str | None:
row = db.get(AppSetting, key)
return row.value if row else default
def _set_setting(db, key: str, value: str) -> None:
row = db.get(AppSetting, key)
if row:
row.value = value
else:
db.add(AppSetting(key=key, value=value))
db.commit()
# Only track legislation that can become law. Simple/concurrent resolutions
# (hres, sres, hconres, sconres) are procedural and not worth analyzing.
TRACKED_BILL_TYPES = {"hr", "s", "hjres", "sjres"}
# Action categories that produce new bill text versions on GovInfo.
# Procedural/administrative actions (referral to committee, calendar placement)
# rarely produce a new text version, so we skip document fetching for them.
_DOC_PRODUCING_CATEGORIES = {"vote", "committee_report", "presidential", "new_document", "new_amendment"}
def _is_congress_off_hours() -> bool:
"""Return True during periods when Congress.gov is unlikely to publish new content."""
try:
from zoneinfo import ZoneInfo
now_est = datetime.now(ZoneInfo("America/New_York"))
except Exception:
return False
# Weekends
if now_est.weekday() >= 5:
return True
# Nights: before 9 AM or after 9 PM EST
if now_est.hour < 9 or now_est.hour >= 21:
return True
return False
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.poll_congress_bills")
def poll_congress_bills(self):
"""Fetch recently updated bills from Congress.gov and enqueue document + LLM processing."""
db = get_sync_db()
try:
last_polled = _get_setting(db, "congress_last_polled_at")
# Adaptive: skip off-hours polls if last poll was recent (< 1 hour ago)
if _is_congress_off_hours() and last_polled:
try:
last_dt = datetime.fromisoformat(last_polled.replace("Z", "+00:00"))
if (datetime.now(timezone.utc) - last_dt) < timedelta(hours=1):
logger.info("Skipping poll — off-hours and last poll < 1 hour ago")
return {"new": 0, "updated": 0, "skipped": "off_hours"}
except Exception:
pass
# On first run, seed from 2 months back rather than the full congress history
if not last_polled:
two_months_ago = datetime.now(timezone.utc) - timedelta(days=60)
last_polled = two_months_ago.strftime("%Y-%m-%dT%H:%M:%SZ")
current_congress = congress_api.get_current_congress()
logger.info(f"Polling Congress {current_congress} (since {last_polled})")
new_count = 0
updated_count = 0
offset = 0
while True:
response = congress_api.get_bills(
congress=current_congress,
offset=offset,
limit=250,
from_date_time=last_polled,
)
bills_data = response.get("bills", [])
if not bills_data:
break
for bill_data in bills_data:
parsed = congress_api.parse_bill_from_api(bill_data, current_congress)
if parsed.get("bill_type") not in TRACKED_BILL_TYPES:
continue
bill_id = parsed["bill_id"]
existing = db.get(Bill, bill_id)
if existing is None:
# Save bill immediately; fetch sponsor detail asynchronously
parsed["sponsor_id"] = None
parsed["last_checked_at"] = datetime.now(timezone.utc)
db.add(Bill(**parsed))
db.commit()
new_count += 1
# Enqueue document, action, sponsor, and cosponsor fetches
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(bill_id)
fetch_bill_actions.delay(bill_id)
fetch_sponsor_for_bill.delay(
bill_id, current_congress, parsed["bill_type"], parsed["bill_number"]
)
from app.workers.bill_classifier import fetch_bill_cosponsors
fetch_bill_cosponsors.delay(bill_id)
else:
_update_bill_if_changed(db, existing, parsed)
updated_count += 1
db.commit()
offset += 250
if len(bills_data) < 250:
break
# Update last polled timestamp
_set_setting(db, "congress_last_polled_at", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))
logger.info(f"Poll complete: {new_count} new, {updated_count} updated")
return {"new": new_count, "updated": updated_count}
except Exception as exc:
db.rollback()
logger.error(f"Poll failed: {exc}")
raise self.retry(exc=exc, countdown=60)
finally:
db.close()
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.sync_members")
def sync_members(self):
"""Sync current Congress members."""
db = get_sync_db()
try:
offset = 0
synced = 0
while True:
response = congress_api.get_members(offset=offset, limit=250, current_member=True)
members_data = response.get("members", [])
if not members_data:
break
for member_data in members_data:
parsed = congress_api.parse_member_from_api(member_data)
if not parsed.get("bioguide_id"):
continue
existing = db.get(Member, parsed["bioguide_id"])
if existing is None:
db.add(Member(**parsed))
else:
for k, v in parsed.items():
setattr(existing, k, v)
synced += 1
db.commit()
offset += 250
if len(members_data) < 250:
break
logger.info(f"Synced {synced} members")
return {"synced": synced}
except Exception as exc:
db.rollback()
raise self.retry(exc=exc, countdown=120)
finally:
db.close()
def _sync_sponsor(db, bill_data: dict) -> str | None:
"""Ensure the bill sponsor exists in the members table. Returns bioguide_id or None."""
sponsors = bill_data.get("sponsors", [])
if not sponsors:
return None
sponsor_raw = sponsors[0]
bioguide_id = sponsor_raw.get("bioguideId")
if not bioguide_id:
return None
existing = db.get(Member, bioguide_id)
if existing is None:
db.add(Member(
bioguide_id=bioguide_id,
name=sponsor_raw.get("fullName", ""),
first_name=sponsor_raw.get("firstName"),
last_name=sponsor_raw.get("lastName"),
party=sponsor_raw.get("party", "")[:10] if sponsor_raw.get("party") else None,
state=sponsor_raw.get("state"),
))
db.commit()
return bioguide_id
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.fetch_sponsor_for_bill")
def fetch_sponsor_for_bill(self, bill_id: str, congress: int, bill_type: str, bill_number: str):
"""Async sponsor fetch: get bill detail from Congress.gov and link the sponsor. Idempotent."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"status": "not_found"}
if bill.sponsor_id:
return {"status": "already_set", "sponsor_id": bill.sponsor_id}
detail = congress_api.get_bill_detail(congress, bill_type, bill_number)
sponsor_id = _sync_sponsor(db, detail.get("bill", {}))
if sponsor_id:
bill.sponsor_id = sponsor_id
db.commit()
return {"status": "ok", "sponsor_id": sponsor_id}
except Exception as exc:
db.rollback()
raise self.retry(exc=exc, countdown=60)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_sponsor_ids")
def backfill_sponsor_ids(self):
"""Backfill sponsor_id for all bills where it is NULL by fetching bill detail from Congress.gov."""
import time
db = get_sync_db()
try:
bills = db.query(Bill).filter(Bill.sponsor_id.is_(None)).all()
total = len(bills)
updated = 0
logger.info(f"Backfilling sponsors for {total} bills")
for bill in bills:
try:
detail = congress_api.get_bill_detail(bill.congress_number, bill.bill_type, bill.bill_number)
sponsor_id = _sync_sponsor(db, detail.get("bill", {}))
if sponsor_id:
bill.sponsor_id = sponsor_id
db.commit()
updated += 1
except Exception as e:
logger.warning(f"Could not backfill sponsor for {bill.bill_id}: {e}")
time.sleep(0.1) # ~10 req/sec, well under Congress.gov 5000/hr limit
logger.info(f"Sponsor backfill complete: {updated}/{total} updated")
return {"total": total, "updated": updated}
finally:
db.close()
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.fetch_bill_actions")
def fetch_bill_actions(self, bill_id: str):
"""Fetch and sync all actions for a bill from Congress.gov. Idempotent."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
logger.warning(f"fetch_bill_actions: bill {bill_id} not found")
return
offset = 0
inserted = 0
while True:
try:
response = congress_api.get_bill_actions(
bill.congress_number, bill.bill_type, bill.bill_number, offset=offset
)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
actions_data = response.get("actions", [])
if not actions_data:
break
for action in actions_data:
stmt = pg_insert(BillAction.__table__).values(
bill_id=bill_id,
action_date=action.get("actionDate"),
action_text=action.get("text", ""),
action_type=action.get("type"),
chamber=action.get("chamber"),
).on_conflict_do_nothing(constraint="uq_bill_actions_bill_date_text")
result = db.execute(stmt)
inserted += result.rowcount
db.commit()
offset += 250
if len(actions_data) < 250:
break
bill.actions_fetched_at = datetime.now(timezone.utc)
db.commit()
logger.info(f"fetch_bill_actions: {bill_id} — inserted {inserted} new actions")
return {"bill_id": bill_id, "inserted": inserted}
except Exception as exc:
db.rollback()
raise
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.congress_poller.fetch_actions_for_active_bills")
def fetch_actions_for_active_bills(self):
"""Nightly batch: enqueue action fetches for recently active bills missing action data."""
db = get_sync_db()
try:
cutoff = datetime.now(timezone.utc).date() - timedelta(days=30)
bills = (
db.query(Bill)
.filter(
Bill.latest_action_date >= cutoff,
or_(
Bill.actions_fetched_at.is_(None),
Bill.latest_action_date > Bill.actions_fetched_at,
),
)
.limit(200)
.all()
)
queued = 0
for bill in bills:
fetch_bill_actions.delay(bill.bill_id)
queued += 1
time.sleep(0.2) # ~5 tasks/sec to avoid Redis burst
logger.info(f"fetch_actions_for_active_bills: queued {queued} bills")
return {"queued": queued}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_all_bill_actions")
def backfill_all_bill_actions(self):
"""One-time backfill: enqueue action fetches for every bill that has never had actions fetched."""
db = get_sync_db()
try:
bills = (
db.query(Bill)
.filter(Bill.actions_fetched_at.is_(None))
.order_by(Bill.latest_action_date.desc())
.all()
)
queued = 0
for bill in bills:
fetch_bill_actions.delay(bill.bill_id)
queued += 1
time.sleep(0.05) # ~20 tasks/sec — workers will self-throttle against Congress.gov
logger.info(f"backfill_all_bill_actions: queued {queued} bills")
return {"queued": queued}
finally:
db.close()
def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool:
"""Update bill fields if anything has changed. Returns True if updated."""
changed = False
dirty = False
# Meaningful change fields — trigger document + action fetch when updated
track_fields = ["title", "short_title", "latest_action_date", "latest_action_text", "status"]
for field in track_fields:
new_val = parsed.get(field)
if new_val and getattr(existing, field) != new_val:
setattr(existing, field, new_val)
changed = True
dirty = True
# Static fields — only fill in if currently null; no change trigger needed
fill_null_fields = ["introduced_date", "congress_url", "chamber"]
for field in fill_null_fields:
new_val = parsed.get(field)
if new_val and getattr(existing, field) is None:
setattr(existing, field, new_val)
dirty = True
if changed:
existing.last_checked_at = datetime.now(timezone.utc)
if dirty:
db.commit()
if changed:
from app.workers.notification_utils import (
emit_bill_notification,
emit_member_follow_notifications,
emit_topic_follow_notifications,
categorize_action,
)
action_text = parsed.get("latest_action_text", "")
action_category = categorize_action(action_text)
# Only fetch new documents for actions that produce new text versions on GovInfo.
# Skip procedural/administrative actions (referral, calendar) to avoid unnecessary calls.
if not action_category or action_category in _DOC_PRODUCING_CATEGORIES:
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(existing.bill_id)
fetch_bill_actions.delay(existing.bill_id)
if action_category:
emit_bill_notification(db, existing, "bill_updated", action_text, action_category=action_category)
emit_member_follow_notifications(db, existing, "bill_updated", action_text, action_category=action_category)
# Topic followers — pull tags from the bill's latest brief
from app.models.brief import BillBrief
latest_brief = (
db.query(BillBrief)
.filter_by(bill_id=existing.bill_id)
.order_by(BillBrief.created_at.desc())
.first()
)
topic_tags = latest_brief.topic_tags or [] if latest_brief else []
emit_topic_follow_notifications(
db, existing, "bill_updated", action_text, topic_tags, action_category=action_category
)
return changed
@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_bill_metadata")
def backfill_bill_metadata(self):
"""
Find bills with null introduced_date (or other static fields) and
re-fetch their detail from Congress.gov to fill in the missing values.
No document or LLM calls — metadata only.
"""
db = get_sync_db()
try:
from sqlalchemy import text as sa_text
rows = db.execute(sa_text("""
SELECT bill_id, congress_number, bill_type, bill_number
FROM bills
WHERE introduced_date IS NULL
OR congress_url IS NULL
OR chamber IS NULL
""")).fetchall()
updated = 0
skipped = 0
for row in rows:
try:
detail = congress_api.get_bill_detail(
row.congress_number, row.bill_type, row.bill_number
)
bill_data = detail.get("bill", {})
parsed = congress_api.parse_bill_from_api(
{
"type": row.bill_type,
"number": row.bill_number,
"introducedDate": bill_data.get("introducedDate"),
"title": bill_data.get("title"),
"shortTitle": bill_data.get("shortTitle"),
"latestAction": bill_data.get("latestAction") or {},
},
row.congress_number,
)
bill = db.get(Bill, row.bill_id)
if not bill:
skipped += 1
continue
fill_null_fields = ["introduced_date", "congress_url", "chamber", "title", "short_title"]
dirty = False
for field in fill_null_fields:
new_val = parsed.get(field)
if new_val and getattr(bill, field) is None:
setattr(bill, field, new_val)
dirty = True
if dirty:
db.commit()
updated += 1
else:
skipped += 1
time.sleep(0.2) # ~300 req/min — well under the 5k/hr limit
except Exception as exc:
logger.warning(f"backfill_bill_metadata: failed for {row.bill_id}: {exc}")
skipped += 1
logger.info(f"backfill_bill_metadata: {updated} updated, {skipped} skipped")
return {"updated": updated, "skipped": skipped}
finally:
db.close()

View File

@@ -0,0 +1,92 @@
"""
Document fetcher — retrieves bill text from GovInfo and stores it.
Triggered by congress_poller when a new bill is detected.
"""
import logging
from datetime import datetime, timezone
from app.database import get_sync_db
from app.models import Bill, BillDocument
from app.services import congress_api, govinfo_api
from app.services.govinfo_api import DocumentUnchangedError
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3, name="app.workers.document_fetcher.fetch_bill_documents")
def fetch_bill_documents(self, bill_id: str):
"""Fetch bill text from GovInfo and store it. Then enqueue LLM processing."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
logger.warning(f"Bill {bill_id} not found in DB")
return {"status": "not_found"}
# Get text versions from Congress.gov
try:
text_response = congress_api.get_bill_text_versions(
bill.congress_number, bill.bill_type, bill.bill_number
)
except Exception as e:
logger.warning(f"No text versions for {bill_id}: {e}")
return {"status": "no_text_versions"}
text_versions = text_response.get("textVersions", [])
if not text_versions:
return {"status": "no_text_versions"}
url, fmt = govinfo_api.find_best_text_url(text_versions)
if not url:
return {"status": "no_suitable_format"}
# Idempotency: skip if we already have this exact document version
existing = (
db.query(BillDocument)
.filter_by(bill_id=bill_id, govinfo_url=url)
.filter(BillDocument.raw_text.isnot(None))
.first()
)
if existing:
return {"status": "already_fetched", "bill_id": bill_id}
logger.info(f"Fetching {bill_id} document ({fmt}) from {url}")
try:
raw_text = govinfo_api.fetch_text_from_url(url, fmt)
except DocumentUnchangedError:
logger.info(f"Document unchanged for {bill_id} (ETag match) — skipping")
return {"status": "unchanged", "bill_id": bill_id}
if not raw_text:
raise ValueError(f"Empty text returned for {bill_id}")
# Get version label from first text version
type_obj = text_versions[0].get("type", {}) if text_versions else {}
doc_version = type_obj.get("name") if isinstance(type_obj, dict) else type_obj
doc = BillDocument(
bill_id=bill_id,
doc_type="bill_text",
doc_version=doc_version,
govinfo_url=url,
raw_text=raw_text,
fetched_at=datetime.now(timezone.utc),
)
db.add(doc)
db.commit()
db.refresh(doc)
logger.info(f"Stored document {doc.id} for bill {bill_id} ({len(raw_text):,} chars)")
# Enqueue LLM processing
from app.workers.llm_processor import process_document_with_llm
process_document_with_llm.delay(doc.id)
return {"status": "ok", "document_id": doc.id, "chars": len(raw_text)}
except Exception as exc:
db.rollback()
logger.error(f"Document fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=120)
finally:
db.close()

View File

@@ -0,0 +1,401 @@
"""
LLM Batch processor — submits and polls OpenAI/Anthropic Batch API jobs.
50% cheaper than synchronous calls; 24-hour processing window.
New bills still use the synchronous llm_processor task.
"""
import io
import json
import logging
from datetime import datetime
from sqlalchemy import text
from app.config import settings
from app.database import get_sync_db
from app.models import Bill, BillBrief, BillDocument, Member
from app.models.setting import AppSetting
from app.services.llm_service import (
AMENDMENT_SYSTEM_PROMPT,
MAX_TOKENS_DEFAULT,
SYSTEM_PROMPT,
build_amendment_prompt,
build_prompt,
parse_brief_json,
)
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
_BATCH_SETTING_KEY = "llm_active_batch"
# ── State helpers ──────────────────────────────────────────────────────────────
def _save_batch_state(db, state: dict):
row = db.get(AppSetting, _BATCH_SETTING_KEY)
if row:
row.value = json.dumps(state)
else:
row = AppSetting(key=_BATCH_SETTING_KEY, value=json.dumps(state))
db.add(row)
db.commit()
def _clear_batch_state(db):
row = db.get(AppSetting, _BATCH_SETTING_KEY)
if row:
db.delete(row)
db.commit()
# ── Request builder ────────────────────────────────────────────────────────────
def _build_request_data(db, doc_id: int, bill_id: str) -> tuple[str, str, str]:
"""Returns (custom_id, system_prompt, user_prompt) for a document."""
doc = db.get(BillDocument, doc_id)
if not doc or not doc.raw_text:
raise ValueError(f"Document {doc_id} missing or has no text")
bill = db.get(Bill, bill_id)
if not bill:
raise ValueError(f"Bill {bill_id} not found")
sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None
bill_metadata = {
"title": bill.title or "Unknown Title",
"sponsor_name": sponsor.name if sponsor else "Unknown",
"party": sponsor.party if sponsor else "Unknown",
"state": sponsor.state if sponsor else "Unknown",
"chamber": bill.chamber or "Unknown",
"introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown",
"latest_action_text": bill.latest_action_text or "None",
"latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown",
}
previous_full_brief = (
db.query(BillBrief)
.filter_by(bill_id=bill_id, brief_type="full")
.order_by(BillBrief.created_at.desc())
.first()
)
if previous_full_brief and previous_full_brief.document_id:
previous_doc = db.get(BillDocument, previous_full_brief.document_id)
if previous_doc and previous_doc.raw_text:
brief_type = "amendment"
prompt = build_amendment_prompt(doc.raw_text, previous_doc.raw_text, bill_metadata, MAX_TOKENS_DEFAULT)
system_prompt = AMENDMENT_SYSTEM_PROMPT + "\n\nIMPORTANT: Respond with ONLY valid JSON. No other text."
else:
brief_type = "full"
prompt = build_prompt(doc.raw_text, bill_metadata, MAX_TOKENS_DEFAULT)
system_prompt = SYSTEM_PROMPT
else:
brief_type = "full"
prompt = build_prompt(doc.raw_text, bill_metadata, MAX_TOKENS_DEFAULT)
system_prompt = SYSTEM_PROMPT
custom_id = f"doc-{doc_id}-{brief_type}"
return custom_id, system_prompt, prompt
# ── Submit task ────────────────────────────────────────────────────────────────
@celery_app.task(bind=True, name="app.workers.llm_batch_processor.submit_llm_batch")
def submit_llm_batch(self):
"""Submit all unbriefed documents to the OpenAI or Anthropic Batch API."""
db = get_sync_db()
try:
prov_row = db.get(AppSetting, "llm_provider")
model_row = db.get(AppSetting, "llm_model")
provider_name = ((prov_row.value if prov_row else None) or settings.LLM_PROVIDER).lower()
if provider_name not in ("openai", "anthropic"):
return {"status": "unsupported", "provider": provider_name}
# Check for already-active batch
active_row = db.get(AppSetting, _BATCH_SETTING_KEY)
if active_row:
try:
active = json.loads(active_row.value)
if active.get("status") == "processing":
return {"status": "already_active", "batch_id": active.get("batch_id")}
except Exception:
pass
# Find docs with text but no brief
rows = db.execute(text("""
SELECT bd.id AS doc_id, bd.bill_id, bd.govinfo_url
FROM bill_documents bd
LEFT JOIN bill_briefs bb ON bb.document_id = bd.id
WHERE bd.raw_text IS NOT NULL AND bb.id IS NULL
LIMIT 1000
""")).fetchall()
if not rows:
return {"status": "nothing_to_process"}
doc_ids = [r.doc_id for r in rows]
if provider_name == "openai":
model = (model_row.value if model_row else None) or settings.OPENAI_MODEL
batch_id = _submit_openai_batch(db, rows, model)
else:
model = (model_row.value if model_row else None) or settings.ANTHROPIC_MODEL
batch_id = _submit_anthropic_batch(db, rows, model)
state = {
"batch_id": batch_id,
"provider": provider_name,
"model": model,
"doc_ids": doc_ids,
"doc_count": len(doc_ids),
"submitted_at": datetime.utcnow().isoformat(),
"status": "processing",
}
_save_batch_state(db, state)
logger.info(f"Submitted {len(doc_ids)}-doc batch to {provider_name}: {batch_id}")
return {"status": "submitted", "batch_id": batch_id, "doc_count": len(doc_ids)}
finally:
db.close()
def _submit_openai_batch(db, rows, model: str) -> str:
from openai import OpenAI
client = OpenAI(api_key=settings.OPENAI_API_KEY)
lines = []
for row in rows:
try:
custom_id, system_prompt, prompt = _build_request_data(db, row.doc_id, row.bill_id)
except Exception as exc:
logger.warning(f"Skipping doc {row.doc_id}: {exc}")
continue
lines.append(json.dumps({
"custom_id": custom_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
"response_format": {"type": "json_object"},
"temperature": 0.1,
"max_tokens": MAX_TOKENS_DEFAULT,
},
}))
jsonl_bytes = "\n".join(lines).encode()
file_obj = client.files.create(
file=("batch.jsonl", io.BytesIO(jsonl_bytes), "application/jsonl"),
purpose="batch",
)
batch = client.batches.create(
input_file_id=file_obj.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
def _submit_anthropic_batch(db, rows, model: str) -> str:
import anthropic
client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
requests = []
for row in rows:
try:
custom_id, system_prompt, prompt = _build_request_data(db, row.doc_id, row.bill_id)
except Exception as exc:
logger.warning(f"Skipping doc {row.doc_id}: {exc}")
continue
requests.append({
"custom_id": custom_id,
"params": {
"model": model,
"max_tokens": 4096,
"system": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}],
"messages": [{"role": "user", "content": prompt}],
},
})
batch = client.messages.batches.create(requests=requests)
return batch.id
# ── Poll task ──────────────────────────────────────────────────────────────────
@celery_app.task(bind=True, name="app.workers.llm_batch_processor.poll_llm_batch_results")
def poll_llm_batch_results(self):
"""Check active batch status and import completed results (runs every 30 min via beat)."""
db = get_sync_db()
try:
active_row = db.get(AppSetting, _BATCH_SETTING_KEY)
if not active_row:
return {"status": "no_active_batch"}
try:
state = json.loads(active_row.value)
except Exception:
_clear_batch_state(db)
return {"status": "invalid_state"}
batch_id = state["batch_id"]
provider_name = state["provider"]
model = state["model"]
if provider_name == "openai":
return _poll_openai(db, state, batch_id, model)
elif provider_name == "anthropic":
return _poll_anthropic(db, state, batch_id, model)
else:
_clear_batch_state(db)
return {"status": "unknown_provider"}
finally:
db.close()
# ── Result processing helpers ──────────────────────────────────────────────────
def _save_brief(db, doc_id: int, bill_id: str, brief, brief_type: str, govinfo_url) -> bool:
"""Idempotency check + save. Returns True if saved, False if already exists."""
if db.query(BillBrief).filter_by(document_id=doc_id).first():
return False
db_brief = BillBrief(
bill_id=bill_id,
document_id=doc_id,
brief_type=brief_type,
summary=brief.summary,
key_points=brief.key_points,
risks=brief.risks,
deadlines=brief.deadlines,
topic_tags=brief.topic_tags,
llm_provider=brief.llm_provider,
llm_model=brief.llm_model,
govinfo_url=govinfo_url,
)
db.add(db_brief)
db.commit()
db.refresh(db_brief)
return True
def _emit_notifications_and_news(db, bill_id: str, brief, brief_type: str):
bill = db.get(Bill, bill_id)
if not bill:
return
from app.workers.notification_utils import (
emit_bill_notification,
emit_member_follow_notifications,
emit_topic_follow_notifications,
)
event_type = "new_amendment" if brief_type == "amendment" else "new_document"
emit_bill_notification(db, bill, event_type, brief.summary)
emit_member_follow_notifications(db, bill, event_type, brief.summary)
emit_topic_follow_notifications(db, bill, event_type, brief.summary, brief.topic_tags or [])
from app.workers.news_fetcher import fetch_news_for_bill
fetch_news_for_bill.delay(bill_id)
def _parse_custom_id(custom_id: str) -> tuple[int, str]:
"""Parse 'doc-{doc_id}-{brief_type}' → (doc_id, brief_type)."""
parts = custom_id.split("-")
return int(parts[1]), parts[2]
def _poll_openai(db, state: dict, batch_id: str, model: str) -> dict:
from openai import OpenAI
client = OpenAI(api_key=settings.OPENAI_API_KEY)
batch = client.batches.retrieve(batch_id)
logger.info(f"OpenAI batch {batch_id} status: {batch.status}")
if batch.status in ("failed", "cancelled", "expired"):
_clear_batch_state(db)
return {"status": batch.status}
if batch.status != "completed":
return {"status": "processing", "batch_status": batch.status}
content = client.files.content(batch.output_file_id).read().decode()
saved = failed = 0
for line in content.strip().split("\n"):
if not line.strip():
continue
try:
item = json.loads(line)
custom_id = item["custom_id"]
doc_id, brief_type = _parse_custom_id(custom_id)
if item.get("error"):
logger.warning(f"Batch result error for {custom_id}: {item['error']}")
failed += 1
continue
raw = item["response"]["body"]["choices"][0]["message"]["content"]
brief = parse_brief_json(raw, "openai", model)
doc = db.get(BillDocument, doc_id)
if not doc:
failed += 1
continue
if _save_brief(db, doc_id, doc.bill_id, brief, brief_type, doc.govinfo_url):
_emit_notifications_and_news(db, doc.bill_id, brief, brief_type)
saved += 1
except Exception as exc:
logger.warning(f"Failed to process OpenAI batch result line: {exc}")
failed += 1
_clear_batch_state(db)
logger.info(f"OpenAI batch {batch_id} complete: {saved} saved, {failed} failed")
return {"status": "completed", "saved": saved, "failed": failed}
def _poll_anthropic(db, state: dict, batch_id: str, model: str) -> dict:
import anthropic
client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
batch = client.messages.batches.retrieve(batch_id)
logger.info(f"Anthropic batch {batch_id} processing_status: {batch.processing_status}")
if batch.processing_status != "ended":
return {"status": "processing", "batch_status": batch.processing_status}
saved = failed = 0
for result in client.messages.batches.results(batch_id):
try:
custom_id = result.custom_id
doc_id, brief_type = _parse_custom_id(custom_id)
if result.result.type != "succeeded":
logger.warning(f"Batch result {custom_id} type: {result.result.type}")
failed += 1
continue
raw = result.result.message.content[0].text
brief = parse_brief_json(raw, "anthropic", model)
doc = db.get(BillDocument, doc_id)
if not doc:
failed += 1
continue
if _save_brief(db, doc_id, doc.bill_id, brief, brief_type, doc.govinfo_url):
_emit_notifications_and_news(db, doc.bill_id, brief, brief_type)
saved += 1
except Exception as exc:
logger.warning(f"Failed to process Anthropic batch result: {exc}")
failed += 1
_clear_batch_state(db)
logger.info(f"Anthropic batch {batch_id} complete: {saved} saved, {failed} failed")
return {"status": "completed", "saved": saved, "failed": failed}

View File

@@ -0,0 +1,380 @@
"""
LLM processor — generates AI briefs for fetched bill documents.
Triggered by document_fetcher after successful text retrieval.
"""
import logging
import time
from sqlalchemy import text
from app.config import settings
from app.database import get_sync_db
from app.models import Bill, BillBrief, BillDocument, Member
from app.services.llm_service import RateLimitError, get_llm_provider
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True,
max_retries=8,
rate_limit=f"{settings.LLM_RATE_LIMIT_RPM}/m",
name="app.workers.llm_processor.process_document_with_llm",
)
def process_document_with_llm(self, document_id: int):
"""Generate an AI brief for a bill document. Full brief for first version, amendment brief for subsequent versions."""
db = get_sync_db()
try:
# Idempotency: skip if brief already exists for this document
existing = db.query(BillBrief).filter_by(document_id=document_id).first()
if existing:
return {"status": "already_processed", "brief_id": existing.id}
doc = db.get(BillDocument, document_id)
if not doc or not doc.raw_text:
logger.warning(f"Document {document_id} not found or has no text")
return {"status": "no_document"}
bill = db.get(Bill, doc.bill_id)
if not bill:
return {"status": "no_bill"}
sponsor = db.get(Member, bill.sponsor_id) if bill.sponsor_id else None
bill_metadata = {
"title": bill.title or "Unknown Title",
"sponsor_name": sponsor.name if sponsor else "Unknown",
"party": sponsor.party if sponsor else "Unknown",
"state": sponsor.state if sponsor else "Unknown",
"chamber": bill.chamber or "Unknown",
"introduced_date": str(bill.introduced_date) if bill.introduced_date else "Unknown",
"latest_action_text": bill.latest_action_text or "None",
"latest_action_date": str(bill.latest_action_date) if bill.latest_action_date else "Unknown",
}
# Check if a full brief already exists for this bill (from an earlier document version)
previous_full_brief = (
db.query(BillBrief)
.filter_by(bill_id=doc.bill_id, brief_type="full")
.order_by(BillBrief.created_at.desc())
.first()
)
from app.models.setting import AppSetting
prov_row = db.get(AppSetting, "llm_provider")
model_row = db.get(AppSetting, "llm_model")
provider = get_llm_provider(
prov_row.value if prov_row else None,
model_row.value if model_row else None,
)
if previous_full_brief and previous_full_brief.document_id:
# New version of a bill we've already analyzed — generate amendment brief
previous_doc = db.get(BillDocument, previous_full_brief.document_id)
if previous_doc and previous_doc.raw_text:
logger.info(f"Generating amendment brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_amendment_brief(doc.raw_text, previous_doc.raw_text, bill_metadata)
brief_type = "amendment"
else:
logger.info(f"Previous document unavailable, generating full brief for document {document_id}")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
else:
logger.info(f"Generating full brief for document {document_id} (bill {doc.bill_id})")
brief = provider.generate_brief(doc.raw_text, bill_metadata)
brief_type = "full"
db_brief = BillBrief(
bill_id=doc.bill_id,
document_id=document_id,
brief_type=brief_type,
summary=brief.summary,
key_points=brief.key_points,
risks=brief.risks,
deadlines=brief.deadlines,
topic_tags=brief.topic_tags,
llm_provider=brief.llm_provider,
llm_model=brief.llm_model,
govinfo_url=doc.govinfo_url,
)
db.add(db_brief)
db.commit()
db.refresh(db_brief)
logger.info(f"{brief_type.capitalize()} brief {db_brief.id} created for bill {doc.bill_id} using {brief.llm_provider}/{brief.llm_model}")
# Emit notification events for bill followers, sponsor followers, and topic followers
from app.workers.notification_utils import (
emit_bill_notification,
emit_member_follow_notifications,
emit_topic_follow_notifications,
)
event_type = "new_amendment" if brief_type == "amendment" else "new_document"
emit_bill_notification(db, bill, event_type, brief.summary)
emit_member_follow_notifications(db, bill, event_type, brief.summary)
emit_topic_follow_notifications(db, bill, event_type, brief.summary, brief.topic_tags or [])
# Trigger news fetch now that we have topic tags
from app.workers.news_fetcher import fetch_news_for_bill
fetch_news_for_bill.delay(doc.bill_id)
# Classify bill as substantive / commemorative / administrative
from app.workers.bill_classifier import classify_bill_category
classify_bill_category.delay(doc.bill_id, document_id)
return {"status": "ok", "brief_id": db_brief.id, "brief_type": brief_type}
except RateLimitError as exc:
db.rollback()
logger.warning(f"LLM rate limit hit ({exc.provider}); retrying in {exc.retry_after}s")
raise self.retry(exc=exc, countdown=exc.retry_after)
except Exception as exc:
db.rollback()
logger.error(f"LLM processing failed for document {document_id}: {exc}")
raise self.retry(exc=exc, countdown=300) # 5 min backoff for other failures
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_citations")
def backfill_brief_citations(self):
"""
Find briefs generated before citation support was added (key_points contains plain
strings instead of {text, citation, quote} objects), delete them, and re-queue
LLM processing against the already-stored document text.
No Congress.gov or GovInfo calls — only LLM calls.
"""
db = get_sync_db()
try:
uncited = db.execute(text("""
SELECT id, document_id, bill_id
FROM bill_briefs
WHERE key_points IS NOT NULL
AND jsonb_array_length(key_points) > 0
AND jsonb_typeof(key_points->0) = 'string'
""")).fetchall()
total = len(uncited)
queued = 0
skipped = 0
for row in uncited:
if not row.document_id:
skipped += 1
continue
# Confirm the document still has text before deleting the brief
doc = db.get(BillDocument, row.document_id)
if not doc or not doc.raw_text:
skipped += 1
continue
brief = db.get(BillBrief, row.id)
if brief:
db.delete(brief)
db.commit()
process_document_with_llm.delay(row.document_id)
queued += 1
time.sleep(0.1) # Avoid burst-queuing all LLM tasks at once
logger.info(
f"backfill_brief_citations: {total} uncited briefs found, "
f"{queued} re-queued, {skipped} skipped (no document text)"
)
return {"total": total, "queued": queued, "skipped": skipped}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.llm_processor.backfill_brief_labels")
def backfill_brief_labels(self):
"""
Add fact/inference labels to existing cited brief points without re-generating them.
Sends one compact classification call per brief (all unlabeled points batched).
Skips briefs already fully labeled and plain-string points (no quote to classify).
"""
import json
from sqlalchemy.orm.attributes import flag_modified
from app.models.setting import AppSetting
db = get_sync_db()
try:
# Step 1: Bulk auto-label quoteless unlabeled points as "inference" via raw SQL.
# This runs before any ORM objects are loaded so the session identity map cannot
# interfere with the commit (the classic "ORM flush overwrites raw UPDATE" trap).
_BULK_AUTO_LABEL = """
UPDATE bill_briefs SET {col} = (
SELECT jsonb_agg(
CASE
WHEN jsonb_typeof(p) = 'object'
AND (p->>'label') IS NULL
AND (p->>'quote') IS NULL
THEN p || '{{"label":"inference"}}'
ELSE p
END
)
FROM jsonb_array_elements({col}) AS p
)
WHERE {col} IS NOT NULL AND EXISTS (
SELECT 1 FROM jsonb_array_elements({col}) AS p
WHERE jsonb_typeof(p) = 'object'
AND (p->>'label') IS NULL
AND (p->>'quote') IS NULL
)
"""
auto_rows = 0
for col in ("key_points", "risks"):
result = db.execute(text(_BULK_AUTO_LABEL.format(col=col)))
auto_rows += result.rowcount
db.commit()
logger.info(f"backfill_brief_labels: bulk auto-labeled {auto_rows} rows (quoteless → inference)")
# Step 2: Find briefs that still have unlabeled points (must have quotes → need LLM).
unlabeled_ids = db.execute(text("""
SELECT id FROM bill_briefs
WHERE (
key_points IS NOT NULL AND EXISTS (
SELECT 1 FROM jsonb_array_elements(key_points) AS p
WHERE jsonb_typeof(p) = 'object' AND (p->>'label') IS NULL
)
) OR (
risks IS NOT NULL AND EXISTS (
SELECT 1 FROM jsonb_array_elements(risks) AS r
WHERE jsonb_typeof(r) = 'object' AND (r->>'label') IS NULL
)
)
""")).fetchall()
total = len(unlabeled_ids)
updated = 0
skipped = 0
prov_row = db.get(AppSetting, "llm_provider")
model_row = db.get(AppSetting, "llm_model")
provider = get_llm_provider(
prov_row.value if prov_row else None,
model_row.value if model_row else None,
)
for row in unlabeled_ids:
brief = db.get(BillBrief, row.id)
if not brief:
skipped += 1
continue
# Only points with a quote can be LLM-classified as cited_fact vs inference
to_classify: list[tuple[str, int, dict]] = []
for field_name in ("key_points", "risks"):
for i, p in enumerate(getattr(brief, field_name) or []):
if isinstance(p, dict) and p.get("label") is None and p.get("quote"):
to_classify.append((field_name, i, p))
if not to_classify:
skipped += 1
continue
lines = [
f'{i + 1}. TEXT: "{p["text"]}" | QUOTE: "{p.get("quote", "")}"'
for i, (_, __, p) in enumerate(to_classify)
]
prompt = (
"Classify each item as 'cited_fact' or 'inference'.\n"
"cited_fact = the claim is explicitly and directly stated in the quoted text.\n"
"inference = analytical interpretation, projection, or implication not literally stated.\n\n"
"Return ONLY a JSON array of strings, one per item, in order. No explanation.\n\n"
"Items:\n" + "\n".join(lines)
)
try:
raw = provider.generate_text(prompt).strip()
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
labels = json.loads(raw.strip())
if not isinstance(labels, list) or len(labels) != len(to_classify):
logger.warning(f"Brief {brief.id}: label count mismatch, skipping")
skipped += 1
continue
except Exception as exc:
logger.warning(f"Brief {brief.id}: classification failed: {exc}")
skipped += 1
time.sleep(0.5)
continue
fields_modified: set[str] = set()
for (field_name, point_idx, _), label in zip(to_classify, labels):
if label in ("cited_fact", "inference"):
getattr(brief, field_name)[point_idx]["label"] = label
fields_modified.add(field_name)
for field_name in fields_modified:
flag_modified(brief, field_name)
db.commit()
updated += 1
time.sleep(0.2)
logger.info(
f"backfill_brief_labels: {total} briefs needing LLM, "
f"{updated} updated, {skipped} skipped"
)
return {"auto_labeled_rows": auto_rows, "total_llm": total, "updated": updated, "skipped": skipped}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.llm_processor.resume_pending_analysis")
def resume_pending_analysis(self):
"""
Two-pass backfill for bills missing analysis:
Pass 1 — Documents with no brief (LLM tasks failed/timed out):
Find BillDocuments that have raw_text but no BillBrief, re-queue LLM.
Pass 2 — Bills with no document at all:
Find Bills with no BillDocument, re-queue document fetch (which will
then chain into LLM if text is available on GovInfo).
"""
db = get_sync_db()
try:
# Pass 1: docs with raw_text but no brief
docs_no_brief = db.execute(text("""
SELECT bd.id
FROM bill_documents bd
LEFT JOIN bill_briefs bb ON bb.document_id = bd.id
WHERE bb.id IS NULL AND bd.raw_text IS NOT NULL
""")).fetchall()
queued_llm = 0
for row in docs_no_brief:
process_document_with_llm.delay(row.id)
queued_llm += 1
time.sleep(0.1)
# Pass 2: bills with no document at all
bills_no_doc = db.execute(text("""
SELECT b.bill_id
FROM bills b
LEFT JOIN bill_documents bd ON bd.bill_id = b.bill_id
WHERE bd.id IS NULL
""")).fetchall()
queued_fetch = 0
from app.workers.document_fetcher import fetch_bill_documents
for row in bills_no_doc:
fetch_bill_documents.delay(row.bill_id)
queued_fetch += 1
time.sleep(0.1)
logger.info(
f"resume_pending_analysis: {queued_llm} LLM tasks queued, "
f"{queued_fetch} document fetch tasks queued"
)
return {"queued_llm": queued_llm, "queued_fetch": queued_fetch}
finally:
db.close()

View File

@@ -0,0 +1,252 @@
"""
Member interest worker — tracks public interest in members of Congress.
Fetches news articles and calculates trend scores for members using the
same composite scoring model as bills (NewsAPI + Google News RSS + pytrends).
Runs on a schedule and can also be triggered per-member.
"""
import logging
from datetime import date, datetime, timedelta, timezone
from app.database import get_sync_db
from app.models import Member, MemberNewsArticle, MemberTrendScore
from app.services import news_service, trends_service
from app.workers.celery_app import celery_app
from app.workers.trend_scorer import calculate_composite_score
logger = logging.getLogger(__name__)
def _parse_pub_at(raw: str | None) -> datetime | None:
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except Exception:
return None
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.sync_member_interest")
def sync_member_interest(self, bioguide_id: str):
"""
Fetch news and score a member in a single API pass.
Called on first profile view — avoids the 2x NewsAPI + GNews calls that
result from queuing fetch_member_news and calculate_member_trend_score separately.
"""
db = get_sync_db()
try:
member = db.get(Member, bioguide_id)
if not member or not member.first_name or not member.last_name:
return {"status": "skipped"}
query = news_service.build_member_query(
first_name=member.first_name,
last_name=member.last_name,
chamber=member.chamber,
)
# Single fetch — results reused for both article storage and scoring
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
gnews_articles = news_service.fetch_gnews_articles(query, days=30)
all_articles = newsapi_articles + gnews_articles
saved = 0
for article in all_articles:
url = article.get("url")
if not url:
continue
existing = (
db.query(MemberNewsArticle)
.filter_by(member_id=bioguide_id, url=url)
.first()
)
if existing:
continue
db.add(MemberNewsArticle(
member_id=bioguide_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=_parse_pub_at(article.get("published_at")),
relevance_score=1.0,
))
saved += 1
# Score using counts already in hand — no second API round-trip
today = date.today()
if not db.query(MemberTrendScore).filter_by(member_id=bioguide_id, score_date=today).first():
keywords = trends_service.keywords_for_member(member.first_name, member.last_name)
gtrends_score = trends_service.get_trends_score(keywords)
composite = calculate_composite_score(
len(newsapi_articles), len(gnews_articles), gtrends_score
)
db.add(MemberTrendScore(
member_id=bioguide_id,
score_date=today,
newsapi_count=len(newsapi_articles),
gnews_count=len(gnews_articles),
gtrends_score=gtrends_score,
composite_score=composite,
))
db.commit()
logger.info(f"Synced member interest for {bioguide_id}: {saved} articles saved")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"Member interest sync failed for {bioguide_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, max_retries=2, name="app.workers.member_interest.fetch_member_news")
def fetch_member_news(self, bioguide_id: str):
"""Fetch and store recent news articles for a specific member."""
db = get_sync_db()
try:
member = db.get(Member, bioguide_id)
if not member or not member.first_name or not member.last_name:
return {"status": "skipped"}
query = news_service.build_member_query(
first_name=member.first_name,
last_name=member.last_name,
chamber=member.chamber,
)
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
gnews_articles = news_service.fetch_gnews_articles(query, days=30)
all_articles = newsapi_articles + gnews_articles
saved = 0
for article in all_articles:
url = article.get("url")
if not url:
continue
existing = (
db.query(MemberNewsArticle)
.filter_by(member_id=bioguide_id, url=url)
.first()
)
if existing:
continue
db.add(MemberNewsArticle(
member_id=bioguide_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=_parse_pub_at(article.get("published_at")),
relevance_score=1.0,
))
saved += 1
db.commit()
logger.info(f"Saved {saved} news articles for member {bioguide_id}")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"Member news fetch failed for {bioguide_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.member_interest.calculate_member_trend_score")
def calculate_member_trend_score(self, bioguide_id: str):
"""Calculate and store today's public interest score for a member."""
db = get_sync_db()
try:
member = db.get(Member, bioguide_id)
if not member or not member.first_name or not member.last_name:
return {"status": "skipped"}
today = date.today()
existing = (
db.query(MemberTrendScore)
.filter_by(member_id=bioguide_id, score_date=today)
.first()
)
if existing:
return {"status": "already_scored"}
query = news_service.build_member_query(
first_name=member.first_name,
last_name=member.last_name,
chamber=member.chamber,
)
keywords = trends_service.keywords_for_member(member.first_name, member.last_name)
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
newsapi_count = len(newsapi_articles)
gnews_count = news_service.fetch_gnews_count(query, days=30)
gtrends_score = trends_service.get_trends_score(keywords)
composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score)
db.add(MemberTrendScore(
member_id=bioguide_id,
score_date=today,
newsapi_count=newsapi_count,
gnews_count=gnews_count,
gtrends_score=gtrends_score,
composite_score=composite,
))
db.commit()
logger.info(f"Scored member {bioguide_id}: composite={composite:.1f}")
return {"status": "ok", "composite": composite}
except Exception as exc:
db.rollback()
logger.error(f"Member trend scoring failed for {bioguide_id}: {exc}")
raise
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.member_interest.fetch_news_for_active_members")
def fetch_news_for_active_members(self):
"""
Scheduled task: fetch news for members who have been viewed or followed.
Prioritises members with detail_fetched set (profile has been viewed).
"""
db = get_sync_db()
try:
members = (
db.query(Member)
.filter(Member.detail_fetched.isnot(None))
.filter(Member.first_name.isnot(None))
.all()
)
for member in members:
fetch_member_news.delay(member.bioguide_id)
logger.info(f"Queued news fetch for {len(members)} members")
return {"queued": len(members)}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.member_interest.calculate_all_member_trend_scores")
def calculate_all_member_trend_scores(self):
"""
Scheduled nightly task: score all members that have been viewed.
Members are scored only after their profile has been loaded at least once.
"""
db = get_sync_db()
try:
members = (
db.query(Member)
.filter(Member.detail_fetched.isnot(None))
.filter(Member.first_name.isnot(None))
.all()
)
for member in members:
calculate_member_trend_score.delay(member.bioguide_id)
logger.info(f"Queued trend scoring for {len(members)} members")
return {"queued": len(members)}
finally:
db.close()

View File

@@ -0,0 +1,159 @@
"""
News fetcher — correlates bills with news articles.
Triggered after LLM brief creation and on a 6-hour schedule for active bills.
"""
import logging
from datetime import date, datetime, timedelta, timezone
from sqlalchemy import and_
from app.database import get_sync_db
from app.models import Bill, BillBrief, NewsArticle
from app.services import news_service
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
def _save_articles(db, bill_id: str, articles: list[dict]) -> int:
"""Persist a list of article dicts for a bill, skipping duplicates. Returns saved count."""
saved = 0
for article in articles:
url = article.get("url")
if not url:
continue
existing = db.query(NewsArticle).filter_by(bill_id=bill_id, url=url).first()
if existing:
continue
pub_at = None
if article.get("published_at"):
try:
pub_at = datetime.fromisoformat(article["published_at"].replace("Z", "+00:00"))
except Exception:
pass
db.add(NewsArticle(
bill_id=bill_id,
source=article.get("source", "")[:200],
headline=article.get("headline", ""),
url=url,
published_at=pub_at,
relevance_score=1.0,
))
saved += 1
return saved
@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill")
def fetch_news_for_bill(self, bill_id: str):
"""Fetch news articles for a specific bill."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"status": "not_found"}
query = news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
)
newsapi_articles = news_service.fetch_newsapi_articles(query)
gnews_articles = news_service.fetch_gnews_articles(query)
saved = _save_articles(db, bill_id, newsapi_articles + gnews_articles)
db.commit()
logger.info(f"Saved {saved} news articles for bill {bill_id}")
return {"status": "ok", "saved": saved}
except Exception as exc:
db.rollback()
logger.error(f"News fetch failed for {bill_id}: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, max_retries=2, name="app.workers.news_fetcher.fetch_news_for_bill_batch")
def fetch_news_for_bill_batch(self, bill_ids: list):
"""
Fetch news for a batch of bills in ONE NewsAPI call using OR query syntax
(up to NEWSAPI_BATCH_SIZE bills per call). Google News is fetched per-bill
but served from the 2-hour Redis cache so the RSS is only hit once per query.
"""
db = get_sync_db()
try:
bills = [db.get(Bill, bid) for bid in bill_ids]
bills = [b for b in bills if b]
if not bills:
return {"status": "no_bills"}
# Build (bill_id, query) pairs for the batch NewsAPI call
bill_queries = [
(
bill.bill_id,
news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
),
)
for bill in bills
]
# One NewsAPI call for the whole batch
newsapi_batch = news_service.fetch_newsapi_articles_batch(bill_queries)
total_saved = 0
for bill in bills:
query = next(q for bid, q in bill_queries if bid == bill.bill_id)
newsapi_articles = newsapi_batch.get(bill.bill_id, [])
# Google News is cached — fine to call per-bill (cache hit after first)
gnews_articles = news_service.fetch_gnews_articles(query)
total_saved += _save_articles(db, bill.bill_id, newsapi_articles + gnews_articles)
db.commit()
logger.info(f"Batch saved {total_saved} articles for {len(bills)} bills")
return {"status": "ok", "bills": len(bills), "saved": total_saved}
except Exception as exc:
db.rollback()
logger.error(f"Batch news fetch failed: {exc}")
raise self.retry(exc=exc, countdown=300)
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.news_fetcher.fetch_news_for_active_bills")
def fetch_news_for_active_bills(self):
"""
Scheduled task: fetch news for bills with recent actions (last 7 days).
Groups bills into batches of NEWSAPI_BATCH_SIZE to multiply effective quota.
"""
db = get_sync_db()
try:
cutoff = date.today() - timedelta(days=7)
active_bills = (
db.query(Bill)
.filter(Bill.latest_action_date >= cutoff)
.order_by(Bill.latest_action_date.desc())
.limit(80)
.all()
)
bill_ids = [b.bill_id for b in active_bills]
batch_size = news_service.NEWSAPI_BATCH_SIZE
batches = [bill_ids[i:i + batch_size] for i in range(0, len(bill_ids), batch_size)]
for batch in batches:
fetch_news_for_bill_batch.delay(batch)
logger.info(
f"Queued {len(batches)} news batches for {len(active_bills)} active bills "
f"({batch_size} bills/batch)"
)
return {"queued_batches": len(batches), "total_bills": len(active_bills)}
finally:
db.close()

View File

@@ -0,0 +1,572 @@
"""
Notification dispatcher — sends pending notification events via ntfy.
RSS is pull-based so no dispatch is needed for it; events are simply
marked dispatched once ntfy is sent (or immediately if the user has no
ntfy configured but has an RSS token, so the feed can clean up old items).
Runs every 5 minutes on Celery Beat.
"""
import base64
import logging
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import requests
from app.core.crypto import decrypt_secret
from app.database import get_sync_db
from app.models.follow import Follow
from app.models.notification import NotificationEvent
from app.models.user import User
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 10
_EVENT_TITLES = {
"new_document": "New Bill Text",
"new_amendment": "Amendment Filed",
"bill_updated": "Bill Updated",
"weekly_digest": "Weekly Digest",
}
_EVENT_TAGS = {
"new_document": "page_facing_up",
"new_amendment": "memo",
"bill_updated": "rotating_light",
}
# Milestone events are more urgent than LLM brief events
_EVENT_PRIORITY = {
"bill_updated": "high",
"new_document": "default",
"new_amendment": "default",
}
_FILTER_DEFAULTS = {
"new_document": False, "new_amendment": False, "vote": False,
"presidential": False, "committee_report": False, "calendar": False,
"procedural": False, "referral": False,
}
def _should_dispatch(event, prefs: dict, follow_mode: str = "neutral") -> bool:
payload = event.payload or {}
source = payload.get("source", "bill_follow")
# Map event type directly for document events
if event.event_type == "new_document":
key = "new_document"
elif event.event_type == "new_amendment":
key = "new_amendment"
else:
# Use action_category if present (new events), fall back from milestone_tier (old events)
key = payload.get("action_category")
if not key:
key = "referral" if payload.get("milestone_tier") == "referral" else "vote"
all_filters = prefs.get("alert_filters")
if all_filters is None:
return True # user hasn't configured filters yet — send everything
if source in ("member_follow", "topic_follow"):
source_filters = all_filters.get(source)
if source_filters is None:
return True # section not configured — send everything
if not source_filters.get("enabled", True):
return False # master toggle off
# Per-entity mute checks
if source == "member_follow":
muted_ids = source_filters.get("muted_ids") or []
if payload.get("matched_member_id") in muted_ids:
return False
if source == "topic_follow":
muted_tags = source_filters.get("muted_tags") or []
if payload.get("matched_topic") in muted_tags:
return False
return bool(source_filters.get(key, _FILTER_DEFAULTS.get(key, True)))
# Bill follow — use follow mode filters (existing behaviour)
mode_filters = all_filters.get(follow_mode) or {}
return bool(mode_filters.get(key, _FILTER_DEFAULTS.get(key, True)))
def _in_quiet_hours(prefs: dict, now: datetime) -> bool:
"""Return True if the current local time falls within the user's quiet window.
Quiet hours are stored as local-time hour integers. If the user has a stored
IANA timezone name we convert ``now`` (UTC) to that zone before comparing.
Falls back to UTC if the timezone is absent or unrecognised.
"""
start = prefs.get("quiet_hours_start")
end = prefs.get("quiet_hours_end")
if start is None or end is None:
return False
tz_name = prefs.get("timezone")
if tz_name:
try:
from zoneinfo import ZoneInfo
h = now.astimezone(ZoneInfo(tz_name)).hour
except Exception:
h = now.hour # unrecognised timezone — degrade gracefully to UTC
else:
h = now.hour
if start <= end:
return start <= h < end
# Wraps midnight (e.g. 22 → 8)
return h >= start or h < end
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.dispatch_notifications")
def dispatch_notifications(self):
"""Fan out pending notification events to ntfy and mark dispatched."""
db = get_sync_db()
try:
pending = (
db.query(NotificationEvent)
.filter(NotificationEvent.dispatched_at.is_(None))
.order_by(NotificationEvent.created_at)
.limit(200)
.all()
)
sent = 0
failed = 0
held = 0
now = datetime.now(timezone.utc)
for event in pending:
user = db.get(User, event.user_id)
if not user:
event.dispatched_at = now
db.commit()
continue
# Look up follow mode for this (user, bill) pair
follow = db.query(Follow).filter_by(
user_id=event.user_id, follow_type="bill", follow_value=event.bill_id
).first()
follow_mode = follow.follow_mode if follow else "neutral"
prefs = user.notification_prefs or {}
if not _should_dispatch(event, prefs, follow_mode):
event.dispatched_at = now
db.commit()
continue
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
ntfy_auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = decrypt_secret(prefs.get("ntfy_password", "").strip())
ntfy_enabled = prefs.get("ntfy_enabled", False)
rss_enabled = prefs.get("rss_enabled", False)
digest_enabled = prefs.get("digest_enabled", False)
ntfy_configured = ntfy_enabled and bool(ntfy_url)
# Hold events when ntfy is configured but delivery should be deferred
in_quiet = _in_quiet_hours(prefs, now) if ntfy_configured else False
hold = ntfy_configured and (in_quiet or digest_enabled)
if hold:
held += 1
continue # Leave undispatched — digest task or next run after quiet hours
if ntfy_configured:
try:
_send_ntfy(
event, ntfy_url, ntfy_auth_method, ntfy_token,
ntfy_username, ntfy_password, follow_mode=follow_mode,
)
sent += 1
except Exception as e:
logger.warning(f"ntfy dispatch failed for event {event.id}: {e}")
failed += 1
email_enabled = prefs.get("email_enabled", False)
email_address = prefs.get("email_address", "").strip()
if email_enabled and email_address:
try:
_send_email(event, email_address, unsubscribe_token=user.email_unsubscribe_token)
sent += 1
except Exception as e:
logger.warning(f"email dispatch failed for event {event.id}: {e}")
failed += 1
# Mark dispatched: channels were attempted, or user has no channels configured (RSS-only)
event.dispatched_at = now
db.commit()
logger.info(
f"dispatch_notifications: {sent} sent, {failed} failed, "
f"{held} held (quiet hours/digest), {len(pending)} total pending"
)
return {"sent": sent, "failed": failed, "held": held, "total": len(pending)}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_notification_digest")
def send_notification_digest(self):
"""
Send a bundled ntfy digest for users with digest mode enabled.
Runs daily; weekly-frequency users only receive on Mondays.
"""
db = get_sync_db()
try:
now = datetime.now(timezone.utc)
users = db.query(User).all()
digest_users = [
u for u in users
if (u.notification_prefs or {}).get("digest_enabled", False)
and (u.notification_prefs or {}).get("ntfy_enabled", False)
and (u.notification_prefs or {}).get("ntfy_topic_url", "").strip()
]
sent = 0
for user in digest_users:
prefs = user.notification_prefs or {}
frequency = prefs.get("digest_frequency", "daily")
# Weekly digests only fire on Mondays (weekday 0)
if frequency == "weekly" and now.weekday() != 0:
continue
lookback_hours = 168 if frequency == "weekly" else 24
cutoff = now - timedelta(hours=lookback_hours)
events = (
db.query(NotificationEvent)
.filter_by(user_id=user.id)
.filter(
NotificationEvent.dispatched_at.is_(None),
NotificationEvent.created_at > cutoff,
)
.order_by(NotificationEvent.created_at.desc())
.all()
)
if not events:
continue
try:
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
_send_digest_ntfy(events, ntfy_url, prefs)
for event in events:
event.dispatched_at = now
db.commit()
sent += 1
except Exception as e:
logger.warning(f"Digest send failed for user {user.id}: {e}")
logger.info(f"send_notification_digest: digests sent to {sent} users")
return {"sent": sent}
finally:
db.close()
def _build_reason(payload: dict) -> str | None:
source = payload.get("source", "bill_follow")
mode_labels = {"pocket_veto": "Pocket Veto", "pocket_boost": "Pocket Boost", "neutral": "Following"}
if source == "bill_follow":
mode = payload.get("follow_mode", "neutral")
return f"\U0001f4cc {mode_labels.get(mode, 'Following')} this bill"
if source == "member_follow":
name = payload.get("matched_member_name")
return f"\U0001f464 You follow {name}" if name else "\U0001f464 Member you follow"
if source == "topic_follow":
topic = payload.get("matched_topic")
return f"\U0001f3f7 You follow \"{topic}\"" if topic else "\U0001f3f7 Topic you follow"
return None
def _send_email(
event: NotificationEvent,
email_address: str,
unsubscribe_token: str | None = None,
) -> None:
"""Send a plain-text email notification via SMTP."""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from app.config import settings as app_settings
if not app_settings.SMTP_HOST or not email_address:
return
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
subject = f"PocketVeto: {event_label}{bill_label}"
lines = [f"{event_label}: {bill_label}"]
if bill_title:
lines.append(bill_title)
if payload.get("brief_summary"):
lines.append("")
lines.append(payload["brief_summary"][:500])
reason = _build_reason(payload)
if reason:
lines.append("")
lines.append(reason)
if payload.get("bill_url"):
lines.append("")
lines.append(f"View bill: {payload['bill_url']}")
unsubscribe_url = f"{base_url}/api/notifications/unsubscribe/{unsubscribe_token}" if unsubscribe_token else None
if unsubscribe_url:
lines.append("")
lines.append(f"Unsubscribe from email alerts: {unsubscribe_url}")
body = "\n".join(lines)
from_addr = app_settings.SMTP_FROM or app_settings.SMTP_USER
msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = from_addr
msg["To"] = email_address
if unsubscribe_url:
msg["List-Unsubscribe"] = f"<{unsubscribe_url}>"
msg["List-Unsubscribe-Post"] = "List-Unsubscribe=One-Click"
msg.attach(MIMEText(body, "plain", "utf-8"))
use_ssl = app_settings.SMTP_PORT == 465
if use_ssl:
smtp_ctx = smtplib.SMTP_SSL(app_settings.SMTP_HOST, app_settings.SMTP_PORT, timeout=10)
else:
smtp_ctx = smtplib.SMTP(app_settings.SMTP_HOST, app_settings.SMTP_PORT, timeout=10)
with smtp_ctx as s:
if not use_ssl and app_settings.SMTP_STARTTLS:
s.starttls()
if app_settings.SMTP_USER:
s.login(app_settings.SMTP_USER, app_settings.SMTP_PASSWORD)
s.sendmail(from_addr, [email_address], msg.as_string())
def _send_ntfy(
event: NotificationEvent,
topic_url: str,
auth_method: str = "none",
token: str = "",
username: str = "",
password: str = "",
follow_mode: str = "neutral",
) -> None:
payload = event.payload or {}
bill_label = payload.get("bill_label", event.bill_id.upper())
bill_title = payload.get("bill_title", "")
event_label = _EVENT_TITLES.get(event.event_type, "Bill Update")
title = f"{event_label}: {bill_label}"
lines = [bill_title] if bill_title else []
if payload.get("brief_summary"):
lines.append("")
lines.append(payload["brief_summary"][:300])
reason = _build_reason(payload)
if reason:
lines.append("")
lines.append(reason)
message = "\n".join(lines) or bill_label
headers = {
"Title": title,
"Priority": _EVENT_PRIORITY.get(event.event_type, "default"),
"Tags": _EVENT_TAGS.get(event.event_type, "bell"),
}
if payload.get("bill_url"):
headers["Click"] = payload["bill_url"]
if follow_mode == "pocket_boost":
headers["Actions"] = (
f"view, View Bill, {payload.get('bill_url', '')}; "
"view, Find Your Rep, https://www.house.gov/representatives/find-your-representative"
)
if auth_method == "token" and token:
headers["Authorization"] = f"Bearer {token}"
elif auth_method == "basic" and username:
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(topic_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_weekly_digest")
def send_weekly_digest(self):
"""
Proactive week-in-review summary for followed bills.
Runs every Monday at 8:30 AM UTC. Queries bills followed by each user
for any activity in the past 7 days and dispatches a low-noise summary
via ntfy and/or creates a NotificationEvent for the RSS feed.
Unlike send_notification_digest (which bundles queued events), this task
generates a fresh summary independent of the notification event queue.
"""
from app.config import settings as app_settings
from app.models.bill import Bill
db = get_sync_db()
try:
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=7)
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
users = db.query(User).all()
ntfy_sent = 0
rss_created = 0
for user in users:
prefs = user.notification_prefs or {}
ntfy_enabled = prefs.get("ntfy_enabled", False)
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
rss_enabled = prefs.get("rss_enabled", False)
ntfy_configured = ntfy_enabled and bool(ntfy_url)
if not ntfy_configured and not rss_enabled:
continue
bill_follows = db.query(Follow).filter_by(
user_id=user.id, follow_type="bill"
).all()
if not bill_follows:
continue
bill_ids = [f.follow_value for f in bill_follows]
active_bills = (
db.query(Bill)
.filter(
Bill.bill_id.in_(bill_ids),
Bill.updated_at >= cutoff,
)
.order_by(Bill.updated_at.desc())
.limit(20)
.all()
)
if not active_bills:
continue
count = len(active_bills)
anchor = active_bills[0]
summary_lines = []
for bill in active_bills[:10]:
lbl = _format_bill_label(bill)
action = (bill.latest_action_text or "")[:80]
summary_lines.append(f"{lbl}: {action}" if action else f"{lbl}")
if count > 10:
summary_lines.append(f" …and {count - 10} more")
summary = "\n".join(summary_lines)
# Mark dispatched_at immediately so dispatch_notifications skips this event;
# it still appears in the RSS feed since that endpoint reads all events.
event = NotificationEvent(
user_id=user.id,
bill_id=anchor.bill_id,
event_type="weekly_digest",
dispatched_at=now,
payload={
"bill_label": "Weekly Digest",
"bill_title": f"{count} followed bill{'s' if count != 1 else ''} had activity this week",
"brief_summary": summary,
"bill_count": count,
"bill_url": f"{base_url}/bills/{anchor.bill_id}",
},
)
db.add(event)
rss_created += 1
if ntfy_configured:
try:
_send_weekly_digest_ntfy(count, summary, ntfy_url, prefs)
ntfy_sent += 1
except Exception as e:
logger.warning(f"Weekly digest ntfy failed for user {user.id}: {e}")
db.commit()
logger.info(f"send_weekly_digest: {ntfy_sent} ntfy sent, {rss_created} events created")
return {"ntfy_sent": ntfy_sent, "rss_created": rss_created}
finally:
db.close()
def _format_bill_label(bill) -> str:
_TYPE_MAP = {
"hr": "H.R.", "s": "S.", "hjres": "H.J.Res.", "sjres": "S.J.Res.",
"hconres": "H.Con.Res.", "sconres": "S.Con.Res.", "hres": "H.Res.", "sres": "S.Res.",
}
prefix = _TYPE_MAP.get(bill.bill_type.lower(), bill.bill_type.upper())
return f"{prefix} {bill.bill_number}"
def _send_weekly_digest_ntfy(count: int, summary: str, ntfy_url: str, prefs: dict) -> None:
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
headers = {
"Title": f"PocketVeto Weekly — {count} bill{'s' if count != 1 else ''} updated",
"Priority": "low",
"Tags": "newspaper,calendar",
}
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
resp = requests.post(ntfy_url, data=summary.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()
def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None:
auth_method = prefs.get("ntfy_auth_method", "none")
ntfy_token = prefs.get("ntfy_token", "").strip()
ntfy_username = prefs.get("ntfy_username", "").strip()
ntfy_password = prefs.get("ntfy_password", "").strip()
headers = {
"Title": f"PocketVeto Digest — {len(events)} update{'s' if len(events) != 1 else ''}",
"Priority": "default",
"Tags": "newspaper",
}
if auth_method == "token" and ntfy_token:
headers["Authorization"] = f"Bearer {ntfy_token}"
elif auth_method == "basic" and ntfy_username:
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
# Group by bill, show up to 10
by_bill: dict = defaultdict(list)
for event in events:
by_bill[event.bill_id].append(event)
lines = []
for bill_id, bill_events in list(by_bill.items())[:10]:
payload = bill_events[0].payload or {}
bill_label = payload.get("bill_label", bill_id.upper())
event_labels = list({_EVENT_TITLES.get(e.event_type, "Update") for e in bill_events})
lines.append(f"{bill_label}: {', '.join(event_labels)}")
if len(by_bill) > 10:
lines.append(f" …and {len(by_bill) - 10} more bills")
message = "\n".join(lines)
resp = requests.post(ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
resp.raise_for_status()

View File

@@ -0,0 +1,164 @@
"""
Shared notification utilities — used by llm_processor, congress_poller, etc.
Centralised here to avoid circular imports.
"""
from datetime import datetime, timedelta, timezone
_VOTE_KW = ["passed", "failed", "agreed to", "roll call"]
_PRES_KW = ["signed", "vetoed", "enacted", "presented to the president"]
_COMMITTEE_KW = ["markup", "ordered to be reported", "ordered reported", "reported by", "discharged"]
_CALENDAR_KW = ["placed on"]
_PROCEDURAL_KW = ["cloture", "conference"]
_REFERRAL_KW = ["referred to"]
# Events created within this window for the same (user, bill, event_type) are suppressed
_DEDUP_MINUTES = 30
def categorize_action(action_text: str) -> str | None:
"""Return the action category string, or None if not notification-worthy."""
t = (action_text or "").lower()
if any(kw in t for kw in _VOTE_KW): return "vote"
if any(kw in t for kw in _PRES_KW): return "presidential"
if any(kw in t for kw in _COMMITTEE_KW): return "committee_report"
if any(kw in t for kw in _CALENDAR_KW): return "calendar"
if any(kw in t for kw in _PROCEDURAL_KW): return "procedural"
if any(kw in t for kw in _REFERRAL_KW): return "referral"
return None
def _build_payload(
bill, action_summary: str, action_category: str, source: str = "bill_follow"
) -> dict:
from app.config import settings
base_url = (settings.PUBLIC_URL or settings.LOCAL_URL).rstrip("/")
return {
"bill_title": bill.short_title or bill.title or "",
"bill_label": f"{bill.bill_type.upper()} {bill.bill_number}",
"brief_summary": (action_summary or "")[:300],
"bill_url": f"{base_url}/bills/{bill.bill_id}",
"action_category": action_category,
# kept for RSS/history backwards compat
"milestone_tier": "referral" if action_category == "referral" else "progress",
"source": source,
}
def _is_duplicate(db, user_id: int, bill_id: str, event_type: str) -> bool:
"""True if an identical event was already created within the dedup window."""
from app.models.notification import NotificationEvent
cutoff = datetime.now(timezone.utc) - timedelta(minutes=_DEDUP_MINUTES)
return db.query(NotificationEvent).filter_by(
user_id=user_id,
bill_id=bill_id,
event_type=event_type,
).filter(NotificationEvent.created_at > cutoff).first() is not None
def emit_bill_notification(
db, bill, event_type: str, action_summary: str, action_category: str = "vote"
) -> int:
"""Create NotificationEvent rows for every user following this bill. Returns count."""
from app.models.follow import Follow
from app.models.notification import NotificationEvent
followers = db.query(Follow).filter_by(follow_type="bill", follow_value=bill.bill_id).all()
if not followers:
return 0
payload = _build_payload(bill, action_summary, action_category, source="bill_follow")
count = 0
for follow in followers:
if _is_duplicate(db, follow.user_id, bill.bill_id, event_type):
continue
db.add(NotificationEvent(
user_id=follow.user_id,
bill_id=bill.bill_id,
event_type=event_type,
payload={**payload, "follow_mode": follow.follow_mode},
))
count += 1
if count:
db.commit()
return count
def emit_member_follow_notifications(
db, bill, event_type: str, action_summary: str, action_category: str = "vote"
) -> int:
"""Notify users following the bill's sponsor (dedup prevents double-alerts for bill+member followers)."""
if not bill.sponsor_id:
return 0
from app.models.follow import Follow
from app.models.notification import NotificationEvent
followers = db.query(Follow).filter_by(follow_type="member", follow_value=bill.sponsor_id).all()
if not followers:
return 0
from app.models.member import Member
member = db.get(Member, bill.sponsor_id)
payload = _build_payload(bill, action_summary, action_category, source="member_follow")
payload["matched_member_name"] = member.name if member else None
payload["matched_member_id"] = bill.sponsor_id
count = 0
for follow in followers:
if _is_duplicate(db, follow.user_id, bill.bill_id, event_type):
continue
db.add(NotificationEvent(
user_id=follow.user_id,
bill_id=bill.bill_id,
event_type=event_type,
payload=payload,
))
count += 1
if count:
db.commit()
return count
def emit_topic_follow_notifications(
db, bill, event_type: str, action_summary: str, topic_tags: list,
action_category: str = "vote",
) -> int:
"""Notify users following any of the bill's topic tags."""
if not topic_tags:
return 0
from app.models.follow import Follow
from app.models.notification import NotificationEvent
# Single query for all topic followers, then deduplicate by user_id
all_follows = db.query(Follow).filter(
Follow.follow_type == "topic",
Follow.follow_value.in_(topic_tags),
).all()
seen_user_ids: set[int] = set()
followers = []
follower_topic: dict[int, str] = {}
for follow in all_follows:
if follow.user_id not in seen_user_ids:
seen_user_ids.add(follow.user_id)
followers.append(follow)
follower_topic[follow.user_id] = follow.follow_value
if not followers:
return 0
payload = _build_payload(bill, action_summary, action_category, source="topic_follow")
count = 0
for follow in followers:
if _is_duplicate(db, follow.user_id, bill.bill_id, event_type):
continue
db.add(NotificationEvent(
user_id=follow.user_id,
bill_id=bill.bill_id,
event_type=event_type,
payload={**payload, "matched_topic": follower_topic.get(follow.user_id)},
))
count += 1
if count:
db.commit()
return count

View File

@@ -0,0 +1,126 @@
"""
Trend scorer — calculates the daily zeitgeist score for bills.
Runs nightly via Celery Beat.
"""
import logging
from datetime import date, timedelta
from sqlalchemy import and_
from app.database import get_sync_db
from app.models import Bill, BillBrief, TrendScore
from app.services import news_service, trends_service
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
_PYTRENDS_BATCH = 5 # max keywords pytrends accepts per call
def calculate_composite_score(newsapi_count: int, gnews_count: int, gtrends_score: float) -> float:
"""
Weighted composite score (0100):
NewsAPI article count → 040 pts (saturates at 20 articles)
Google News RSS count → 030 pts (saturates at 50 articles)
Google Trends score → 030 pts (0100 input)
"""
newsapi_pts = min(newsapi_count / 20, 1.0) * 40
gnews_pts = min(gnews_count / 50, 1.0) * 30
gtrends_pts = (gtrends_score / 100) * 30
return round(newsapi_pts + gnews_pts + gtrends_pts, 2)
@celery_app.task(bind=True, name="app.workers.trend_scorer.calculate_all_trend_scores")
def calculate_all_trend_scores(self):
"""Nightly task: calculate trend scores for bills active in the last 90 days."""
db = get_sync_db()
try:
cutoff = date.today() - timedelta(days=90)
active_bills = (
db.query(Bill)
.filter(Bill.latest_action_date >= cutoff)
.all()
)
today = date.today()
# Filter to bills not yet scored today
bills_to_score = []
for bill in active_bills:
existing = (
db.query(TrendScore)
.filter_by(bill_id=bill.bill_id, score_date=today)
.first()
)
if not existing:
bills_to_score.append(bill)
scored = 0
# Process in batches of _PYTRENDS_BATCH so one pytrends call covers multiple bills
for batch_start in range(0, len(bills_to_score), _PYTRENDS_BATCH):
batch = bills_to_score[batch_start: batch_start + _PYTRENDS_BATCH]
# Collect keyword groups for pytrends batch call
keyword_groups = []
bill_queries = []
for bill in batch:
latest_brief = (
db.query(BillBrief)
.filter_by(bill_id=bill.bill_id)
.order_by(BillBrief.created_at.desc())
.first()
)
topic_tags = latest_brief.topic_tags if latest_brief else []
query = news_service.build_news_query(
bill_title=bill.title,
short_title=bill.short_title,
sponsor_name=None,
bill_type=bill.bill_type,
bill_number=bill.bill_number,
)
keywords = trends_service.keywords_for_bill(
title=bill.title or "",
short_title=bill.short_title or "",
topic_tags=topic_tags,
)
keyword_groups.append(keywords)
bill_queries.append(query)
# One pytrends call for the whole batch
gtrends_scores = trends_service.get_trends_scores_batch(keyword_groups)
for i, bill in enumerate(batch):
try:
query = bill_queries[i]
# NewsAPI + Google News counts (gnews served from 2-hour cache)
newsapi_articles = news_service.fetch_newsapi_articles(query, days=30)
newsapi_count = len(newsapi_articles)
gnews_count = news_service.fetch_gnews_count(query, days=30)
gtrends_score = gtrends_scores[i]
composite = calculate_composite_score(newsapi_count, gnews_count, gtrends_score)
db.add(TrendScore(
bill_id=bill.bill_id,
score_date=today,
newsapi_count=newsapi_count,
gnews_count=gnews_count,
gtrends_score=gtrends_score,
composite_score=composite,
))
scored += 1
except Exception as exc:
logger.warning(f"Trend scoring skipped for {bill.bill_id}: {exc}")
db.commit()
logger.info(f"Scored {scored} bills")
return {"scored": scored}
except Exception as exc:
db.rollback()
logger.error(f"Trend scoring failed: {exc}")
raise
finally:
db.close()

View File

@@ -0,0 +1,271 @@
"""
Vote fetcher — fetches roll-call vote data for bills.
Roll-call votes are referenced in bill actions as recordedVotes objects.
Each recordedVote contains a direct URL to the source XML:
- House: https://clerk.house.gov/evs/{year}/roll{NNN}.xml
- Senate: https://www.senate.gov/legislative/LIS/roll_call_votes/...
We fetch and parse that XML directly rather than going through a
Congress.gov API endpoint (which doesn't expose vote detail).
Triggered on-demand from GET /api/bills/{bill_id}/votes when no votes
are stored yet.
"""
import logging
import xml.etree.ElementTree as ET
from datetime import date, datetime, timezone
import requests
from app.database import get_sync_db
from app.models.bill import Bill
from app.models.member import Member
from app.models.vote import BillVote, MemberVotePosition
from app.services.congress_api import get_bill_actions as _api_get_bill_actions
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
_FETCH_TIMEOUT = 15
def _parse_date(s) -> date | None:
if not s:
return None
try:
return date.fromisoformat(str(s)[:10])
except Exception:
return None
def _fetch_xml(url: str) -> ET.Element:
resp = requests.get(url, timeout=_FETCH_TIMEOUT)
resp.raise_for_status()
return ET.fromstring(resp.content)
def _parse_house_xml(root: ET.Element) -> dict:
"""Parse House Clerk roll-call XML (clerk.house.gov/evs/...)."""
meta = root.find("vote-metadata")
question = (meta.findtext("vote-question") or "").strip() if meta is not None else ""
result = (meta.findtext("vote-result") or "").strip() if meta is not None else ""
totals = root.find(".//totals-by-vote")
yeas = int((totals.findtext("yea-total") or "0").strip()) if totals is not None else 0
nays = int((totals.findtext("nay-total") or "0").strip()) if totals is not None else 0
not_voting = int((totals.findtext("not-voting-total") or "0").strip()) if totals is not None else 0
members = []
for rv in root.findall(".//recorded-vote"):
leg = rv.find("legislator")
if leg is None:
continue
members.append({
"bioguide_id": leg.get("name-id"),
"member_name": (leg.text or "").strip(),
"party": leg.get("party"),
"state": leg.get("state"),
"position": (rv.findtext("vote") or "Not Voting").strip(),
})
return {"question": question, "result": result, "yeas": yeas, "nays": nays,
"not_voting": not_voting, "members": members}
def _parse_senate_xml(root: ET.Element) -> dict:
"""Parse Senate LIS roll-call XML (senate.gov/legislative/LIS/...)."""
question = (root.findtext("vote_question_text") or root.findtext("question") or "").strip()
result = (root.findtext("vote_result_text") or "").strip()
counts = root.find("vote_counts")
yeas = int((counts.findtext("yeas") or "0").strip()) if counts is not None else 0
nays = int((counts.findtext("nays") or "0").strip()) if counts is not None else 0
not_voting = int((counts.findtext("absent") or "0").strip()) if counts is not None else 0
members = []
for m in root.findall(".//member"):
first = (m.findtext("first_name") or "").strip()
last = (m.findtext("last_name") or "").strip()
members.append({
"bioguide_id": (m.findtext("bioguide_id") or "").strip() or None,
"member_name": f"{first} {last}".strip(),
"party": m.findtext("party"),
"state": m.findtext("state"),
"position": (m.findtext("vote_cast") or "Not Voting").strip(),
})
return {"question": question, "result": result, "yeas": yeas, "nays": nays,
"not_voting": not_voting, "members": members}
def _parse_vote_xml(url: str, chamber: str) -> dict:
root = _fetch_xml(url)
if chamber.lower() == "house":
return _parse_house_xml(root)
return _parse_senate_xml(root)
def _collect_recorded_votes(congress: int, bill_type: str, bill_number: int) -> list[dict]:
"""Page through all bill actions and collect unique recordedVotes entries."""
seen: set[tuple] = set()
recorded: list[dict] = []
offset = 0
while True:
data = _api_get_bill_actions(congress, bill_type, bill_number, offset=offset)
actions = data.get("actions", [])
pagination = data.get("pagination", {})
for action in actions:
for rv in action.get("recordedVotes", []):
chamber = rv.get("chamber", "")
session = int(rv.get("sessionNumber") or rv.get("session") or 1)
roll_number = rv.get("rollNumber")
if not roll_number:
continue
roll_number = int(roll_number)
key = (chamber, session, roll_number)
if key not in seen:
seen.add(key)
recorded.append({
"chamber": chamber,
"session": session,
"roll_number": roll_number,
"date": action.get("actionDate"),
"url": rv.get("url"),
})
total = pagination.get("count", 0)
offset += len(actions)
if offset >= total or not actions:
break
return recorded
@celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_bill_votes")
def fetch_bill_votes(self, bill_id: str) -> dict:
"""Fetch and store roll-call votes for a single bill."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"error": f"Bill {bill_id} not found"}
recorded = _collect_recorded_votes(bill.congress_number, bill.bill_type, bill.bill_number)
if not recorded:
logger.info(f"fetch_bill_votes({bill_id}): no recorded votes in actions")
return {"bill_id": bill_id, "stored": 0, "skipped": 0}
now = datetime.now(timezone.utc)
stored = 0
skipped = 0
# Cache known bioguide IDs to avoid N+1 member lookups
known_bioguides: set[str] = {
row[0] for row in db.query(Member.bioguide_id).all()
}
for rv in recorded:
chamber = rv["chamber"]
session = rv["session"]
roll_number = rv["roll_number"]
source_url = rv.get("url")
existing = (
db.query(BillVote)
.filter_by(
congress=bill.congress_number,
chamber=chamber,
session=session,
roll_number=roll_number,
)
.first()
)
if existing:
skipped += 1
continue
if not source_url:
logger.warning(f"No URL for {chamber} roll {roll_number} — skipping")
continue
try:
parsed = _parse_vote_xml(source_url, chamber)
except Exception as exc:
logger.warning(f"Could not parse vote XML {source_url}: {exc}")
continue
bill_vote = BillVote(
bill_id=bill_id,
congress=bill.congress_number,
chamber=chamber,
session=session,
roll_number=roll_number,
question=parsed["question"],
description=None,
vote_date=_parse_date(rv.get("date")),
yeas=parsed["yeas"],
nays=parsed["nays"],
not_voting=parsed["not_voting"],
result=parsed["result"],
source_url=source_url,
fetched_at=now,
)
db.add(bill_vote)
db.flush()
for pos in parsed["members"]:
bioguide_id = pos.get("bioguide_id")
if bioguide_id and bioguide_id not in known_bioguides:
bioguide_id = None
db.add(MemberVotePosition(
vote_id=bill_vote.id,
bioguide_id=bioguide_id,
member_name=pos.get("member_name"),
party=pos.get("party"),
state=pos.get("state"),
position=pos.get("position") or "Not Voting",
))
db.commit()
stored += 1
logger.info(f"fetch_bill_votes({bill_id}): {stored} stored, {skipped} skipped")
return {"bill_id": bill_id, "stored": stored, "skipped": skipped}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_votes_for_stanced_bills")
def fetch_votes_for_stanced_bills(self) -> dict:
"""
Nightly task: queue vote fetches for every bill any user has a stance on
(pocket_veto or pocket_boost). Only queues bills that don't already have
a vote stored, so re-runs are cheap after the first pass.
"""
from app.models.follow import Follow
db = get_sync_db()
try:
from sqlalchemy import text as sa_text
rows = db.execute(sa_text("""
SELECT DISTINCT f.follow_value AS bill_id
FROM follows f
LEFT JOIN bill_votes bv ON bv.bill_id = f.follow_value
WHERE f.follow_type = 'bill'
AND f.follow_mode IN ('pocket_veto', 'pocket_boost')
AND bv.id IS NULL
""")).fetchall()
queued = 0
for row in rows:
fetch_bill_votes.delay(row.bill_id)
queued += 1
logger.info(f"fetch_votes_for_stanced_bills: queued {queued} bills")
return {"queued": queued}
finally:
db.close()