Compare commits

...

3 commits

Author SHA1 Message Date
Fanilo-Nantenaina
1c6c45465f fix(security): improve api key and jwt validation handling 2026-01-21 14:00:57 +03:00
Fanilo-Nantenaina
574d82f3c4 fix(security): improve auth handling and openapi schema generation 2026-01-21 13:50:44 +03:00
Fanilo-Nantenaina
a6a623d1ab refactor(api): simplify auth scheme handling and improve schema filtering 2026-01-21 13:41:37 +03:00
2 changed files with 85 additions and 39 deletions

86
api.py
View file

@ -278,18 +278,21 @@ def get_auth_schemes_for_user(swagger_user: dict) -> dict:
allowed_tags = swagger_user.get("allowed_tags") allowed_tags = swagger_user.get("allowed_tags")
if not allowed_tags: if not allowed_tags:
# Admin complet
return { return {
"HTTPBearer": { "HTTPBearer": {
"type": "http", "type": "http",
"scheme": "bearer", "scheme": "bearer",
"bearerFormat": "JWT", "bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)", "description": "🎫 Authentification JWT pour utilisateurs (POST /auth/login). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
}, },
"ApiKeyAuth": { "ApiKeyAuth": {
"type": "apiKey", "type": "apiKey",
"in": "header", "in": "header",
"name": "X-API-Key", "name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)", "description": " Clé API pour intégrations externes (format: sdk_live_xxx). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
}, },
} }
@ -300,23 +303,16 @@ def get_auth_schemes_for_user(swagger_user: dict) -> dict:
"type": "http", "type": "http",
"scheme": "bearer", "scheme": "bearer",
"bearerFormat": "JWT", "bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)", "description": "🎫 Authentification JWT pour utilisateurs (POST /auth/login). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
} }
if "API Keys Management" in allowed_tags or len(allowed_tags) > 3:
schemes["ApiKeyAuth"] = { schemes["ApiKeyAuth"] = {
"type": "apiKey", "type": "apiKey",
"in": "header", "in": "header",
"name": "X-API-Key", "name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)", "description": " Clé API pour intégrations externes (format: sdk_live_xxx). "
} "Utilisez SOIT JWT SOIT API Key, pas les deux.",
if not schemes:
schemes["HTTPBearer"] = {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification requise",
} }
return schemes return schemes
@ -352,16 +348,10 @@ def generate_filtered_openapi_schema(
base_schema["components"]["securitySchemes"] = auth_schemes base_schema["components"]["securitySchemes"] = auth_schemes
security_requirements = [] base_schema["security"] = []
if "HTTPBearer" in auth_schemes:
security_requirements.append({"HTTPBearer": []})
if "ApiKeyAuth" in auth_schemes:
security_requirements.append({"ApiKeyAuth": []})
base_schema["security"] = security_requirements if security_requirements else []
if not allowed_tags: if not allowed_tags:
logger.info(" Schéma OpenAPI complet (admin)") logger.info("⚙️ Schéma OpenAPI complet (admin)")
return base_schema return base_schema
filtered_paths = {} filtered_paths = {}
@ -384,6 +374,13 @@ def generate_filtered_openapi_schema(
operation_tags = operation.get("tags", []) operation_tags = operation.get("tags", [])
if any(tag in allowed_tags for tag in operation_tags): if any(tag in allowed_tags for tag in operation_tags):
operation_security = []
if "HTTPBearer" in auth_schemes:
operation_security.append({"HTTPBearer": []})
if "ApiKeyAuth" in auth_schemes:
operation_security.append({"ApiKeyAuth": []})
operation["security"] = operation_security
filtered_operations[method] = operation filtered_operations[method] = operation
if filtered_operations: if filtered_operations:
@ -400,16 +397,47 @@ def generate_filtered_openapi_schema(
if "components" in base_schema and "schemas" in base_schema["components"]: if "components" in base_schema and "schemas" in base_schema["components"]:
all_schemas = base_schema["components"]["schemas"] all_schemas = base_schema["components"]["schemas"]
filtered_schemas = get_schemas_for_tags(allowed_tags, all_schemas)
referenced_schemas = set()
def extract_schema_refs(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "$ref" and isinstance(value, str):
schema_name = value.split("/")[-1]
referenced_schemas.add(schema_name)
else:
extract_schema_refs(value)
elif isinstance(obj, list):
for item in obj:
extract_schema_refs(item)
extract_schema_refs(filtered_paths)
def add_dependencies(schema_name):
if schema_name not in all_schemas:
return
schema_def = all_schemas[schema_name]
extract_schema_refs(schema_def)
initial_schemas = referenced_schemas.copy()
for schema_name in initial_schemas:
add_dependencies(schema_name)
filtered_schemas = {}
for schema_name in referenced_schemas:
if schema_name in all_schemas:
filtered_schemas[schema_name] = all_schemas[schema_name]
base_schema["components"]["schemas"] = filtered_schemas base_schema["components"]["schemas"] = filtered_schemas
logger.info( logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, " f"🔍 Schéma filtré: {len(filtered_paths)} paths, "
f"{len(filtered_schemas)} schémas, tags: {allowed_tags}" f"{len(filtered_schemas)} schémas, tags: {allowed_tags}"
) )
else: else:
logger.info( logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}" f"🔍 Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}"
) )
return base_schema return base_schema
@ -429,10 +457,14 @@ async def custom_openapi_endpoint(request: Request):
username = swagger_user.get("username", "unknown") username = swagger_user.get("username", "unknown")
allowed_tags = swagger_user.get("allowed_tags") allowed_tags = swagger_user.get("allowed_tags")
logger.info(f" OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}") logger.info(f"📖 OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}")
schema = generate_filtered_openapi_schema(app, allowed_tags, swagger_user) schema = generate_filtered_openapi_schema(app, allowed_tags, swagger_user)
if request.url.scheme == "https":
if "servers" not in schema or not schema["servers"]:
schema["servers"] = [{"url": str(request.base_url).rstrip("/")}]
return JSONResponse(content=schema) return JSONResponse(content=schema)
@ -456,7 +488,9 @@ async def custom_swagger_ui(request: Request):
"displayRequestDuration": True, "displayRequestDuration": True,
"filter": True, "filter": True,
"tryItOutEnabled": True, "tryItOutEnabled": True,
"docExpansion": "list", # Meilleure UX "docExpansion": "list",
# CORRECTIF : Ne pas pré-remplir les credentials
"preAuthorizeApiKey": False,
}, },
) )

View file

@ -158,15 +158,18 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
auth_header = request.headers.get("Authorization") auth_header = request.headers.get("Authorization")
api_key_header = request.headers.get("X-API-Key") api_key_header = request.headers.get("X-API-Key")
# CORRECTIF : Nettoyer et valider la clé API
if api_key_header: if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}") api_key_header = api_key_header.strip()
return await self._handle_api_key_auth( # Si la clé est vide ou juste des espaces, la considérer comme absente
request, api_key_header, path, method, call_next if not api_key_header or api_key_header == "":
) api_key_header = None
# Vérifier si c'est un token JWT Bearer
if auth_header and auth_header.startswith("Bearer "): if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1] token = auth_header.split(" ", 1)[1].strip()
# CORRECTIF : Vérifier si c'est une API Key envoyée par erreur dans Authorization
if token.startswith("sdk_live_"): if token.startswith("sdk_live_"):
logger.warning( logger.warning(
"⚠️ API Key envoyée dans Authorization au lieu de X-API-Key" "⚠️ API Key envoyée dans Authorization au lieu de X-API-Key"
@ -175,11 +178,20 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
request, token, path, method, call_next request, token, path, method, call_next
) )
# C'est un JWT valide, déléguer à FastAPI
logger.debug(f"🎫 JWT détecté pour {method} {path} → délégation à FastAPI") logger.debug(f"🎫 JWT détecté pour {method} {path} → délégation à FastAPI")
request.state.authenticated_via = "jwt" request.state.authenticated_via = "jwt"
return await call_next(request) return await call_next(request)
logger.debug(f"❓ Aucune auth pour {method} {path} → délégation à FastAPI") # CORRECTIF : Si une clé API est présente ET non vide, la traiter
if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}")
return await self._handle_api_key_auth(
request, api_key_header, path, method, call_next
)
# Aucune authentification fournie, déléguer à FastAPI qui renverra 401
logger.debug(f"❌ Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request) return await call_next(request)
async def _handle_api_key_auth( async def _handle_api_key_auth(
@ -200,7 +212,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
api_key_obj = await service.verify_api_key(api_key) api_key_obj = await service.verify_api_key(api_key)
if not api_key_obj: if not api_key_obj:
logger.warning(f" Clé API invalide: {method} {path}") logger.warning(f"🔒 Clé API invalide: {method} {path}")
return JSONResponse( return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={ content={
@ -250,7 +262,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
request.state.api_key = api_key_obj request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key" request.state.authenticated_via = "api_key"
logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name}{method} {path}") logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name}{method} {path}")
return await call_next(request) return await call_next(request)