Sage100-vps/core/dependencies.py
2025-12-29 11:20:55 +03:00

113 lines
3.5 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_session, User
from security.auth import decode_token
from typing import Optional
from datetime import datetime # AJOUT MANQUANT - C'ÉTAIT LE BUG !
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
token = credentials.credentials
# Décoder le token
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
# Vérifier le type
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
# Extraire user_id
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
# Charger l'utilisateur
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
# Vérifications de sécurité
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
# FIX: Vérifier si le compte est verrouillé (maintenant datetime est importé!)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
return user
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""
Version optionnelle - ne lève pas d'erreur si pas de token
Utile pour des endpoints publics avec contenu enrichi si authentifié
"""
if not credentials:
return None
try:
return await get_current_user(credentials, session)
except HTTPException:
return None
def require_role(*allowed_roles: str):
"""
Décorateur pour restreindre l'accès par rôle
Usage:
@app.get("/admin/users")
async def list_users(user: User = Depends(require_role("admin"))):
...
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
)
return user
return role_checker