113 lines
3.5 KiB
Python
113 lines
3.5 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from database import get_session, User
|
|
from security.auth import decode_token
|
|
from typing import Optional
|
|
from datetime import datetime # AJOUT MANQUANT - C'ÉTAIT LE BUG !
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> User:
|
|
token = credentials.credentials
|
|
|
|
# Décoder le token
|
|
payload = decode_token(token)
|
|
if not payload:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token invalide ou expiré",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Vérifier le type
|
|
if payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Type de token incorrect",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Extraire user_id
|
|
user_id: str = payload.get("sub")
|
|
if not user_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token malformé",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Charger l'utilisateur
|
|
result = await session.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Utilisateur introuvable",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Vérifications de sécurité
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
|
|
)
|
|
|
|
if not user.is_verified:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Email non vérifié. Consultez votre boîte de réception.",
|
|
)
|
|
|
|
# FIX: Vérifier si le compte est verrouillé (maintenant datetime est importé!)
|
|
if user.locked_until and user.locked_until > datetime.now():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
async def get_current_user_optional(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> Optional[User]:
|
|
"""
|
|
Version optionnelle - ne lève pas d'erreur si pas de token
|
|
Utile pour des endpoints publics avec contenu enrichi si authentifié
|
|
"""
|
|
if not credentials:
|
|
return None
|
|
|
|
try:
|
|
return await get_current_user(credentials, session)
|
|
except HTTPException:
|
|
return None
|
|
|
|
|
|
def require_role(*allowed_roles: str):
|
|
"""
|
|
Décorateur pour restreindre l'accès par rôle
|
|
|
|
Usage:
|
|
@app.get("/admin/users")
|
|
async def list_users(user: User = Depends(require_role("admin"))):
|
|
...
|
|
"""
|
|
|
|
async def role_checker(user: User = Depends(get_current_user)) -> User:
|
|
if user.role not in allowed_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
|
|
)
|
|
return user
|
|
|
|
return role_checker
|