Coverage for app / api / v1 / pings.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
1from uuid import UUID
2from fastapi import APIRouter, Depends, HTTPException, Request, Query
3from sqlalchemy import select, func
4from sqlalchemy.ext.asyncio import AsyncSession
5from pydantic import BaseModel
7from app.api.deps import get_db, get_current_user
8from app.core.rate_limiter import limiter
9from app.models.ping_log import PingLog
10from app.models.monitor import Monitor
11from app.schemas.ping_log import PingLogRead
13router = APIRouter()
16def _coerce_monitor_id(monitor_id: str) -> UUID:
17 try:
18 return UUID(monitor_id)
19 except ValueError:
20 raise HTTPException(status_code=400, detail="Invalid monitor ID format")
23class PingListResponse(BaseModel):
24 """Response model for paginated pings."""
25 items: list[PingLogRead]
26 total: int
29@router.get("/monitor/{monitor_id}", response_model=PingListResponse)
30@limiter.limit("60/minute")
31async def get_monitor_pings(
32 request: Request,
33 monitor_id: str,
34 skip: int = Query(0, ge=0),
35 limit: int = Query(10, ge=1, le=100),
36 db: AsyncSession = Depends(get_db),
37 current_user: dict = Depends(get_current_user),
38):
39 monitor_uuid = _coerce_monitor_id(monitor_id)
40 result = await db.execute(
41 select(Monitor).where(
42 Monitor.id == monitor_uuid,
43 Monitor.user_id == current_user["id"],
44 )
45 )
46 monitor = result.scalar_one_or_none()
47 if not monitor:
48 raise HTTPException(status_code=404, detail="Monitor not found")
50 # Get total count for pagination
51 total = await db.scalar(
52 select(func.count()).where(PingLog.monitor_id == monitor_uuid)
53 )
55 # Get paginated results
56 result = await db.execute(
57 select(PingLog)
58 .where(PingLog.monitor_id == monitor_uuid)
59 .order_by(PingLog.timestamp.desc())
60 .offset(skip)
61 .limit(limit)
62 )
63 pings = result.scalars().all()
65 # Convert ORM objects to Pydantic schemas
66 items = [PingLogRead.model_validate(p) for p in pings]
68 return PingListResponse(items=items, total=total or 0)
71@router.get("/monitor/{monitor_id}/stats")
72@limiter.limit("60/minute")
73async def get_monitor_stats(
74 request: Request,
75 monitor_id: str,
76 db: AsyncSession = Depends(get_db),
77 current_user: dict = Depends(get_current_user),
78):
79 monitor_uuid = _coerce_monitor_id(monitor_id)
80 result = await db.execute(
81 select(Monitor).where(
82 Monitor.id == monitor_uuid,
83 Monitor.user_id == current_user["id"],
84 )
85 )
86 monitor = result.scalar_one_or_none()
87 if not monitor:
88 raise HTTPException(status_code=404, detail="Monitor not found")
90 total = await db.scalar(
91 select(func.count()).where(PingLog.monitor_id == monitor_uuid)
92 )
94 uptime = await db.scalar(
95 select(func.count()).where(
96 PingLog.monitor_id == monitor_uuid,
97 PingLog.is_up == True,
98 )
99 )
101 avg_response = await db.scalar(
102 select(func.avg(PingLog.response_ms)).where(
103 PingLog.monitor_id == monitor_uuid,
104 PingLog.response_ms.is_not(None),
105 )
106 )
108 from datetime import datetime, timezone, timedelta
109 day_ago = datetime.now(timezone.utc) - timedelta(hours=24)
111 day_total = await db.scalar(
112 select(func.count()).where(
113 PingLog.monitor_id == monitor_uuid,
114 PingLog.timestamp >= day_ago,
115 )
116 )
118 day_uptime = await db.scalar(
119 select(func.count()).where(
120 PingLog.monitor_id == monitor_uuid,
121 PingLog.timestamp >= day_ago,
122 PingLog.is_up == True,
123 )
124 )
126 return {
127 "total_checks": total or 0,
128 "uptime_count": uptime or 0,
129 "uptime_percent": round((uptime / total * 100), 2) if total else 0,
130 "avg_response_ms": round(avg_response, 2) if avg_response else None,
131 "last_24h": {
132 "checks": day_total or 0,
133 "uptime_percent": round((day_uptime / day_total * 100), 2) if day_total else 0,
134 },
135 }