diff --git a/middleware/security.py b/middleware/security.py new file mode 100644 index 0000000..17186a0 --- /dev/null +++ b/middleware/security.py @@ -0,0 +1,296 @@ +import secrets +from fastapi import Request, HTTPException, status +from fastapi.responses import JSONResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from sqlalchemy import select +from typing import Optional +from datetime import datetime +import logging + +from database import get_session +from database.models.api_key import SwaggerUser +from security.auth import verify_password + +logger = logging.getLogger(__name__) + +# === Configuration Swagger === +security = HTTPBasic() + + +async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: + """ + VERSION 2: Vérification des identifiants Swagger via base de données + + ✅ Plus sécurisé + ✅ Gestion centralisée + ✅ Tracking des connexions + """ + username = credentials.username + password = credentials.password + + try: + # Utiliser get_session de manière asynchrone + async for session in get_session(): + result = await session.execute( + select(SwaggerUser).where(SwaggerUser.username == username) + ) + swagger_user = result.scalar_one_or_none() + + if swagger_user and swagger_user.is_active: + if verify_password(password, swagger_user.hashed_password): + # Mise à jour de la dernière connexion + swagger_user.last_login = datetime.now() + await session.commit() + + logger.info(f"✅ Accès Swagger autorisé (DB): {username}") + return True + + logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}") + return False + + except Exception as e: + logger.error(f"❌ Erreur vérification Swagger credentials: {e}") + return False + + +class SwaggerAuthMiddleware: + """ + Middleware pour protéger les endpoints de documentation + (/docs, /redoc, /openapi.json) + + VERSION 2: Avec vérification en base de données + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive=receive) + path = request.url.path + + # Endpoints à protéger + protected_paths = ["/docs", "/redoc", "/openapi.json"] + + if any(path.startswith(protected_path) for protected_path in protected_paths): + # Vérification de l'authentification Basic + auth_header = request.headers.get("Authorization") + + if not auth_header or not auth_header.startswith("Basic "): + # Demande d'authentification + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Authentification requise pour accéder à la documentation" + }, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + # Extraction des credentials + try: + import base64 + + encoded_credentials = auth_header.split(" ")[1] + decoded_credentials = base64.b64decode(encoded_credentials).decode( + "utf-8" + ) + username, password = decoded_credentials.split(":", 1) + + credentials = HTTPBasicCredentials(username=username, password=password) + + # Vérification via DB + if not await verify_swagger_credentials(credentials): + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"detail": "Identifiants invalides"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + except Exception as e: + logger.error(f"❌ Erreur parsing auth header: {e}") + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"detail": "Format d'authentification invalide"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + # Si tout est OK, continuer + await self.app(scope, receive, send) + + +class ApiKeyMiddleware: + """ + Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics + + ✅ Accepte X-API-Key OU Authorization: Bearer (JWT) + ✅ Les deux méthodes sont équivalentes + ✅ Les endpoints /auth/* restent accessibles sans auth + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive=receive) + path = request.url.path + + # ============================================ + # ENDPOINTS EXCLUS (accessibles sans auth) + # ============================================ + excluded_paths = [ + "/docs", + "/redoc", + "/openapi.json", + "/health", + "/", + # === ROUTES AUTH (toujours publiques) === + "/api/v1/auth/login", + "/api/v1/auth/register", + "/api/v1/auth/verify-email", + "/api/v1/auth/reset-password", + "/api/v1/auth/request-reset", + "/api/v1/auth/refresh", + ] + + if any(path.startswith(excluded_path) for excluded_path in excluded_paths): + await self.app(scope, receive, send) + return + + # ============================================ + # VÉRIFICATION HYBRIDE: API Key OU JWT + # ============================================ + + # Option 1: Vérifier si JWT présent + auth_header = request.headers.get("Authorization") + has_jwt = auth_header and auth_header.startswith("Bearer ") + + # Option 2: Vérifier si API Key présente + api_key = request.headers.get("X-API-Key") + has_api_key = api_key is not None + + # ============================================ + # LOGIQUE HYBRIDE + # ============================================ + + if has_jwt: + # JWT présent → laisser passer, sera validé par les dependencies FastAPI + logger.debug(f"🔑 JWT détecté pour {path}") + await self.app(scope, receive, send) + return + + elif has_api_key: + # API Key présente → valider la clé + logger.debug(f"🔑 API Key détectée pour {path}") + + from services.api_key import ApiKeyService + + try: + async for session in get_session(): + api_key_service = ApiKeyService(session) + api_key_obj = await api_key_service.verify_api_key(api_key) + + if not api_key_obj: + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Clé API invalide ou expirée", + "hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer ", + }, + ) + await response(scope, receive, send) + return + + # Vérification du rate limit + is_allowed, rate_info = await api_key_service.check_rate_limit( + api_key_obj + ) + if not is_allowed: + response = JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content={"detail": "Rate limit dépassé"}, + headers={ + "X-RateLimit-Limit": str(rate_info["limit"]), + "X-RateLimit-Remaining": "0", + }, + ) + await response(scope, receive, send) + return + + # Vérification de l'accès à l'endpoint + has_access = await api_key_service.check_endpoint_access( + api_key_obj, path + ) + if not has_access: + response = JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "detail": "Accès non autorisé à cet endpoint", + "endpoint": path, + "api_key": api_key_obj.key_prefix + "...", + }, + ) + await response(scope, receive, send) + return + + # ✅ Clé valide → ajouter les infos à la requête + request.state.api_key = api_key_obj + request.state.authenticated_via = "api_key" + + logger.info(f"✅ API Key valide: {api_key_obj.name} → {path}") + + # Continuer la requête + await self.app(scope, receive, send) + return + + except Exception as e: + logger.error(f"❌ Erreur validation API Key: {e}", exc_info=True) + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "detail": "Erreur interne lors de la validation de la clé" + }, + ) + await response(scope, receive, send) + return + + else: + # ❌ Ni JWT ni API Key + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Authentification requise", + "hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer '", + }, + headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'}, + ) + await response(scope, receive, send) + return + + +# === Fonction helper pour récupérer l'API Key depuis la requête === +def get_api_key_from_request(request: Request) -> Optional: + """Récupère l'objet ApiKey depuis la requête si présent""" + return getattr(request.state, "api_key", None) + + +def get_auth_method(request: Request) -> str: + """ + Retourne la méthode d'authentification utilisée + + Returns: + "jwt" | "api_key" | "none" + """ + return getattr(request.state, "authenticated_via", "none") diff --git a/routes/api_keys.py b/routes/api_keys.py new file mode 100644 index 0000000..8c8f6db --- /dev/null +++ b/routes/api_keys.py @@ -0,0 +1,180 @@ +from fastapi import APIRouter, Depends, HTTPException, status, Query +from sqlalchemy.ext.asyncio import AsyncSession +import logging + +from database import get_session, User +from core.dependencies import get_current_user, require_role +from services.api_key import ApiKeyService, api_key_to_response +from schemas.api_key import ( + ApiKeyCreate, + ApiKeyCreatedResponse, + ApiKeyResponse, + ApiKeyList, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api-keys", tags=["API Keys Management"]) + + +@router.post( + "", + response_model=ApiKeyCreatedResponse, + status_code=status.HTTP_201_CREATED, + dependencies=[Depends(require_role("admin", "super_admin"))], +) +async def create_api_key( + data: ApiKeyCreate, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 🔑 Créer une nouvelle clé API + + **Réservé aux admins** + + ⚠️ La clé en clair ne sera affichée qu'une seule fois ! + """ + service = ApiKeyService(session) + + api_key_obj, api_key_plain = await service.create_api_key( + name=data.name, + description=data.description, + created_by=user.email, + user_id=user.id, + expires_in_days=data.expires_in_days, + rate_limit_per_minute=data.rate_limit_per_minute, + allowed_endpoints=data.allowed_endpoints, + ) + + logger.info(f"✅ Clé API créée par {user.email}: {data.name}") + + response_data = api_key_to_response(api_key_obj) + response_data["api_key"] = api_key_plain + + return ApiKeyCreatedResponse(**response_data) + + +@router.get("", response_model=ApiKeyList) +async def list_api_keys( + include_revoked: bool = Query(False, description="Inclure les clés révoquées"), + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 📋 Lister toutes les clés API + + **Réservé aux admins** - liste toutes les clés + **Utilisateurs standards** - liste uniquement leurs clés + """ + service = ApiKeyService(session) + + # Si admin, voir toutes les clés, sinon seulement les siennes + user_id = None if user.role in ["admin", "super_admin"] else user.id + + keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id) + + items = [ApiKeyResponse(**api_key_to_response(k)) for k in keys] + + return ApiKeyList(total=len(items), items=items) + + +@router.get("/{key_id}", response_model=ApiKeyResponse) +async def get_api_key( + key_id: str, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """🔍 Récupérer une clé API par son ID""" + service = ApiKeyService(session) + + api_key_obj = await service.get_by_id(key_id) + + if not api_key_obj: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Clé API {key_id} introuvable", + ) + + # Vérification des permissions + if user.role not in ["admin", "super_admin"]: + if api_key_obj.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Accès refusé à cette clé", + ) + + return ApiKeyResponse(**api_key_to_response(api_key_obj)) + + +@router.delete("/{key_id}", status_code=status.HTTP_200_OK) +async def revoke_api_key( + key_id: str, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 🚫 Révoquer une clé API + + **Action irréversible** - la clé sera désactivée définitivement + """ + service = ApiKeyService(session) + + api_key_obj = await service.get_by_id(key_id) + + if not api_key_obj: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Clé API {key_id} introuvable", + ) + + # Vérification des permissions + if user.role not in ["admin", "super_admin"]: + if api_key_obj.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Accès refusé à cette clé", + ) + + success = await service.revoke_api_key(key_id) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Erreur lors de la révocation", + ) + + logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}") + + return { + "success": True, + "message": f"Clé API '{api_key_obj.name}' révoquée avec succès", + } + + +@router.post("/verify", status_code=status.HTTP_200_OK) +async def verify_api_key_endpoint( + api_key: str = Query(..., description="Clé API à vérifier"), + session: AsyncSession = Depends(get_session), +): + """ + ✅ Vérifier la validité d'une clé API + + **Endpoint public** - permet de tester une clé + """ + service = ApiKeyService(session) + + api_key_obj = await service.verify_api_key(api_key) + + if not api_key_obj: + return { + "valid": False, + "message": "Clé API invalide, expirée ou révoquée", + } + + return { + "valid": True, + "message": "Clé API valide", + "key_name": api_key_obj.name, + "rate_limit": api_key_obj.rate_limit_per_minute, + "expires_at": api_key_obj.expires_at, + } diff --git a/schemas/api_key.py b/schemas/api_key.py new file mode 100644 index 0000000..6a2d659 --- /dev/null +++ b/schemas/api_key.py @@ -0,0 +1,77 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime + + +class ApiKeyCreate(BaseModel): + """Schema pour créer une clé API""" + + name: str = Field(..., min_length=3, max_length=255, description="Nom de la clé") + description: Optional[str] = Field(None, description="Description de l'usage") + expires_in_days: Optional[int] = Field( + None, ge=1, le=3650, description="Expiration en jours (max 10 ans)" + ) + rate_limit_per_minute: int = Field( + 60, ge=1, le=1000, description="Limite de requêtes par minute" + ) + allowed_endpoints: Optional[List[str]] = Field( + None, description="Endpoints autorisés ([] = tous, ['/clients*'] = wildcard)" + ) + + +class ApiKeyResponse(BaseModel): + """Schema de réponse pour une clé API""" + + id: str + name: str + description: Optional[str] + key_prefix: str + is_active: bool + is_expired: bool + rate_limit_per_minute: int + allowed_endpoints: Optional[List[str]] + total_requests: int + last_used_at: Optional[datetime] + created_at: datetime + expires_at: Optional[datetime] + revoked_at: Optional[datetime] + created_by: str + + +class ApiKeyCreatedResponse(ApiKeyResponse): + """Schema de réponse après création (inclut la clé en clair)""" + + api_key: str = Field( + ..., description="⚠️ Clé API en clair - à sauvegarder immédiatement" + ) + + +class ApiKeyList(BaseModel): + """Liste de clés API""" + + total: int + items: List[ApiKeyResponse] + + +class SwaggerUserCreate(BaseModel): + """Schema pour créer un utilisateur Swagger""" + + username: str = Field(..., min_length=3, max_length=100) + password: str = Field(..., min_length=8) + full_name: Optional[str] = None + email: Optional[str] = None + + +class SwaggerUserResponse(BaseModel): + """Schema de réponse pour un utilisateur Swagger""" + + id: str + username: str + full_name: Optional[str] + email: Optional[str] + is_active: bool + created_at: datetime + last_login: Optional[datetime] + + class Config: + from_attributes = True