""" Shared notification utilities — used by llm_processor, congress_poller, etc. Centralised here to avoid circular imports. """ from datetime import datetime, timedelta, timezone _VOTE_KW = ["passed", "failed", "agreed to", "roll call"] _PRES_KW = ["signed", "vetoed", "enacted", "presented to the president"] _COMMITTEE_KW = ["markup", "ordered to be reported", "ordered reported", "reported by", "discharged"] _CALENDAR_KW = ["placed on"] _PROCEDURAL_KW = ["cloture", "conference"] _REFERRAL_KW = ["referred to"] # Events created within this window for the same (user, bill, event_type) are suppressed _DEDUP_MINUTES = 30 def categorize_action(action_text: str) -> str | None: """Return the action category string, or None if not notification-worthy.""" t = (action_text or "").lower() if any(kw in t for kw in _VOTE_KW): return "vote" if any(kw in t for kw in _PRES_KW): return "presidential" if any(kw in t for kw in _COMMITTEE_KW): return "committee_report" if any(kw in t for kw in _CALENDAR_KW): return "calendar" if any(kw in t for kw in _PROCEDURAL_KW): return "procedural" if any(kw in t for kw in _REFERRAL_KW): return "referral" return None def _build_payload( bill, action_summary: str, action_category: str, source: str = "bill_follow" ) -> dict: from app.config import settings base_url = (settings.PUBLIC_URL or settings.LOCAL_URL).rstrip("/") return { "bill_title": bill.short_title or bill.title or "", "bill_label": f"{bill.bill_type.upper()} {bill.bill_number}", "brief_summary": (action_summary or "")[:300], "bill_url": f"{base_url}/bills/{bill.bill_id}", "action_category": action_category, # kept for RSS/history backwards compat "milestone_tier": "referral" if action_category == "referral" else "progress", "source": source, } def _is_duplicate(db, user_id: int, bill_id: str, event_type: str) -> bool: """True if an identical event was already created within the dedup window.""" from app.models.notification import NotificationEvent cutoff = datetime.now(timezone.utc) - timedelta(minutes=_DEDUP_MINUTES) return db.query(NotificationEvent).filter_by( user_id=user_id, bill_id=bill_id, event_type=event_type, ).filter(NotificationEvent.created_at > cutoff).first() is not None def emit_bill_notification( db, bill, event_type: str, action_summary: str, action_category: str = "vote" ) -> int: """Create NotificationEvent rows for every user following this bill. Returns count.""" from app.models.follow import Follow from app.models.notification import NotificationEvent followers = db.query(Follow).filter_by(follow_type="bill", follow_value=bill.bill_id).all() if not followers: return 0 payload = _build_payload(bill, action_summary, action_category, source="bill_follow") count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count def emit_member_follow_notifications( db, bill, event_type: str, action_summary: str, action_category: str = "vote" ) -> int: """Notify users following the bill's sponsor (dedup prevents double-alerts for bill+member followers).""" if not bill.sponsor_id: return 0 from app.models.follow import Follow from app.models.notification import NotificationEvent followers = db.query(Follow).filter_by(follow_type="member", follow_value=bill.sponsor_id).all() if not followers: return 0 payload = _build_payload(bill, action_summary, action_category, source="member_follow") count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count def emit_topic_follow_notifications( db, bill, event_type: str, action_summary: str, topic_tags: list, action_category: str = "vote", ) -> int: """Notify users following any of the bill's topic tags.""" if not topic_tags: return 0 from app.models.follow import Follow from app.models.notification import NotificationEvent # Collect unique followers across all matching tags seen_user_ids: set[int] = set() followers = [] for tag in topic_tags: for follow in db.query(Follow).filter_by(follow_type="topic", follow_value=tag).all(): if follow.user_id not in seen_user_ids: seen_user_ids.add(follow.user_id) followers.append(follow) if not followers: return 0 payload = _build_payload(bill, action_summary, action_category, source="topic_follow") count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count