| """ |
| interface/api/app.py |
| βββββββββββββββββββββ |
| FastAPI application factory. |
| |
| Usage: |
| # Direct (dev) |
| uvicorn src.interface.api.app:create_app --factory --port 7860 --reload |
| |
| # Docker / HF Spaces |
| CMD ["uvicorn", "src.interface.api.app:create_app", "--factory", ...] |
| |
| Design decisions: |
| β’ Factory function (not module-level app) so tests can create isolated instances. |
| β’ Lifespan context manager handles startup/shutdown cleanly. |
| β’ Global exception handlers registered via error_handlers.register_exception_handlers(). |
| β’ CORS is wide-open by default β restrict in production via settings. |
| """ |
| from __future__ import annotations |
|
|
| from contextlib import asynccontextmanager |
| from typing import Annotated, AsyncGenerator |
|
|
| from fastapi import Depends, FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| from src.infrastructure.database.connection import ( |
| create_all_tables, |
| dispose_engine, |
| ping_database, |
| ) |
| from src.interface.api.dependencies import get_broker, get_model_service |
| from src.interface.api.error_handlers import register_exception_handlers |
| from src.interface.api.routes import ppg_routes, prediction_routes |
| from src.shared.config import get_settings |
| from src.shared.constants import API_V1_PREFIX |
| from src.shared.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: |
| """ |
| Application lifespan handler. |
| |
| Startup: |
| 1. Validate database connectivity (ping). |
| 2. Create DB tables (dev/SQLite only β Supabase uses Alembic). |
| 3. Connect to RabbitMQ. |
| 4. Pre-load AI model. |
| |
| Shutdown: |
| 1. Disconnect from RabbitMQ. |
| 2. Dispose the DB engine (returns all pooled connections). |
| """ |
| settings = get_settings() |
| logger.info("=" * 60) |
| logger.info("BP Monitoring Pipeline β Starting up") |
| logger.info("Database : %s", settings.database_url.split("@")[-1] if "@" in settings.database_url else settings.database_url) |
| logger.info("Supabase : %s", "yes" if settings.is_supabase else "no") |
| logger.info("Pooler : %s", "yes (port 6543)" if settings.uses_pooler else "no") |
| logger.info("Broker : %s", settings.rabbitmq_url.split("@")[-1] if "@" in settings.rabbitmq_url else settings.rabbitmq_url) |
| logger.info("Mock Model: %s", settings.use_mock_model) |
| logger.info("=" * 60) |
|
|
| |
| db_ok = await ping_database() |
| if db_ok: |
| logger.info("β Database connection verified.") |
| else: |
| logger.warning( |
| "β Database ping failed β check DATABASE_URL. " |
| "The API will start but DB operations will fail." |
| ) |
|
|
| |
| if settings.debug or settings.is_sqlite: |
| logger.info("Auto-creating DB tables (dev/SQLite mode)β¦") |
| await create_all_tables() |
|
|
| |
| broker_provider = app.dependency_overrides.get(get_broker, get_broker) |
| broker = broker_provider() |
| try: |
| await broker.connect() |
| logger.info("β RabbitMQ broker connected.") |
| except Exception as exc: |
| logger.warning( |
| "β Could not connect to RabbitMQ on startup (will retry on first publish): %s", exc |
| ) |
|
|
| |
| model_provider = app.dependency_overrides.get(get_model_service, get_model_service) |
| model_service = model_provider() |
| try: |
| await model_service.load_model() |
| logger.info("β Model service ready: %s", model_service.model_version) |
| except Exception as exc: |
| logger.warning("β Could not pre-load model on startup: %s", exc) |
|
|
| logger.info("Startup complete. API is ready to serve requests.") |
| yield |
|
|
| |
| logger.info("Shutting downβ¦") |
| try: |
| await broker.disconnect() |
| except Exception: |
| pass |
| await dispose_engine() |
| logger.info("Shutdown complete.") |
|
|
|
|
| def create_app() -> FastAPI: |
| """ |
| FastAPI application factory. |
| |
| Returns a fully configured FastAPI instance with: |
| β’ Lifespan hooks (startup/shutdown) |
| β’ Global domain exception handlers |
| β’ CORS middleware |
| β’ API v1 routes |
| β’ Health check endpoint |
| β’ Swagger UI / ReDoc |
| """ |
| settings = get_settings() |
|
|
| app = FastAPI( |
| title="BP Monitoring Pipeline API", |
| description=( |
| "REST API for the **Blood Pressure Monitoring Pipeline**.\n\n" |
| "Receives PPG signals from IoT devices (ETL #1) and serves " |
| "AI-predicted blood pressure results to the frontend.\n\n" |
| "**Error Response Format** (all errors):\n" |
| "```json\n" |
| '{"error": "not_found", "message": "...", "context": {}, "timestamp": "..."}\n' |
| "```" |
| ), |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc", |
| openapi_url="/openapi.json", |
| lifespan=lifespan, |
| ) |
|
|
| |
| |
| register_exception_handlers(app) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| app.include_router(ppg_routes.router, prefix=API_V1_PREFIX) |
| app.include_router(prediction_routes.router, prefix=API_V1_PREFIX) |
|
|
| |
| @app.get( |
| "/health", |
| tags=["Health"], |
| summary="Health Check", |
| responses={ |
| 200: {"description": "Service is healthy."}, |
| 503: {"description": "One or more dependencies are unavailable."}, |
| }, |
| ) |
| async def health( |
| broker = Depends(get_broker) |
| ) -> dict: |
| """ |
| Returns service health status including database and broker connectivity. |
| Used by Docker HEALTHCHECK and HF Spaces monitoring. |
| """ |
| from src.domain.interfaces.services.message_broker import MessageBroker |
| |
| db_healthy = await ping_database() |
| broker_connected = await broker.is_connected() |
| model_service = get_model_service() |
|
|
| all_healthy = db_healthy and broker_connected |
|
|
| return { |
| "status": "ok" if all_healthy else "degraded", |
| "database": "ok" if db_healthy else "unavailable", |
| "broker_connected": broker_connected, |
| "model_loaded": model_service.is_loaded(), |
| "model_version": model_service.model_version, |
| } |
|
|
| @app.get("/", tags=["Health"], include_in_schema=False) |
| async def root() -> dict: |
| return { |
| "service": "BP Monitoring Pipeline API", |
| "version": "1.0.0", |
| "docs": "/docs", |
| } |
|
|
| logger.info("FastAPI application created.") |
| return app |
|
|