LIBRE / src /interface /api /error_handlers.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
"""
interface/api/error_handlers.py
────────────────────────────────
Centralised FastAPI exception handlers.
Every domain exception is mapped to a specific HTTP status code and a
consistent JSON error response format. Registering all handlers in one place
means route handlers stay clean β€” they simply raise domain exceptions and
let this module translate them to HTTP responses.
Error response format (all errors):
{
"error": "<snake_case_error_code>",
"message": "<human-readable description>",
"context": { ... }, // domain context dict (may be empty)
"timestamp": "2026-05-31T..."
}
HTTP Status Code Mapping:
400 β€” InvalidSignalError (bad input data)
404 β€” EntityNotFoundError (resource not found)
409 β€” ConflictError (duplicate / unique violation)
422 β€” PredictionOutOfRangeError (model output unprocessable)
422 β€” PreprocessingError (signal processing failed)
500 β€” unhandled Exception (unknown / programming error)
503 β€” DatabaseError (DB unreachable)
503 β€” BrokerError (message broker unreachable)
503 β€” ModelInferenceError (AI model failed)
"""
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from src.domain.exceptions.domain_exceptions import (
ConflictError,
DatabaseError,
DomainException,
EntityNotFoundError,
InvalidSignalError,
PredictionOutOfRangeError,
)
from src.domain.exceptions.pipeline_exceptions import (
BrokerError,
ModelInferenceError,
PreprocessingError,
)
from src.shared.logger import get_logger
logger = get_logger(__name__)
# ── Shared response builder ───────────────────────────────────────────────────
def _error_response(
status_code: int,
error_code: str,
message: str,
context: dict | None = None,
) -> JSONResponse:
"""Build a consistent JSON error payload."""
return JSONResponse(
status_code=status_code,
content={
"error": error_code,
"message": message,
"context": context or {},
"timestamp": datetime.now(timezone.utc).isoformat(),
},
)
# ── Individual exception handlers ─────────────────────────────────────────────
async def handle_invalid_signal(request: Request, exc: InvalidSignalError) -> JSONResponse:
"""400 β€” PPG signal fails domain validation."""
logger.warning("InvalidSignalError [%s %s]: %s", request.method, request.url.path, exc.message)
return _error_response(400, "invalid_signal", exc.message, exc.context)
async def handle_entity_not_found(request: Request, exc: EntityNotFoundError) -> JSONResponse:
"""404 β€” Requested entity does not exist in the database."""
logger.info("EntityNotFoundError [%s %s]: %s", request.method, request.url.path, exc.message)
return _error_response(404, "not_found", exc.message, exc.context)
async def handle_conflict(request: Request, exc: ConflictError) -> JSONResponse:
"""409 β€” Unique constraint / duplicate record violation."""
logger.warning("ConflictError [%s %s]: %s", request.method, request.url.path, exc.message)
return _error_response(409, "conflict", exc.message, exc.context)
async def handle_prediction_out_of_range(
request: Request, exc: PredictionOutOfRangeError
) -> JSONResponse:
"""422 β€” Model prediction is physiologically implausible."""
logger.warning(
"PredictionOutOfRangeError [%s %s]: %s", request.method, request.url.path, exc.message
)
return _error_response(422, "prediction_out_of_range", exc.message, exc.context)
async def handle_preprocessing_error(request: Request, exc: PreprocessingError) -> JSONResponse:
"""422 β€” Signal preprocessing failed (bad data or algorithm error)."""
logger.warning(
"PreprocessingError [%s %s]: %s", request.method, request.url.path, exc.message
)
return _error_response(422, "preprocessing_failed", exc.message, exc.context)
async def handle_database_error(request: Request, exc: DatabaseError) -> JSONResponse:
"""503 β€” Database unavailable or unrecoverable query error."""
logger.error("DatabaseError [%s %s]: %s", request.method, request.url.path, exc.message)
return _error_response(
503,
"database_unavailable",
"The database is temporarily unavailable. Please try again later.",
exc.context,
)
async def handle_broker_error(request: Request, exc: BrokerError) -> JSONResponse:
"""503 β€” Message broker (RabbitMQ) unreachable or operation failed."""
logger.error("BrokerError [%s %s]: %s", request.method, request.url.path, exc.message)
return _error_response(
503,
"broker_unavailable",
"The message broker is temporarily unavailable. Your data was saved but not queued.",
exc.context,
)
async def handle_model_inference_error(request: Request, exc: ModelInferenceError) -> JSONResponse:
"""503 β€” AI model inference failed."""
logger.error(
"ModelInferenceError [%s %s]: %s", request.method, request.url.path, exc.message
)
return _error_response(
503,
"model_inference_failed",
f"The AI model '{exc.model_name}' failed to process the request. "
"Please try again later.",
exc.context,
)
async def handle_domain_exception(request: Request, exc: DomainException) -> JSONResponse:
"""400 β€” Catch-all for any unclassified domain exception."""
logger.warning(
"DomainException [%s %s]: %s", request.method, request.url.path, exc.message
)
return _error_response(400, "domain_error", exc.message, exc.context)
async def handle_unhandled_exception(request: Request, exc: Exception) -> JSONResponse:
"""500 β€” Completely unexpected / programming error."""
logger.error(
"Unhandled exception [%s %s]: %s",
request.method,
request.url.path,
exc,
exc_info=True,
)
return _error_response(
500,
"internal_error",
"An unexpected internal error occurred. Please contact support.",
)
# ── Registration helper ───────────────────────────────────────────────────────
def register_exception_handlers(app: FastAPI) -> None:
"""
Register all domain exception handlers on a FastAPI application instance.
Call this once inside ``create_app()`` after the app is instantiated.
Order matters: more-specific subclasses must be registered BEFORE their
base classes so FastAPI routes to the correct handler.
"""
# ── 400 ──────────────────────────────────────────────────────────────────
app.add_exception_handler(InvalidSignalError, handle_invalid_signal) # type: ignore
# ── 404 ──────────────────────────────────────────────────────────────────
app.add_exception_handler(EntityNotFoundError, handle_entity_not_found) # type: ignore
# ── 409 ──────────────────────────────────────────────────────────────────
app.add_exception_handler(ConflictError, handle_conflict) # type: ignore
# ── 422 ──────────────────────────────────────────────────────────────────
app.add_exception_handler(PredictionOutOfRangeError, handle_prediction_out_of_range) # type: ignore
app.add_exception_handler(PreprocessingError, handle_preprocessing_error) # type: ignore
# ── 503 ──────────────────────────────────────────────────────────────────
app.add_exception_handler(DatabaseError, handle_database_error) # type: ignore
app.add_exception_handler(BrokerError, handle_broker_error) # type: ignore
app.add_exception_handler(ModelInferenceError, handle_model_inference_error) # type: ignore
# ── 400 catchall (must be AFTER all subclasses) ───────────────────────────
app.add_exception_handler(DomainException, handle_domain_exception) # type: ignore
# ── 500 catchall ─────────────────────────────────────────────────────────
app.add_exception_handler(Exception, handle_unhandled_exception) # type: ignore
logger.info("Exception handlers registered (%d handlers).", 9)