diff --git a/api.py b/api.py index 53a2273..c35e280 100644 --- a/api.py +++ b/api.py @@ -181,7 +181,6 @@ def custom_openapi(): openapi_schema = app.openapi() - # Définir deux schémas de sécurité openapi_schema["components"]["securitySchemes"] = { "HTTPBearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}, "ApiKeyAuth": {"type": "apiKey", "in": "header", "name": "X-API-Key"}, diff --git a/core/dependencies.py b/core/dependencies.py index 8bb30ab..76c85be 100644 --- a/core/dependencies.py +++ b/core/dependencies.py @@ -2,13 +2,11 @@ from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select +from typing import Optional +from jwt.exceptions import InvalidTokenError + from database import get_session, User from security.auth import decode_token -from typing import Optional -from datetime import datetime -import logging - -logger = logging.getLogger(__name__) security = HTTPBearer(auto_error=False) @@ -18,62 +16,6 @@ async def get_current_user_hybrid( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: - if credentials and credentials.credentials: - token = credentials.credentials - - payload = decode_token(token) - if not payload: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Token invalide ou expiré", - headers={"WWW-Authenticate": "Bearer"}, - ) - - if payload.get("type") != "access": - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Type de token incorrect", - headers={"WWW-Authenticate": "Bearer"}, - ) - - user_id: str = payload.get("sub") - if not user_id: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Token malformé", - headers={"WWW-Authenticate": "Bearer"}, - ) - - result = await session.execute(select(User).where(User.id == user_id)) - user = result.scalar_one_or_none() - - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Utilisateur introuvable", - headers={"WWW-Authenticate": "Bearer"}, - ) - - if not user.is_active: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé" - ) - - if not user.is_verified: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Email non vérifié. Consultez votre boîte de réception.", - ) - - if user.locked_until and user.locked_until > datetime.now(): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Compte temporairement verrouillé suite à trop de tentatives échouées", - ) - - logger.debug(f" Authentifié via JWT: {user.email}") - return user - api_key_obj = getattr(request.state, "api_key", None) if api_key_obj: @@ -84,69 +26,76 @@ async def get_current_user_hybrid( user = result.scalar_one_or_none() if user: - logger.debug( - f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" - ) + user._is_api_key_user = True + user._api_key_obj = api_key_obj return user - from database import User as UserModel - - virtual_user = UserModel( + virtual_user = User( id=f"api_key_{api_key_obj.id}", - email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local", - nom="API Key", - prenom=api_key_obj.name, + email=f"api_key_{api_key_obj.id}@virtual.local", + username=api_key_obj.name, role="api_client", - is_verified=True, is_active=True, ) virtual_user._is_api_key_user = True virtual_user._api_key_obj = api_key_obj - logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)") return virtual_user - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Authentification requise (JWT ou API Key)", - headers={"WWW-Authenticate": "Bearer"}, - ) + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Authentification requise (JWT ou API Key)", + headers={"WWW-Authenticate": "Bearer"}, + ) + token = credentials.credentials -async def get_current_user_optional_hybrid( - request: Request, - credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), - session: AsyncSession = Depends(get_session), -) -> Optional[User]: - """Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)""" try: - return await get_current_user_hybrid(request, credentials, session) - except HTTPException: - return None + payload = decode_token(token) + user_id: str = payload.get("sub") - -def require_role(*allowed_roles: str): - async def role_checker( - request: Request, user: User = Depends(get_current_user_hybrid) - ) -> User: - is_api_key_user = getattr(user, "_is_api_key_user", False) - - if is_api_key_user: - if "api_client" not in allowed_roles and "*" not in allowed_roles: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.", - ) - logger.debug(" API Key autorisée pour cette route") - return user - - if user.role not in allowed_roles and "*" not in allowed_roles: + if user_id is None: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}", + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token invalide: user_id manquant", + headers={"WWW-Authenticate": "Bearer"}, ) + result = await session.execute(select(User).where(User.id == user_id)) + user = result.scalar_one_or_none() + + if user is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Utilisateur introuvable", + headers={"WWW-Authenticate": "Bearer"}, + ) + + if not user.is_active: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Utilisateur inactif", + ) + + return user + + except InvalidTokenError as e: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=f"Token invalide: {str(e)}", + headers={"WWW-Authenticate": "Bearer"}, + ) + + +def require_role_hybrid(*allowed_roles: str): + async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User: + if user.role not in allowed_roles: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}", + ) return user return role_checker @@ -158,9 +107,9 @@ def is_api_key_user(user: User) -> bool: def get_api_key_from_user(user: User): - """Récupère l'objet API Key depuis un user virtuel""" + """Récupère l'objet ApiKey depuis un utilisateur (si applicable)""" return getattr(user, "_api_key_obj", None) get_current_user = get_current_user_hybrid -get_current_user_optional = get_current_user_optional_hybrid +require_role = require_role_hybrid