""" Bill classifier and Member Effectiveness Score workers. Tasks: classify_bill_category — lightweight LLM call; triggered after brief generation fetch_bill_cosponsors — Congress.gov cosponsor fetch; triggered on new bill calculate_effectiveness_scores — nightly beat task backfill_bill_categories — one-time backfill for existing bills backfill_all_bill_cosponsors — one-time backfill for existing bills """ import json import logging import time from datetime import datetime, timezone from sqlalchemy import text from app.config import settings from app.database import get_sync_db from app.models import Bill, BillCosponsor, BillDocument, Member from app.models.setting import AppSetting from app.services import congress_api from app.services.llm_service import RateLimitError, get_llm_provider from app.workers.celery_app import celery_app logger = logging.getLogger(__name__) # ── Classification ───────────────────────────────────────────────────────────── _CLASSIFICATION_PROMPT = """\ Classify this bill into exactly one category. Categories: - substantive: Creates, modifies, or repeals policy, programs, regulations, funding, or rights. Real legislative work. - commemorative: Names buildings/post offices, recognizes awareness days/weeks, honors individuals or events with no policy effect. - administrative: Technical corrections, routine reauthorizations, housekeeping changes with no new policy substance. Respond with ONLY valid JSON: {{"category": "substantive" | "commemorative" | "administrative"}} BILL TITLE: {title} BILL TEXT (excerpt): {excerpt} Classify now:""" _VALID_CATEGORIES = {"substantive", "commemorative", "administrative"} @celery_app.task( bind=True, max_retries=3, rate_limit=f"{settings.LLM_RATE_LIMIT_RPM}/m", name="app.workers.bill_classifier.classify_bill_category", ) def classify_bill_category(self, bill_id: str, document_id: int): """Set bill_category via a cheap one-shot LLM call. Idempotent.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill or bill.bill_category: return {"status": "skipped"} doc = db.get(BillDocument, document_id) excerpt = (doc.raw_text[:1200] if doc and doc.raw_text else "").strip() prov_row = db.get(AppSetting, "llm_provider") model_row = db.get(AppSetting, "llm_model") provider = get_llm_provider( prov_row.value if prov_row else None, model_row.value if model_row else None, ) prompt = _CLASSIFICATION_PROMPT.format( title=bill.title or "Unknown", excerpt=excerpt or "(no text available)", ) raw = provider.generate_text(prompt).strip() # Strip markdown fences if present if raw.startswith("```"): raw = raw.split("```")[1].lstrip("json").strip() raw = raw.rstrip("```").strip() data = json.loads(raw) category = data.get("category", "").lower() if category not in _VALID_CATEGORIES: logger.warning(f"classify_bill_category: invalid category '{category}' for {bill_id}, defaulting to substantive") category = "substantive" bill.bill_category = category db.commit() logger.info(f"Bill {bill_id} classified as '{category}'") return {"status": "ok", "bill_id": bill_id, "category": category} except RateLimitError as exc: db.rollback() raise self.retry(exc=exc, countdown=exc.retry_after) except Exception as exc: db.rollback() logger.error(f"classify_bill_category failed for {bill_id}: {exc}") raise self.retry(exc=exc, countdown=120) finally: db.close() @celery_app.task(bind=True, max_retries=3, name="app.workers.bill_classifier.backfill_bill_categories") def backfill_bill_categories(self): """Queue classification for all bills with text but no category.""" db = get_sync_db() try: rows = db.execute(text(""" SELECT bd.bill_id, bd.id AS document_id FROM bill_documents bd JOIN bills b ON b.bill_id = bd.bill_id WHERE b.bill_category IS NULL AND bd.raw_text IS NOT NULL """)).fetchall() queued = 0 for row in rows: classify_bill_category.delay(row.bill_id, row.document_id) queued += 1 time.sleep(0.05) logger.info(f"backfill_bill_categories: queued {queued} classification tasks") return {"queued": queued} finally: db.close() # ── Co-sponsor fetching ──────────────────────────────────────────────────────── @celery_app.task(bind=True, max_retries=3, name="app.workers.bill_classifier.fetch_bill_cosponsors") def fetch_bill_cosponsors(self, bill_id: str): """Fetch and store cosponsor list from Congress.gov. Idempotent.""" db = get_sync_db() try: bill = db.get(Bill, bill_id) if not bill or bill.cosponsors_fetched_at: return {"status": "skipped"} known_bioguides = {row[0] for row in db.execute(text("SELECT bioguide_id FROM members")).fetchall()} inserted = 0 offset = 0 while True: data = congress_api.get_bill_cosponsors( bill.congress_number, bill.bill_type, bill.bill_number, offset=offset ) cosponsors = data.get("cosponsors", []) if not cosponsors: break for cs in cosponsors: bioguide_id = cs.get("bioguideId") # Only link to members we've already ingested if bioguide_id and bioguide_id not in known_bioguides: bioguide_id = None # Skip if we already have this (bioguide_id, bill_id) pair if bioguide_id: exists = db.query(BillCosponsor).filter_by( bill_id=bill_id, bioguide_id=bioguide_id ).first() if exists: continue date_str = cs.get("sponsorshipDate") try: sponsored_date = datetime.strptime(date_str, "%Y-%m-%d").date() if date_str else None except ValueError: sponsored_date = None db.add(BillCosponsor( bill_id=bill_id, bioguide_id=bioguide_id, name=cs.get("fullName") or cs.get("name"), party=cs.get("party"), state=cs.get("state"), sponsored_date=sponsored_date, )) inserted += 1 db.commit() offset += 250 if len(cosponsors) < 250: break time.sleep(0.25) bill.cosponsors_fetched_at = datetime.now(timezone.utc) db.commit() return {"bill_id": bill_id, "inserted": inserted} except Exception as exc: db.rollback() logger.error(f"fetch_bill_cosponsors failed for {bill_id}: {exc}") raise self.retry(exc=exc, countdown=60) finally: db.close() @celery_app.task(bind=True, name="app.workers.bill_classifier.backfill_all_bill_cosponsors") def backfill_all_bill_cosponsors(self): """Queue cosponsor fetches for all bills that haven't been fetched yet.""" db = get_sync_db() try: rows = db.execute(text( "SELECT bill_id FROM bills WHERE cosponsors_fetched_at IS NULL" )).fetchall() queued = 0 for row in rows: fetch_bill_cosponsors.delay(row.bill_id) queued += 1 time.sleep(0.05) logger.info(f"backfill_all_bill_cosponsors: queued {queued} tasks") return {"queued": queued} finally: db.close() # ── Effectiveness scoring ────────────────────────────────────────────────────── def _distance_points(latest_action_text: str | None) -> int: """Map latest action text to a distance-traveled score.""" text = (latest_action_text or "").lower() if "became public law" in text or "signed by president" in text or "enacted" in text: return 50 if "passed house" in text or "passed senate" in text or "agreed to in" in text: return 20 if "placed on" in text and "calendar" in text: return 10 if "reported by" in text or "ordered to be reported" in text or "discharged" in text: return 5 return 1 def _bipartisan_multiplier(db, bill_id: str, sponsor_party: str | None) -> float: """1.5x if ≥20% of cosponsors are from the opposing party.""" if not sponsor_party: return 1.0 cosponsors = db.query(BillCosponsor).filter_by(bill_id=bill_id).all() if not cosponsors: return 1.0 opposing = [c for c in cosponsors if c.party and c.party != sponsor_party] if len(cosponsors) > 0 and len(opposing) / len(cosponsors) >= 0.20: return 1.5 return 1.0 def _substance_multiplier(bill_category: str | None) -> float: return 0.1 if bill_category == "commemorative" else 1.0 def _leadership_multiplier(member: Member, congress_number: int) -> float: """1.2x if member chaired a committee during this Congress.""" if not member.leadership_json: return 1.0 for role in member.leadership_json: if (role.get("congress") == congress_number and "chair" in (role.get("type") or "").lower()): return 1.2 return 1.0 def _seniority_tier(terms_json: list | None) -> str: """Return 'junior' | 'mid' | 'senior' based on number of terms served.""" if not terms_json: return "junior" count = len(terms_json) if count <= 2: return "junior" if count <= 5: return "mid" return "senior" @celery_app.task(bind=True, name="app.workers.bill_classifier.calculate_effectiveness_scores") def calculate_effectiveness_scores(self): """Nightly: compute effectiveness score and within-tier percentile for all members.""" db = get_sync_db() try: members = db.query(Member).all() if not members: return {"status": "no_members"} # Map bioguide_id → Member for quick lookup member_map = {m.bioguide_id: m for m in members} # Load all bills sponsored by current members (current congress only) current_congress = congress_api.get_current_congress() bills = db.query(Bill).filter_by(congress_number=current_congress).all() # Compute raw score per member raw_scores: dict[str, float] = {m.bioguide_id: 0.0 for m in members} for bill in bills: if not bill.sponsor_id or bill.sponsor_id not in member_map: continue sponsor = member_map[bill.sponsor_id] pts = _distance_points(bill.latest_action_text) bipartisan = _bipartisan_multiplier(db, bill.bill_id, sponsor.party) substance = _substance_multiplier(bill.bill_category) leadership = _leadership_multiplier(sponsor, current_congress) raw_scores[bill.sponsor_id] = raw_scores.get(bill.sponsor_id, 0.0) + ( pts * bipartisan * substance * leadership ) # Group members by (tier, party) for percentile normalisation # We treat party as a proxy for majority/minority — grouped separately so # a minority-party junior isn't unfairly compared to a majority-party senior. from collections import defaultdict buckets: dict[tuple, list[str]] = defaultdict(list) for m in members: tier = _seniority_tier(m.terms_json) party_bucket = m.party or "Unknown" buckets[(tier, party_bucket)].append(m.bioguide_id) # Compute percentile within each bucket percentiles: dict[str, float] = {} tiers: dict[str, str] = {} for (tier, _), ids in buckets.items(): scores = [(bid, raw_scores.get(bid, 0.0)) for bid in ids] scores.sort(key=lambda x: x[1]) n = len(scores) for rank, (bid, _) in enumerate(scores): percentiles[bid] = round((rank / max(n - 1, 1)) * 100, 1) tiers[bid] = tier # Bulk update members updated = 0 for m in members: score = raw_scores.get(m.bioguide_id, 0.0) pct = percentiles.get(m.bioguide_id) tier = tiers.get(m.bioguide_id, _seniority_tier(m.terms_json)) m.effectiveness_score = round(score, 2) m.effectiveness_percentile = pct m.effectiveness_tier = tier updated += 1 db.commit() logger.info(f"calculate_effectiveness_scores: updated {updated} members for Congress {current_congress}") return {"status": "ok", "updated": updated, "congress": current_congress} except Exception as exc: db.rollback() logger.error(f"calculate_effectiveness_scores failed: {exc}") raise finally: db.close()