from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from database import get_session, User from security.auth import decode_token from typing import Optional from datetime import datetime # ✅ AJOUT MANQUANT - C'ÉTAIT LE BUG ! security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: """ Dépendance FastAPI pour extraire l'utilisateur du JWT Usage dans un endpoint: @app.get("/protected") async def protected_route(user: User = Depends(get_current_user)): return {"user_id": user.id} """ token = credentials.credentials # Décoder le token payload = decode_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expiré", headers={"WWW-Authenticate": "Bearer"}, ) # Vérifier le type if payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Type de token incorrect", headers={"WWW-Authenticate": "Bearer"}, ) # Extraire user_id user_id: str = payload.get("sub") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token malformé", headers={"WWW-Authenticate": "Bearer"}, ) # Charger l'utilisateur result = await session.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Utilisateur introuvable", headers={"WWW-Authenticate": "Bearer"}, ) # Vérifications de sécurité if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé" ) if not user.is_verified: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Email non vérifié. Consultez votre boîte de réception.", ) # ✅ FIX: Vérifier si le compte est verrouillé (maintenant datetime est importé!) if user.locked_until and user.locked_until > datetime.now(): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Compte temporairement verrouillé suite à trop de tentatives échouées", ) return user async def get_current_user_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> Optional[User]: """ Version optionnelle - ne lève pas d'erreur si pas de token Utile pour des endpoints publics avec contenu enrichi si authentifié """ if not credentials: return None try: return await get_current_user(credentials, session) except HTTPException: return None def require_role(*allowed_roles: str): """ Décorateur pour restreindre l'accès par rôle Usage: @app.get("/admin/users") async def list_users(user: User = Depends(require_role("admin"))): ... """ async def role_checker(user: User = Depends(get_current_user)) -> User: if user.role not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}", ) return user return role_checker