""" Shared notification utilities — used by llm_processor, congress_poller, etc. Centralised here to avoid circular imports. """ from datetime import datetime, timedelta, timezone _MILESTONE_KEYWORDS = [ "passed", "failed", "agreed to", "signed", "vetoed", "enacted", "presented to the president", "ordered to be reported", "ordered reported", "reported by", "discharged", "placed on", # placed on calendar "cloture", "roll call", "markup", # markup session scheduled/completed "conference", # conference committee activity ] # Committee referral — meaningful for pocket_veto/boost but noisy for neutral _REFERRAL_KEYWORDS = [ "referred to", ] # Events created within this window for the same (user, bill, event_type) are suppressed _DEDUP_MINUTES = 30 def is_milestone_action(action_text: str) -> bool: t = (action_text or "").lower() return any(kw in t for kw in _MILESTONE_KEYWORDS) def is_referral_action(action_text: str) -> bool: t = (action_text or "").lower() return any(kw in t for kw in _REFERRAL_KEYWORDS) def _build_payload(bill, action_summary: str, milestone_tier: str = "progress") -> dict: from app.config import settings base_url = (settings.PUBLIC_URL or settings.LOCAL_URL).rstrip("/") return { "bill_title": bill.short_title or bill.title or "", "bill_label": f"{bill.bill_type.upper()} {bill.bill_number}", "brief_summary": (action_summary or "")[:300], "bill_url": f"{base_url}/bills/{bill.bill_id}", "milestone_tier": milestone_tier, } def _is_duplicate(db, user_id: int, bill_id: str, event_type: str) -> bool: """True if an identical event was already created within the dedup window.""" from app.models.notification import NotificationEvent cutoff = datetime.now(timezone.utc) - timedelta(minutes=_DEDUP_MINUTES) return db.query(NotificationEvent).filter_by( user_id=user_id, bill_id=bill_id, event_type=event_type, ).filter(NotificationEvent.created_at > cutoff).first() is not None def emit_bill_notification( db, bill, event_type: str, action_summary: str, milestone_tier: str = "progress" ) -> int: """Create NotificationEvent rows for every user following this bill. Returns count.""" from app.models.follow import Follow from app.models.notification import NotificationEvent followers = db.query(Follow).filter_by(follow_type="bill", follow_value=bill.bill_id).all() if not followers: return 0 payload = _build_payload(bill, action_summary, milestone_tier) count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count def emit_member_follow_notifications( db, bill, event_type: str, action_summary: str, milestone_tier: str = "progress" ) -> int: """Notify users following the bill's sponsor (dedup prevents double-alerts for bill+member followers).""" if not bill.sponsor_id: return 0 from app.models.follow import Follow from app.models.notification import NotificationEvent followers = db.query(Follow).filter_by(follow_type="member", follow_value=bill.sponsor_id).all() if not followers: return 0 payload = _build_payload(bill, action_summary, milestone_tier) count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count def emit_topic_follow_notifications( db, bill, event_type: str, action_summary: str, topic_tags: list, milestone_tier: str = "progress", ) -> int: """Notify users following any of the bill's topic tags.""" if not topic_tags: return 0 from app.models.follow import Follow from app.models.notification import NotificationEvent # Collect unique followers across all matching tags seen_user_ids: set[int] = set() followers = [] for tag in topic_tags: for follow in db.query(Follow).filter_by(follow_type="topic", follow_value=tag).all(): if follow.user_id not in seen_user_ids: seen_user_ids.add(follow.user_id) followers.append(follow) if not followers: return 0 payload = _build_payload(bill, action_summary, milestone_tier) count = 0 for follow in followers: if _is_duplicate(db, follow.user_id, bill.bill_id, event_type): continue db.add(NotificationEvent( user_id=follow.user_id, bill_id=bill.bill_id, event_type=event_type, payload=payload, )) count += 1 if count: db.commit() return count