refactor: remove emojis and clean up code comments

This commit is contained in:
Fanilo-Nantenaina 2026-01-20 06:31:17 +03:00
parent a10fda072c
commit 1164c7975a
13 changed files with 137 additions and 548 deletions

View file

@ -1,17 +1,3 @@
"""
CONFIGURATION CORS POUR API AVEC CLÉS API MULTIPLES
Problématique:
- Les clés API seront utilisées depuis de nombreux domaines/IPs différents
- Impossible de lister tous les origins autorisés à l'avance
- Solution: CORS permissif mais sécurisé par les clés API
Stratégies:
1. CORS ouvert avec validation par clé API (RECOMMANDÉ)
2. CORS dynamique basé sur whitelist
3. CORS avec wildcard et credentials=False
"""
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from typing import List from typing import List
@ -21,66 +7,24 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ============================================
# STRATÉGIE 1: CORS OUVERT + VALIDATION API KEY
# ============================================
# ✅ RECOMMANDÉ pour les API publiques avec clés
#
# Principe:
# - Accepter toutes les origines (allow_origins=["*"])
# - La sécurité est assurée par la validation des clés API
# - Les clés API protègent l'accès, pas le CORS
def configure_cors_open(app: FastAPI): def configure_cors_open(app: FastAPI):
"""
Configuration CORS ouverte (RECOMMANDÉE)
Accepte toutes les origines
Sécurité assurée par les clés API
Simplifie l'utilisation pour les clients
Attention: credentials=False obligatoire avec allow_origins=["*"]
"""
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], # Accepte toutes les origines allow_origins=["*"],
allow_credentials=False, # ⚠️ Obligatoire avec "*" allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["*"], # Accepte tous les headers (dont X-API-Key) allow_headers=["*"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600, # Cache preflight requests pendant 1h max_age=3600,
) )
logger.info("🌐 CORS configuré: Mode OUVERT (sécurisé par API Keys)") logger.info(" CORS configuré: Mode OUVERT (sécurisé par API Keys)")
logger.info(" - Origins: * (toutes)") logger.info(" - Origins: * (toutes)")
logger.info(" - Headers: * (dont X-API-Key)") logger.info(" - Headers: * (dont X-API-Key)")
logger.info(" - Credentials: False") logger.info(" - Credentials: False")
# ============================================
# STRATÉGIE 2: CORS AVEC WHITELIST DYNAMIQUE
# ============================================
# 🔶 Pour environnements contrôlés
#
# Principe:
# - Lister explicitement les domaines autorisés
# - Peut inclure des patterns wildcards
# - Credentials possible (cookies, etc.)
def configure_cors_whitelist(app: FastAPI): def configure_cors_whitelist(app: FastAPI):
"""
Configuration CORS avec whitelist (MODE CONTRÔLÉ)
Meilleur contrôle des origines
Credentials possible
Nécessite maintenance de la liste
À utiliser si vous connaissez tous les domaines clients
"""
# Charger depuis .env ou config
allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "") allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "")
if allowed_origins_str: if allowed_origins_str:
@ -90,57 +34,11 @@ def configure_cors_whitelist(app: FastAPI):
if origin.strip() if origin.strip()
] ]
else: else:
# Valeurs par défaut allowed_origins = ["*"]
allowed_origins = [
"http://localhost:3000", # Frontend dev React/Vue
"http://localhost:5173", # Vite dev
"http://localhost:8080", # Frontend dev alternatif
"https://app.votre-domaine.com",
"https://admin.votre-domaine.com",
# Ajouter vos domaines de production
]
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=allowed_origins, allow_origins=allowed_origins,
allow_credentials=True, # ✅ Possible avec liste explicite
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600,
)
logger.info("🌐 CORS configuré: Mode WHITELIST")
logger.info(f" - Origins autorisées: {len(allowed_origins)}")
for origin in allowed_origins:
logger.info(f"{origin}")
# ============================================
# STRATÉGIE 3: CORS AVEC REGEX (AVANCÉ)
# ============================================
# 🔶 Pour patterns complexes
#
# Exemple: Autoriser tous les sous-domaines *.votre-domaine.com
def configure_cors_regex(app: FastAPI):
"""
Configuration CORS avec patterns regex (AVANCÉ)
Flexible pour sous-domaines
Supporte patterns complexes
Plus complexe à configurer
Utilise allow_origin_regex au lieu de allow_origins
"""
# Pattern regex pour autoriser tous les sous-domaines
origin_regex = r"https://.*\.votre-domaine\.com"
app.add_middleware(
CORSMiddleware,
allow_origin_regex=origin_regex, # ⚠️ Utilise regex au lieu de liste
allow_credentials=True, allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"], allow_headers=["Content-Type", "Authorization", "X-API-Key"],
@ -148,32 +46,31 @@ def configure_cors_regex(app: FastAPI):
max_age=3600, max_age=3600,
) )
logger.info("🌐 CORS configuré: Mode REGEX") logger.info(" CORS configuré: Mode WHITELIST")
logger.info(f" - Origins autorisées: {len(allowed_origins)}")
for origin in allowed_origins:
logger.info(f"{origin}")
def configure_cors_regex(app: FastAPI):
origin_regex = r"*"
app.add_middleware(
CORSMiddleware,
allow_origin_regex=origin_regex,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-API-Key"],
expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"],
max_age=3600,
)
logger.info(" CORS configuré: Mode REGEX")
logger.info(f" - Pattern: {origin_regex}") logger.info(f" - Pattern: {origin_regex}")
# ============================================
# STRATÉGIE 4: CORS HYBRIDE (PRODUCTION)
# ============================================
# ✅ RECOMMANDÉ pour production
#
# Principe:
# - Whitelist pour les domaines connus (credentials=True)
# - Fallback sur "*" pour le reste (credentials=False)
def configure_cors_hybrid(app: FastAPI): def configure_cors_hybrid(app: FastAPI):
"""
Configuration CORS hybride (PRODUCTION)
Meilleur des deux mondes
Whitelist pour domaines connus
Fallback ouvert pour API Keys externes
Note: Nécessite un middleware custom pour gérer les deux modes
"""
from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class HybridCORSMiddleware(BaseHTTPMiddleware): class HybridCORSMiddleware(BaseHTTPMiddleware):
def __init__(self, app, known_origins: List[str]): def __init__(self, app, known_origins: List[str]):
@ -183,7 +80,6 @@ def configure_cors_hybrid(app: FastAPI):
async def dispatch(self, request, call_next): async def dispatch(self, request, call_next):
origin = request.headers.get("origin") origin = request.headers.get("origin")
# Si origin connue → CORS strict avec credentials
if origin in self.known_origins: if origin in self.known_origins:
response = await call_next(request) response = await call_next(request)
response.headers["Access-Control-Allow-Origin"] = origin response.headers["Access-Control-Allow-Origin"] = origin
@ -196,7 +92,6 @@ def configure_cors_hybrid(app: FastAPI):
) )
return response return response
# Sinon → CORS ouvert sans credentials
response = await call_next(request) response = await call_next(request)
response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = ( response.headers["Access-Control-Allow-Methods"] = (
@ -205,40 +100,16 @@ def configure_cors_hybrid(app: FastAPI):
response.headers["Access-Control-Allow-Headers"] = "*" response.headers["Access-Control-Allow-Headers"] = "*"
return response return response
# Domaines connus (whitelist) known_origins = ["*"]
known_origins = [
"https://app.votre-domaine.com",
"https://admin.votre-domaine.com",
"http://localhost:3000",
"http://localhost:5173",
]
app.add_middleware(HybridCORSMiddleware, known_origins=known_origins) app.add_middleware(HybridCORSMiddleware, known_origins=known_origins)
logger.info("🌐 CORS configuré: Mode HYBRIDE") logger.info(" CORS configuré: Mode HYBRIDE")
logger.info(f" - Whitelist: {len(known_origins)} domaines") logger.info(f" - Whitelist: {len(known_origins)} domaines")
logger.info(" - Fallback: * (ouvert)") logger.info(" - Fallback: * (ouvert)")
# ============================================
# FONCTION PRINCIPALE
# ============================================
def setup_cors(app: FastAPI, mode: str = "open"): def setup_cors(app: FastAPI, mode: str = "open"):
"""
Configure CORS selon le mode choisi
Args:
app: Instance FastAPI
mode: "open" | "whitelist" | "regex" | "hybrid"
Recommandations:
- Development: "open"
- Production (API publique): "open" ou "hybrid"
- Production (API interne): "whitelist"
"""
if mode == "open": if mode == "open":
configure_cors_open(app) configure_cors_open(app)
elif mode == "whitelist": elif mode == "whitelist":
@ -249,73 +120,6 @@ def setup_cors(app: FastAPI, mode: str = "open"):
configure_cors_hybrid(app) configure_cors_hybrid(app)
else: else:
logger.warning( logger.warning(
f"⚠️ Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." f" Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut."
) )
configure_cors_open(app) configure_cors_open(app)
# ============================================
# EXEMPLE D'UTILISATION DANS api.py
# ============================================
"""
# Dans api.py
from config.cors_config import setup_cors
app = FastAPI(...)
# DÉVELOPPEMENT
setup_cors(app, mode="open")
# PRODUCTION (API publique avec clés)
setup_cors(app, mode="hybrid")
# PRODUCTION (API interne uniquement)
setup_cors(app, mode="whitelist")
"""
# ============================================
# VARIABLES D'ENVIRONNEMENT (.env)
# ============================================
"""
# Pour mode "whitelist"
CORS_ALLOWED_ORIGINS=https://app.example.com,https://admin.example.com,http://localhost:3000
# Pour mode "regex"
CORS_ORIGIN_REGEX=https://.*\.example\.com
# Choisir le mode
CORS_MODE=open
"""
# ============================================
# FAQ CORS + API KEYS
# ============================================
"""
Q: Pourquoi allow_origins=["*"] est sécurisé avec des API Keys ?
R: Les clés API protègent l'accès aux données. CORS empêche seulement les
navigateurs web de faire des requêtes cross-origin. Un attaquant peut
contourner CORS facilement (curl, postman), donc la vraie sécurité vient
de la validation des clés API, pas du CORS.
Q: Pourquoi credentials=False avec allow_origins=["*"] ?
R: C'est une restriction du navigateur (spec CORS). Vous ne pouvez pas avoir
credentials=True ET origins=["*"] en même temps.
Q: Mes clients utilisent des IPs dynamiques, que faire ?
R: Utilisez mode "open". Les clés API sont justement faites pour ça -
permettre l'accès depuis n'importe quelle origine, de manière sécurisée.
Q: Je veux quand même utiliser des cookies/sessions ?
R: Utilisez mode "whitelist" ou "hybrid" pour les domaines qui ont besoin
de credentials, et laissez les autres utiliser X-API-Key sans credentials.
Q: Comment tester CORS localement ?
R: Créez un simple fichier HTML avec fetch() et ouvrez-le en file://
Ou utilisez un serveur local (python -m http.server)
"""

View file

@ -10,7 +10,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False) # ⚠️ auto_error=False pour gérer manuellement security = HTTPBearer(auto_error=False)
async def get_current_user_hybrid( async def get_current_user_hybrid(
@ -18,19 +18,6 @@ async def get_current_user_hybrid(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> User: ) -> User:
"""
VERSION HYBRIDE: Accepte JWT OU API Key
Priorité:
1. JWT (Authorization: Bearer)
2. API Key (déjà validée par middleware)
Si API Key utilisée, retourne un "user virtuel" basé sur la clé
"""
# ============================================
# OPTION 1: JWT (comportement standard)
# ============================================
if credentials and credentials.credentials: if credentials and credentials.credentials:
token = credentials.credentials token = credentials.credentials
@ -84,19 +71,12 @@ async def get_current_user_hybrid(
detail="Compte temporairement verrouillé suite à trop de tentatives échouées", detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
) )
logger.debug(f"🔐 Authentifié via JWT: {user.email}") logger.debug(f" Authentifié via JWT: {user.email}")
return user return user
# ============================================
# OPTION 2: API Key (validée par middleware)
# ============================================
api_key_obj = getattr(request.state, "api_key", None) api_key_obj = getattr(request.state, "api_key", None)
if api_key_obj: if api_key_obj:
# Créer un "user virtuel" basé sur la clé API
# Cela permet aux routes existantes de fonctionner sans modification
# Si la clé est associée à un vrai user, le récupérer
if api_key_obj.user_id: if api_key_obj.user_id:
result = await session.execute( result = await session.execute(
select(User).where(User.id == api_key_obj.user_id) select(User).where(User.id == api_key_obj.user_id)
@ -105,11 +85,10 @@ async def get_current_user_hybrid(
if user: if user:
logger.debug( logger.debug(
f"🔑 Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}"
) )
return user return user
# Sinon, créer un user virtuel (pour les clés API sans user associé)
from database import User as UserModel from database import User as UserModel
virtual_user = UserModel( virtual_user = UserModel(
@ -117,21 +96,17 @@ async def get_current_user_hybrid(
email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local", email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local",
nom="API Key", nom="API Key",
prenom=api_key_obj.name, prenom=api_key_obj.name,
role="api_client", # Rôle spécial pour les API keys role="api_client",
is_verified=True, is_verified=True,
is_active=True, is_active=True,
) )
# Marquer que c'est un user virtuel
virtual_user._is_api_key_user = True virtual_user._is_api_key_user = True
virtual_user._api_key_obj = api_key_obj virtual_user._api_key_obj = api_key_obj
logger.debug(f"🔑 Authentifié via API Key: {api_key_obj.name} (user virtuel)") logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)")
return virtual_user return virtual_user
# ============================================
# AUCUNE AUTHENTIFICATION
# ============================================
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (JWT ou API Key)", detail="Authentification requise (JWT ou API Key)",
@ -152,32 +127,20 @@ async def get_current_user_optional_hybrid(
def require_role_hybrid(*allowed_roles: str): def require_role_hybrid(*allowed_roles: str):
"""
VERSION HYBRIDE: Vérification de rôle compatible avec API Keys
Notes:
- Les users via JWT ont leur vrai rôle
- Les users via API Key ont le rôle "api_client" par défaut
- Vous pouvez autoriser "api_client" dans allowed_roles pour permettre l'accès aux API Keys
"""
async def role_checker( async def role_checker(
request: Request, user: User = Depends(get_current_user_hybrid) request: Request, user: User = Depends(get_current_user_hybrid)
) -> User: ) -> User:
# Vérifier si c'est un user API Key
is_api_key_user = getattr(user, "_is_api_key_user", False) is_api_key_user = getattr(user, "_is_api_key_user", False)
if is_api_key_user: if is_api_key_user:
# Pour les API Keys, vérifier si "api_client" est autorisé
if "api_client" not in allowed_roles and "*" not in allowed_roles: if "api_client" not in allowed_roles and "*" not in allowed_roles:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.", detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.",
) )
logger.debug(f" API Key autorisée pour cette route") logger.debug(" API Key autorisée pour cette route")
return user return user
# Pour les vrais users, vérification standard
if user.role not in allowed_roles and "*" not in allowed_roles: if user.role not in allowed_roles and "*" not in allowed_roles:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
@ -189,11 +152,6 @@ def require_role_hybrid(*allowed_roles: str):
return role_checker return role_checker
# ============================================
# HELPERS
# ============================================
def is_api_key_user(user: User) -> bool: def is_api_key_user(user: User) -> bool:
"""Vérifie si l'utilisateur est authentifié via API Key""" """Vérifie si l'utilisateur est authentifié via API Key"""
return getattr(user, "_is_api_key_user", False) return getattr(user, "_is_api_key_user", False)
@ -204,10 +162,5 @@ def get_api_key_from_user(user: User):
return getattr(user, "_api_key_obj", None) return getattr(user, "_api_key_obj", None)
# ============================================
# RÉTROCOMPATIBILITÉ
# ============================================
# Alias pour garder la compatibilité avec le code existant
get_current_user = get_current_user_hybrid get_current_user = get_current_user_hybrid
get_current_user_optional = get_current_user_optional_hybrid get_current_user_optional = get_current_user_optional_hybrid

View file

@ -12,29 +12,21 @@ class ApiKey(Base):
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
key_hash = Column(String(64), unique=True, nullable=False, index=True) key_hash = Column(String(64), unique=True, nullable=False, index=True)
key_prefix = Column( key_prefix = Column(String(10), nullable=False)
String(10), nullable=False
) # Premiers caractères pour identification
name = Column(String(255), nullable=False) name = Column(String(255), nullable=False)
description = Column(Text, nullable=True) description = Column(Text, nullable=True)
# Métadonnées user_id = Column(String(36), nullable=True)
user_id = Column(String(36), nullable=True) # Optionnel si associé à un utilisateur
created_by = Column(String(255), nullable=False) created_by = Column(String(255), nullable=False)
# Contrôle d'accès
is_active = Column(Boolean, default=True, nullable=False) is_active = Column(Boolean, default=True, nullable=False)
rate_limit_per_minute = Column(Integer, default=60, nullable=False) rate_limit_per_minute = Column(Integer, default=60, nullable=False)
allowed_endpoints = Column( allowed_endpoints = Column(Text, nullable=True)
Text, nullable=True
) # JSON array des endpoints autorisés
# Statistiques
total_requests = Column(Integer, default=0, nullable=False) total_requests = Column(Integer, default=0, nullable=False)
last_used_at = Column(DateTime, nullable=True) last_used_at = Column(DateTime, nullable=True)
# Dates
created_at = Column(DateTime, default=datetime.now, nullable=False) created_at = Column(DateTime, default=datetime.now, nullable=False)
expires_at = Column(DateTime, nullable=True) expires_at = Column(DateTime, nullable=True)
revoked_at = Column(DateTime, nullable=True) revoked_at = Column(DateTime, nullable=True)

View file

@ -1,5 +1,4 @@
import secrets from fastapi import Request, status
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.security import HTTPBasic, HTTPBasicCredentials
from sqlalchemy import select from sqlalchemy import select
@ -13,23 +12,14 @@ from security.auth import verify_password
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# === Configuration Swagger ===
security = HTTPBasic() security = HTTPBasic()
async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
"""
VERSION 2: Vérification des identifiants Swagger via base de données
Plus sécurisé
Gestion centralisée
Tracking des connexions
"""
username = credentials.username username = credentials.username
password = credentials.password password = credentials.password
try: try:
# Utiliser get_session de manière asynchrone
async for session in get_session(): async for session in get_session():
result = await session.execute( result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username) select(SwaggerUser).where(SwaggerUser.username == username)
@ -38,29 +28,21 @@ async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
if swagger_user and swagger_user.is_active: if swagger_user and swagger_user.is_active:
if verify_password(password, swagger_user.hashed_password): if verify_password(password, swagger_user.hashed_password):
# Mise à jour de la dernière connexion
swagger_user.last_login = datetime.now() swagger_user.last_login = datetime.now()
await session.commit() await session.commit()
logger.info(f" Accès Swagger autorisé (DB): {username}") logger.info(f" Accès Swagger autorisé (DB): {username}")
return True return True
logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}") logger.warning(f" Tentative d'accès Swagger refusée: {username}")
return False return False
except Exception as e: except Exception as e:
logger.error(f" Erreur vérification Swagger credentials: {e}") logger.error(f" Erreur vérification Swagger credentials: {e}")
return False return False
class SwaggerAuthMiddleware: class SwaggerAuthMiddleware:
"""
Middleware pour protéger les endpoints de documentation
(/docs, /redoc, /openapi.json)
VERSION 2: Avec vérification en base de données
"""
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
@ -72,15 +54,12 @@ class SwaggerAuthMiddleware:
request = Request(scope, receive=receive) request = Request(scope, receive=receive)
path = request.url.path path = request.url.path
# Endpoints à protéger
protected_paths = ["/docs", "/redoc", "/openapi.json"] protected_paths = ["/docs", "/redoc", "/openapi.json"]
if any(path.startswith(protected_path) for protected_path in protected_paths): if any(path.startswith(protected_path) for protected_path in protected_paths):
# Vérification de l'authentification Basic
auth_header = request.headers.get("Authorization") auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "): if not auth_header or not auth_header.startswith("Basic "):
# Demande d'authentification
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={ content={
@ -91,7 +70,6 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
# Extraction des credentials
try: try:
import base64 import base64
@ -103,7 +81,6 @@ class SwaggerAuthMiddleware:
credentials = HTTPBasicCredentials(username=username, password=password) credentials = HTTPBasicCredentials(username=username, password=password)
# Vérification via DB
if not await verify_swagger_credentials(credentials): if not await verify_swagger_credentials(credentials):
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -114,7 +91,7 @@ class SwaggerAuthMiddleware:
return return
except Exception as e: except Exception as e:
logger.error(f" Erreur parsing auth header: {e}") logger.error(f" Erreur parsing auth header: {e}")
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Format d'authentification invalide"}, content={"detail": "Format d'authentification invalide"},
@ -123,19 +100,10 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
# Si tout est OK, continuer
await self.app(scope, receive, send) await self.app(scope, receive, send)
class ApiKeyMiddleware: class ApiKeyMiddleware:
"""
Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics
Accepte X-API-Key OU Authorization: Bearer (JWT)
Les deux méthodes sont équivalentes
Les endpoints /auth/* restent accessibles sans auth
"""
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
@ -147,53 +115,37 @@ class ApiKeyMiddleware:
request = Request(scope, receive=receive) request = Request(scope, receive=receive)
path = request.url.path path = request.url.path
# ============================================
# ENDPOINTS EXCLUS (accessibles sans auth)
# ============================================
excluded_paths = [ excluded_paths = [
"/docs", "/docs",
"/redoc", "/redoc",
"/openapi.json", "/openapi.json",
"/health", "/health",
"/", "/",
# === ROUTES AUTH (toujours publiques) === "/auth/login",
"/api/v1/auth/login", "/auth/register",
"/api/v1/auth/register", "/auth/verify-email",
"/api/v1/auth/verify-email", "/auth/reset-password",
"/api/v1/auth/reset-password", "/auth/request-reset",
"/api/v1/auth/request-reset", "/auth/refresh",
"/api/v1/auth/refresh",
] ]
if any(path.startswith(excluded_path) for excluded_path in excluded_paths): if any(path.startswith(excluded_path) for excluded_path in excluded_paths):
await self.app(scope, receive, send) await self.app(scope, receive, send)
return return
# ============================================
# VÉRIFICATION HYBRIDE: API Key OU JWT
# ============================================
# Option 1: Vérifier si JWT présent
auth_header = request.headers.get("Authorization") auth_header = request.headers.get("Authorization")
has_jwt = auth_header and auth_header.startswith("Bearer ") has_jwt = auth_header and auth_header.startswith("Bearer ")
# Option 2: Vérifier si API Key présente
api_key = request.headers.get("X-API-Key") api_key = request.headers.get("X-API-Key")
has_api_key = api_key is not None has_api_key = api_key is not None
# ============================================
# LOGIQUE HYBRIDE
# ============================================
if has_jwt: if has_jwt:
# JWT présent → laisser passer, sera validé par les dependencies FastAPI logger.debug(f" JWT détecté pour {path}")
logger.debug(f"🔑 JWT détecté pour {path}")
await self.app(scope, receive, send) await self.app(scope, receive, send)
return return
elif has_api_key: elif has_api_key:
# API Key présente → valider la clé logger.debug(f" API Key détectée pour {path}")
logger.debug(f"🔑 API Key détectée pour {path}")
from services.api_key import ApiKeyService from services.api_key import ApiKeyService
@ -213,7 +165,6 @@ class ApiKeyMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
# Vérification du rate limit
is_allowed, rate_info = await api_key_service.check_rate_limit( is_allowed, rate_info = await api_key_service.check_rate_limit(
api_key_obj api_key_obj
) )
@ -229,7 +180,6 @@ class ApiKeyMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
# Vérification de l'accès à l'endpoint
has_access = await api_key_service.check_endpoint_access( has_access = await api_key_service.check_endpoint_access(
api_key_obj, path api_key_obj, path
) )
@ -245,18 +195,16 @@ class ApiKeyMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
# ✅ Clé valide → ajouter les infos à la requête
request.state.api_key = api_key_obj request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key" request.state.authenticated_via = "api_key"
logger.info(f" API Key valide: {api_key_obj.name}{path}") logger.info(f" API Key valide: {api_key_obj.name}{path}")
# Continuer la requête
await self.app(scope, receive, send) await self.app(scope, receive, send)
return return
except Exception as e: except Exception as e:
logger.error(f" Erreur validation API Key: {e}", exc_info=True) logger.error(f" Erreur validation API Key: {e}", exc_info=True)
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={ content={
@ -267,7 +215,6 @@ class ApiKeyMiddleware:
return return
else: else:
# ❌ Ni JWT ni API Key
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={ content={
@ -280,17 +227,10 @@ class ApiKeyMiddleware:
return return
# === Fonction helper pour récupérer l'API Key depuis la requête ===
def get_api_key_from_request(request: Request) -> Optional: def get_api_key_from_request(request: Request) -> Optional:
"""Récupère l'objet ApiKey depuis la requête si présent""" """Récupère l'objet ApiKey depuis la requête si présent"""
return getattr(request.state, "api_key", None) return getattr(request.state, "api_key", None)
def get_auth_method(request: Request) -> str: def get_auth_method(request: Request) -> str:
"""
Retourne la méthode d'authentification utilisée
Returns:
"jwt" | "api_key" | "none"
"""
return getattr(request.state, "authenticated_via", "none") return getattr(request.state, "authenticated_via", "none")

View file

@ -27,13 +27,6 @@ async def create_api_key(
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
): ):
"""
🔑 Créer une nouvelle clé API
**Réservé aux admins**
La clé en clair ne sera affichée qu'une seule fois !
"""
service = ApiKeyService(session) service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key( api_key_obj, api_key_plain = await service.create_api_key(
@ -46,7 +39,7 @@ async def create_api_key(
allowed_endpoints=data.allowed_endpoints, allowed_endpoints=data.allowed_endpoints,
) )
logger.info(f" Clé API créée par {user.email}: {data.name}") logger.info(f" Clé API créée par {user.email}: {data.name}")
response_data = api_key_to_response(api_key_obj) response_data = api_key_to_response(api_key_obj)
response_data["api_key"] = api_key_plain response_data["api_key"] = api_key_plain
@ -60,15 +53,8 @@ async def list_api_keys(
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
): ):
"""
📋 Lister toutes les clés API
**Réservé aux admins** - liste toutes les clés
**Utilisateurs standards** - liste uniquement leurs clés
"""
service = ApiKeyService(session) service = ApiKeyService(session)
# Si admin, voir toutes les clés, sinon seulement les siennes
user_id = None if user.role in ["admin", "super_admin"] else user.id user_id = None if user.role in ["admin", "super_admin"] else user.id
keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id) keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id)
@ -84,7 +70,7 @@ async def get_api_key(
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
): ):
"""🔍 Récupérer une clé API par son ID""" """ Récupérer une clé API par son ID"""
service = ApiKeyService(session) service = ApiKeyService(session)
api_key_obj = await service.get_by_id(key_id) api_key_obj = await service.get_by_id(key_id)
@ -95,7 +81,6 @@ async def get_api_key(
detail=f"Clé API {key_id} introuvable", detail=f"Clé API {key_id} introuvable",
) )
# Vérification des permissions
if user.role not in ["admin", "super_admin"]: if user.role not in ["admin", "super_admin"]:
if api_key_obj.user_id != user.id: if api_key_obj.user_id != user.id:
raise HTTPException( raise HTTPException(
@ -112,11 +97,6 @@ async def revoke_api_key(
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
): ):
"""
🚫 Révoquer une clé API
**Action irréversible** - la clé sera désactivée définitivement
"""
service = ApiKeyService(session) service = ApiKeyService(session)
api_key_obj = await service.get_by_id(key_id) api_key_obj = await service.get_by_id(key_id)
@ -127,7 +107,6 @@ async def revoke_api_key(
detail=f"Clé API {key_id} introuvable", detail=f"Clé API {key_id} introuvable",
) )
# Vérification des permissions
if user.role not in ["admin", "super_admin"]: if user.role not in ["admin", "super_admin"]:
if api_key_obj.user_id != user.id: if api_key_obj.user_id != user.id:
raise HTTPException( raise HTTPException(
@ -143,7 +122,7 @@ async def revoke_api_key(
detail="Erreur lors de la révocation", detail="Erreur lors de la révocation",
) )
logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}") logger.info(f" Clé API révoquée par {user.email}: {api_key_obj.name}")
return { return {
"success": True, "success": True,
@ -156,11 +135,6 @@ async def verify_api_key_endpoint(
api_key: str = Query(..., description="Clé API à vérifier"), api_key: str = Query(..., description="Clé API à vérifier"),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
): ):
"""
Vérifier la validité d'une clé API
**Endpoint public** - permet de tester une clé
"""
service = ApiKeyService(session) service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key) api_key_obj = await service.verify_api_key(api_key)

View file

@ -42,7 +42,7 @@ class ApiKeyCreatedResponse(ApiKeyResponse):
"""Schema de réponse après création (inclut la clé en clair)""" """Schema de réponse après création (inclut la clé en clair)"""
api_key: str = Field( api_key: str = Field(
..., description="⚠️ Clé API en clair - à sauvegarder immédiatement" ..., description=" Clé API en clair - à sauvegarder immédiatement"
) )

View file

@ -1,18 +1,3 @@
#!/usr/bin/env python3
"""
Script CLI pour gérer les clés API et utilisateurs Swagger
Usage:
python manage_security.py swagger add <username> <password>
python manage_security.py swagger list
python manage_security.py swagger delete <username>
python manage_security.py apikey create <name> [--days 365] [--rate-limit 60]
python manage_security.py apikey list
python manage_security.py apikey revoke <key_id>
python manage_security.py apikey verify <api_key>
"""
import asyncio import asyncio
import sys import sys
from pathlib import Path from pathlib import Path
@ -23,7 +8,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from database import get_session from database import get_session
from database.models.api_key import SwaggerUser from database.models.api_key import SwaggerUser
from services.api_key import ApiKeyService from services.api_key import ApiKeyService
from security.auth import hash_password, verify_password from security.auth import hash_password
from sqlalchemy import select from sqlalchemy import select
import logging import logging
@ -31,24 +16,18 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ============================================
# GESTION DES UTILISATEURS SWAGGER
# ============================================
async def add_swagger_user(username: str, password: str, full_name: str = None): async def add_swagger_user(username: str, password: str, full_name: str = None):
"""Ajouter un utilisateur Swagger""" """Ajouter un utilisateur Swagger"""
async with get_session() as session: async with get_session() as session:
# Vérifier si existe
result = await session.execute( result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username) select(SwaggerUser).where(SwaggerUser.username == username)
) )
existing = result.scalar_one_or_none() existing = result.scalar_one_or_none()
if existing: if existing:
logger.error(f" L'utilisateur {username} existe déjà") logger.error(f" L'utilisateur {username} existe déjà")
return return
# Créer
user = SwaggerUser( user = SwaggerUser(
username=username, username=username,
hashed_password=hash_password(password), hashed_password=hash_password(password),
@ -59,10 +38,10 @@ async def add_swagger_user(username: str, password: str, full_name: str = None):
session.add(user) session.add(user)
await session.commit() await session.commit()
logger.info(f" Utilisateur Swagger créé: {username}") logger.info(f" Utilisateur Swagger créé: {username}")
print(f"\n Utilisateur créé avec succès") print("\n Utilisateur créé avec succès")
print(f" Username: {username}") print(f" Username: {username}")
print(f" Accès: https://votre-serveur/docs") print(" Accès: https://votre-serveur/docs")
async def list_swagger_users(): async def list_swagger_users():
@ -75,9 +54,9 @@ async def list_swagger_users():
print("Aucun utilisateur Swagger trouvé") print("Aucun utilisateur Swagger trouvé")
return return
print(f"\n📋 {len(users)} utilisateur(s) Swagger:\n") print(f"\n {len(users)} utilisateur(s) Swagger:\n")
for user in users: for user in users:
status = " Actif" if user.is_active else " Inactif" status = " Actif" if user.is_active else " Inactif"
print(f"{user.username:<20} {status}") print(f"{user.username:<20} {status}")
if user.full_name: if user.full_name:
print(f" Nom: {user.full_name}") print(f" Nom: {user.full_name}")
@ -95,7 +74,7 @@ async def delete_swagger_user(username: str):
user = result.scalar_one_or_none() user = result.scalar_one_or_none()
if not user: if not user:
logger.error(f" Utilisateur {username} introuvable") logger.error(f" Utilisateur {username} introuvable")
return return
await session.delete(user) await session.delete(user)
@ -104,10 +83,6 @@ async def delete_swagger_user(username: str):
logger.info(f"🗑️ Utilisateur supprimé: {username}") logger.info(f"🗑️ Utilisateur supprimé: {username}")
# ============================================
# GESTION DES CLÉS API
# ============================================
async def create_api_key( async def create_api_key(
name: str, name: str,
description: str = None, description: str = None,
@ -128,14 +103,14 @@ async def create_api_key(
allowed_endpoints=endpoints, allowed_endpoints=endpoints,
) )
print(f"\n Clé API créée avec succès\n") print("\n Clé API créée avec succès\n")
print(f" ID: {api_key_obj.id}") print(f" ID: {api_key_obj.id}")
print(f" Nom: {name}") print(f" Nom: {name}")
print(f" Clé: {api_key_plain}") print(f" Clé: {api_key_plain}")
print(f" Préfixe: {api_key_obj.key_prefix}") print(f" Préfixe: {api_key_obj.key_prefix}")
print(f" Rate limit: {rate_limit} req/min") print(f" Rate limit: {rate_limit} req/min")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}")
print(f"\n⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n")
async def list_api_keys(): async def list_api_keys():
@ -148,11 +123,15 @@ async def list_api_keys():
print("Aucune clé API trouvée") print("Aucune clé API trouvée")
return return
print(f"\n📋 {len(keys)} clé(s) API:\n") print(f"\n {len(keys)} clé(s) API:\n")
for key in keys: for key in keys:
status = "" if key.is_active else "" status = "" if key.is_active else ""
expired = "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else "" expired = (
"⏰ Expirée"
if key.expires_at and key.expires_at < datetime.now()
else ""
)
print(f" {status} {key.name:<30} ({key.key_prefix}...)") print(f" {status} {key.name:<30} ({key.key_prefix}...)")
print(f" ID: {key.id}") print(f" ID: {key.id}")
print(f" Requêtes: {key.total_requests}") print(f" Requêtes: {key.total_requests}")
@ -166,19 +145,19 @@ async def revoke_api_key(key_id: str):
"""Révoquer une clé API""" """Révoquer une clé API"""
async with get_session() as session: async with get_session() as session:
service = ApiKeyService(session) service = ApiKeyService(session)
api_key = await service.get_by_id(key_id) api_key = await service.get_by_id(key_id)
if not api_key: if not api_key:
logger.error(f" Clé {key_id} introuvable") logger.error(f" Clé {key_id} introuvable")
return return
success = await service.revoke_api_key(key_id) success = await service.revoke_api_key(key_id)
if success: if success:
logger.info(f"🚫 Clé révoquée: {api_key.name}") logger.info(f" Clé révoquée: {api_key.name}")
print(f"\n Clé '{api_key.name}' révoquée avec succès") print(f"\n Clé '{api_key.name}' révoquée avec succès")
else: else:
logger.error(" Erreur lors de la révocation") logger.error(" Erreur lors de la révocation")
async def verify_api_key_cmd(api_key: str): async def verify_api_key_cmd(api_key: str):
@ -188,64 +167,59 @@ async def verify_api_key_cmd(api_key: str):
api_key_obj = await service.verify_api_key(api_key) api_key_obj = await service.verify_api_key(api_key)
if api_key_obj: if api_key_obj:
print(f"\n Clé API valide\n") print("\n Clé API valide\n")
print(f" Nom: {api_key_obj.name}") print(f" Nom: {api_key_obj.name}")
print(f" ID: {api_key_obj.id}") print(f" ID: {api_key_obj.id}")
print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
print(f" Requêtes: {api_key_obj.total_requests}") print(f" Requêtes: {api_key_obj.total_requests}")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n")
else: else:
print(f"\n Clé API invalide, expirée ou révoquée\n") print("\n Clé API invalide, expirée ou révoquée\n")
# ============================================
# CLI PRINCIPAL
# ============================================
async def main(): async def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Gestion de la sécurité Sage Dataven API" description="Gestion de la sécurité Sage Dataven API"
) )
subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles")
# === SWAGGER === swagger_parser = subparsers.add_parser(
swagger_parser = subparsers.add_parser("swagger", help="Gestion utilisateurs Swagger") "swagger", help="Gestion utilisateurs Swagger"
)
swagger_subparsers = swagger_parser.add_subparsers(dest="action") swagger_subparsers = swagger_parser.add_subparsers(dest="action")
# swagger add
swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur")
swagger_add.add_argument("username", help="Nom d'utilisateur") swagger_add.add_argument("username", help="Nom d'utilisateur")
swagger_add.add_argument("password", help="Mot de passe") swagger_add.add_argument("password", help="Mot de passe")
swagger_add.add_argument("--full-name", help="Nom complet") swagger_add.add_argument("--full-name", help="Nom complet")
# swagger list
swagger_subparsers.add_parser("list", help="Lister les utilisateurs") swagger_subparsers.add_parser("list", help="Lister les utilisateurs")
# swagger delete swagger_delete = swagger_subparsers.add_parser(
swagger_delete = swagger_subparsers.add_parser("delete", help="Supprimer un utilisateur") "delete", help="Supprimer un utilisateur"
)
swagger_delete.add_argument("username", help="Nom d'utilisateur") swagger_delete.add_argument("username", help="Nom d'utilisateur")
# === API KEYS ===
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_subparsers = apikey_parser.add_subparsers(dest="action") apikey_subparsers = apikey_parser.add_subparsers(dest="action")
# apikey create
apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API")
apikey_create.add_argument("name", help="Nom de la clé") apikey_create.add_argument("name", help="Nom de la clé")
apikey_create.add_argument("--description", help="Description") apikey_create.add_argument("--description", help="Description")
apikey_create.add_argument("--days", type=int, default=365, help="Expiration en jours") apikey_create.add_argument(
apikey_create.add_argument("--rate-limit", type=int, default=60, help="Limite req/min") "--days", type=int, default=365, help="Expiration en jours"
)
apikey_create.add_argument(
"--rate-limit", type=int, default=60, help="Limite req/min"
)
apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
# apikey list
apikey_subparsers.add_parser("list", help="Lister les clés") apikey_subparsers.add_parser("list", help="Lister les clés")
# apikey revoke
apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé")
apikey_revoke.add_argument("key_id", help="ID de la clé") apikey_revoke.add_argument("key_id", help="ID de la clé")
# apikey verify
apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé")
apikey_verify.add_argument("api_key", help="Clé API à vérifier") apikey_verify.add_argument("api_key", help="Clé API à vérifier")
@ -255,7 +229,6 @@ async def main():
parser.print_help() parser.print_help()
return return
# Exécution des commandes
if args.command == "swagger": if args.command == "swagger":
if args.action == "add": if args.action == "add":
await add_swagger_user(args.username, args.password, args.full_name) await add_swagger_user(args.username, args.password, args.full_name)
@ -287,4 +260,5 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
from datetime import datetime from datetime import datetime
asyncio.run(main())
asyncio.run(main())

View file

@ -1,16 +1,7 @@
#!/usr/bin/env python3
"""
Script de test automatisé pour vérifier la sécurité de l'API
Usage:
python test_security.py --url http://votre-vps:8000 --swagger-user admin --swagger-pass password
"""
import requests import requests
import argparse import argparse
import sys import sys
from typing import Dict, Tuple from typing import Tuple
import json
class SecurityTester: class SecurityTester:
@ -20,7 +11,7 @@ class SecurityTester:
def log_test(self, name: str, passed: bool, details: str = ""): def log_test(self, name: str, passed: bool, details: str = ""):
"""Enregistrer le résultat d'un test""" """Enregistrer le résultat d'un test"""
status = " PASS" if passed else " FAIL" status = " PASS" if passed else " FAIL"
print(f"{status} - {name}") print(f"{status} - {name}")
if details: if details:
print(f" {details}") print(f" {details}")
@ -36,7 +27,7 @@ class SecurityTester:
def test_swagger_without_auth(self) -> bool: def test_swagger_without_auth(self) -> bool:
"""Test 1: Swagger UI devrait demander une authentification""" """Test 1: Swagger UI devrait demander une authentification"""
print("\n🔍 Test 1: Protection Swagger UI") print("\n Test 1: Protection Swagger UI")
try: try:
response = requests.get(f"{self.base_url}/docs", timeout=5) response = requests.get(f"{self.base_url}/docs", timeout=5)
@ -62,7 +53,7 @@ class SecurityTester:
def test_swagger_with_auth(self, username: str, password: str) -> bool: def test_swagger_with_auth(self, username: str, password: str) -> bool:
"""Test 2: Swagger UI accessible avec credentials valides""" """Test 2: Swagger UI accessible avec credentials valides"""
print("\n🔍 Test 2: Accès Swagger avec authentification") print("\n Test 2: Accès Swagger avec authentification")
try: try:
response = requests.get( response = requests.get(
@ -90,7 +81,7 @@ class SecurityTester:
def test_api_without_auth(self) -> bool: def test_api_without_auth(self) -> bool:
"""Test 3: Endpoints API devraient demander une authentification""" """Test 3: Endpoints API devraient demander une authentification"""
print("\n🔍 Test 3: Protection des endpoints API") print("\n Test 3: Protection des endpoints API")
test_endpoints = ["/api/v1/clients", "/api/v1/documents"] test_endpoints = ["/api/v1/clients", "/api/v1/documents"]
@ -100,15 +91,15 @@ class SecurityTester:
response = requests.get(f"{self.base_url}{endpoint}", timeout=5) response = requests.get(f"{self.base_url}{endpoint}", timeout=5)
if response.status_code == 401: if response.status_code == 401:
print(f" {endpoint} protégé (401)") print(f" {endpoint} protégé (401)")
else: else:
print( print(
f" {endpoint} accessible sans auth (code {response.status_code})" f" {endpoint} accessible sans auth (code {response.status_code})"
) )
all_protected = False all_protected = False
except Exception as e: except Exception as e:
print(f" ⚠️ {endpoint} erreur: {str(e)}") print(f" {endpoint} erreur: {str(e)}")
all_protected = False all_protected = False
self.log_test("Endpoints API protégés", all_protected) self.log_test("Endpoints API protégés", all_protected)
@ -116,7 +107,7 @@ class SecurityTester:
def test_health_endpoint_public(self) -> bool: def test_health_endpoint_public(self) -> bool:
"""Test 4: Endpoint /health devrait être accessible sans auth""" """Test 4: Endpoint /health devrait être accessible sans auth"""
print("\n🔍 Test 4: Endpoint /health public") print("\n Test 4: Endpoint /health public")
try: try:
response = requests.get(f"{self.base_url}/health", timeout=5) response = requests.get(f"{self.base_url}/health", timeout=5)
@ -138,10 +129,9 @@ class SecurityTester:
def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]: def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]:
"""Test 5: Créer une clé API via l'endpoint""" """Test 5: Créer une clé API via l'endpoint"""
print("\n🔍 Test 5: Création d'une clé API") print("\n Test 5: Création d'une clé API")
try: try:
# 1. Login pour obtenir un JWT
login_response = requests.post( login_response = requests.post(
f"{self.base_url}/api/v1/auth/login", f"{self.base_url}/api/v1/auth/login",
json={"email": username, "password": password}, json={"email": username, "password": password},
@ -158,7 +148,6 @@ class SecurityTester:
jwt_token = login_response.json().get("access_token") jwt_token = login_response.json().get("access_token")
# 2. Créer une clé API
create_response = requests.post( create_response = requests.post(
f"{self.base_url}/api/v1/api-keys", f"{self.base_url}/api/v1/api-keys",
headers={"Authorization": f"Bearer {jwt_token}"}, headers={"Authorization": f"Bearer {jwt_token}"},
@ -189,7 +178,7 @@ class SecurityTester:
def test_api_key_usage(self, api_key: str) -> bool: def test_api_key_usage(self, api_key: str) -> bool:
"""Test 6: Utiliser une clé API pour accéder à un endpoint""" """Test 6: Utiliser une clé API pour accéder à un endpoint"""
print("\n🔍 Test 6: Utilisation d'une clé API") print("\n Test 6: Utilisation d'une clé API")
if not api_key: if not api_key:
self.log_test("Utilisation clé API", False, "Pas de clé disponible") self.log_test("Utilisation clé API", False, "Pas de clé disponible")
@ -219,7 +208,7 @@ class SecurityTester:
def test_invalid_api_key(self) -> bool: def test_invalid_api_key(self) -> bool:
"""Test 7: Une clé invalide devrait être refusée""" """Test 7: Une clé invalide devrait être refusée"""
print("\n🔍 Test 7: Rejet de clé API invalide") print("\n Test 7: Rejet de clé API invalide")
invalid_key = "sdk_live_invalid_key_12345" invalid_key = "sdk_live_invalid_key_12345"
@ -247,13 +236,12 @@ class SecurityTester:
def test_rate_limiting(self, api_key: str) -> bool: def test_rate_limiting(self, api_key: str) -> bool:
"""Test 8: Rate limiting (optionnel, peut prendre du temps)""" """Test 8: Rate limiting (optionnel, peut prendre du temps)"""
print("\n🔍 Test 8: Rate limiting (test simple)") print("\n Test 8: Rate limiting (test simple)")
if not api_key: if not api_key:
self.log_test("Rate limiting", False, "Pas de clé disponible") self.log_test("Rate limiting", False, "Pas de clé disponible")
return False return False
# Envoyer 70 requêtes rapidement (limite = 60/min)
print(" Envoi de 70 requêtes rapides...") print(" Envoi de 70 requêtes rapides...")
rate_limited = False rate_limited = False
@ -267,7 +255,7 @@ class SecurityTester:
if response.status_code == 429: if response.status_code == 429:
rate_limited = True rate_limited = True
print(f" ⚠️ Rate limit atteint à la requête {i + 1}") print(f" Rate limit atteint à la requête {i + 1}")
break break
except Exception: except Exception:
@ -287,22 +275,22 @@ class SecurityTester:
def print_summary(self): def print_summary(self):
"""Afficher le résumé des tests""" """Afficher le résumé des tests"""
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("📊 RÉSUMÉ DES TESTS") print(" RÉSUMÉ DES TESTS")
print("=" * 60) print("=" * 60)
total = self.results["passed"] + self.results["failed"] total = self.results["passed"] + self.results["failed"]
success_rate = (self.results["passed"] / total * 100) if total > 0 else 0 success_rate = (self.results["passed"] / total * 100) if total > 0 else 0
print(f"\nTotal: {total} tests") print(f"\nTotal: {total} tests")
print(f" Réussis: {self.results['passed']}") print(f" Réussis: {self.results['passed']}")
print(f" Échoués: {self.results['failed']}") print(f" Échoués: {self.results['failed']}")
print(f"📈 Taux de réussite: {success_rate:.1f}%\n") print(f"Taux de réussite: {success_rate:.1f}%\n")
if self.results["failed"] == 0: if self.results["failed"] == 0:
print("🎉 Tous les tests sont passés ! Sécurité OK.") print("🎉 Tous les tests sont passés ! Sécurité OK.")
return 0 return 0
else: else:
print("⚠️ Certains tests ont échoué. Vérifiez la configuration.") print(" Certains tests ont échoué. Vérifiez la configuration.")
return 1 return 1
@ -333,18 +321,16 @@ def main():
args = parser.parse_args() args = parser.parse_args()
print("🚀 Démarrage des tests de sécurité") print(" Démarrage des tests de sécurité")
print(f"🎯 URL cible: {args.url}\n") print(f" URL cible: {args.url}\n")
tester = SecurityTester(args.url) tester = SecurityTester(args.url)
# Exécuter les tests
tester.test_swagger_without_auth() tester.test_swagger_without_auth()
tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass) tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass)
tester.test_api_without_auth() tester.test_api_without_auth()
tester.test_health_endpoint_public() tester.test_health_endpoint_public()
# Tests nécessitant une clé API
success, api_key = tester.test_api_key_creation( success, api_key = tester.test_api_key_creation(
args.swagger_user, args.swagger_pass args.swagger_user, args.swagger_pass
) )
@ -356,11 +342,10 @@ def main():
if not args.skip_rate_limit: if not args.skip_rate_limit:
tester.test_rate_limiting(api_key) tester.test_rate_limiting(api_key)
else: else:
print("\n⏭️ Test de rate limiting sauté") print("\n Test de rate limiting sauté")
else: else:
print("\n⚠️ Tests avec clé API sautés (création échouée)") print("\n Tests avec clé API sautés (création échouée)")
# Résumé
exit_code = tester.print_summary() exit_code = tester.print_summary()
sys.exit(exit_code) sys.exit(exit_code)

View file

@ -21,7 +21,6 @@ class ApiKeyService:
@staticmethod @staticmethod
def generate_api_key() -> str: def generate_api_key() -> str:
"""Génère une clé API unique et sécurisée""" """Génère une clé API unique et sécurisée"""
# Format: sdk_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
random_part = secrets.token_urlsafe(32) random_part = secrets.token_urlsafe(32)
return f"sdk_live_{random_part}" return f"sdk_live_{random_part}"
@ -33,7 +32,6 @@ class ApiKeyService:
@staticmethod @staticmethod
def get_key_prefix(api_key: str) -> str: def get_key_prefix(api_key: str) -> str:
"""Extrait le préfixe de la clé pour identification""" """Extrait le préfixe de la clé pour identification"""
# Retourne les 12 premiers caractères
return api_key[:12] if len(api_key) >= 12 else api_key return api_key[:12] if len(api_key) >= 12 else api_key
async def create_api_key( async def create_api_key(
@ -46,23 +44,14 @@ class ApiKeyService:
rate_limit_per_minute: int = 60, rate_limit_per_minute: int = 60,
allowed_endpoints: Optional[List[str]] = None, allowed_endpoints: Optional[List[str]] = None,
) -> tuple[ApiKey, str]: ) -> tuple[ApiKey, str]:
"""
Crée une nouvelle clé API
Returns:
tuple[ApiKey, str]: (objet ApiKey, clé en clair - à ne montrer qu'une fois)
"""
# Génération de la clé
api_key_plain = self.generate_api_key() api_key_plain = self.generate_api_key()
key_hash = self.hash_api_key(api_key_plain) key_hash = self.hash_api_key(api_key_plain)
key_prefix = self.get_key_prefix(api_key_plain) key_prefix = self.get_key_prefix(api_key_plain)
# Calcul de la date d'expiration
expires_at = None expires_at = None
if expires_in_days: if expires_in_days:
expires_at = datetime.now() + timedelta(days=expires_in_days) expires_at = datetime.now() + timedelta(days=expires_in_days)
# Création de l'objet
api_key_obj = ApiKey( api_key_obj = ApiKey(
key_hash=key_hash, key_hash=key_hash,
key_prefix=key_prefix, key_prefix=key_prefix,
@ -81,24 +70,18 @@ class ApiKeyService:
await self.session.commit() await self.session.commit()
await self.session.refresh(api_key_obj) await self.session.refresh(api_key_obj)
logger.info(f" Clé API créée: {name} (prefix: {key_prefix})") logger.info(f" Clé API créée: {name} (prefix: {key_prefix})")
return api_key_obj, api_key_plain return api_key_obj, api_key_plain
async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]: async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]:
"""
Vérifie une clé API et retourne l'objet si valide
Returns:
Optional[ApiKey]: L'objet ApiKey si valide, None sinon
"""
key_hash = self.hash_api_key(api_key_plain) key_hash = self.hash_api_key(api_key_plain)
result = await self.session.execute( result = await self.session.execute(
select(ApiKey).where( select(ApiKey).where(
and_( and_(
ApiKey.key_hash == key_hash, ApiKey.key_hash == key_hash,
ApiKey.is_active == True, ApiKey.is_active,
ApiKey.revoked_at.is_(None), ApiKey.revoked_at.is_(None),
or_( or_(
ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now() ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now()
@ -110,14 +93,13 @@ class ApiKeyService:
api_key_obj = result.scalar_one_or_none() api_key_obj = result.scalar_one_or_none()
if api_key_obj: if api_key_obj:
# Mise à jour des statistiques
api_key_obj.total_requests += 1 api_key_obj.total_requests += 1
api_key_obj.last_used_at = datetime.now() api_key_obj.last_used_at = datetime.now()
await self.session.commit() await self.session.commit()
logger.debug(f"🔑 Clé API validée: {api_key_obj.name}") logger.debug(f" Clé API validée: {api_key_obj.name}")
else: else:
logger.warning(f"⚠️ Clé API invalide ou expirée") logger.warning(" Clé API invalide ou expirée")
return api_key_obj return api_key_obj
@ -152,7 +134,7 @@ class ApiKeyService:
api_key_obj.revoked_at = datetime.now() api_key_obj.revoked_at = datetime.now()
await self.session.commit() await self.session.commit()
logger.info(f"🚫 Clé API révoquée: {api_key_obj.name}") logger.info(f" Clé API révoquée: {api_key_obj.name}")
return True return True
async def get_by_id(self, key_id: str) -> Optional[ApiKey]: async def get_by_id(self, key_id: str) -> Optional[ApiKey]:
@ -161,14 +143,6 @@ class ApiKeyService:
return result.scalar_one_or_none() return result.scalar_one_or_none()
async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]: async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]:
"""
Vérifie le rate limit d'une clé API (à implémenter avec Redis/cache)
Returns:
tuple[bool, Dict]: (is_allowed, info_dict)
"""
# TODO: Implémenter avec Redis pour un vrai rate limiting
# Pour l'instant, retourne toujours True
return True, { return True, {
"allowed": True, "allowed": True,
"limit": api_key_obj.rate_limit_per_minute, "limit": api_key_obj.rate_limit_per_minute,
@ -178,13 +152,11 @@ class ApiKeyService:
async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool: async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool:
"""Vérifie si la clé a accès à un endpoint spécifique""" """Vérifie si la clé a accès à un endpoint spécifique"""
if not api_key_obj.allowed_endpoints: if not api_key_obj.allowed_endpoints:
# Si aucune restriction, accès total
return True return True
try: try:
allowed = json.loads(api_key_obj.allowed_endpoints) allowed = json.loads(api_key_obj.allowed_endpoints)
# Support des wildcards
for pattern in allowed: for pattern in allowed:
if pattern == "*": if pattern == "*":
return True return True
@ -197,7 +169,7 @@ class ApiKeyService:
return False return False
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}") logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}")
return False return False

View file

@ -23,7 +23,7 @@ class UniversignDocumentService:
def fetch_transaction_documents(self, transaction_id: str) -> Optional[List[Dict]]: def fetch_transaction_documents(self, transaction_id: str) -> Optional[List[Dict]]:
try: try:
logger.info(f"📋 Récupération documents pour transaction: {transaction_id}") logger.info(f" Récupération documents pour transaction: {transaction_id}")
response = requests.get( response = requests.get(
f"{self.api_url}/transactions/{transaction_id}", f"{self.api_url}/transactions/{transaction_id}",

View file

@ -344,7 +344,7 @@ class UniversignSyncService:
universign_data = result["transaction"] universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft") universign_status_raw = universign_data.get("state", "draft")
logger.info(f"📊 Statut Universign brut: {universign_status_raw}") logger.info(f" Statut Universign brut: {universign_status_raw}")
new_local_status = map_universign_to_local(universign_status_raw) new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value previous_local_status = transaction.local_status.value
@ -367,7 +367,7 @@ class UniversignSyncService:
if status_changed: if status_changed:
logger.info( logger.info(
f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}" f"CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}"
) )
try: try:
@ -392,7 +392,7 @@ class UniversignSyncService:
if new_local_status == "EN_COURS" and not transaction.sent_at: if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now() transaction.sent_at = datetime.now()
logger.info("📅 Date d'envoi mise à jour") logger.info("Date d'envoi mise à jour")
if new_local_status == "SIGNE" and not transaction.signed_at: if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now() transaction.signed_at = datetime.now()
@ -404,8 +404,7 @@ class UniversignSyncService:
if new_local_status == "EXPIRE" and not transaction.expired_at: if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now() transaction.expired_at = datetime.now()
logger.info("⏰ Date d'expiration mise à jour") logger.info("Date d'expiration mise à jour")
documents = universign_data.get("documents", []) documents = universign_data.get("documents", [])
if documents: if documents:
@ -432,9 +431,7 @@ class UniversignSyncService:
logger.warning(f"Échec téléchargement: {download_error}") logger.warning(f"Échec téléchargement: {download_error}")
except Exception as e: except Exception as e:
logger.error( logger.error(f" Erreur téléchargement document: {e}", exc_info=True)
f" Erreur téléchargement document: {e}", exc_info=True
)
await self._sync_signers(session, transaction, universign_data) await self._sync_signers(session, transaction, universign_data)
@ -529,9 +526,7 @@ class UniversignSyncService:
logger.warning(f"Échec téléchargement: {download_error}") logger.warning(f"Échec téléchargement: {download_error}")
except Exception as e: except Exception as e:
logger.error( logger.error(f" Erreur téléchargement document: {e}", exc_info=True)
f" Erreur téléchargement document: {e}", exc_info=True
)
else: else:
logger.debug( logger.debug(
f"Document déjà téléchargé: {transaction.signed_document_path}" f"Document déjà téléchargé: {transaction.signed_document_path}"

View file

@ -425,7 +425,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"REFUSE": { "REFUSE": {
"fr": "Signature refusée", "fr": "Signature refusée",
"en": "Signature refused", "en": "Signature refused",
"icon": "", "icon": "",
"color": "red", "color": "red",
}, },
"EXPIRE": { "EXPIRE": {
@ -437,7 +437,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"ERREUR": { "ERREUR": {
"fr": "Erreur technique", "fr": "Erreur technique",
"en": "Technical error", "en": "Technical error",
"icon": "⚠️", "icon": "",
"color": "red", "color": "red",
}, },
} }

View file

@ -96,7 +96,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"REFUSE": { "REFUSE": {
"fr": "Signature refusée", "fr": "Signature refusée",
"en": "Signature refused", "en": "Signature refused",
"icon": "", "icon": "",
"color": "red", "color": "red",
}, },
"EXPIRE": { "EXPIRE": {