from celery import Celery from celery.schedules import crontab from kombu import Queue from app.config import settings celery_app = Celery( "pocketveto", broker=settings.REDIS_URL, backend=settings.REDIS_URL, include=[ "app.workers.congress_poller", "app.workers.document_fetcher", "app.workers.llm_processor", "app.workers.news_fetcher", "app.workers.trend_scorer", "app.workers.member_interest", ], ) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="UTC", enable_utc=True, # Late ack: task is only removed from queue after completion, not on pickup. # Combined with idempotent tasks, this ensures no work is lost if a worker crashes. task_acks_late=True, # Prevent workers from prefetching LLM tasks and blocking other workers. worker_prefetch_multiplier=1, # Route tasks to named queues task_routes={ "app.workers.congress_poller.*": {"queue": "polling"}, "app.workers.document_fetcher.*": {"queue": "documents"}, "app.workers.llm_processor.*": {"queue": "llm"}, "app.workers.news_fetcher.*": {"queue": "news"}, "app.workers.trend_scorer.*": {"queue": "news"}, "app.workers.member_interest.*": {"queue": "news"}, }, task_queues=[ Queue("polling"), Queue("documents"), Queue("llm"), Queue("news"), ], # RedBeat stores schedule in Redis — restart-safe and dynamically updatable redbeat_redis_url=settings.REDIS_URL, beat_scheduler="redbeat.RedBeatScheduler", beat_schedule={ "poll-congress-bills": { "task": "app.workers.congress_poller.poll_congress_bills", "schedule": crontab(minute=f"*/{settings.CONGRESS_POLL_INTERVAL_MINUTES}"), }, "fetch-news-active-bills": { "task": "app.workers.news_fetcher.fetch_news_for_active_bills", "schedule": crontab(hour="*/6", minute=0), }, "calculate-trend-scores": { "task": "app.workers.trend_scorer.calculate_all_trend_scores", "schedule": crontab(hour=2, minute=0), }, "fetch-news-active-members": { "task": "app.workers.member_interest.fetch_news_for_active_members", "schedule": crontab(hour="*/12", minute=30), }, "calculate-member-trend-scores": { "task": "app.workers.member_interest.calculate_all_member_trend_scores", "schedule": crontab(hour=3, minute=0), }, }, )