from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import Optional from jwt.exceptions import InvalidTokenError from database import get_session, User from security.auth import decode_token security = HTTPBearer(auto_error=False) async def get_current_user_hybrid( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: api_key_obj = getattr(request.state, "api_key", None) if api_key_obj: if api_key_obj.user_id: result = await session.execute( select(User).where(User.id == api_key_obj.user_id) ) user = result.scalar_one_or_none() if user: user._is_api_key_user = True user._api_key_obj = api_key_obj return user virtual_user = User( id=f"api_key_{api_key_obj.id}", email=f"api_key_{api_key_obj.id}@virtual.local", nom=api_key_obj.name, prenom="API", hashed_password="", role="api_client", is_active=True, is_verified=True, ) virtual_user._is_api_key_user = True virtual_user._api_key_obj = api_key_obj return virtual_user if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise (JWT ou API Key)", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials try: payload = decode_token(token) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide: user_id manquant", headers={"WWW-Authenticate": "Bearer"}, ) result = await session.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Utilisateur introuvable", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Utilisateur inactif", ) return user except InvalidTokenError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Token invalide: {str(e)}", headers={"WWW-Authenticate": "Bearer"}, ) def require_role_hybrid(*allowed_roles: str): async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User: if user.role not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}", ) return user return role_checker def is_api_key_user(user: User) -> bool: """Vérifie si l'utilisateur est authentifié via API Key""" return getattr(user, "_is_api_key_user", False) def get_api_key_from_user(user: User): """Récupère l'objet ApiKey depuis un utilisateur (si applicable)""" return getattr(user, "_api_key_obj", None) get_current_user = get_current_user_hybrid require_role = require_role_hybrid