Coverage for app / api / v1 / monitors.py: 100%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
1from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
2from sqlalchemy.ext.asyncio import AsyncSession
4from app.api.deps import get_db, get_current_user
5from app.core.rate_limiter import limiter
6from app.schemas.monitor import (
7 MonitorCreate,
8 MonitorRead,
9 MonitorUpdate,
10 MonitorListResponse,
11)
12from app.services.monitor_service import MonitorService
14router = APIRouter()
17@router.post("/", response_model=MonitorRead, status_code=201)
18@limiter.limit("10/minute")
19async def create_monitor(
20 request: Request,
21 monitor_in: MonitorCreate,
22 db: AsyncSession = Depends(get_db),
23 current_user: dict = Depends(get_current_user),
24):
25 service = MonitorService(db)
26 monitor = await service.create(current_user["id"], monitor_in)
27 return monitor
30@router.get("/", response_model=MonitorListResponse)
31@limiter.limit("60/minute")
32async def list_monitors(
33 request: Request,
34 db: AsyncSession = Depends(get_db),
35 current_user: dict = Depends(get_current_user),
36 skip: int = 0,
37 limit: int = Query(100, ge=1, le=500),
38):
39 service = MonitorService(db)
40 items = await service.list_by_user(current_user["id"], skip, limit)
41 total = await service.count_by_user(current_user["id"])
42 return MonitorListResponse(items=items, total=total)
45@router.get("/{monitor_id}", response_model=MonitorRead)
46@limiter.limit("60/minute")
47async def get_monitor(
48 request: Request,
49 monitor_id: str,
50 db: AsyncSession = Depends(get_db),
51 current_user: dict = Depends(get_current_user),
52):
53 service = MonitorService(db)
54 monitor = await service.get_by_id(monitor_id, current_user["id"])
55 if not monitor:
56 raise HTTPException(status_code=404, detail="Monitor not found")
57 return monitor
60@router.patch("/{monitor_id}", response_model=MonitorRead)
61@limiter.limit("20/minute")
62async def update_monitor(
63 request: Request,
64 monitor_id: str,
65 monitor_in: MonitorUpdate,
66 db: AsyncSession = Depends(get_db),
67 current_user: dict = Depends(get_current_user),
68):
69 service = MonitorService(db)
70 monitor = await service.get_by_id(monitor_id, current_user["id"])
71 if not monitor:
72 raise HTTPException(status_code=404, detail="Monitor not found")
73 return await service.update(monitor, monitor_in)
76@router.delete("/{monitor_id}", status_code=204)
77@limiter.limit("20/minute")
78async def delete_monitor(
79 request: Request,
80 monitor_id: str,
81 db: AsyncSession = Depends(get_db),
82 current_user: dict = Depends(get_current_user),
83):
84 service = MonitorService(db)
85 monitor = await service.get_by_id(monitor_id, current_user["id"])
86 if not monitor:
87 raise HTTPException(status_code=404, detail="Monitor not found")
88 await service.delete(monitor)
89 return None