fix: proactively fetch votes for stanced bills + register vote_fetcher with Celery
vote_fetcher was missing from Celery's include list (task not registered with workers) and had no beat schedule — votes only fetched on-demand when a user visited a bill's votes page. Stanced bills (pocket_veto/pocket_boost) never had votes fetched, leaving the alignment page blank. Add fetch_votes_for_stanced_bills nightly task (4:30 AM UTC) that queues fetch_bill_votes for every bill any user has stanced but has no stored votes. Register vote_fetcher in the include list and add it to the polling queue route. Authored by: Jack Levy
This commit is contained in:
@@ -18,6 +18,7 @@ celery_app = Celery(
|
|||||||
"app.workers.notification_dispatcher",
|
"app.workers.notification_dispatcher",
|
||||||
"app.workers.llm_batch_processor",
|
"app.workers.llm_batch_processor",
|
||||||
"app.workers.bill_classifier",
|
"app.workers.bill_classifier",
|
||||||
|
"app.workers.vote_fetcher",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -43,6 +44,7 @@ celery_app.conf.update(
|
|||||||
"app.workers.trend_scorer.*": {"queue": "news"},
|
"app.workers.trend_scorer.*": {"queue": "news"},
|
||||||
"app.workers.member_interest.*": {"queue": "news"},
|
"app.workers.member_interest.*": {"queue": "news"},
|
||||||
"app.workers.notification_dispatcher.*": {"queue": "polling"},
|
"app.workers.notification_dispatcher.*": {"queue": "polling"},
|
||||||
|
"app.workers.vote_fetcher.*": {"queue": "polling"},
|
||||||
},
|
},
|
||||||
task_queues=[
|
task_queues=[
|
||||||
Queue("polling"),
|
Queue("polling"),
|
||||||
@@ -82,6 +84,10 @@ celery_app.conf.update(
|
|||||||
"task": "app.workers.congress_poller.fetch_actions_for_active_bills",
|
"task": "app.workers.congress_poller.fetch_actions_for_active_bills",
|
||||||
"schedule": crontab(hour=4, minute=0), # 4 AM UTC, after trend + member scoring
|
"schedule": crontab(hour=4, minute=0), # 4 AM UTC, after trend + member scoring
|
||||||
},
|
},
|
||||||
|
"fetch-votes-for-stanced-bills": {
|
||||||
|
"task": "app.workers.vote_fetcher.fetch_votes_for_stanced_bills",
|
||||||
|
"schedule": crontab(hour=4, minute=30), # 4:30 AM UTC daily
|
||||||
|
},
|
||||||
"dispatch-notifications": {
|
"dispatch-notifications": {
|
||||||
"task": "app.workers.notification_dispatcher.dispatch_notifications",
|
"task": "app.workers.notification_dispatcher.dispatch_notifications",
|
||||||
"schedule": crontab(minute="*/5"), # Every 5 minutes
|
"schedule": crontab(minute="*/5"), # Every 5 minutes
|
||||||
|
|||||||
@@ -237,3 +237,35 @@ def fetch_bill_votes(self, bill_id: str) -> dict:
|
|||||||
return {"bill_id": bill_id, "stored": stored, "skipped": skipped}
|
return {"bill_id": bill_id, "stored": stored, "skipped": skipped}
|
||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(bind=True, name="app.workers.vote_fetcher.fetch_votes_for_stanced_bills")
|
||||||
|
def fetch_votes_for_stanced_bills(self) -> dict:
|
||||||
|
"""
|
||||||
|
Nightly task: queue vote fetches for every bill any user has a stance on
|
||||||
|
(pocket_veto or pocket_boost). Only queues bills that don't already have
|
||||||
|
a vote stored, so re-runs are cheap after the first pass.
|
||||||
|
"""
|
||||||
|
from app.models.follow import Follow
|
||||||
|
|
||||||
|
db = get_sync_db()
|
||||||
|
try:
|
||||||
|
from sqlalchemy import text as sa_text
|
||||||
|
rows = db.execute(sa_text("""
|
||||||
|
SELECT DISTINCT f.follow_value AS bill_id
|
||||||
|
FROM follows f
|
||||||
|
LEFT JOIN bill_votes bv ON bv.bill_id = f.follow_value
|
||||||
|
WHERE f.follow_type = 'bill'
|
||||||
|
AND f.follow_mode IN ('pocket_veto', 'pocket_boost')
|
||||||
|
AND bv.id IS NULL
|
||||||
|
""")).fetchall()
|
||||||
|
|
||||||
|
queued = 0
|
||||||
|
for row in rows:
|
||||||
|
fetch_bill_votes.delay(row.bill_id)
|
||||||
|
queued += 1
|
||||||
|
|
||||||
|
logger.info(f"fetch_votes_for_stanced_bills: queued {queued} bills")
|
||||||
|
return {"queued": queued}
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user