Coverage for app / tasks / scheduler.py: 100%
23 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
1from datetime import datetime, timezone
3from celery_worker import celery_app
4from app.db.session import SyncSessionLocal
5from app.services.monitor_service import MonitorService
6from app.tasks.ping import ping_url
8def _align_to_beat(now: datetime, beat_interval: int = 60) -> datetime:
9 """
10 Round a datetime down to the nearest beat boundary.
11 This ensures next_check_at always aligns to when the beat fires,
12 preventing drift from Celery task queue lag.
13 """
14 timestamp = now.timestamp()
15 aligned_timestamp = (timestamp // beat_interval) * beat_interval
16 return datetime.fromtimestamp(aligned_timestamp, tz=timezone.utc)
18@celery_app.task
19def dispatch_checks():
20 with SyncSessionLocal() as db:
21 service = MonitorService(db)
22 # Use aligned time so next_check_at always falls on a beat tick
23 now = _align_to_beat(datetime.now(timezone.utc))
24 monitors = service.get_due_monitors_sync(db, now)
26 dispatched_ids = []
27 for monitor in monitors:
28 # Dispatch the check
29 ping_url.delay(str(monitor.id), str(monitor.url))
30 # Set next_check_at based on aligned time, not actual execution time
31 # This prevents drift from queue lag
32 service.update_next_check_sync(monitor, now)
33 dispatched_ids.append(str(monitor.id))
35 if monitors:
36 db.commit()
38 return {
39 "dispatched": len(monitors),
40 "monitors": dispatched_ids,
41 "aligned_time": now.isoformat(),
42 }