Coverage for app / services / monitor_service.py: 100%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
1from datetime import datetime, timezone, timedelta
2from typing import List, Optional
3from uuid import UUID
5from sqlalchemy import select, func
6from sqlalchemy.ext.asyncio import AsyncSession
8from app.models.monitor import Monitor
9from app.schemas.monitor import MonitorCreate, MonitorUpdate
12def _coerce_uuid(value) -> UUID:
13 """Coerce string UUID (from SQLite) to UUID object."""
14 if isinstance(value, str):
15 return UUID(value)
16 return value
19class MonitorService:
20 """Business logic for monitor management."""
22 def __init__(self, db: AsyncSession):
23 self.db = db
25 async def create(self, user_id: UUID, data: MonitorCreate) -> Monitor:
26 # Set next_check_at to now so the scheduler picks it up immediately
27 monitor = Monitor(
28 user_id=user_id,
29 url=str(data.url),
30 interval_seconds=data.interval_seconds,
31 is_active=data.is_active,
32 next_check_at=datetime.now(timezone.utc), # ← Run immediately
33 )
34 self.db.add(monitor)
35 await self.db.commit()
36 await self.db.refresh(monitor)
37 return monitor
39 async def list_by_user(
40 self,
41 user_id: UUID,
42 skip: int = 0,
43 limit: int = 100,
44 ) -> List[Monitor]:
45 result = await self.db.execute(
46 select(Monitor)
47 .where(Monitor.user_id == user_id)
48 .offset(skip)
49 .limit(limit)
50 )
51 return result.scalars().all()
53 async def count_by_user(self, user_id: UUID) -> int:
54 result = await self.db.execute(
55 select(func.count()).where(Monitor.user_id == user_id)
56 )
57 return result.scalar() or 0
59 async def get_by_id(self, monitor_id: str, user_id: UUID) -> Optional[Monitor]:
60 """Get monitor by ID, coercing UUID types for SQLite compatibility."""
61 monitor_uuid = _coerce_uuid(monitor_id)
62 result = await self.db.execute(
63 select(Monitor).where(
64 Monitor.id == monitor_uuid,
65 Monitor.user_id == user_id,
66 )
67 )
68 return result.scalar_one_or_none()
70 async def update(
71 self,
72 monitor: Monitor,
73 data: MonitorUpdate,
74 ) -> Monitor:
75 update_data = data.model_dump(exclude_unset=True, mode="json")
76 for field, value in update_data.items():
77 setattr(monitor, field, value)
78 await self.db.commit()
79 await self.db.refresh(monitor)
80 return monitor
82 async def delete(self, monitor: Monitor) -> None:
83 await self.db.delete(monitor)
84 await self.db.commit()
86 async def toggle_active(self, monitor_id: str, user_id: UUID) -> Optional[Monitor]:
87 monitor = await self.get_by_id(monitor_id, user_id)
88 if not monitor:
89 return None
90 monitor.is_active = not monitor.is_active
91 await self.db.commit()
92 await self.db.refresh(monitor)
93 return monitor
95 def get_due_monitors_sync(self, db_session, now: Optional[datetime] = None) -> List[Monitor]:
96 """
97 Synchronous version for Celery scheduler.
98 Returns monitors that are active and due for a check.
99 Uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions
100 where concurrent dispatchers see the same due monitors.
101 """
102 if now is None:
103 now = datetime.now(timezone.utc)
105 result = db_session.execute(
106 select(Monitor)
107 .where(
108 Monitor.is_active == True,
109 (Monitor.next_check_at <= now) | (Monitor.next_check_at.is_(None)),
110 )
111 .with_for_update(skip_locked=True) # ← Lock and skip if another dispatcher has it
112 )
113 return result.scalars().all()
115 def update_next_check_sync(self, monitor: Monitor, now: Optional[datetime] = None) -> None:
116 """Update next_check_at after dispatching a check."""
117 if now is None:
118 now = datetime.now(timezone.utc)
119 monitor.next_check_at = now + timedelta(seconds=monitor.interval_seconds)