Files
PocketVeto/backend/app/workers/vote_fetcher.py
Jack Levy f6770b16be fix: proactively fetch votes for stanced bills + register vote_fetcher with Celery
vote_fetcher was missing from Celery's include list (task not registered with
workers) and had no beat schedule — votes only fetched on-demand when a user
visited a bill's votes page. Stanced bills (pocket_veto/pocket_boost) never had
votes fetched, leaving the alignment page blank.

Add fetch_votes_for_stanced_bills nightly task (4:30 AM UTC) that queues
fetch_bill_votes for every bill any user has stanced but has no stored votes.
Register vote_fetcher in the include list and add it to the polling queue route.

Authored by: Jack Levy
2026-03-14 19:38:06 -04:00

272 lines
9.6 KiB
Python

"""
Vote fetcher — fetches roll-call vote data for bills.
Roll-call votes are referenced in bill actions as recordedVotes objects.
Each recordedVote contains a direct URL to the source XML:
- House: https://clerk.house.gov/evs/{year}/roll{NNN}.xml
- Senate: https://www.senate.gov/legislative/LIS/roll_call_votes/...
We fetch and parse that XML directly rather than going through a
Congress.gov API endpoint (which doesn't expose vote detail).
Triggered on-demand from GET /api/bills/{bill_id}/votes when no votes
are stored yet.
"""
import logging
import xml.etree.ElementTree as ET
from datetime import date, datetime, timezone
import requests
from app.database import get_sync_db
from app.models.bill import Bill
from app.models.member import Member
from app.models.vote import BillVote, MemberVotePosition
from app.services.congress_api import get_bill_actions as _api_get_bill_actions
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
_FETCH_TIMEOUT = 15
def _parse_date(s) -> date | None:
if not s:
return None
try:
return date.fromisoformat(str(s)[:10])
except Exception:
return None
def _fetch_xml(url: str) -> ET.Element:
resp = requests.get(url, timeout=_FETCH_TIMEOUT)
resp.raise_for_status()
return ET.fromstring(resp.content)
def _parse_house_xml(root: ET.Element) -> dict:
"""Parse House Clerk roll-call XML (clerk.house.gov/evs/...)."""
meta = root.find("vote-metadata")
question = (meta.findtext("vote-question") or "").strip() if meta is not None else ""
result = (meta.findtext("vote-result") or "").strip() if meta is not None else ""
totals = root.find(".//totals-by-vote")
yeas = int((totals.findtext("yea-total") or "0").strip()) if totals is not None else 0
nays = int((totals.findtext("nay-total") or "0").strip()) if totals is not None else 0
not_voting = int((totals.findtext("not-voting-total") or "0").strip()) if totals is not None else 0
members = []
for rv in root.findall(".//recorded-vote"):
leg = rv.find("legislator")
if leg is None:
continue
members.append({
"bioguide_id": leg.get("name-id"),
"member_name": (leg.text or "").strip(),
"party": leg.get("party"),
"state": leg.get("state"),
"position": (rv.findtext("vote") or "Not Voting").strip(),
})
return {"question": question, "result": result, "yeas": yeas, "nays": nays,
"not_voting": not_voting, "members": members}
def _parse_senate_xml(root: ET.Element) -> dict:
"""Parse Senate LIS roll-call XML (senate.gov/legislative/LIS/...)."""
question = (root.findtext("vote_question_text") or root.findtext("question") or "").strip()
result = (root.findtext("vote_result_text") or "").strip()
counts = root.find("vote_counts")
yeas = int((counts.findtext("yeas") or "0").strip()) if counts is not None else 0
nays = int((counts.findtext("nays") or "0").strip()) if counts is not None else 0
not_voting = int((counts.findtext("absent") or "0").strip()) if counts is not None else 0
members = []
for m in root.findall(".//member"):
first = (m.findtext("first_name") or "").strip()
last = (m.findtext("last_name") or "").strip()
members.append({
"bioguide_id": (m.findtext("bioguide_id") or "").strip() or None,
"member_name": f"{first} {last}".strip(),
"party": m.findtext("party"),
"state": m.findtext("state"),
"position": (m.findtext("vote_cast") or "Not Voting").strip(),
})
return {"question": question, "result": result, "yeas": yeas, "nays": nays,
"not_voting": not_voting, "members": members}
def _parse_vote_xml(url: str, chamber: str) -> dict:
root = _fetch_xml(url)
if chamber.lower() == "house":
return _parse_house_xml(root)
return _parse_senate_xml(root)
def _collect_recorded_votes(congress: int, bill_type: str, bill_number: int) -> list[dict]:
"""Page through all bill actions and collect unique recordedVotes entries."""
seen: set[tuple] = set()
recorded: list[dict] = []
offset = 0
while True:
data = _api_get_bill_actions(congress, bill_type, bill_number, offset=offset)
actions = data.get("actions", [])
pagination = data.get("pagination", {})
for action in actions:
for rv in action.get("recordedVotes", []):
chamber = rv.get("chamber", "")
session = int(rv.get("sessionNumber") or rv.get("session") or 1)
roll_number = rv.get("rollNumber")
if not roll_number:
continue
roll_number = int(roll_number)
key = (chamber, session, roll_number)
if key not in seen:
seen.add(key)
recorded.append({
"chamber": chamber,
"session": session,
"roll_number": roll_number,
"date": action.get("actionDate"),
"url": rv.get("url"),
})
total = pagination.get("count", 0)
offset += len(actions)
if offset >= total or not actions:
break
return recorded
@celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_bill_votes")
def fetch_bill_votes(self, bill_id: str) -> dict:
"""Fetch and store roll-call votes for a single bill."""
db = get_sync_db()
try:
bill = db.get(Bill, bill_id)
if not bill:
return {"error": f"Bill {bill_id} not found"}
recorded = _collect_recorded_votes(bill.congress_number, bill.bill_type, bill.bill_number)
if not recorded:
logger.info(f"fetch_bill_votes({bill_id}): no recorded votes in actions")
return {"bill_id": bill_id, "stored": 0, "skipped": 0}
now = datetime.now(timezone.utc)
stored = 0
skipped = 0
# Cache known bioguide IDs to avoid N+1 member lookups
known_bioguides: set[str] = {
row[0] for row in db.query(Member.bioguide_id).all()
}
for rv in recorded:
chamber = rv["chamber"]
session = rv["session"]
roll_number = rv["roll_number"]
source_url = rv.get("url")
existing = (
db.query(BillVote)
.filter_by(
congress=bill.congress_number,
chamber=chamber,
session=session,
roll_number=roll_number,
)
.first()
)
if existing:
skipped += 1
continue
if not source_url:
logger.warning(f"No URL for {chamber} roll {roll_number} — skipping")
continue
try:
parsed = _parse_vote_xml(source_url, chamber)
except Exception as exc:
logger.warning(f"Could not parse vote XML {source_url}: {exc}")
continue
bill_vote = BillVote(
bill_id=bill_id,
congress=bill.congress_number,
chamber=chamber,
session=session,
roll_number=roll_number,
question=parsed["question"],
description=None,
vote_date=_parse_date(rv.get("date")),
yeas=parsed["yeas"],
nays=parsed["nays"],
not_voting=parsed["not_voting"],
result=parsed["result"],
source_url=source_url,
fetched_at=now,
)
db.add(bill_vote)
db.flush()
for pos in parsed["members"]:
bioguide_id = pos.get("bioguide_id")
if bioguide_id and bioguide_id not in known_bioguides:
bioguide_id = None
db.add(MemberVotePosition(
vote_id=bill_vote.id,
bioguide_id=bioguide_id,
member_name=pos.get("member_name"),
party=pos.get("party"),
state=pos.get("state"),
position=pos.get("position") or "Not Voting",
))
db.commit()
stored += 1
logger.info(f"fetch_bill_votes({bill_id}): {stored} stored, {skipped} skipped")
return {"bill_id": bill_id, "stored": stored, "skipped": skipped}
finally:
db.close()
@celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_votes_for_stanced_bills")
def fetch_votes_for_stanced_bills(self) -> dict:
"""
Nightly task: queue vote fetches for every bill any user has a stance on
(pocket_veto or pocket_boost). Only queues bills that don't already have
a vote stored, so re-runs are cheap after the first pass.
"""
from app.models.follow import Follow
db = get_sync_db()
try:
from sqlalchemy import text as sa_text
rows = db.execute(sa_text("""
SELECT DISTINCT f.follow_value AS bill_id
FROM follows f
LEFT JOIN bill_votes bv ON bv.bill_id = f.follow_value
WHERE f.follow_type = 'bill'
AND f.follow_mode IN ('pocket_veto', 'pocket_boost')
AND bv.id IS NULL
""")).fetchall()
queued = 0
for row in rows:
fetch_bill_votes.delay(row.bill_id)
queued += 1
logger.info(f"fetch_votes_for_stanced_bills: queued {queued} bills")
return {"queued": queued}
finally:
db.close()