""" Congress.gov poller — incremental bill and member sync. Runs on Celery Beat schedule (every 30 min by default). Uses fromDateTime to fetch only recently updated bills. All operations are idempotent. """ import logging import time from datetime import datetime, timedelta, timezone from sqlalchemy import or_ from app.database import get_sync_db from app.models import Bill, BillAction, Member, AppSetting from app.services import congress_api from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) def _get_setting(db, key: str, default=None) -> str | None: row = db.get(AppSetting, key) return row.value if row else default def _set_setting(db, key: str, value: str) -> None: row = db.get(AppSetting, key) if row: row.value = value else: db.add(AppSetting(key=key, value=value)) db.commit() # Only track legislation that can become law. Simple/concurrent resolutions # (hres, sres, hconres, sconres) are procedural and not worth analyzing. TRACKED_BILL_TYPES = {"hr", "s", "hjres", "sjres"} @celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.poll_congress_bills") def poll_congress_bills(self): """Fetch recently updated bills from Congress.gov and enqueue document + LLM processing.""" db = get_sync_db() try: last_polled = _get_setting(db, "congress_last_polled_at") # On first run, seed from 2 months back rather than the full congress history if not last_polled: two_months_ago = datetime.now(timezone.utc) - timedelta(days=60) last_polled = two_months_ago.strftime("%Y-%m-%dT%H:%M:%SZ") current_congress = congress_api.get_current_congress() logger.info(f"Polling Congress {current_congress} (since {last_polled})") new_count = 0 updated_count = 0 offset = 0 while True: response = congress_api.get_bills( congress=current_congress, offset=offset, limit=250, from_date_time=last_polled, ) bills_data = response.get("bills", []) if not bills_data: break for bill_data in bills_data: parsed = congress_api.parse_bill_from_api(bill_data, current_congress) if parsed.get("bill_type") not in TRACKED_BILL_TYPES: continue bill_id = parsed["bill_id"] existing = db.get(Bill, bill_id) if existing is None: # Bill list endpoint has no sponsor data — fetch detail to get it try: detail = congress_api.get_bill_detail( current_congress, parsed["bill_type"], parsed["bill_number"] ) sponsor_id = _sync_sponsor(db, detail.get("bill", {})) except Exception: sponsor_id = None parsed["sponsor_id"] = sponsor_id parsed["last_checked_at"] = datetime.now(timezone.utc) db.add(Bill(**parsed)) db.commit() new_count += 1 # Enqueue document and action fetches from app.workers.document_fetcher import fetch_bill_documents fetch_bill_documents.delay(bill_id) fetch_bill_actions.delay(bill_id) else: _update_bill_if_changed(db, existing, parsed) updated_count += 1 db.commit() offset += 250 if len(bills_data) < 250: break # Update last polled timestamp _set_setting(db, "congress_last_polled_at", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")) logger.info(f"Poll complete: {new_count} new, {updated_count} updated") return {"new": new_count, "updated": updated_count} except Exception as exc: db.rollback() logger.error(f"Poll failed: {exc}") raise self.retry(exc=exc, countdown=60) finally: db.close() @celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.sync_members") def sync_members(self): """Sync current Congress members.""" db = get_sync_db() try: offset = 0 synced = 0 while True: response = congress_api.get_members(offset=offset, limit=250, current_member=True) members_data = response.get("members", []) if not members_data: break for member_data in members_data: parsed = congress_api.parse_member_from_api(member_data) if not parsed.get("bioguide_id"): continue existing = db.get(Member, parsed["bioguide_id"]) if existing is None: db.add(Member(**parsed)) else: for k, v in parsed.items(): setattr(existing, k, v) synced += 1 db.commit() offset += 250 if len(members_data) < 250: break logger.info(f"Synced {synced} members") return {"synced": synced} except Exception as exc: db.rollback() raise self.retry(exc=exc, countdown=120) finally: db.close() def _sync_sponsor(db, bill_data: dict) -> str | None: """Ensure the bill sponsor exists in the members table. Returns bioguide_id or None.""" sponsors = bill_data.get("sponsors", []) if not sponsors: return None sponsor_raw = sponsors[0] bioguide_id = sponsor_raw.get("bioguideId") if not bioguide_id: return None existing = db.get(Member, bioguide_id) if existing is None: db.add(Member( bioguide_id=bioguide_id, name=sponsor_raw.get("fullName", ""), first_name=sponsor_raw.get("firstName"), last_name=sponsor_raw.get("lastName"), party=sponsor_raw.get("party", "")[:10] if sponsor_raw.get("party") else None, state=sponsor_raw.get("state"), )) db.commit() return bioguide_id @celery_app.task(bind=True, name="app.workers.congress_poller.backfill_sponsor_ids") def backfill_sponsor_ids(self): """Backfill sponsor_id for all bills where it is NULL by fetching bill detail from Congress.gov.""" import time db = get_sync_db() try: bills = db.query(Bill).filter(Bill.sponsor_id.is_(None)).all() total = len(bills) updated = 0 logger.info(f"Backfilling sponsors for {total} bills") for bill in bills: try: detail = congress_api.get_bill_detail(bill.congress_number, bill.bill_type, bill.bill_number) sponsor_id = _sync_sponsor(db, detail.get("bill", {})) if sponsor_id: bill.sponsor_id = sponsor_id db.commit() updated += 1 except Exception as e: logger.warning(f"Could not backfill sponsor for {bill.bill_id}: {e}") time.sleep(0.1) # ~10 req/sec, well under Congress.gov 5000/hr limit logger.info(f"Sponsor backfill complete: {updated}/{total} updated") return {"total": total, "updated": updated} finally: db.close() @celery_app.task(bind=True, max_retries=3, name="app.workers.congress_poller.fetch_bill_actions") def fetch_bill_actions(self, bill_id: str): """Fetch and sync all actions for a bill from Congress.gov. Idempotent.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill: logger.warning(f"fetch_bill_actions: bill {bill_id} not found") return offset = 0 inserted = 0 while True: try: response = congress_api.get_bill_actions( bill.congress_number, bill.bill_type, bill.bill_number, offset=offset ) except Exception as exc: raise self.retry(exc=exc, countdown=60) actions_data = response.get("actions", []) if not actions_data: break for action in actions_data: action_date_str = action.get("actionDate") action_text = action.get("text", "") action_type = action.get("type") chamber = action.get("chamber") # Idempotency check: skip if (bill_id, action_date, action_text) exists exists = ( db.query(BillAction) .filter( BillAction.bill_id == bill_id, BillAction.action_date == action_date_str, BillAction.action_text == action_text, ) .first() ) if not exists: db.add(BillAction( bill_id=bill_id, action_date=action_date_str, action_text=action_text, action_type=action_type, chamber=chamber, )) inserted += 1 db.commit() offset += 250 if len(actions_data) < 250: break bill.actions_fetched_at = datetime.now(timezone.utc) db.commit() logger.info(f"fetch_bill_actions: {bill_id} — inserted {inserted} new actions") return {"bill_id": bill_id, "inserted": inserted} except Exception as exc: db.rollback() raise finally: db.close() @celery_app.task(bind=True, name="app.workers.congress_poller.fetch_actions_for_active_bills") def fetch_actions_for_active_bills(self): """Nightly batch: enqueue action fetches for recently active bills missing action data.""" db = get_sync_db() try: cutoff = datetime.now(timezone.utc).date() - timedelta(days=30) bills = ( db.query(Bill) .filter( Bill.latest_action_date >= cutoff, or_( Bill.actions_fetched_at.is_(None), Bill.latest_action_date > Bill.actions_fetched_at, ), ) .limit(200) .all() ) queued = 0 for bill in bills: fetch_bill_actions.delay(bill.bill_id) queued += 1 time.sleep(0.2) # ~5 tasks/sec to avoid Redis burst logger.info(f"fetch_actions_for_active_bills: queued {queued} bills") return {"queued": queued} finally: db.close() def _update_bill_if_changed(db, existing: Bill, parsed: dict) -> bool: """Update bill fields if anything has changed. Returns True if updated.""" changed = False track_fields = ["title", "short_title", "latest_action_date", "latest_action_text", "status"] for field in track_fields: new_val = parsed.get(field) if new_val and getattr(existing, field) != new_val: setattr(existing, field, new_val) changed = True if changed: existing.last_checked_at = datetime.now(timezone.utc) db.commit() # Check for new text versions and sync actions now that the bill has changed from app.workers.document_fetcher import fetch_bill_documents fetch_bill_documents.delay(existing.bill_id) fetch_bill_actions.delay(existing.bill_id) return changed