Coverage for app / services / monitor_service.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 17:54 +0000

1from datetime import datetime, timezone, timedelta 

2from typing import List, Optional 

3from uuid import UUID 

4 

5from sqlalchemy import select, func 

6from sqlalchemy.ext.asyncio import AsyncSession 

7 

8from app.models.monitor import Monitor 

9from app.schemas.monitor import MonitorCreate, MonitorUpdate 

10 

11 

12def _coerce_uuid(value) -> UUID: 

13 """Coerce string UUID (from SQLite) to UUID object.""" 

14 if isinstance(value, str): 

15 return UUID(value) 

16 return value 

17 

18 

19class MonitorService: 

20 """Business logic for monitor management.""" 

21 

22 def __init__(self, db: AsyncSession): 

23 self.db = db 

24 

25 async def create(self, user_id: UUID, data: MonitorCreate) -> Monitor: 

26 # Set next_check_at to now so the scheduler picks it up immediately 

27 monitor = Monitor( 

28 user_id=user_id, 

29 url=str(data.url), 

30 interval_seconds=data.interval_seconds, 

31 is_active=data.is_active, 

32 next_check_at=datetime.now(timezone.utc), # ← Run immediately 

33 ) 

34 self.db.add(monitor) 

35 await self.db.commit() 

36 await self.db.refresh(monitor) 

37 return monitor 

38 

39 async def list_by_user( 

40 self, 

41 user_id: UUID, 

42 skip: int = 0, 

43 limit: int = 100, 

44 ) -> List[Monitor]: 

45 result = await self.db.execute( 

46 select(Monitor) 

47 .where(Monitor.user_id == user_id) 

48 .offset(skip) 

49 .limit(limit) 

50 ) 

51 return result.scalars().all() 

52 

53 async def count_by_user(self, user_id: UUID) -> int: 

54 result = await self.db.execute( 

55 select(func.count()).where(Monitor.user_id == user_id) 

56 ) 

57 return result.scalar() or 0 

58 

59 async def get_by_id(self, monitor_id: str, user_id: UUID) -> Optional[Monitor]: 

60 """Get monitor by ID, coercing UUID types for SQLite compatibility.""" 

61 monitor_uuid = _coerce_uuid(monitor_id) 

62 result = await self.db.execute( 

63 select(Monitor).where( 

64 Monitor.id == monitor_uuid, 

65 Monitor.user_id == user_id, 

66 ) 

67 ) 

68 return result.scalar_one_or_none() 

69 

70 async def update( 

71 self, 

72 monitor: Monitor, 

73 data: MonitorUpdate, 

74 ) -> Monitor: 

75 update_data = data.model_dump(exclude_unset=True, mode="json") 

76 for field, value in update_data.items(): 

77 setattr(monitor, field, value) 

78 await self.db.commit() 

79 await self.db.refresh(monitor) 

80 return monitor 

81 

82 async def delete(self, monitor: Monitor) -> None: 

83 await self.db.delete(monitor) 

84 await self.db.commit() 

85 

86 async def toggle_active(self, monitor_id: str, user_id: UUID) -> Optional[Monitor]: 

87 monitor = await self.get_by_id(monitor_id, user_id) 

88 if not monitor: 

89 return None 

90 monitor.is_active = not monitor.is_active 

91 await self.db.commit() 

92 await self.db.refresh(monitor) 

93 return monitor 

94 

95 def get_due_monitors_sync(self, db_session, now: Optional[datetime] = None) -> List[Monitor]: 

96 """ 

97 Synchronous version for Celery scheduler. 

98 Returns monitors that are active and due for a check. 

99 Uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions 

100 where concurrent dispatchers see the same due monitors. 

101 """ 

102 if now is None: 

103 now = datetime.now(timezone.utc) 

104 

105 result = db_session.execute( 

106 select(Monitor) 

107 .where( 

108 Monitor.is_active == True, 

109 (Monitor.next_check_at <= now) | (Monitor.next_check_at.is_(None)), 

110 ) 

111 .with_for_update(skip_locked=True) # ← Lock and skip if another dispatcher has it 

112 ) 

113 return result.scalars().all() 

114 

115 def update_next_check_sync(self, monitor: Monitor, now: Optional[datetime] = None) -> None: 

116 """Update next_check_at after dispatching a check.""" 

117 if now is None: 

118 now = datetime.now(timezone.utc) 

119 monitor.next_check_at = now + timedelta(seconds=monitor.interval_seconds)