Coverage for app / main.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 17:54 +0000
1from fastapi import FastAPI, Request, Depends
2from fastapi.middleware.cors import CORSMiddleware
3from fastapi.responses import JSONResponse
4from sqlalchemy import select, text
5import redis.asyncio as aioredis
7from slowapi.errors import RateLimitExceeded
8from slowapi import _rate_limit_exceeded_handler
10from app.api.v1.monitors import router as monitors_router
11from app.api.v1.users import router as users_router
12from app.api.v1.pings import router as pings_router
13from app.core.rate_limiter import limiter
14from app.core.config import settings
15from app.api.deps import get_db
17from app.core.logging import setup_logging
19setup_logging()
21app = FastAPI(
22 title="Uptime Monitor API",
23 description="URL uptime monitoring and alerting system",
24 version="0.1.0",
25)
26app.state.limiter = limiter
27app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
29app.add_middleware(
30 CORSMiddleware,
31 allow_origins=["http://localhost:5173"],
32 allow_credentials=True,
33 allow_methods=["*"],
34 allow_headers=["*"],
35 expose_headers=["*"],
36)
38app.include_router(monitors_router, prefix="/api/v1/monitors", tags=["Monitors"])
39app.include_router(pings_router, prefix="/api/v1/pings", tags=["Pings"])
40app.include_router(users_router, prefix="/api/v1/users", tags=["Users"])
43@app.get("/health", tags=["Health"])
44@limiter.limit("30/minute")
45async def health_check(request: Request, db=Depends(get_db)):
46 health = {"status": "ok", "services": {}}
48 try:
49 await db.execute(text("SELECT 1"))
50 health["services"]["database"] = "ok"
51 except Exception as e:
52 health["services"]["database"] = f"error: {e}"
53 health["status"] = "degraded"
55 r = None
56 try:
57 r = aioredis.from_url(settings.redis_url)
58 await r.ping()
59 health["services"]["redis"] = "ok"
60 except Exception as e:
61 health["services"]["redis"] = f"error: {e}"
62 health["status"] = "degraded"
63 finally:
64 if r:
65 await r.close()
67 status_code = 200 if health["status"] == "ok" else 503
68 return JSONResponse(content=health, status_code=status_code)
71@app.get("/", tags=["Root"])
72@limiter.limit("60/minute")
73async def root(request: Request):
74 return {"message": "Uptime Monitor API", "docs": "/docs"}