feat: weekly digest + local-time quiet hours
Weekly Digest (send_weekly_digest Celery task): - Runs every Monday 8:30 AM UTC via beat schedule - Queries all followed bills updated in the past 7 days per user - Sends low-priority ntfy push (Priority: low, Tags: newspaper,calendar) - Creates a NotificationEvent (weekly_digest type) for RSS feed visibility - Admin can trigger immediately via POST /api/admin/trigger-weekly-digest - Manual Controls panel now includes "Send Weekly Digest" button Local-time quiet hours: - Browser auto-detects IANA timezone via Intl.DateTimeFormat().resolvedOptions().timeZone - Timezone saved to notification_prefs alongside quiet_hours_start/end on Save - Dispatcher converts UTC → user's local time (zoneinfo stdlib) before hour comparison - Falls back to UTC if timezone absent or unrecognised - Quiet hours UI: 12-hour AM/PM selectors, shows detected timezone as hint - Clearing quiet hours also clears stored timezone Co-Authored-By: Jack Levy
This commit is contained in:
@@ -28,6 +28,7 @@ _EVENT_TITLES = {
|
||||
"new_document": "New Bill Text",
|
||||
"new_amendment": "Amendment Filed",
|
||||
"bill_updated": "Bill Updated",
|
||||
"weekly_digest": "Weekly Digest",
|
||||
}
|
||||
|
||||
_EVENT_TAGS = {
|
||||
@@ -45,12 +46,27 @@ _EVENT_PRIORITY = {
|
||||
|
||||
|
||||
def _in_quiet_hours(prefs: dict, now: datetime) -> bool:
|
||||
"""Return True if the current UTC hour falls within the user's quiet window."""
|
||||
"""Return True if the current local time falls within the user's quiet window.
|
||||
|
||||
Quiet hours are stored as local-time hour integers. If the user has a stored
|
||||
IANA timezone name we convert ``now`` (UTC) to that zone before comparing.
|
||||
Falls back to UTC if the timezone is absent or unrecognised.
|
||||
"""
|
||||
start = prefs.get("quiet_hours_start")
|
||||
end = prefs.get("quiet_hours_end")
|
||||
if start is None or end is None:
|
||||
return False
|
||||
h = now.hour
|
||||
|
||||
tz_name = prefs.get("timezone")
|
||||
if tz_name:
|
||||
try:
|
||||
from zoneinfo import ZoneInfo
|
||||
h = now.astimezone(ZoneInfo(tz_name)).hour
|
||||
except Exception:
|
||||
h = now.hour # unrecognised timezone — degrade gracefully to UTC
|
||||
else:
|
||||
h = now.hour
|
||||
|
||||
if start <= end:
|
||||
return start <= h < end
|
||||
# Wraps midnight (e.g. 22 → 8)
|
||||
@@ -250,6 +266,137 @@ def _send_ntfy(
|
||||
resp.raise_for_status()
|
||||
|
||||
|
||||
@celery_app.task(bind=True, name="app.workers.notification_dispatcher.send_weekly_digest")
|
||||
def send_weekly_digest(self):
|
||||
"""
|
||||
Proactive week-in-review summary for followed bills.
|
||||
|
||||
Runs every Monday at 8:30 AM UTC. Queries bills followed by each user
|
||||
for any activity in the past 7 days and dispatches a low-noise summary
|
||||
via ntfy and/or creates a NotificationEvent for the RSS feed.
|
||||
|
||||
Unlike send_notification_digest (which bundles queued events), this task
|
||||
generates a fresh summary independent of the notification event queue.
|
||||
"""
|
||||
from app.config import settings as app_settings
|
||||
from app.models.bill import Bill
|
||||
|
||||
db = get_sync_db()
|
||||
try:
|
||||
now = datetime.now(timezone.utc)
|
||||
cutoff = now - timedelta(days=7)
|
||||
base_url = (app_settings.PUBLIC_URL or app_settings.LOCAL_URL).rstrip("/")
|
||||
|
||||
users = db.query(User).all()
|
||||
ntfy_sent = 0
|
||||
rss_created = 0
|
||||
|
||||
for user in users:
|
||||
prefs = user.notification_prefs or {}
|
||||
ntfy_enabled = prefs.get("ntfy_enabled", False)
|
||||
ntfy_url = prefs.get("ntfy_topic_url", "").strip()
|
||||
rss_enabled = prefs.get("rss_enabled", False)
|
||||
ntfy_configured = ntfy_enabled and bool(ntfy_url)
|
||||
|
||||
if not ntfy_configured and not rss_enabled:
|
||||
continue
|
||||
|
||||
bill_follows = db.query(Follow).filter_by(
|
||||
user_id=user.id, follow_type="bill"
|
||||
).all()
|
||||
if not bill_follows:
|
||||
continue
|
||||
|
||||
bill_ids = [f.follow_value for f in bill_follows]
|
||||
|
||||
active_bills = (
|
||||
db.query(Bill)
|
||||
.filter(
|
||||
Bill.bill_id.in_(bill_ids),
|
||||
Bill.updated_at >= cutoff,
|
||||
)
|
||||
.order_by(Bill.updated_at.desc())
|
||||
.limit(20)
|
||||
.all()
|
||||
)
|
||||
|
||||
if not active_bills:
|
||||
continue
|
||||
|
||||
count = len(active_bills)
|
||||
anchor = active_bills[0]
|
||||
|
||||
summary_lines = []
|
||||
for bill in active_bills[:10]:
|
||||
lbl = _format_bill_label(bill)
|
||||
action = (bill.latest_action_text or "")[:80]
|
||||
summary_lines.append(f"• {lbl}: {action}" if action else f"• {lbl}")
|
||||
if count > 10:
|
||||
summary_lines.append(f" …and {count - 10} more")
|
||||
summary = "\n".join(summary_lines)
|
||||
|
||||
# Mark dispatched_at immediately so dispatch_notifications skips this event;
|
||||
# it still appears in the RSS feed since that endpoint reads all events.
|
||||
event = NotificationEvent(
|
||||
user_id=user.id,
|
||||
bill_id=anchor.bill_id,
|
||||
event_type="weekly_digest",
|
||||
dispatched_at=now,
|
||||
payload={
|
||||
"bill_label": "Weekly Digest",
|
||||
"bill_title": f"{count} followed bill{'s' if count != 1 else ''} had activity this week",
|
||||
"brief_summary": summary,
|
||||
"bill_count": count,
|
||||
"bill_url": f"{base_url}/bills/{anchor.bill_id}",
|
||||
},
|
||||
)
|
||||
db.add(event)
|
||||
rss_created += 1
|
||||
|
||||
if ntfy_configured:
|
||||
try:
|
||||
_send_weekly_digest_ntfy(count, summary, ntfy_url, prefs)
|
||||
ntfy_sent += 1
|
||||
except Exception as e:
|
||||
logger.warning(f"Weekly digest ntfy failed for user {user.id}: {e}")
|
||||
|
||||
db.commit()
|
||||
logger.info(f"send_weekly_digest: {ntfy_sent} ntfy sent, {rss_created} events created")
|
||||
return {"ntfy_sent": ntfy_sent, "rss_created": rss_created}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _format_bill_label(bill) -> str:
|
||||
_TYPE_MAP = {
|
||||
"hr": "H.R.", "s": "S.", "hjres": "H.J.Res.", "sjres": "S.J.Res.",
|
||||
"hconres": "H.Con.Res.", "sconres": "S.Con.Res.", "hres": "H.Res.", "sres": "S.Res.",
|
||||
}
|
||||
prefix = _TYPE_MAP.get(bill.bill_type.lower(), bill.bill_type.upper())
|
||||
return f"{prefix} {bill.bill_number}"
|
||||
|
||||
|
||||
def _send_weekly_digest_ntfy(count: int, summary: str, ntfy_url: str, prefs: dict) -> None:
|
||||
auth_method = prefs.get("ntfy_auth_method", "none")
|
||||
ntfy_token = prefs.get("ntfy_token", "").strip()
|
||||
ntfy_username = prefs.get("ntfy_username", "").strip()
|
||||
ntfy_password = prefs.get("ntfy_password", "").strip()
|
||||
|
||||
headers = {
|
||||
"Title": f"PocketVeto Weekly — {count} bill{'s' if count != 1 else ''} updated",
|
||||
"Priority": "low",
|
||||
"Tags": "newspaper,calendar",
|
||||
}
|
||||
if auth_method == "token" and ntfy_token:
|
||||
headers["Authorization"] = f"Bearer {ntfy_token}"
|
||||
elif auth_method == "basic" and ntfy_username:
|
||||
creds = base64.b64encode(f"{ntfy_username}:{ntfy_password}".encode()).decode()
|
||||
headers["Authorization"] = f"Basic {creds}"
|
||||
|
||||
resp = requests.post(ntfy_url, data=summary.encode("utf-8"), headers=headers, timeout=NTFY_TIMEOUT)
|
||||
resp.raise_for_status()
|
||||
|
||||
|
||||
def _send_digest_ntfy(events: list, ntfy_url: str, prefs: dict) -> None:
|
||||
auth_method = prefs.get("ntfy_auth_method", "none")
|
||||
ntfy_token = prefs.get("ntfy_token", "").strip()
|
||||
|
||||
Reference in New Issue
Block a user