Coverage for app / tasks / scheduler.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 17:54 +0000

1from datetime import datetime, timezone 

2 

3from celery_worker import celery_app 

4from app.db.session import SyncSessionLocal 

5from app.services.monitor_service import MonitorService 

6from app.tasks.ping import ping_url 

7 

8def _align_to_beat(now: datetime, beat_interval: int = 60) -> datetime: 

9 """ 

10 Round a datetime down to the nearest beat boundary. 

11 This ensures next_check_at always aligns to when the beat fires, 

12 preventing drift from Celery task queue lag. 

13 """ 

14 timestamp = now.timestamp() 

15 aligned_timestamp = (timestamp // beat_interval) * beat_interval 

16 return datetime.fromtimestamp(aligned_timestamp, tz=timezone.utc) 

17 

18@celery_app.task 

19def dispatch_checks(): 

20 with SyncSessionLocal() as db: 

21 service = MonitorService(db) 

22 # Use aligned time so next_check_at always falls on a beat tick 

23 now = _align_to_beat(datetime.now(timezone.utc)) 

24 monitors = service.get_due_monitors_sync(db, now) 

25 

26 dispatched_ids = [] 

27 for monitor in monitors: 

28 # Dispatch the check 

29 ping_url.delay(str(monitor.id), str(monitor.url)) 

30 # Set next_check_at based on aligned time, not actual execution time 

31 # This prevents drift from queue lag 

32 service.update_next_check_sync(monitor, now) 

33 dispatched_ids.append(str(monitor.id)) 

34 

35 if monitors: 

36 db.commit() 

37 

38 return { 

39 "dispatched": len(monitors), 

40 "monitors": dispatched_ids, 

41 "aligned_time": now.isoformat(), 

42 }