Coverage for waveqc/celery.py: 100%
11 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
1import os
3from celery import Celery
4from celery.schedules import crontab
5from django.conf import settings
7# Set the default Django settings module for the 'celery' program.
8os.environ.setdefault("DJANGO_SETTINGS_MODULE", "waveqc.settings")
9CHECKS_LAUNCH = settings.WAVEQC_SCHEDULE[0]
10RETRY_LAUNCHES = settings.WAVEQC_SCHEDULE[2:]
13app = Celery("waveqc")
14app.config_from_object("django.conf:settings", namespace="CELERY")
15app.conf.beat_schedule = {
16 "launch-checks-daily": {
17 "task": "quality.tasks.launch_checks",
18 "schedule": crontab(hour=CHECKS_LAUNCH, minute="15"),
19 "args": (),
20 },
21 "retry-failing-checks-daily": {
22 "task": "quality.tasks.retry_failing_checks",
23 "schedule": crontab(hour=RETRY_LAUNCHES, minute="15"),
24 "args": (),
25 },
26}
27app.autodiscover_tasks()