118 lines
3.6 KiB
Python
118 lines
3.6 KiB
Python
from fastapi import Depends, HTTPException, status, Request
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from typing import Optional
|
|
from jwt.exceptions import InvalidTokenError
|
|
|
|
from database import get_session, User
|
|
from security.auth import decode_token
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def get_current_user_hybrid(
|
|
request: Request,
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> User:
|
|
api_key_obj = getattr(request.state, "api_key", None)
|
|
|
|
if api_key_obj:
|
|
if api_key_obj.user_id:
|
|
result = await session.execute(
|
|
select(User).where(User.id == api_key_obj.user_id)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user:
|
|
user._is_api_key_user = True
|
|
user._api_key_obj = api_key_obj
|
|
return user
|
|
|
|
virtual_user = User(
|
|
id=f"api_key_{api_key_obj.id}",
|
|
email=f"api_key_{api_key_obj.id}@virtual.local",
|
|
nom=api_key_obj.name,
|
|
prenom="API",
|
|
hashed_password="",
|
|
role="api_client",
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
|
|
virtual_user._is_api_key_user = True
|
|
virtual_user._api_key_obj = api_key_obj
|
|
|
|
return virtual_user
|
|
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise (JWT ou API Key)",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
token = credentials.credentials
|
|
|
|
try:
|
|
payload = decode_token(token)
|
|
user_id: str = payload.get("sub")
|
|
|
|
if user_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token invalide: user_id manquant",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
result = await session.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Utilisateur introuvable",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Utilisateur inactif",
|
|
)
|
|
|
|
return user
|
|
|
|
except InvalidTokenError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Token invalide: {str(e)}",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
def require_role_hybrid(*allowed_roles: str):
|
|
async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User:
|
|
if user.role not in allowed_roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}",
|
|
)
|
|
return user
|
|
|
|
return role_checker
|
|
|
|
|
|
def is_api_key_user(user: User) -> bool:
|
|
"""Vérifie si l'utilisateur est authentifié via API Key"""
|
|
return getattr(user, "_is_api_key_user", False)
|
|
|
|
|
|
def get_api_key_from_user(user: User):
|
|
"""Récupère l'objet ApiKey depuis un utilisateur (si applicable)"""
|
|
return getattr(user, "_api_key_obj", None)
|
|
|
|
|
|
get_current_user = get_current_user_hybrid
|
|
require_role = require_role_hybrid
|