feat(security): implement API key authentication system

This commit is contained in:
Fanilo-Nantenaina 2026-01-19 20:38:01 +03:00
parent 9f6c1de8ef
commit a10fda072c
6 changed files with 1395 additions and 61 deletions

4
.gitignore vendored
View file

@ -45,4 +45,6 @@ tools/
.env.staging .env.staging
.env.production .env.production
.trunk .trunk
*clean*.py

321
config/cors_config.py Normal file
View file

@ -0,0 +1,321 @@
"""
CONFIGURATION CORS POUR API AVEC CLÉS API MULTIPLES
Problématique:
- Les clés API seront utilisées depuis de nombreux domaines/IPs différents
- Impossible de lister tous les origins autorisés à l'avance
- Solution: CORS permissif mais sécurisé par les clés API
Stratégies:
1. CORS ouvert avec validation par clé API (RECOMMANDÉ)
2. CORS dynamique basé sur whitelist
3. CORS avec wildcard et credentials=False
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from typing import List
import os
import logging
logger = logging.getLogger(__name__)
# ============================================
# STRATÉGIE 1: CORS OUVERT + VALIDATION API KEY
# ============================================
# ✅ RECOMMANDÉ pour les API publiques avec clés
#
# Principe:
# - Accepter toutes les origines (allow_origins=["*"])
# - La sécurité est assurée par la validation des clés API
# - Les clés API protègent l'accès, pas le CORS
def configure_cors_open(app: FastAPI):
"""
Configuration CORS ouverte (RECOMMANDÉE)
Accepte toutes les origines
Sécurité assurée par les clés API
Simplifie l'utilisation pour les clients
Attention: credentials=False obligatoire avec allow_origins=["*"]
"""
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Accepte toutes les origines
allow_credentials=False, # ⚠️ Obligatoire avec "*"
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["*"], # Accepte tous les headers (dont X-API-Key)
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600, # Cache preflight requests pendant 1h
)
logger.info("🌐 CORS configuré: Mode OUVERT (sécurisé par API Keys)")
logger.info(" - Origins: * (toutes)")
logger.info(" - Headers: * (dont X-API-Key)")
logger.info(" - Credentials: False")
# ============================================
# STRATÉGIE 2: CORS AVEC WHITELIST DYNAMIQUE
# ============================================
# 🔶 Pour environnements contrôlés
#
# Principe:
# - Lister explicitement les domaines autorisés
# - Peut inclure des patterns wildcards
# - Credentials possible (cookies, etc.)
def configure_cors_whitelist(app: FastAPI):
"""
Configuration CORS avec whitelist (MODE CONTRÔLÉ)
Meilleur contrôle des origines
Credentials possible
Nécessite maintenance de la liste
À utiliser si vous connaissez tous les domaines clients
"""
# Charger depuis .env ou config
allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "")
if allowed_origins_str:
allowed_origins = [
origin.strip()
for origin in allowed_origins_str.split(",")
if origin.strip()
]
else:
# Valeurs par défaut
allowed_origins = [
"http://localhost:3000", # Frontend dev React/Vue
"http://localhost:5173", # Vite dev
"http://localhost:8080", # Frontend dev alternatif
"https://app.votre-domaine.com",
"https://admin.votre-domaine.com",
# Ajouter vos domaines de production
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True, # ✅ Possible avec liste explicite
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600,
)
logger.info("🌐 CORS configuré: Mode WHITELIST")
logger.info(f" - Origins autorisées: {len(allowed_origins)}")
for origin in allowed_origins:
logger.info(f"{origin}")
# ============================================
# STRATÉGIE 3: CORS AVEC REGEX (AVANCÉ)
# ============================================
# 🔶 Pour patterns complexes
#
# Exemple: Autoriser tous les sous-domaines *.votre-domaine.com
def configure_cors_regex(app: FastAPI):
"""
Configuration CORS avec patterns regex (AVANCÉ)
Flexible pour sous-domaines
Supporte patterns complexes
Plus complexe à configurer
Utilise allow_origin_regex au lieu de allow_origins
"""
# Pattern regex pour autoriser tous les sous-domaines
origin_regex = r"https://.*\.votre-domaine\.com"
app.add_middleware(
CORSMiddleware,
allow_origin_regex=origin_regex, # ⚠️ Utilise regex au lieu de liste
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600,
)
logger.info("🌐 CORS configuré: Mode REGEX")
logger.info(f" - Pattern: {origin_regex}")
# ============================================
# STRATÉGIE 4: CORS HYBRIDE (PRODUCTION)
# ============================================
# ✅ RECOMMANDÉ pour production
#
# Principe:
# - Whitelist pour les domaines connus (credentials=True)
# - Fallback sur "*" pour le reste (credentials=False)
def configure_cors_hybrid(app: FastAPI):
"""
Configuration CORS hybride (PRODUCTION)
Meilleur des deux mondes
Whitelist pour domaines connus
Fallback ouvert pour API Keys externes
Note: Nécessite un middleware custom pour gérer les deux modes
"""
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class HybridCORSMiddleware(BaseHTTPMiddleware):
def __init__(self, app, known_origins: List[str]):
super().__init__(app)
self.known_origins = set(known_origins)
async def dispatch(self, request, call_next):
origin = request.headers.get("origin")
# Si origin connue → CORS strict avec credentials
if origin in self.known_origins:
response = await call_next(request)
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Allow-Methods"] = (
"GET, POST, PUT, DELETE, PATCH, OPTIONS"
)
response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, Authorization, X-API-Key"
)
return response
# Sinon → CORS ouvert sans credentials
response = await call_next(request)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = (
"GET, POST, PUT, DELETE, PATCH, OPTIONS"
)
response.headers["Access-Control-Allow-Headers"] = "*"
return response
# Domaines connus (whitelist)
known_origins = [
"https://app.votre-domaine.com",
"https://admin.votre-domaine.com",
"http://localhost:3000",
"http://localhost:5173",
]
app.add_middleware(HybridCORSMiddleware, known_origins=known_origins)
logger.info("🌐 CORS configuré: Mode HYBRIDE")
logger.info(f" - Whitelist: {len(known_origins)} domaines")
logger.info(" - Fallback: * (ouvert)")
# ============================================
# FONCTION PRINCIPALE
# ============================================
def setup_cors(app: FastAPI, mode: str = "open"):
"""
Configure CORS selon le mode choisi
Args:
app: Instance FastAPI
mode: "open" | "whitelist" | "regex" | "hybrid"
Recommandations:
- Development: "open"
- Production (API publique): "open" ou "hybrid"
- Production (API interne): "whitelist"
"""
if mode == "open":
configure_cors_open(app)
elif mode == "whitelist":
configure_cors_whitelist(app)
elif mode == "regex":
configure_cors_regex(app)
elif mode == "hybrid":
configure_cors_hybrid(app)
else:
logger.warning(
f"⚠️ Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut."
)
configure_cors_open(app)
# ============================================
# EXEMPLE D'UTILISATION DANS api.py
# ============================================
"""
# Dans api.py
from config.cors_config import setup_cors
app = FastAPI(...)
# DÉVELOPPEMENT
setup_cors(app, mode="open")
# PRODUCTION (API publique avec clés)
setup_cors(app, mode="hybrid")
# PRODUCTION (API interne uniquement)
setup_cors(app, mode="whitelist")
"""
# ============================================
# VARIABLES D'ENVIRONNEMENT (.env)
# ============================================
"""
# Pour mode "whitelist"
CORS_ALLOWED_ORIGINS=https://app.example.com,https://admin.example.com,http://localhost:3000
# Pour mode "regex"
CORS_ORIGIN_REGEX=https://.*\.example\.com
# Choisir le mode
CORS_MODE=open
"""
# ============================================
# FAQ CORS + API KEYS
# ============================================
"""
Q: Pourquoi allow_origins=["*"] est sécurisé avec des API Keys ?
R: Les clés API protègent l'accès aux données. CORS empêche seulement les
navigateurs web de faire des requêtes cross-origin. Un attaquant peut
contourner CORS facilement (curl, postman), donc la vraie sécurité vient
de la validation des clés API, pas du CORS.
Q: Pourquoi credentials=False avec allow_origins=["*"] ?
R: C'est une restriction du navigateur (spec CORS). Vous ne pouvez pas avoir
credentials=True ET origins=["*"] en même temps.
Q: Mes clients utilisent des IPs dynamiques, que faire ?
R: Utilisez mode "open". Les clés API sont justement faites pour ça -
permettre l'accès depuis n'importe quelle origine, de manière sécurisée.
Q: Je veux quand même utiliser des cookies/sessions ?
R: Utilisez mode "whitelist" ou "hybrid" pour les domaines qui ont besoin
de credentials, et laissez les autres utiliser X-API-Key sans credentials.
Q: Comment tester CORS localement ?
R: Créez un simple fichier HTML avec fetch() et ouvrez-le en file://
Ou utilisez un serveur local (python -m http.server)
"""

View file

@ -1,4 +1,4 @@
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select from sqlalchemy import select
@ -6,89 +6,208 @@ from database import get_session, User
from security.auth import decode_token from security.auth import decode_token
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
import logging
security = HTTPBearer() logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False) # ⚠️ auto_error=False pour gérer manuellement
async def get_current_user( async def get_current_user_hybrid(
credentials: HTTPAuthorizationCredentials = Depends(security), request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> User: ) -> User:
token = credentials.credentials """
VERSION HYBRIDE: Accepte JWT OU API Key
payload = decode_token(token) Priorité:
if not payload: 1. JWT (Authorization: Bearer)
raise HTTPException( 2. API Key (déjà validée par middleware)
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré", Si API Key utilisée, retourne un "user virtuel" basé sur la clé
headers={"WWW-Authenticate": "Bearer"}, """
# ============================================
# OPTION 1: JWT (comportement standard)
# ============================================
if credentials and credentials.credentials:
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
logger.debug(f"🔐 Authentifié via JWT: {user.email}")
return user
# ============================================
# OPTION 2: API Key (validée par middleware)
# ============================================
api_key_obj = getattr(request.state, "api_key", None)
if api_key_obj:
# Créer un "user virtuel" basé sur la clé API
# Cela permet aux routes existantes de fonctionner sans modification
# Si la clé est associée à un vrai user, le récupérer
if api_key_obj.user_id:
result = await session.execute(
select(User).where(User.id == api_key_obj.user_id)
)
user = result.scalar_one_or_none()
if user:
logger.debug(
f"🔑 Authentifié via API Key ({api_key_obj.name}) → User: {user.email}"
)
return user
# Sinon, créer un user virtuel (pour les clés API sans user associé)
from database import User as UserModel
virtual_user = UserModel(
id=f"api_key_{api_key_obj.id}",
email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local",
nom="API Key",
prenom=api_key_obj.name,
role="api_client", # Rôle spécial pour les API keys
is_verified=True,
is_active=True,
) )
if payload.get("type") != "access": # Marquer que c'est un user virtuel
raise HTTPException( virtual_user._is_api_key_user = True
status_code=status.HTTP_401_UNAUTHORIZED, virtual_user._api_key_obj = api_key_obj
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub") logger.debug(f"🔑 Authentifié via API Key: {api_key_obj.name} (user virtuel)")
if not user_id: return virtual_user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id)) # ============================================
user = result.scalar_one_or_none() # AUCUNE AUTHENTIFICATION
# ============================================
if not user: raise HTTPException(
raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED,
status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise (JWT ou API Key)",
detail="Utilisateur introuvable", headers={"WWW-Authenticate": "Bearer"},
headers={"WWW-Authenticate": "Bearer"}, )
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
return user
async def get_current_user_optional( async def get_current_user_optional_hybrid(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> Optional[User]: ) -> Optional[User]:
if not credentials: """Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)"""
return None
try: try:
return await get_current_user(credentials, session) return await get_current_user_hybrid(request, credentials, session)
except HTTPException: except HTTPException:
return None return None
def require_role(*allowed_roles: str): def require_role_hybrid(*allowed_roles: str):
async def role_checker(user: User = Depends(get_current_user)) -> User: """
if user.role not in allowed_roles: VERSION HYBRIDE: Vérification de rôle compatible avec API Keys
Notes:
- Les users via JWT ont leur vrai rôle
- Les users via API Key ont le rôle "api_client" par défaut
- Vous pouvez autoriser "api_client" dans allowed_roles pour permettre l'accès aux API Keys
"""
async def role_checker(
request: Request, user: User = Depends(get_current_user_hybrid)
) -> User:
# Vérifier si c'est un user API Key
is_api_key_user = getattr(user, "_is_api_key_user", False)
if is_api_key_user:
# Pour les API Keys, vérifier si "api_client" est autorisé
if "api_client" not in allowed_roles and "*" not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.",
)
logger.debug(f"✅ API Key autorisée pour cette route")
return user
# Pour les vrais users, vérification standard
if user.role not in allowed_roles and "*" not in allowed_roles:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}", detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
) )
return user return user
return role_checker return role_checker
# ============================================
# HELPERS
# ============================================
def is_api_key_user(user: User) -> bool:
"""Vérifie si l'utilisateur est authentifié via API Key"""
return getattr(user, "_is_api_key_user", False)
def get_api_key_from_user(user: User):
"""Récupère l'objet API Key depuis un user virtuel"""
return getattr(user, "_api_key_obj", None)
# ============================================
# RÉTROCOMPATIBILITÉ
# ============================================
# Alias pour garder la compatibilité avec le code existant
get_current_user = get_current_user_hybrid
get_current_user_optional = get_current_user_optional_hybrid

290
scripts/manage_security.py Normal file
View file

@ -0,0 +1,290 @@
#!/usr/bin/env python3
"""
Script CLI pour gérer les clés API et utilisateurs Swagger
Usage:
python manage_security.py swagger add <username> <password>
python manage_security.py swagger list
python manage_security.py swagger delete <username>
python manage_security.py apikey create <name> [--days 365] [--rate-limit 60]
python manage_security.py apikey list
python manage_security.py apikey revoke <key_id>
python manage_security.py apikey verify <api_key>
"""
import asyncio
import sys
from pathlib import Path
import argparse
sys.path.insert(0, str(Path(__file__).parent.parent))
from database import get_session
from database.models.api_key import SwaggerUser
from services.api_key import ApiKeyService
from security.auth import hash_password, verify_password
from sqlalchemy import select
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================
# GESTION DES UTILISATEURS SWAGGER
# ============================================
async def add_swagger_user(username: str, password: str, full_name: str = None):
"""Ajouter un utilisateur Swagger"""
async with get_session() as session:
# Vérifier si existe
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
existing = result.scalar_one_or_none()
if existing:
logger.error(f"❌ L'utilisateur {username} existe déjà")
return
# Créer
user = SwaggerUser(
username=username,
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
)
session.add(user)
await session.commit()
logger.info(f"✅ Utilisateur Swagger créé: {username}")
print(f"\n✅ Utilisateur créé avec succès")
print(f" Username: {username}")
print(f" Accès: https://votre-serveur/docs")
async def list_swagger_users():
"""Lister les utilisateurs Swagger"""
async with get_session() as session:
result = await session.execute(select(SwaggerUser))
users = result.scalars().all()
if not users:
print("Aucun utilisateur Swagger trouvé")
return
print(f"\n📋 {len(users)} utilisateur(s) Swagger:\n")
for user in users:
status = "✅ Actif" if user.is_active else "❌ Inactif"
print(f"{user.username:<20} {status}")
if user.full_name:
print(f" Nom: {user.full_name}")
if user.last_login:
print(f" Dernière connexion: {user.last_login}")
print()
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with get_session() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f"❌ Utilisateur {username} introuvable")
return
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur supprimé: {username}")
# ============================================
# GESTION DES CLÉS API
# ============================================
async def create_api_key(
name: str,
description: str = None,
expires_in_days: int = 365,
rate_limit: int = 60,
endpoints: list = None,
):
"""Créer une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
name=name,
description=description,
created_by="CLI",
expires_in_days=expires_in_days,
rate_limit_per_minute=rate_limit,
allowed_endpoints=endpoints,
)
print(f"\n✅ Clé API créée avec succès\n")
print(f" ID: {api_key_obj.id}")
print(f" Nom: {name}")
print(f" Clé: {api_key_plain}")
print(f" Préfixe: {api_key_obj.key_prefix}")
print(f" Rate limit: {rate_limit} req/min")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}")
print(f"\n⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n")
async def list_api_keys():
"""Lister les clés API"""
async with get_session() as session:
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
print("Aucune clé API trouvée")
return
print(f"\n📋 {len(keys)} clé(s) API:\n")
for key in keys:
status = "" if key.is_active else ""
expired = "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else ""
print(f" {status} {key.name:<30} ({key.key_prefix}...)")
print(f" ID: {key.id}")
print(f" Requêtes: {key.total_requests}")
print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if expired:
print(f" {expired}")
print()
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key = await service.get_by_id(key_id)
if not api_key:
logger.error(f"❌ Clé {key_id} introuvable")
return
success = await service.revoke_api_key(key_id)
if success:
logger.info(f"🚫 Clé révoquée: {api_key.name}")
print(f"\n✅ Clé '{api_key.name}' révoquée avec succès")
else:
logger.error("❌ Erreur lors de la révocation")
async def verify_api_key_cmd(api_key: str):
"""Vérifier une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
if api_key_obj:
print(f"\n✅ Clé API valide\n")
print(f" Nom: {api_key_obj.name}")
print(f" ID: {api_key_obj.id}")
print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
print(f" Requêtes: {api_key_obj.total_requests}")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n")
else:
print(f"\n❌ Clé API invalide, expirée ou révoquée\n")
# ============================================
# CLI PRINCIPAL
# ============================================
async def main():
parser = argparse.ArgumentParser(
description="Gestion de la sécurité Sage Dataven API"
)
subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles")
# === SWAGGER ===
swagger_parser = subparsers.add_parser("swagger", help="Gestion utilisateurs Swagger")
swagger_subparsers = swagger_parser.add_subparsers(dest="action")
# swagger add
swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur")
swagger_add.add_argument("username", help="Nom d'utilisateur")
swagger_add.add_argument("password", help="Mot de passe")
swagger_add.add_argument("--full-name", help="Nom complet")
# swagger list
swagger_subparsers.add_parser("list", help="Lister les utilisateurs")
# swagger delete
swagger_delete = swagger_subparsers.add_parser("delete", help="Supprimer un utilisateur")
swagger_delete.add_argument("username", help="Nom d'utilisateur")
# === API KEYS ===
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_subparsers = apikey_parser.add_subparsers(dest="action")
# apikey create
apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API")
apikey_create.add_argument("name", help="Nom de la clé")
apikey_create.add_argument("--description", help="Description")
apikey_create.add_argument("--days", type=int, default=365, help="Expiration en jours")
apikey_create.add_argument("--rate-limit", type=int, default=60, help="Limite req/min")
apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
# apikey list
apikey_subparsers.add_parser("list", help="Lister les clés")
# apikey revoke
apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé")
apikey_revoke.add_argument("key_id", help="ID de la clé")
# apikey verify
apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé")
apikey_verify.add_argument("api_key", help="Clé API à vérifier")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Exécution des commandes
if args.command == "swagger":
if args.action == "add":
await add_swagger_user(args.username, args.password, args.full_name)
elif args.action == "list":
await list_swagger_users()
elif args.action == "delete":
await delete_swagger_user(args.username)
else:
swagger_parser.print_help()
elif args.command == "apikey":
if args.action == "create":
await create_api_key(
args.name,
args.description,
args.days,
args.rate_limit,
args.endpoints,
)
elif args.action == "list":
await list_api_keys()
elif args.action == "revoke":
await revoke_api_key(args.key_id)
elif args.action == "verify":
await verify_api_key_cmd(args.api_key)
else:
apikey_parser.print_help()
if __name__ == "__main__":
from datetime import datetime
asyncio.run(main())

369
scripts/test_security.py Normal file
View file

@ -0,0 +1,369 @@
#!/usr/bin/env python3
"""
Script de test automatisé pour vérifier la sécurité de l'API
Usage:
python test_security.py --url http://votre-vps:8000 --swagger-user admin --swagger-pass password
"""
import requests
import argparse
import sys
from typing import Dict, Tuple
import json
class SecurityTester:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
self.results = {"passed": 0, "failed": 0, "tests": []}
def log_test(self, name: str, passed: bool, details: str = ""):
"""Enregistrer le résultat d'un test"""
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status} - {name}")
if details:
print(f" {details}")
self.results["tests"].append(
{"name": name, "passed": passed, "details": details}
)
if passed:
self.results["passed"] += 1
else:
self.results["failed"] += 1
def test_swagger_without_auth(self) -> bool:
"""Test 1: Swagger UI devrait demander une authentification"""
print("\n🔍 Test 1: Protection Swagger UI")
try:
response = requests.get(f"{self.base_url}/docs", timeout=5)
if response.status_code == 401:
self.log_test(
"Swagger protégé",
True,
"Code 401 retourné sans authentification",
)
return True
else:
self.log_test(
"Swagger protégé",
False,
f"Code {response.status_code} au lieu de 401",
)
return False
except Exception as e:
self.log_test("Swagger protégé", False, f"Erreur: {str(e)}")
return False
def test_swagger_with_auth(self, username: str, password: str) -> bool:
"""Test 2: Swagger UI accessible avec credentials valides"""
print("\n🔍 Test 2: Accès Swagger avec authentification")
try:
response = requests.get(
f"{self.base_url}/docs", auth=(username, password), timeout=5
)
if response.status_code == 200:
self.log_test(
"Accès Swagger avec auth",
True,
f"Authentifié comme {username}",
)
return True
else:
self.log_test(
"Accès Swagger avec auth",
False,
f"Code {response.status_code}, credentials invalides?",
)
return False
except Exception as e:
self.log_test("Accès Swagger avec auth", False, f"Erreur: {str(e)}")
return False
def test_api_without_auth(self) -> bool:
"""Test 3: Endpoints API devraient demander une authentification"""
print("\n🔍 Test 3: Protection des endpoints API")
test_endpoints = ["/api/v1/clients", "/api/v1/documents"]
all_protected = True
for endpoint in test_endpoints:
try:
response = requests.get(f"{self.base_url}{endpoint}", timeout=5)
if response.status_code == 401:
print(f"{endpoint} protégé (401)")
else:
print(
f"{endpoint} accessible sans auth (code {response.status_code})"
)
all_protected = False
except Exception as e:
print(f" ⚠️ {endpoint} erreur: {str(e)}")
all_protected = False
self.log_test("Endpoints API protégés", all_protected)
return all_protected
def test_health_endpoint_public(self) -> bool:
"""Test 4: Endpoint /health devrait être accessible sans auth"""
print("\n🔍 Test 4: Endpoint /health public")
try:
response = requests.get(f"{self.base_url}/health", timeout=5)
if response.status_code == 200:
self.log_test("/health accessible", True, "Endpoint public fonctionne")
return True
else:
self.log_test(
"/health accessible",
False,
f"Code {response.status_code} inattendu",
)
return False
except Exception as e:
self.log_test("/health accessible", False, f"Erreur: {str(e)}")
return False
def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]:
"""Test 5: Créer une clé API via l'endpoint"""
print("\n🔍 Test 5: Création d'une clé API")
try:
# 1. Login pour obtenir un JWT
login_response = requests.post(
f"{self.base_url}/api/v1/auth/login",
json={"email": username, "password": password},
timeout=5,
)
if login_response.status_code != 200:
self.log_test(
"Création clé API",
False,
"Impossible de se connecter pour obtenir un JWT",
)
return False, ""
jwt_token = login_response.json().get("access_token")
# 2. Créer une clé API
create_response = requests.post(
f"{self.base_url}/api/v1/api-keys",
headers={"Authorization": f"Bearer {jwt_token}"},
json={
"name": "Test API Key",
"description": "Clé de test automatisé",
"rate_limit_per_minute": 60,
"expires_in_days": 30,
},
timeout=5,
)
if create_response.status_code == 201:
api_key = create_response.json().get("api_key")
self.log_test("Création clé API", True, f"Clé créée: {api_key[:20]}...")
return True, api_key
else:
self.log_test(
"Création clé API",
False,
f"Code {create_response.status_code}",
)
return False, ""
except Exception as e:
self.log_test("Création clé API", False, f"Erreur: {str(e)}")
return False, ""
def test_api_key_usage(self, api_key: str) -> bool:
"""Test 6: Utiliser une clé API pour accéder à un endpoint"""
print("\n🔍 Test 6: Utilisation d'une clé API")
if not api_key:
self.log_test("Utilisation clé API", False, "Pas de clé disponible")
return False
try:
response = requests.get(
f"{self.base_url}/api/v1/clients",
headers={"X-API-Key": api_key},
timeout=5,
)
if response.status_code == 200:
self.log_test("Utilisation clé API", True, "Clé acceptée")
return True
else:
self.log_test(
"Utilisation clé API",
False,
f"Code {response.status_code}, clé refusée?",
)
return False
except Exception as e:
self.log_test("Utilisation clé API", False, f"Erreur: {str(e)}")
return False
def test_invalid_api_key(self) -> bool:
"""Test 7: Une clé invalide devrait être refusée"""
print("\n🔍 Test 7: Rejet de clé API invalide")
invalid_key = "sdk_live_invalid_key_12345"
try:
response = requests.get(
f"{self.base_url}/api/v1/clients",
headers={"X-API-Key": invalid_key},
timeout=5,
)
if response.status_code == 401:
self.log_test("Clé invalide rejetée", True, "Code 401 comme attendu")
return True
else:
self.log_test(
"Clé invalide rejetée",
False,
f"Code {response.status_code} au lieu de 401",
)
return False
except Exception as e:
self.log_test("Clé invalide rejetée", False, f"Erreur: {str(e)}")
return False
def test_rate_limiting(self, api_key: str) -> bool:
"""Test 8: Rate limiting (optionnel, peut prendre du temps)"""
print("\n🔍 Test 8: Rate limiting (test simple)")
if not api_key:
self.log_test("Rate limiting", False, "Pas de clé disponible")
return False
# Envoyer 70 requêtes rapidement (limite = 60/min)
print(" Envoi de 70 requêtes rapides...")
rate_limited = False
for i in range(70):
try:
response = requests.get(
f"{self.base_url}/health",
headers={"X-API-Key": api_key},
timeout=1,
)
if response.status_code == 429:
rate_limited = True
print(f" ⚠️ Rate limit atteint à la requête {i + 1}")
break
except Exception:
pass
if rate_limited:
self.log_test("Rate limiting", True, "Rate limit détecté")
return True
else:
self.log_test(
"Rate limiting",
True,
"Aucun rate limit détecté (peut être normal si pas implémenté)",
)
return True
def print_summary(self):
"""Afficher le résumé des tests"""
print("\n" + "=" * 60)
print("📊 RÉSUMÉ DES TESTS")
print("=" * 60)
total = self.results["passed"] + self.results["failed"]
success_rate = (self.results["passed"] / total * 100) if total > 0 else 0
print(f"\nTotal: {total} tests")
print(f"✅ Réussis: {self.results['passed']}")
print(f"❌ Échoués: {self.results['failed']}")
print(f"📈 Taux de réussite: {success_rate:.1f}%\n")
if self.results["failed"] == 0:
print("🎉 Tous les tests sont passés ! Sécurité OK.")
return 0
else:
print("⚠️ Certains tests ont échoué. Vérifiez la configuration.")
return 1
def main():
parser = argparse.ArgumentParser(
description="Test automatisé de la sécurité de l'API"
)
parser.add_argument(
"--url",
required=True,
help="URL de base de l'API (ex: http://localhost:8000)",
)
parser.add_argument(
"--swagger-user", required=True, help="Utilisateur Swagger pour les tests"
)
parser.add_argument(
"--swagger-pass", required=True, help="Mot de passe Swagger pour les tests"
)
parser.add_argument(
"--skip-rate-limit",
action="store_true",
help="Sauter le test de rate limiting (long)",
)
args = parser.parse_args()
print("🚀 Démarrage des tests de sécurité")
print(f"🎯 URL cible: {args.url}\n")
tester = SecurityTester(args.url)
# Exécuter les tests
tester.test_swagger_without_auth()
tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass)
tester.test_api_without_auth()
tester.test_health_endpoint_public()
# Tests nécessitant une clé API
success, api_key = tester.test_api_key_creation(
args.swagger_user, args.swagger_pass
)
if success and api_key:
tester.test_api_key_usage(api_key)
tester.test_invalid_api_key()
if not args.skip_rate_limit:
tester.test_rate_limiting(api_key)
else:
print("\n⏭️ Test de rate limiting sauté")
else:
print("\n⚠️ Tests avec clé API sautés (création échouée)")
# Résumé
exit_code = tester.print_summary()
sys.exit(exit_code)
if __name__ == "__main__":
main()

233
services/api_key.py Normal file
View file

@ -0,0 +1,233 @@
import secrets
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_
import logging
from database.models.api_key import ApiKey
logger = logging.getLogger(__name__)
class ApiKeyService:
"""Service de gestion des clés API"""
def __init__(self, session: AsyncSession):
self.session = session
@staticmethod
def generate_api_key() -> str:
"""Génère une clé API unique et sécurisée"""
# Format: sdk_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
random_part = secrets.token_urlsafe(32)
return f"sdk_live_{random_part}"
@staticmethod
def hash_api_key(api_key: str) -> str:
"""Hash la clé API pour stockage sécurisé"""
return hashlib.sha256(api_key.encode()).hexdigest()
@staticmethod
def get_key_prefix(api_key: str) -> str:
"""Extrait le préfixe de la clé pour identification"""
# Retourne les 12 premiers caractères
return api_key[:12] if len(api_key) >= 12 else api_key
async def create_api_key(
self,
name: str,
description: Optional[str] = None,
created_by: str = "system",
user_id: Optional[str] = None,
expires_in_days: Optional[int] = None,
rate_limit_per_minute: int = 60,
allowed_endpoints: Optional[List[str]] = None,
) -> tuple[ApiKey, str]:
"""
Crée une nouvelle clé API
Returns:
tuple[ApiKey, str]: (objet ApiKey, clé en clair - à ne montrer qu'une fois)
"""
# Génération de la clé
api_key_plain = self.generate_api_key()
key_hash = self.hash_api_key(api_key_plain)
key_prefix = self.get_key_prefix(api_key_plain)
# Calcul de la date d'expiration
expires_at = None
if expires_in_days:
expires_at = datetime.now() + timedelta(days=expires_in_days)
# Création de l'objet
api_key_obj = ApiKey(
key_hash=key_hash,
key_prefix=key_prefix,
name=name,
description=description,
created_by=created_by,
user_id=user_id,
expires_at=expires_at,
rate_limit_per_minute=rate_limit_per_minute,
allowed_endpoints=json.dumps(allowed_endpoints)
if allowed_endpoints
else None,
)
self.session.add(api_key_obj)
await self.session.commit()
await self.session.refresh(api_key_obj)
logger.info(f"✅ Clé API créée: {name} (prefix: {key_prefix})")
return api_key_obj, api_key_plain
async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]:
"""
Vérifie une clé API et retourne l'objet si valide
Returns:
Optional[ApiKey]: L'objet ApiKey si valide, None sinon
"""
key_hash = self.hash_api_key(api_key_plain)
result = await self.session.execute(
select(ApiKey).where(
and_(
ApiKey.key_hash == key_hash,
ApiKey.is_active == True,
ApiKey.revoked_at.is_(None),
or_(
ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now()
),
)
)
)
api_key_obj = result.scalar_one_or_none()
if api_key_obj:
# Mise à jour des statistiques
api_key_obj.total_requests += 1
api_key_obj.last_used_at = datetime.now()
await self.session.commit()
logger.debug(f"🔑 Clé API validée: {api_key_obj.name}")
else:
logger.warning(f"⚠️ Clé API invalide ou expirée")
return api_key_obj
async def list_api_keys(
self,
include_revoked: bool = False,
user_id: Optional[str] = None,
) -> List[ApiKey]:
"""Liste les clés API"""
query = select(ApiKey)
if not include_revoked:
query = query.where(ApiKey.revoked_at.is_(None))
if user_id:
query = query.where(ApiKey.user_id == user_id)
query = query.order_by(ApiKey.created_at.desc())
result = await self.session.execute(query)
return list(result.scalars().all())
async def revoke_api_key(self, key_id: str) -> bool:
"""Révoque une clé API"""
result = await self.session.execute(select(ApiKey).where(ApiKey.id == key_id))
api_key_obj = result.scalar_one_or_none()
if not api_key_obj:
return False
api_key_obj.is_active = False
api_key_obj.revoked_at = datetime.now()
await self.session.commit()
logger.info(f"🚫 Clé API révoquée: {api_key_obj.name}")
return True
async def get_by_id(self, key_id: str) -> Optional[ApiKey]:
"""Récupère une clé API par son ID"""
result = await self.session.execute(select(ApiKey).where(ApiKey.id == key_id))
return result.scalar_one_or_none()
async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]:
"""
Vérifie le rate limit d'une clé API (à implémenter avec Redis/cache)
Returns:
tuple[bool, Dict]: (is_allowed, info_dict)
"""
# TODO: Implémenter avec Redis pour un vrai rate limiting
# Pour l'instant, retourne toujours True
return True, {
"allowed": True,
"limit": api_key_obj.rate_limit_per_minute,
"remaining": api_key_obj.rate_limit_per_minute,
}
async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool:
"""Vérifie si la clé a accès à un endpoint spécifique"""
if not api_key_obj.allowed_endpoints:
# Si aucune restriction, accès total
return True
try:
allowed = json.loads(api_key_obj.allowed_endpoints)
# Support des wildcards
for pattern in allowed:
if pattern == "*":
return True
if pattern.endswith("*"):
prefix = pattern[:-1]
if endpoint.startswith(prefix):
return True
if pattern == endpoint:
return True
return False
except json.JSONDecodeError:
logger.error(f"❌ Erreur parsing allowed_endpoints pour {api_key_obj.id}")
return False
def api_key_to_response(api_key_obj: ApiKey, show_key: bool = False) -> Dict:
"""Convertit un objet ApiKey en réponse API"""
allowed_endpoints = None
if api_key_obj.allowed_endpoints:
try:
allowed_endpoints = json.loads(api_key_obj.allowed_endpoints)
except json.JSONDecodeError:
pass
is_expired = False
if api_key_obj.expires_at:
is_expired = api_key_obj.expires_at < datetime.now()
return {
"id": api_key_obj.id,
"name": api_key_obj.name,
"description": api_key_obj.description,
"key_prefix": api_key_obj.key_prefix,
"is_active": api_key_obj.is_active,
"is_expired": is_expired,
"rate_limit_per_minute": api_key_obj.rate_limit_per_minute,
"allowed_endpoints": allowed_endpoints,
"total_requests": api_key_obj.total_requests,
"last_used_at": api_key_obj.last_used_at,
"created_at": api_key_obj.created_at,
"expires_at": api_key_obj.expires_at,
"revoked_at": api_key_obj.revoked_at,
"created_by": api_key_obj.created_by,
}