""" Vote fetcher — fetches roll-call vote data for bills. Roll-call votes are referenced in bill actions as recordedVotes objects. Each recordedVote contains a direct URL to the source XML: - House: https://clerk.house.gov/evs/{year}/roll{NNN}.xml - Senate: https://www.senate.gov/legislative/LIS/roll_call_votes/... We fetch and parse that XML directly rather than going through a Congress.gov API endpoint (which doesn't expose vote detail). Triggered on-demand from GET /api/bills/{bill_id}/votes when no votes are stored yet. """ import logging import xml.etree.ElementTree as ET from datetime import date, datetime, timezone import requests from app.database import get_sync_db from app.models.bill import Bill from app.models.member import Member from app.models.vote import BillVote, MemberVotePosition from app.services.congress_api import get_bill_actions as _api_get_bill_actions from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) _FETCH_TIMEOUT = 15 def _parse_date(s) -> date | None: if not s: return None try: return date.fromisoformat(str(s)[:10]) except Exception: return None def _fetch_xml(url: str) -> ET.Element: resp = requests.get(url, timeout=_FETCH_TIMEOUT) resp.raise_for_status() return ET.fromstring(resp.content) def _parse_house_xml(root: ET.Element) -> dict: """Parse House Clerk roll-call XML (clerk.house.gov/evs/...).""" meta = root.find("vote-metadata") question = (meta.findtext("vote-question") or "").strip() if meta is not None else "" result = (meta.findtext("vote-result") or "").strip() if meta is not None else "" totals = root.find(".//totals-by-vote") yeas = int((totals.findtext("yea-total") or "0").strip()) if totals is not None else 0 nays = int((totals.findtext("nay-total") or "0").strip()) if totals is not None else 0 not_voting = int((totals.findtext("not-voting-total") or "0").strip()) if totals is not None else 0 members = [] for rv in root.findall(".//recorded-vote"): leg = rv.find("legislator") if leg is None: continue members.append({ "bioguide_id": leg.get("name-id"), "member_name": (leg.text or "").strip(), "party": leg.get("party"), "state": leg.get("state"), "position": (rv.findtext("vote") or "Not Voting").strip(), }) return {"question": question, "result": result, "yeas": yeas, "nays": nays, "not_voting": not_voting, "members": members} def _parse_senate_xml(root: ET.Element) -> dict: """Parse Senate LIS roll-call XML (senate.gov/legislative/LIS/...).""" question = (root.findtext("vote_question_text") or root.findtext("question") or "").strip() result = (root.findtext("vote_result_text") or "").strip() counts = root.find("vote_counts") yeas = int((counts.findtext("yeas") or "0").strip()) if counts is not None else 0 nays = int((counts.findtext("nays") or "0").strip()) if counts is not None else 0 not_voting = int((counts.findtext("absent") or "0").strip()) if counts is not None else 0 members = [] for m in root.findall(".//member"): first = (m.findtext("first_name") or "").strip() last = (m.findtext("last_name") or "").strip() members.append({ "bioguide_id": (m.findtext("bioguide_id") or "").strip() or None, "member_name": f"{first} {last}".strip(), "party": m.findtext("party"), "state": m.findtext("state"), "position": (m.findtext("vote_cast") or "Not Voting").strip(), }) return {"question": question, "result": result, "yeas": yeas, "nays": nays, "not_voting": not_voting, "members": members} def _parse_vote_xml(url: str, chamber: str) -> dict: root = _fetch_xml(url) if chamber.lower() == "house": return _parse_house_xml(root) return _parse_senate_xml(root) def _collect_recorded_votes(congress: int, bill_type: str, bill_number: int) -> list[dict]: """Page through all bill actions and collect unique recordedVotes entries.""" seen: set[tuple] = set() recorded: list[dict] = [] offset = 0 while True: data = _api_get_bill_actions(congress, bill_type, bill_number, offset=offset) actions = data.get("actions", []) pagination = data.get("pagination", {}) for action in actions: for rv in action.get("recordedVotes", []): chamber = rv.get("chamber", "") session = int(rv.get("sessionNumber") or rv.get("session") or 1) roll_number = rv.get("rollNumber") if not roll_number: continue roll_number = int(roll_number) key = (chamber, session, roll_number) if key not in seen: seen.add(key) recorded.append({ "chamber": chamber, "session": session, "roll_number": roll_number, "date": action.get("actionDate"), "url": rv.get("url"), }) total = pagination.get("count", 0) offset += len(actions) if offset >= total or not actions: break return recorded @celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_bill_votes") def fetch_bill_votes(self, bill_id: str) -> dict: """Fetch and store roll-call votes for a single bill.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill: return {"error": f"Bill {bill_id} not found"} recorded = _collect_recorded_votes(bill.congress_number, bill.bill_type, bill.bill_number) if not recorded: logger.info(f"fetch_bill_votes({bill_id}): no recorded votes in actions") return {"bill_id": bill_id, "stored": 0, "skipped": 0} now = datetime.now(timezone.utc) stored = 0 skipped = 0 # Cache known bioguide IDs to avoid N+1 member lookups known_bioguides: set[str] = { row[0] for row in db.query(Member.bioguide_id).all() } for rv in recorded: chamber = rv["chamber"] session = rv["session"] roll_number = rv["roll_number"] source_url = rv.get("url") existing = ( db.query(BillVote) .filter_by( congress=bill.congress_number, chamber=chamber, session=session, roll_number=roll_number, ) .first() ) if existing: skipped += 1 continue if not source_url: logger.warning(f"No URL for {chamber} roll {roll_number} — skipping") continue try: parsed = _parse_vote_xml(source_url, chamber) except Exception as exc: logger.warning(f"Could not parse vote XML {source_url}: {exc}") continue bill_vote = BillVote( bill_id=bill_id, congress=bill.congress_number, chamber=chamber, session=session, roll_number=roll_number, question=parsed["question"], description=None, vote_date=_parse_date(rv.get("date")), yeas=parsed["yeas"], nays=parsed["nays"], not_voting=parsed["not_voting"], result=parsed["result"], source_url=source_url, fetched_at=now, ) db.add(bill_vote) db.flush() for pos in parsed["members"]: bioguide_id = pos.get("bioguide_id") if bioguide_id and bioguide_id not in known_bioguides: bioguide_id = None db.add(MemberVotePosition( vote_id=bill_vote.id, bioguide_id=bioguide_id, member_name=pos.get("member_name"), party=pos.get("party"), state=pos.get("state"), position=pos.get("position") or "Not Voting", )) db.commit() stored += 1 logger.info(f"fetch_bill_votes({bill_id}): {stored} stored, {skipped} skipped") return {"bill_id": bill_id, "stored": stored, "skipped": skipped} finally: db.close()