Coverage for app / api / v1 / monitors.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 17:54 +0000

1from fastapi import APIRouter, Depends, HTTPException, status, Request, Query 

2from sqlalchemy.ext.asyncio import AsyncSession 

3 

4from app.api.deps import get_db, get_current_user 

5from app.core.rate_limiter import limiter 

6from app.schemas.monitor import ( 

7 MonitorCreate, 

8 MonitorRead, 

9 MonitorUpdate, 

10 MonitorListResponse, 

11) 

12from app.services.monitor_service import MonitorService 

13 

14router = APIRouter() 

15 

16 

17@router.post("/", response_model=MonitorRead, status_code=201) 

18@limiter.limit("10/minute") 

19async def create_monitor( 

20 request: Request, 

21 monitor_in: MonitorCreate, 

22 db: AsyncSession = Depends(get_db), 

23 current_user: dict = Depends(get_current_user), 

24): 

25 service = MonitorService(db) 

26 monitor = await service.create(current_user["id"], monitor_in) 

27 return monitor 

28 

29 

30@router.get("/", response_model=MonitorListResponse) 

31@limiter.limit("60/minute") 

32async def list_monitors( 

33 request: Request, 

34 db: AsyncSession = Depends(get_db), 

35 current_user: dict = Depends(get_current_user), 

36 skip: int = 0, 

37 limit: int = Query(100, ge=1, le=500), 

38): 

39 service = MonitorService(db) 

40 items = await service.list_by_user(current_user["id"], skip, limit) 

41 total = await service.count_by_user(current_user["id"]) 

42 return MonitorListResponse(items=items, total=total) 

43 

44 

45@router.get("/{monitor_id}", response_model=MonitorRead) 

46@limiter.limit("60/minute") 

47async def get_monitor( 

48 request: Request, 

49 monitor_id: str, 

50 db: AsyncSession = Depends(get_db), 

51 current_user: dict = Depends(get_current_user), 

52): 

53 service = MonitorService(db) 

54 monitor = await service.get_by_id(monitor_id, current_user["id"]) 

55 if not monitor: 

56 raise HTTPException(status_code=404, detail="Monitor not found") 

57 return monitor 

58 

59 

60@router.patch("/{monitor_id}", response_model=MonitorRead) 

61@limiter.limit("20/minute") 

62async def update_monitor( 

63 request: Request, 

64 monitor_id: str, 

65 monitor_in: MonitorUpdate, 

66 db: AsyncSession = Depends(get_db), 

67 current_user: dict = Depends(get_current_user), 

68): 

69 service = MonitorService(db) 

70 monitor = await service.get_by_id(monitor_id, current_user["id"]) 

71 if not monitor: 

72 raise HTTPException(status_code=404, detail="Monitor not found") 

73 return await service.update(monitor, monitor_in) 

74 

75 

76@router.delete("/{monitor_id}", status_code=204) 

77@limiter.limit("20/minute") 

78async def delete_monitor( 

79 request: Request, 

80 monitor_id: str, 

81 db: AsyncSession = Depends(get_db), 

82 current_user: dict = Depends(get_current_user), 

83): 

84 service = MonitorService(db) 

85 monitor = await service.get_by_id(monitor_id, current_user["id"]) 

86 if not monitor: 

87 raise HTTPException(status_code=404, detail="Monitor not found") 

88 await service.delete(monitor) 

89 return None