Files
PocketVeto/backend/app/workers/congress_poller.py
Jack Levy 13e1577968 fix(members): link sponsors to bills and fix member search
- Poller now fetches bill detail on insert to get sponsor (list endpoint
  has no sponsor data)
- Add backfill_sponsor_ids task + admin endpoint + UI button to fix the
  1,616 existing bills with NULL sponsor_id
- Member name search now matches both "Last, First" and "First Last"
  using split_part() on the stored name column; same fix applied to
  global search
- Load Bill.sponsor relationship eagerly in get_member_bills to prevent
  MissingGreenlet error during Pydantic serialization
- Remove .trim() on search onChange so spaces can be typed

Authored-By: Jack Levy
2026-02-28 23:29:58 -05:00

217 lines
8.0 KiB
Python

"""
Congress.gov poller — incremental bill and member sync.
Runs on Celery Beat schedule (every 30 min by default).
Uses fromDateTime to fetch only recently updated bills.
All operations are idempotent.
"""
import logging
from datetime import datetime, timedelta, timezone
from app.database import get_sync_db
from app.models import Bill, BillAction, Member, AppSetting
from app.services import congress_api
from app.workers.celery_app import celery_app
logger = logging.getLogger(__name__)
def _get_setting(db, key: str, default=None) -> str | None:
row = db.get(AppSetting, key)
return row.value if row else default
def _set_setting(db, key: str, value: str) -> None:
row = db.get(AppSetting, key)
if row:
row.value = value
else:
db.add(AppSetting(key=key, value=value))
db.commit()
# Only track legislation that can become law. Simple/concurrent resolutions
# (hres, sres, hconres, sconres) are procedural and not worth analyzing.
TRACKED_BILL_TYPES = {"hr", "s", "hjres", "sjres"}
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.poll_congress_bills")
def poll_congress_bills(self):
"""Fetch recently updated bills from Congress.gov and enqueue document + LLM processing."""
db = get_sync_db()
try:
last_polled = _get_setting(db, "congress_last_polled_at")
# On first run, seed from 2 months back rather than the full congress history
if not last_polled:
two_months_ago = datetime.now(timezone.utc) - timedelta(days=60)
last_polled = two_months_ago.strftime("%Y-%m-%dT%H:%M:%SZ")
current_congress = congress_api.get_current_congress()
logger.info(f"Polling Congress {current_congress} (since {last_polled})")
new_count = 0
updated_count = 0
offset = 0
while True:
response = congress_api.get_bills(
congress=current_congress,
offset=offset,
limit=250,
from_date_time=last_polled,
)
bills_data = response.get("bills", [])
if not bills_data:
break
for bill_data in bills_data:
parsed = congress_api.parse_bill_from_api(bill_data, current_congress)
if parsed.get("bill_type") not in TRACKED_BILL_TYPES:
continue
bill_id = parsed["bill_id"]
existing = db.get(Bill, bill_id)
if existing is None:
# Bill list endpoint has no sponsor data — fetch detail to get it
try:
detail = congress_api.get_bill_detail(
current_congress, parsed["bill_type"], parsed["bill_number"]
)
sponsor_id = _sync_sponsor(db, detail.get("bill", {}))
except Exception:
sponsor_id = None
parsed["sponsor_id"] = sponsor_id
parsed["last_checked_at"] = datetime.now(timezone.utc)
db.add(Bill(**parsed))
db.commit()
new_count += 1
# Enqueue document fetch
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(bill_id)
else:
_update_bill_if_changed(db, existing, parsed)
updated_count += 1
db.commit()
offset += 250
if len(bills_data) < 250:
break
# Update last polled timestamp
_set_setting(db, "congress_last_polled_at", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))
logger.info(f"Poll complete: {new_count} new, {updated_count} updated")
return {"new": new_count, "updated": updated_count}
except Exception as exc:
db.rollback()
logger.error(f"Poll failed: {exc}")
raise self.retry(exc=exc, countdown=60)
finally:
db.close()
@celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.sync_members")
def sync_members(self):
"""Sync current Congress members."""
db = get_sync_db()
try:
offset = 0
synced = 0
while True:
response = congress_api.get_members(offset=offset, limit=250, current_member=True)
members_data = response.get("members", [])
if not members_data:
break
for member_data in members_data:
parsed = congress_api.parse_member_from_api(member_data)
if not parsed.get("bioguide_id"):
continue
existing = db.get(Member, parsed["bioguide_id"])
if existing is None:
db.add(Member(**parsed))
else:
for k, v in parsed.items():
setattr(existing, k, v)
synced += 1
db.commit()
offset += 250
if len(members_data) < 250:
break
logger.info(f"Synced {synced} members")
return {"synced": synced}
except Exception as exc:
db.rollback()
raise self.retry(exc=exc, countdown=120)
finally:
db.close()
def _sync_sponsor(db, bill_data: dict) -> str | None:
"""Ensure the bill sponsor exists in the members table. Returns bioguide_id or None."""
sponsors = bill_data.get("sponsors", [])
if not sponsors:
return None
sponsor_raw = sponsors[0]
bioguide_id = sponsor_raw.get("bioguideId")
if not bioguide_id:
return None
existing = db.get(Member, bioguide_id)
if existing is None:
db.add(Member(
bioguide_id=bioguide_id,
name=sponsor_raw.get("fullName", ""),
first_name=sponsor_raw.get("firstName"),
last_name=sponsor_raw.get("lastName"),
party=sponsor_raw.get("party", "")[:10] if sponsor_raw.get("party") else None,
state=sponsor_raw.get("state"),
))
db.commit()
return bioguide_id
@celery_app.task(bind=True, name="app.workers.congress_poller.backfill_sponsor_ids")
def backfill_sponsor_ids(self):
"""Backfill sponsor_id for all bills where it is NULL by fetching bill detail from Congress.gov."""
import time
db = get_sync_db()
try:
bills = db.query(Bill).filter(Bill.sponsor_id.is_(None)).all()
total = len(bills)
updated = 0
logger.info(f"Backfilling sponsors for {total} bills")
for bill in bills:
try:
detail = congress_api.get_bill_detail(bill.congress_number, bill.bill_type, bill.bill_number)
sponsor_id = _sync_sponsor(db, detail.get("bill", {}))
if sponsor_id:
bill.sponsor_id = sponsor_id
db.commit()
updated += 1
except Exception as e:
logger.warning(f"Could not backfill sponsor for {bill.bill_id}: {e}")
time.sleep(0.1) # ~10 req/sec, well under Congress.gov 5000/hr limit
logger.info(f"Sponsor backfill complete: {updated}/{total} updated")
return {"total": total, "updated": updated}
finally:
db.close()
def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool:
"""Update bill fields if anything has changed. Returns True if updated."""
changed = False
track_fields = ["title", "short_title", "latest_action_date", "latest_action_text", "status"]
for field in track_fields:
new_val = parsed.get(field)
if new_val and getattr(existing, field) != new_val:
setattr(existing, field, new_val)
changed = True
if changed:
existing.last_checked_at = datetime.now(timezone.utc)
db.commit()
# Check for new text versions now that the bill has changed
from app.workers.document_fetcher import fetch_bill_documents
fetch_bill_documents.delay(existing.bill_id)
return changed