From 67ef83c4e332f4c40983b879581e08df94c01575 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 20 Jan 2026 16:01:54 +0300 Subject: [PATCH] refactor(security): improve authentication logging and endpoint checks --- middleware/security.py | 99 +++++++++++++++++++++----------------- scripts/manage_security.py | 24 ++++----- services/api_key.py | 34 ++++++++++--- 3 files changed, 92 insertions(+), 65 deletions(-) diff --git a/middleware/security.py b/middleware/security.py index 8b2d90a..e9ff831 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -1,11 +1,10 @@ - from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import ASGIApp from sqlalchemy import select -from typing import Optional, Callable +from typing import Callable from datetime import datetime import logging import base64 @@ -16,7 +15,6 @@ security = HTTPBasic() class SwaggerAuthMiddleware: - PROTECTED_PATHS = ["/docs", "/redoc", "/openapi.json"] def __init__(self, app: ASGIApp): @@ -111,46 +109,11 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): "/redoc", "/openapi.json", "/", - "/auth/login", - "/auth/register", - "/auth/verify-email", - "/auth/reset-password", - "/auth/request-reset", - "/auth/refresh", + "/health", + "/auth", + "/api-keys/verify", ] - async def dispatch(self, request: Request, call_next: Callable): - path = request.url.path - method = request.method - - if self._is_excluded_path(path): - return await call_next(request) - - auth_header = request.headers.get("Authorization") - has_jwt = auth_header and auth_header.startswith("Bearer ") - - api_key = request.headers.get("X-API-Key") - has_api_key = bool(api_key) - - if has_jwt: - logger.debug(f"JWT détecté pour {method} {path}") - return await call_next(request) - - if has_api_key: - logger.debug(f"API Key détectée pour {method} {path}") - return await self._handle_api_key_auth( - request, api_key, path, method, call_next - ) - - return JSONResponse( - status_code=status.HTTP_401_UNAUTHORIZED, - content={ - "detail": "Authentification requise", - "hint": "Utilisez 'X-API-Key: sdk_live_xxx' ou 'Authorization: Bearer '", - }, - headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'}, - ) - def _is_excluded_path(self, path: str) -> bool: """Vérifie si le chemin est exclu de l'authentification""" if path == "/": @@ -164,6 +127,41 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): return False + async def dispatch(self, request: Request, call_next: Callable): + path = request.url.path + method = request.method + + if self._is_excluded_path(path): + logger.debug(f" Route publique: {method} {path}") + return await call_next(request) + + auth_header = request.headers.get("Authorization") + has_jwt = auth_header and auth_header.startswith("Bearer ") + + api_key = request.headers.get("X-API-Key") + has_api_key = bool(api_key) + + if has_jwt: + logger.debug(f" JWT détecté pour {method} {path}") + return await call_next(request) + + if has_api_key: + logger.debug(f" API Key détectée pour {method} {path}") + return await self._handle_api_key_auth( + request, api_key, path, method, call_next + ) + + logger.warning(f" Aucune authentification pour {method} {path}") + return JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Authentification requise", + "hint": "Utilisez 'X-API-Key: sdk_live_xxx' ou 'Authorization: Bearer '", + "path": path, + }, + headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'}, + ) + async def _handle_api_key_auth( self, request: Request, @@ -179,9 +177,11 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): async with async_session_factory() as session: service = ApiKeyService(session) + api_key_obj = await service.verify_api_key(api_key) if not api_key_obj: + logger.warning(f" Clé API invalide pour {method} {path}") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -192,6 +192,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): is_allowed, rate_info = await service.check_rate_limit(api_key_obj) if not is_allowed: + logger.warning(f"⚠️ Rate limit dépassé: {api_key_obj.name}") return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Rate limit dépassé"}, @@ -203,24 +204,32 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): has_access = await service.check_endpoint_access(api_key_obj, path) if not has_access: + logger.warning( + f"Accès refusé: {api_key_obj.name} → {method} {path}" + ) return JSONResponse( status_code=status.HTTP_403_FORBIDDEN, content={ "detail": "Accès non autorisé à cet endpoint", "endpoint": path, - "api_key": api_key_obj.key_prefix + "...", + "api_key_name": api_key_obj.name, + "allowed_endpoints": ( + api_key_obj.allowed_endpoints + if api_key_obj.allowed_endpoints + else "Tous" + ), }, ) request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" - logger.info(f"✓ API Key valide: {api_key_obj.name} → {method} {path}") + logger.info(f"✅ API Key valide: {api_key_obj.name} → {method} {path}") return await call_next(request) except Exception as e: - logger.error(f"Erreur validation API Key: {e}", exc_info=True) + logger.error(f" Erreur validation API Key: {e}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Erreur interne lors de la validation"}, @@ -243,7 +252,7 @@ def get_auth_method(request: Request) -> str: __all__ = [ "SwaggerAuthMiddleware", "ApiKeyMiddlewareHTTP", - "ApiKeyMiddleware", # Alias + "ApiKeyMiddleware", "get_api_key_from_request", "get_auth_method", ] diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 59f7ee2..6c5ac01 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -31,9 +31,9 @@ print("\nDEBUG: Vérification des imports...") for module in _test_imports: try: __import__(module) - print(f" ✅ {module}") + print(f" {module}") except ImportError as e: - print(f" ❌ {module}: {e}") + print(f" {module}: {e}") import asyncio import argparse @@ -49,7 +49,7 @@ try: from services.api_key import ApiKeyService from security.auth import hash_password except ImportError as e: - print(f"\n❌ ERREUR D'IMPORT: {e}") + print(f"\n ERREUR D'IMPORT: {e}") print(f" Vérifiez que vous êtes dans /app") print(f" Commande correcte: cd /app && python scripts/manage_security.py ...") sys.exit(1) @@ -67,7 +67,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): existing = result.scalar_one_or_none() if existing: - logger.error(f"❌ L'utilisateur '{username}' existe déjà") + logger.error(f" L'utilisateur '{username}' existe déjà") return swagger_user = SwaggerUser( @@ -80,7 +80,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): session.add(swagger_user) await session.commit() - logger.info(f"✅ Utilisateur Swagger créé: {username}") + logger.info(f" Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") @@ -96,7 +96,7 @@ async def list_swagger_users(): logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = "✅" if user.is_active else "❌" + status = "" if user.is_active else "" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") @@ -112,7 +112,7 @@ async def delete_swagger_user(username: str): user = result.scalar_one_or_none() if not user: - logger.error(f"❌ Utilisateur '{username}' introuvable") + logger.error(f" Utilisateur '{username}' introuvable") return await session.delete(user) @@ -182,7 +182,7 @@ async def list_api_keys(): is_valid = key.is_active and ( not key.expires_at or key.expires_at > datetime.now() ) - status = "✅" if is_valid else "❌" + status = "" if is_valid else "" logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") logger.info(f" ID: {key.id}") @@ -214,7 +214,7 @@ async def revoke_api_key(key_id: str): key = result.scalar_one_or_none() if not key: - logger.error(f"❌ Clé API '{key_id}' introuvable") + logger.error(f" Clé API '{key_id}' introuvable") return key.is_active = False @@ -232,11 +232,11 @@ async def verify_api_key(api_key: str): key = await service.verify_api_key(api_key) if not key: - logger.error("❌ Clé API invalide ou expirée") + logger.error(" Clé API invalide ou expirée") return logger.info("=" * 60) - logger.info("✅ Clé API valide") + logger.info(" Clé API valide") logger.info("=" * 60) logger.info(f" Nom: {key.name}") logger.info(f" ID: {key.id}") @@ -346,7 +346,7 @@ if __name__ == "__main__": print("\nℹ️ Interrupted") sys.exit(0) except Exception as e: - logger.error(f"❌ Erreur: {e}") + logger.error(f" Erreur: {e}") import traceback traceback.print_exc() diff --git a/services/api_key.py b/services/api_key.py index ad3cf6f..04e271e 100644 --- a/services/api_key.py +++ b/services/api_key.py @@ -134,7 +134,7 @@ class ApiKeyService: api_key_obj.revoked_at = datetime.now() await self.session.commit() - logger.info(f" Clé API révoquée: {api_key_obj.name}") + logger.info(f"🗑️ Clé API révoquée: {api_key_obj.name}") return True async def get_by_id(self, key_id: str) -> Optional[ApiKey]: @@ -150,24 +150,42 @@ class ApiKeyService: } async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool: - """Vérifie si la clé a accès à un endpoint spécifique""" if not api_key_obj.allowed_endpoints: + logger.debug( + f"🔓 API Key {api_key_obj.name}: Aucune restriction d'endpoint" + ) return True try: allowed = json.loads(api_key_obj.allowed_endpoints) + if "*" in allowed or "/*" in allowed: + logger.debug(f"🔓 API Key {api_key_obj.name}: Accès global autorisé") + return True + for pattern in allowed: - if pattern == "*": - return True - if pattern.endswith("*"): - prefix = pattern[:-1] - if endpoint.startswith(prefix): - return True if pattern == endpoint: + logger.debug(f" Match exact: {pattern} == {endpoint}") return True + if pattern.endswith("/*"): + base = pattern[:-2] # "/clients/*" → "/clients" + if endpoint == base or endpoint.startswith(base + "/"): + logger.debug(f" Match wildcard: {pattern} ↔ {endpoint}") + return True + + elif pattern.endswith("*"): + base = pattern[:-1] # "/clients*" → "/clients" + if endpoint.startswith(base): + logger.debug(f" Match prefix: {pattern} ↔ {endpoint}") + return True + + logger.warning( + f" API Key {api_key_obj.name}: Accès refusé à {endpoint}\n" + f" Endpoints autorisés: {allowed}" + ) return False + except json.JSONDecodeError: logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}") return False