Sage100-vps/middleware/security.py
2026-01-20 13:51:09 +03:00

243 lines
8.6 KiB
Python

from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from sqlalchemy import select
from typing import Optional
from datetime import datetime
import logging
from database import get_session
from database.models.api_key import SwaggerUser
from security.auth import verify_password
logger = logging.getLogger(__name__)
security = HTTPBasic()
async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
username = credentials.username
password = credentials.password
try:
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
swagger_user = result.scalar_one_or_none()
if swagger_user and swagger_user.is_active:
if verify_password(password, swagger_user.hashed_password):
swagger_user.last_login = datetime.now()
await session.commit()
logger.info(f" Accès Swagger autorisé (DB): {username}")
return True
logger.warning(f"Tentative d'accès Swagger refusée: {username}")
return False
except Exception as e:
logger.error(f" Erreur vérification Swagger credentials: {e}")
return False
class SwaggerAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive=receive)
path = request.url.path
protected_paths = ["/docs", "/redoc"]
if any(path.startswith(protected_path) for protected_path in protected_paths):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise pour accéder à la documentation"
},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
try:
import base64
encoded_credentials = auth_header.split(" ")[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode(
"utf-8"
)
username, password = decoded_credentials.split(":", 1)
credentials = HTTPBasicCredentials(username=username, password=password)
if not await verify_swagger_credentials(credentials):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
except Exception as e:
logger.error(f" Erreur parsing auth header: {e}")
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Format d'authentification invalide"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
await self.app(scope, receive, send)
class ApiKeyMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive=receive)
path = request.url.path
public_exact_paths = [
"/",
"/health",
"/docs",
"/redoc",
"/openapi.json",
]
public_path_prefixes = [
"/api/v1/auth/",
]
is_public = path in public_exact_paths or any(
path.startswith(prefix) for prefix in public_path_prefixes
)
if is_public:
logger.debug(f"Chemin public: {path}")
await self.app(scope, receive, send)
return
auth_header = request.headers.get("Authorization")
has_jwt = auth_header and auth_header.startswith("Bearer ")
api_key = request.headers.get("X-API-Key")
has_api_key = api_key is not None
if has_jwt:
logger.debug(f"🔑 JWT détecté pour {path}")
await self.app(scope, receive, send)
return
elif has_api_key:
logger.debug(f"🔑 API Key détectée pour {path}")
from services.api_key import ApiKeyService
try:
async for session in get_session():
api_key_service = ApiKeyService(session)
api_key_obj = await api_key_service.verify_api_key(api_key)
if not api_key_obj:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Clé API invalide ou expirée",
"hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer <jwt>",
},
)
await response(scope, receive, send)
return
is_allowed, rate_info = await api_key_service.check_rate_limit(
api_key_obj
)
if not is_allowed:
response = JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"},
headers={
"X-RateLimit-Limit": str(rate_info["limit"]),
"X-RateLimit-Remaining": "0",
},
)
await response(scope, receive, send)
return
has_access = await api_key_service.check_endpoint_access(
api_key_obj, path
)
if not has_access:
response = JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"detail": "Accès non autorisé à cet endpoint",
"endpoint": path,
"api_key": api_key_obj.key_prefix + "...",
},
)
await response(scope, receive, send)
return
request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key"
logger.info(f" API Key valide: {api_key_obj.name}{path}")
await self.app(scope, receive, send)
return
except Exception as e:
logger.error(f" Erreur validation API Key: {e}", exc_info=True)
response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "Erreur interne lors de la validation de la clé"
},
)
await response(scope, receive, send)
return
else:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise (JWT ou API Key)",
"hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer <jwt>'",
"endpoint": path,
},
headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'},
)
await response(scope, receive, send)
return
def get_api_key_from_request(request: Request) -> Optional:
"""Récupère l'objet ApiKey depuis la requête si présent"""
return getattr(request.state, "api_key", None)
def get_auth_method(request: Request) -> str:
return getattr(request.state, "authenticated_via", "none")