LIBRE / src /interface /api /app.py
RyZ
fix: resolve env path and respect dependency overrides in lifespan
77f2d58
"""
interface/api/app.py
─────────────────────
FastAPI application factory.
Usage:
# Direct (dev)
uvicorn src.interface.api.app:create_app --factory --port 7860 --reload
# Docker / HF Spaces
CMD ["uvicorn", "src.interface.api.app:create_app", "--factory", ...]
Design decisions:
β€’ Factory function (not module-level app) so tests can create isolated instances.
β€’ Lifespan context manager handles startup/shutdown cleanly.
β€’ Global exception handlers registered via error_handlers.register_exception_handlers().
β€’ CORS is wide-open by default β€” restrict in production via settings.
"""
from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Annotated, AsyncGenerator
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.infrastructure.database.connection import (
create_all_tables,
dispose_engine,
ping_database,
)
from src.interface.api.dependencies import get_broker, get_model_service
from src.interface.api.error_handlers import register_exception_handlers
from src.interface.api.routes import ppg_routes, prediction_routes
from src.shared.config import get_settings
from src.shared.constants import API_V1_PREFIX
from src.shared.logger import get_logger
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Application lifespan handler.
Startup:
1. Validate database connectivity (ping).
2. Create DB tables (dev/SQLite only β€” Supabase uses Alembic).
3. Connect to RabbitMQ.
4. Pre-load AI model.
Shutdown:
1. Disconnect from RabbitMQ.
2. Dispose the DB engine (returns all pooled connections).
"""
settings = get_settings()
logger.info("=" * 60)
logger.info("BP Monitoring Pipeline β€” Starting up")
logger.info("Database : %s", settings.database_url.split("@")[-1] if "@" in settings.database_url else settings.database_url)
logger.info("Supabase : %s", "yes" if settings.is_supabase else "no")
logger.info("Pooler : %s", "yes (port 6543)" if settings.uses_pooler else "no")
logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1] if "@" in settings.rabbitmq_url else settings.rabbitmq_url)
logger.info("Mock Model: %s", settings.use_mock_model)
logger.info("=" * 60)
# ── 1. Validate database connection ───────────────────────────────────────
db_ok = await ping_database()
if db_ok:
logger.info("βœ“ Database connection verified.")
else:
logger.warning(
"βœ— Database ping failed β€” check DATABASE_URL. "
"The API will start but DB operations will fail."
)
# ── 2. Create tables (dev/SQLite only) ────────────────────────────────────
if settings.debug or settings.is_sqlite:
logger.info("Auto-creating DB tables (dev/SQLite mode)…")
await create_all_tables()
# ── 3. Connect broker (best-effort on startup) ────────────────────────────
broker_provider = app.dependency_overrides.get(get_broker, get_broker)
broker = broker_provider()
try:
await broker.connect()
logger.info("βœ“ RabbitMQ broker connected.")
except Exception as exc:
logger.warning(
"βœ— Could not connect to RabbitMQ on startup (will retry on first publish): %s", exc
)
# ── 4. Pre-load AI model (best-effort) ────────────────────────────────────
model_provider = app.dependency_overrides.get(get_model_service, get_model_service)
model_service = model_provider()
try:
await model_service.load_model()
logger.info("βœ“ Model service ready: %s", model_service.model_version)
except Exception as exc:
logger.warning("βœ— Could not pre-load model on startup: %s", exc)
logger.info("Startup complete. API is ready to serve requests.")
yield
# ── Shutdown ──────────────────────────────────────────────────────────────
logger.info("Shutting down…")
try:
await broker.disconnect()
except Exception:
pass
await dispose_engine()
logger.info("Shutdown complete.")
def create_app() -> FastAPI:
"""
FastAPI application factory.
Returns a fully configured FastAPI instance with:
β€’ Lifespan hooks (startup/shutdown)
β€’ Global domain exception handlers
β€’ CORS middleware
β€’ API v1 routes
β€’ Health check endpoint
β€’ Swagger UI / ReDoc
"""
settings = get_settings()
app = FastAPI(
title="BP Monitoring Pipeline API",
description=(
"REST API for the **Blood Pressure Monitoring Pipeline**.\n\n"
"Receives PPG signals from IoT devices (ETL #1) and serves "
"AI-predicted blood pressure results to the frontend.\n\n"
"**Error Response Format** (all errors):\n"
"```json\n"
'{"error": "not_found", "message": "...", "context": {}, "timestamp": "..."}\n'
"```"
),
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# ── Global Exception Handlers ─────────────────────────────────────────────
# Registered BEFORE routes so handlers are available for all endpoints.
register_exception_handlers(app)
# ── CORS ──────────────────────────────────────────────────────────────────
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Routes ────────────────────────────────────────────────────────────────
app.include_router(ppg_routes.router, prefix=API_V1_PREFIX)
app.include_router(prediction_routes.router, prefix=API_V1_PREFIX)
# ── Health Check ──────────────────────────────────────────────────────────
@app.get(
"/health",
tags=["Health"],
summary="Health Check",
responses={
200: {"description": "Service is healthy."},
503: {"description": "One or more dependencies are unavailable."},
},
)
async def health(
broker = Depends(get_broker)
) -> dict:
"""
Returns service health status including database and broker connectivity.
Used by Docker HEALTHCHECK and HF Spaces monitoring.
"""
from src.domain.interfaces.services.message_broker import MessageBroker
db_healthy = await ping_database()
broker_connected = await broker.is_connected()
model_service = get_model_service()
all_healthy = db_healthy and broker_connected
return {
"status": "ok" if all_healthy else "degraded",
"database": "ok" if db_healthy else "unavailable",
"broker_connected": broker_connected,
"model_loaded": model_service.is_loaded(),
"model_version": model_service.model_version,
}
@app.get("/", tags=["Health"], include_in_schema=False)
async def root() -> dict:
return {
"service": "BP Monitoring Pipeline API",
"version": "1.0.0",
"docs": "/docs",
}
logger.info("FastAPI application created.")
return app