feat(api-keys): implement api key management system

This commit is contained in:
Fanilo-Nantenaina 2026-01-19 20:35:03 +03:00
parent 09eae50952
commit 9f6c1de8ef
3 changed files with 553 additions and 0 deletions

296
middleware/security.py Normal file
View file

@ -0,0 +1,296 @@
import secrets
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from sqlalchemy import select
from typing import Optional
from datetime import datetime
import logging
from database import get_session
from database.models.api_key import SwaggerUser
from security.auth import verify_password
logger = logging.getLogger(__name__)
# === Configuration Swagger ===
security = HTTPBasic()
async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
"""
VERSION 2: Vérification des identifiants Swagger via base de données
Plus sécurisé
Gestion centralisée
Tracking des connexions
"""
username = credentials.username
password = credentials.password
try:
# Utiliser get_session de manière asynchrone
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
swagger_user = result.scalar_one_or_none()
if swagger_user and swagger_user.is_active:
if verify_password(password, swagger_user.hashed_password):
# Mise à jour de la dernière connexion
swagger_user.last_login = datetime.now()
await session.commit()
logger.info(f"✅ Accès Swagger autorisé (DB): {username}")
return True
logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}")
return False
except Exception as e:
logger.error(f"❌ Erreur vérification Swagger credentials: {e}")
return False
class SwaggerAuthMiddleware:
"""
Middleware pour protéger les endpoints de documentation
(/docs, /redoc, /openapi.json)
VERSION 2: Avec vérification en base de données
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive=receive)
path = request.url.path
# Endpoints à protéger
protected_paths = ["/docs", "/redoc", "/openapi.json"]
if any(path.startswith(protected_path) for protected_path in protected_paths):
# Vérification de l'authentification Basic
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
# Demande d'authentification
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise pour accéder à la documentation"
},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
# Extraction des credentials
try:
import base64
encoded_credentials = auth_header.split(" ")[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode(
"utf-8"
)
username, password = decoded_credentials.split(":", 1)
credentials = HTTPBasicCredentials(username=username, password=password)
# Vérification via DB
if not await verify_swagger_credentials(credentials):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
except Exception as e:
logger.error(f"❌ Erreur parsing auth header: {e}")
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Format d'authentification invalide"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
# Si tout est OK, continuer
await self.app(scope, receive, send)
class ApiKeyMiddleware:
"""
Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics
Accepte X-API-Key OU Authorization: Bearer (JWT)
Les deux méthodes sont équivalentes
Les endpoints /auth/* restent accessibles sans auth
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive=receive)
path = request.url.path
# ============================================
# ENDPOINTS EXCLUS (accessibles sans auth)
# ============================================
excluded_paths = [
"/docs",
"/redoc",
"/openapi.json",
"/health",
"/",
# === ROUTES AUTH (toujours publiques) ===
"/api/v1/auth/login",
"/api/v1/auth/register",
"/api/v1/auth/verify-email",
"/api/v1/auth/reset-password",
"/api/v1/auth/request-reset",
"/api/v1/auth/refresh",
]
if any(path.startswith(excluded_path) for excluded_path in excluded_paths):
await self.app(scope, receive, send)
return
# ============================================
# VÉRIFICATION HYBRIDE: API Key OU JWT
# ============================================
# Option 1: Vérifier si JWT présent
auth_header = request.headers.get("Authorization")
has_jwt = auth_header and auth_header.startswith("Bearer ")
# Option 2: Vérifier si API Key présente
api_key = request.headers.get("X-API-Key")
has_api_key = api_key is not None
# ============================================
# LOGIQUE HYBRIDE
# ============================================
if has_jwt:
# JWT présent → laisser passer, sera validé par les dependencies FastAPI
logger.debug(f"🔑 JWT détecté pour {path}")
await self.app(scope, receive, send)
return
elif has_api_key:
# API Key présente → valider la clé
logger.debug(f"🔑 API Key détectée pour {path}")
from services.api_key import ApiKeyService
try:
async for session in get_session():
api_key_service = ApiKeyService(session)
api_key_obj = await api_key_service.verify_api_key(api_key)
if not api_key_obj:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Clé API invalide ou expirée",
"hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer <jwt>",
},
)
await response(scope, receive, send)
return
# Vérification du rate limit
is_allowed, rate_info = await api_key_service.check_rate_limit(
api_key_obj
)
if not is_allowed:
response = JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"},
headers={
"X-RateLimit-Limit": str(rate_info["limit"]),
"X-RateLimit-Remaining": "0",
},
)
await response(scope, receive, send)
return
# Vérification de l'accès à l'endpoint
has_access = await api_key_service.check_endpoint_access(
api_key_obj, path
)
if not has_access:
response = JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"detail": "Accès non autorisé à cet endpoint",
"endpoint": path,
"api_key": api_key_obj.key_prefix + "...",
},
)
await response(scope, receive, send)
return
# ✅ Clé valide → ajouter les infos à la requête
request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key"
logger.info(f"✅ API Key valide: {api_key_obj.name}{path}")
# Continuer la requête
await self.app(scope, receive, send)
return
except Exception as e:
logger.error(f"❌ Erreur validation API Key: {e}", exc_info=True)
response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "Erreur interne lors de la validation de la clé"
},
)
await response(scope, receive, send)
return
else:
# ❌ Ni JWT ni API Key
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise",
"hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer <jwt>'",
},
headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'},
)
await response(scope, receive, send)
return
# === Fonction helper pour récupérer l'API Key depuis la requête ===
def get_api_key_from_request(request: Request) -> Optional:
"""Récupère l'objet ApiKey depuis la requête si présent"""
return getattr(request.state, "api_key", None)
def get_auth_method(request: Request) -> str:
"""
Retourne la méthode d'authentification utilisée
Returns:
"jwt" | "api_key" | "none"
"""
return getattr(request.state, "authenticated_via", "none")

180
routes/api_keys.py Normal file
View file

@ -0,0 +1,180 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
import logging
from database import get_session, User
from core.dependencies import get_current_user, require_role
from services.api_key import ApiKeyService, api_key_to_response
from schemas.api_key import (
ApiKeyCreate,
ApiKeyCreatedResponse,
ApiKeyResponse,
ApiKeyList,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api-keys", tags=["API Keys Management"])
@router.post(
"",
response_model=ApiKeyCreatedResponse,
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(require_role("admin", "super_admin"))],
)
async def create_api_key(
data: ApiKeyCreate,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""
🔑 Créer une nouvelle clé API
**Réservé aux admins**
La clé en clair ne sera affichée qu'une seule fois !
"""
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
name=data.name,
description=data.description,
created_by=user.email,
user_id=user.id,
expires_in_days=data.expires_in_days,
rate_limit_per_minute=data.rate_limit_per_minute,
allowed_endpoints=data.allowed_endpoints,
)
logger.info(f"✅ Clé API créée par {user.email}: {data.name}")
response_data = api_key_to_response(api_key_obj)
response_data["api_key"] = api_key_plain
return ApiKeyCreatedResponse(**response_data)
@router.get("", response_model=ApiKeyList)
async def list_api_keys(
include_revoked: bool = Query(False, description="Inclure les clés révoquées"),
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""
📋 Lister toutes les clés API
**Réservé aux admins** - liste toutes les clés
**Utilisateurs standards** - liste uniquement leurs clés
"""
service = ApiKeyService(session)
# Si admin, voir toutes les clés, sinon seulement les siennes
user_id = None if user.role in ["admin", "super_admin"] else user.id
keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id)
items = [ApiKeyResponse(**api_key_to_response(k)) for k in keys]
return ApiKeyList(total=len(items), items=items)
@router.get("/{key_id}", response_model=ApiKeyResponse)
async def get_api_key(
key_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""🔍 Récupérer une clé API par son ID"""
service = ApiKeyService(session)
api_key_obj = await service.get_by_id(key_id)
if not api_key_obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Clé API {key_id} introuvable",
)
# Vérification des permissions
if user.role not in ["admin", "super_admin"]:
if api_key_obj.user_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Accès refusé à cette clé",
)
return ApiKeyResponse(**api_key_to_response(api_key_obj))
@router.delete("/{key_id}", status_code=status.HTTP_200_OK)
async def revoke_api_key(
key_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
"""
🚫 Révoquer une clé API
**Action irréversible** - la clé sera désactivée définitivement
"""
service = ApiKeyService(session)
api_key_obj = await service.get_by_id(key_id)
if not api_key_obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Clé API {key_id} introuvable",
)
# Vérification des permissions
if user.role not in ["admin", "super_admin"]:
if api_key_obj.user_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Accès refusé à cette clé",
)
success = await service.revoke_api_key(key_id)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Erreur lors de la révocation",
)
logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}")
return {
"success": True,
"message": f"Clé API '{api_key_obj.name}' révoquée avec succès",
}
@router.post("/verify", status_code=status.HTTP_200_OK)
async def verify_api_key_endpoint(
api_key: str = Query(..., description="Clé API à vérifier"),
session: AsyncSession = Depends(get_session),
):
"""
Vérifier la validité d'une clé API
**Endpoint public** - permet de tester une clé
"""
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
if not api_key_obj:
return {
"valid": False,
"message": "Clé API invalide, expirée ou révoquée",
}
return {
"valid": True,
"message": "Clé API valide",
"key_name": api_key_obj.name,
"rate_limit": api_key_obj.rate_limit_per_minute,
"expires_at": api_key_obj.expires_at,
}

77
schemas/api_key.py Normal file
View file

@ -0,0 +1,77 @@
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class ApiKeyCreate(BaseModel):
"""Schema pour créer une clé API"""
name: str = Field(..., min_length=3, max_length=255, description="Nom de la clé")
description: Optional[str] = Field(None, description="Description de l'usage")
expires_in_days: Optional[int] = Field(
None, ge=1, le=3650, description="Expiration en jours (max 10 ans)"
)
rate_limit_per_minute: int = Field(
60, ge=1, le=1000, description="Limite de requêtes par minute"
)
allowed_endpoints: Optional[List[str]] = Field(
None, description="Endpoints autorisés ([] = tous, ['/clients*'] = wildcard)"
)
class ApiKeyResponse(BaseModel):
"""Schema de réponse pour une clé API"""
id: str
name: str
description: Optional[str]
key_prefix: str
is_active: bool
is_expired: bool
rate_limit_per_minute: int
allowed_endpoints: Optional[List[str]]
total_requests: int
last_used_at: Optional[datetime]
created_at: datetime
expires_at: Optional[datetime]
revoked_at: Optional[datetime]
created_by: str
class ApiKeyCreatedResponse(ApiKeyResponse):
"""Schema de réponse après création (inclut la clé en clair)"""
api_key: str = Field(
..., description="⚠️ Clé API en clair - à sauvegarder immédiatement"
)
class ApiKeyList(BaseModel):
"""Liste de clés API"""
total: int
items: List[ApiKeyResponse]
class SwaggerUserCreate(BaseModel):
"""Schema pour créer un utilisateur Swagger"""
username: str = Field(..., min_length=3, max_length=100)
password: str = Field(..., min_length=8)
full_name: Optional[str] = None
email: Optional[str] = None
class SwaggerUserResponse(BaseModel):
"""Schema de réponse pour un utilisateur Swagger"""
id: str
username: str
full_name: Optional[str]
email: Optional[str]
is_active: bool
created_at: datetime
last_login: Optional[datetime]
class Config:
from_attributes = True