refactor(auth): simplify authentication logic and improve error handling

This commit is contained in:
Fanilo-Nantenaina 2026-01-20 14:25:06 +03:00
parent 41ca202d4b
commit c84e4ddc20
2 changed files with 56 additions and 108 deletions

1
api.py
View file

@ -181,7 +181,6 @@ def custom_openapi():
openapi_schema = app.openapi()
# Définir deux schémas de sécurité
openapi_schema["components"]["securitySchemes"] = {
"HTTPBearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"},
"ApiKeyAuth": {"type": "apiKey", "in": "header", "name": "X-API-Key"},

View file

@ -2,13 +2,11 @@ from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from jwt.exceptions import InvalidTokenError
from database import get_session, User
from security.auth import decode_token
from typing import Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False)
@ -18,62 +16,6 @@ async def get_current_user_hybrid(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
if credentials and credentials.credentials:
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
logger.debug(f" Authentifié via JWT: {user.email}")
return user
api_key_obj = getattr(request.state, "api_key", None)
if api_key_obj:
@ -84,69 +26,76 @@ async def get_current_user_hybrid(
user = result.scalar_one_or_none()
if user:
logger.debug(
f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}"
)
user._is_api_key_user = True
user._api_key_obj = api_key_obj
return user
from database import User as UserModel
virtual_user = UserModel(
virtual_user = User(
id=f"api_key_{api_key_obj.id}",
email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local",
nom="API Key",
prenom=api_key_obj.name,
email=f"api_key_{api_key_obj.id}@virtual.local",
username=api_key_obj.name,
role="api_client",
is_verified=True,
is_active=True,
)
virtual_user._is_api_key_user = True
virtual_user._api_key_obj = api_key_obj
logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)")
return virtual_user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (JWT ou API Key)",
headers={"WWW-Authenticate": "Bearer"},
)
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (JWT ou API Key)",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
async def get_current_user_optional_hybrid(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)"""
try:
return await get_current_user_hybrid(request, credentials, session)
except HTTPException:
return None
payload = decode_token(token)
user_id: str = payload.get("sub")
def require_role(*allowed_roles: str):
async def role_checker(
request: Request, user: User = Depends(get_current_user_hybrid)
) -> User:
is_api_key_user = getattr(user, "_is_api_key_user", False)
if is_api_key_user:
if "api_client" not in allowed_roles and "*" not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.",
)
logger.debug(" API Key autorisée pour cette route")
return user
if user.role not in allowed_roles and "*" not in allowed_roles:
if user_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide: user_id manquant",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utilisateur inactif",
)
return user
except InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token invalide: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def require_role_hybrid(*allowed_roles: str):
async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User:
if user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}",
)
return user
return role_checker
@ -158,9 +107,9 @@ def is_api_key_user(user: User) -> bool:
def get_api_key_from_user(user: User):
"""Récupère l'objet API Key depuis un user virtuel"""
"""Récupère l'objet ApiKey depuis un utilisateur (si applicable)"""
return getattr(user, "_api_key_obj", None)
get_current_user = get_current_user_hybrid
get_current_user_optional = get_current_user_optional_hybrid
require_role = require_role_hybrid