Coverage for waveqc/celery.py: 100%

11 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-26 15:42 +0000

1import os 

2 

3from celery import Celery 

4from celery.schedules import crontab 

5from django.conf import settings 

6 

7# Set the default Django settings module for the 'celery' program. 

8os.environ.setdefault("DJANGO_SETTINGS_MODULE", "waveqc.settings") 

9CHECKS_LAUNCH = settings.WAVEQC_SCHEDULE[0] 

10RETRY_LAUNCHES = settings.WAVEQC_SCHEDULE[2:] 

11 

12 

13app = Celery("waveqc") 

14app.config_from_object("django.conf:settings", namespace="CELERY") 

15app.conf.beat_schedule = { 

16 "launch-checks-daily": { 

17 "task": "quality.tasks.launch_checks", 

18 "schedule": crontab(hour=CHECKS_LAUNCH, minute="15"), 

19 "args": (), 

20 }, 

21 "retry-failing-checks-daily": { 

22 "task": "quality.tasks.retry_failing_checks", 

23 "schedule": crontab(hour=RETRY_LAUNCHES, minute="15"), 

24 "args": (), 

25 }, 

26} 

27app.autodiscover_tasks()