from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from sqlalchemy import select from typing import Optional from datetime import datetime import logging from database import get_session from database.models.api_key import SwaggerUser from security.auth import verify_password logger = logging.getLogger(__name__) security = HTTPBasic() async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: username = credentials.username password = credentials.password try: async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) swagger_user = result.scalar_one_or_none() if swagger_user and swagger_user.is_active: if verify_password(password, swagger_user.hashed_password): swagger_user.last_login = datetime.now() await session.commit() logger.info(f" Accès Swagger autorisé (DB): {username}") return True logger.warning(f" Tentative d'accès Swagger refusée: {username}") return False except Exception as e: logger.error(f" Erreur vérification Swagger credentials: {e}") return False class SwaggerAuthMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive=receive) path = request.url.path protected_paths = ["/docs", "/redoc", "/openapi.json"] if any(path.startswith(protected_path) for protected_path in protected_paths): auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "Authentification requise pour accéder à la documentation" }, headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, ) await response(scope, receive, send) return try: import base64 encoded_credentials = auth_header.split(" ")[1] decoded_credentials = base64.b64decode(encoded_credentials).decode( "utf-8" ) username, password = decoded_credentials.split(":", 1) credentials = HTTPBasicCredentials(username=username, password=password) if not await verify_swagger_credentials(credentials): response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Identifiants invalides"}, headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, ) await response(scope, receive, send) return except Exception as e: logger.error(f" Erreur parsing auth header: {e}") response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Format d'authentification invalide"}, headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, ) await response(scope, receive, send) return await self.app(scope, receive, send) class ApiKeyMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive=receive) path = request.url.path excluded_paths = [ "/docs", "/redoc", "/openapi.json", "/health", "/", "/auth/login", "/auth/register", "/auth/verify-email", "/auth/reset-password", "/auth/request-reset", "/auth/refresh", ] if any(path.startswith(excluded_path) for excluded_path in excluded_paths): await self.app(scope, receive, send) return auth_header = request.headers.get("Authorization") has_jwt = auth_header and auth_header.startswith("Bearer ") api_key = request.headers.get("X-API-Key") has_api_key = api_key is not None if has_jwt: logger.debug(f" JWT détecté pour {path}") await self.app(scope, receive, send) return elif has_api_key: logger.debug(f" API Key détectée pour {path}") from services.api_key import ApiKeyService try: async for session in get_session(): api_key_service = ApiKeyService(session) api_key_obj = await api_key_service.verify_api_key(api_key) if not api_key_obj: response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "Clé API invalide ou expirée", "hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer ", }, ) await response(scope, receive, send) return is_allowed, rate_info = await api_key_service.check_rate_limit( api_key_obj ) if not is_allowed: response = JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Rate limit dépassé"}, headers={ "X-RateLimit-Limit": str(rate_info["limit"]), "X-RateLimit-Remaining": "0", }, ) await response(scope, receive, send) return has_access = await api_key_service.check_endpoint_access( api_key_obj, path ) if not has_access: response = JSONResponse( status_code=status.HTTP_403_FORBIDDEN, content={ "detail": "Accès non autorisé à cet endpoint", "endpoint": path, "api_key": api_key_obj.key_prefix + "...", }, ) await response(scope, receive, send) return request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" logger.info(f" API Key valide: {api_key_obj.name} → {path}") await self.app(scope, receive, send) return except Exception as e: logger.error(f" Erreur validation API Key: {e}", exc_info=True) response = JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "detail": "Erreur interne lors de la validation de la clé" }, ) await response(scope, receive, send) return else: response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ "detail": "Authentification requise", "hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer '", }, headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'}, ) await response(scope, receive, send) return def get_api_key_from_request(request: Request) -> Optional: """Récupère l'objet ApiKey depuis la requête si présent""" return getattr(request.state, "api_key", None) def get_auth_method(request: Request) -> str: return getattr(request.state, "authenticated_via", "none")