From 92597a1143a6be459ddc2506346f6ba4383f2633 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Wed, 21 Jan 2026 12:05:06 +0300 Subject: [PATCH 1/4] feat(api): add tag-based OpenAPI schema filtering for Swagger users --- api.py | 95 +++++++++++++++++++++++++++++++++---- database/models/api_key.py | 17 +++++++ scripts/manage_security.py | 97 +++++++++++++++++++++++++------------- 3 files changed, 166 insertions(+), 43 deletions(-) diff --git a/api.py b/api.py index 7eb5984..67dd846 100644 --- a/api.py +++ b/api.py @@ -1,4 +1,5 @@ -from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body +from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body, Request +from fastapi.responses import JSONResponse from fastapi.openapi.utils import get_openapi from fastapi.responses import StreamingResponse, HTMLResponse, Response from fastapi.encoders import jsonable_encoder @@ -175,18 +176,17 @@ app = FastAPI( ) """ -def custom_openapi(): - if app.openapi_schema: - return app.openapi_schema +def generate_filtered_openapi(app: FastAPI, allowed_tags: Optional[List[str]] = None): + """Génère le schéma OpenAPI filtré selon les tags autorisés""" - openapi_schema = get_openapi( + base_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) - openapi_schema["components"]["securitySchemes"] = { + base_schema["components"]["securitySchemes"] = { "HTTPBearer": { "type": "http", "scheme": "bearer", @@ -201,13 +201,68 @@ def custom_openapi(): }, } - openapi_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}] + base_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}] - app.openapi_schema = openapi_schema - return app.openapi_schema + if not allowed_tags: + return base_schema + + filtered_paths = {} + + for path, path_item in base_schema.get("paths", {}).items(): + for method, operation in path_item.items(): + if method in ["get", "post", "put", "delete", "patch", "options"]: + operation_tags = operation.get("tags", []) + + if any(tag in allowed_tags for tag in operation_tags): + if path not in filtered_paths: + filtered_paths[path] = {} + filtered_paths[path][method] = operation + + base_schema["paths"] = filtered_paths + + if "tags" in base_schema: + base_schema["tags"] = [ + tag_obj + for tag_obj in base_schema["tags"] + if tag_obj.get("name") in allowed_tags + ] + + return base_schema -app.openapi = custom_openapi +async def get_swagger_user_from_request(request: Request) -> Optional[dict]: + """Récupère l'utilisateur Swagger depuis la requête authentifiée""" + auth_header = request.headers.get("Authorization") + + if not auth_header or not auth_header.startswith("Basic "): + return None + + import base64 + from fastapi.security import HTTPBasicCredentials + from database.db_config import async_session_factory + from database.models.api_key import SwaggerUser + from sqlalchemy import select + + try: + encoded_credentials = auth_header.split(" ")[1] + decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") + username, _ = decoded_credentials.split(":", 1) + + async with async_session_factory() as session: + result = await session.execute( + select(SwaggerUser).where(SwaggerUser.username == username) + ) + swagger_user = result.scalar_one_or_none() + + if swagger_user and swagger_user.is_active: + return { + "username": swagger_user.username, + "allowed_tags": swagger_user.allowed_tags_list, + } + except Exception as e: + logger.error(f"Erreur récupération utilisateur Swagger: {e}") + + return None setup_cors(app, mode="open") @@ -221,6 +276,26 @@ app.include_router(universign_router) app.include_router(entreprises_router) +@app.get("/openapi.json", include_in_schema=False) +async def get_openapi_filtered(request: Request): + """Retourne le schéma OpenAPI filtré selon l'utilisateur""" + + swagger_user = await get_swagger_user_from_request(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + allowed_tags = swagger_user.get("allowed_tags") + + schema = generate_filtered_openapi(app, allowed_tags) + + return JSONResponse(content=schema) + + @app.get("/clients", response_model=List[ClientDetails], tags=["Clients"]) async def obtenir_clients( query: Optional[str] = Query(None), diff --git a/database/models/api_key.py b/database/models/api_key.py index 0d246ab..b4ffbb5 100644 --- a/database/models/api_key.py +++ b/database/models/api_key.py @@ -1,4 +1,6 @@ from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text +from typing import Optional, List +import json from datetime import datetime import uuid @@ -49,8 +51,23 @@ class SwaggerUser(Base): is_active = Column(Boolean, default=True, nullable=False) + allowed_tags = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.now, nullable=False) last_login = Column(DateTime, nullable=True) + @property + def allowed_tags_list(self) -> Optional[List[str]]: + if self.allowed_tags: + try: + return json.loads(self.allowed_tags) + except json.JSONDecodeError: + return None + return None + + @allowed_tags_list.setter + def allowed_tags_list(self, tags: Optional[List[str]]): + self.allowed_tags = json.dumps(tags) if tags is not None else None + def __repr__(self): return f"" diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 1e5cab9..c814dc1 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,6 +1,13 @@ import sys import os from pathlib import Path +import asyncio +import argparse +import logging +from datetime import datetime +from typing import Optional, List +import json +from sqlalchemy import select _current_file = Path(__file__).resolve() _script_dir = _current_file.parent @@ -35,30 +42,28 @@ for module in _test_imports: except ImportError as e: print(f" {module}: {e}") -import asyncio -import argparse -import logging -from datetime import datetime - -from sqlalchemy import select try: from database.db_config import async_session_factory - from database.models.user import User from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password except ImportError as e: print(f"\n ERREUR D'IMPORT: {e}") - print(f" Vérifiez que vous êtes dans /app") - print(f" Commande correcte: cd /app && python scripts/manage_security.py ...") + print(" Vérifiez que vous êtes dans /app") + print(" Commande correcte: cd /app && python scripts/manage_security.py ...") sys.exit(1) logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) -async def add_swagger_user(username: str, password: str, full_name: str = None): +async def add_swagger_user( + username: str, + password: str, + full_name: str = None, + tags: Optional[List[str]] = None, +): """Ajouter un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( @@ -75,6 +80,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): hashed_password=hash_password(password), full_name=full_name or username, is_active=True, + allowed_tags=json.dumps(tags) if tags else None, ) session.add(swagger_user) @@ -96,14 +102,28 @@ async def list_swagger_users(): logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = "" if user.is_active else "" + status = "ACTIF" if user.is_active else "NON ACTIF" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") - logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n") + logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}") + + if user.allowed_tags: + try: + tags = json.loads(user.allowed_tags) + if tags: + logger.info(f" Tags autorisés: {', '.join(tags)}") + else: + logger.info(" Tags autorisés: Tous (admin)") + except json.JSONDecodeError: + logger.info(" Tags: Erreur format") + else: + logger.info(" Tags autorisés: Tous (admin)") + + logger.info("") -async def delete_swagger_user(username: str): +async def delete_swagger_user(username: str, tags: Optional[List[str]] = None): """Supprimer un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( @@ -117,7 +137,7 @@ async def delete_swagger_user(username: str): await session.delete(user) await session.commit() - logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") + logger.info("🗑️ Utilisateur Swagger supprimé: {}".format(username)) async def create_api_key( @@ -143,21 +163,23 @@ async def create_api_key( logger.info("=" * 70) logger.info("🔑 Clé API créée avec succès") logger.info("=" * 70) - logger.info(f" ID: {api_key_obj.id}") - logger.info(f" Nom: {api_key_obj.name}") - logger.info(f" Clé: {api_key_plain}") - logger.info(f" Préfixe: {api_key_obj.key_prefix}") - logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") - logger.info(f" Expire le: {api_key_obj.expires_at}") + logger.info(" ID: {}".format(api_key_obj.id)) + logger.info(" Nom: {}".format(api_key_obj.name)) + logger.info(" Clé: {}".format(api_key_plain)) + logger.info(" Préfixe: {}".format(api_key_obj.key_prefix)) + logger.info( + " Rate limit: {} req/min".format(api_key_obj.rate_limit_per_minute) + ) + logger.info(" Expire le: {}".format(api_key_obj.expires_at)) if api_key_obj.allowed_endpoints: import json try: endpoints_list = json.loads(api_key_obj.allowed_endpoints) - logger.info(f" Endpoints: {', '.join(endpoints_list)}") - except: - logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}") + logger.info(" Endpoints: {}".format(", ".join(endpoints_list))) + except Exception: + logger.info(" Endpoints: {}".format(api_key_obj.allowed_endpoints)) else: logger.info(" Endpoints: Tous (aucune restriction)") @@ -200,7 +222,7 @@ async def list_api_keys(): if len(endpoints) > 4: display += f"... (+{len(endpoints) - 4})" logger.info(f" Endpoints: {display}") - except: + except Exception: pass else: logger.info(" Endpoints: Tous") @@ -250,7 +272,7 @@ async def verify_api_key(api_key: str): try: endpoints = json.loads(key.allowed_endpoints) logger.info(f" Endpoints autorisés: {endpoints}") - except: + except Exception: pass else: logger.info(" Endpoints autorisés: Tous") @@ -263,12 +285,14 @@ async def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: - python scripts/manage_security.py swagger add admin MyP@ssw0rd - python scripts/manage_security.py swagger list - python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 - python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" - python scripts/manage_security.py apikey list - python scripts/manage_security.py apikey verify sdk_live_xxxxx + python scripts/manage_security.py swagger add admin MyP@ssw0rd + python scripts/manage_security.py swagger list + python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 + python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" + python scripts/manage_security.py apikey list + python scripts/manage_security.py apikey verify sdk_live_xxxxx + python scripts/manage_security.py swagger add client_user Secret123 --full-name "Client Tech IT" --tags Authentication Clients Devis Factures + python scripts/manage_security.py swagger add admin_user AdminPass --tags # vide = tout voir """, ) subparsers = parser.add_subparsers(dest="command", help="Commandes") @@ -280,6 +304,11 @@ Exemples: add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("password", help="Mot de passe") add_p.add_argument("--full-name", help="Nom complet") + add_p.add_argument( + "--tags", + nargs="*", + help="Tags OpenAPI autorisés (ex. Clients Devis Authentication)", + ) swagger_sub.add_parser("list", help="Lister utilisateurs") @@ -312,11 +341,13 @@ Exemples: if args.command == "swagger": if args.swagger_command == "add": - await add_swagger_user(args.username, args.password, args.full_name) + await add_swagger_user( + args.username, args.password, args.full_name, args.tags + ) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": - await delete_swagger_user(args.username) + await delete_swagger_user(args.username, args.tags) else: swagger_parser.print_help() From 8a22e285df4ec09208d9a60715d50239a3b5dab6 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Wed, 21 Jan 2026 12:12:35 +0300 Subject: [PATCH 2/4] refactor(security): improve logging format and argument handling --- scripts/manage_security.py | 49 +++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/scripts/manage_security.py b/scripts/manage_security.py index c814dc1..94cd00d 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -100,25 +100,25 @@ async def list_swagger_users(): logger.info("🔭 Aucun utilisateur Swagger") return - logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") + logger.info("👥 {} utilisateur(s) Swagger:\n".format(len(users))) for user in users: status = "ACTIF" if user.is_active else "NON ACTIF" - logger.info(f" {status} {user.username}") - logger.info(f" Nom: {user.full_name}") - logger.info(f" Créé: {user.created_at}") - logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}") + logger.info(" {} {}".format(status, user.username)) + logger.info("Nom: {}".format(user.full_name)) + logger.info("Créé: {}".format(user.created_at)) + logger.info(" Dernière connexion: {}".format(user.last_login or "Jamais")) if user.allowed_tags: try: tags = json.loads(user.allowed_tags) if tags: - logger.info(f" Tags autorisés: {', '.join(tags)}") + logger.info("Tags autorisés: {}".format(", ".join(tags))) else: - logger.info(" Tags autorisés: Tous (admin)") + logger.info("Tags autorisés: Tous (admin)") except json.JSONDecodeError: - logger.info(" Tags: Erreur format") + logger.info("Tags: Erreur format") else: - logger.info(" Tags autorisés: Tous (admin)") + logger.info("Tags autorisés: Tous (admin)") logger.info("") @@ -198,7 +198,7 @@ async def list_api_keys(): logger.info("🔭 Aucune clé API") return - logger.info(f"🔑 {len(keys)} clé(s) API:\n") + logger.info("🔑 {} clé(s) API:\n".format(len(keys))) for key in keys: is_valid = key.is_active and ( @@ -207,11 +207,11 @@ async def list_api_keys(): status = "" if is_valid else "" logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") - logger.info(f" ID: {key.id}") - logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") - logger.info(f" Requêtes: {key.total_requests}") - logger.info(f" Expire: {key.expires_at or 'Jamais'}") - logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + logger.info(f" ID: {key.id}") + logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") + logger.info(f" Requêtes: {key.total_requests}") + logger.info(f" Expire: {key.expires_at or 'Jamais'}") + logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: import json @@ -221,11 +221,11 @@ async def list_api_keys(): display = ", ".join(endpoints[:4]) if len(endpoints) > 4: display += f"... (+{len(endpoints) - 4})" - logger.info(f" Endpoints: {display}") + logger.info(f" Endpoints: {display}") except Exception: pass else: - logger.info(" Endpoints: Tous") + logger.info("Endpoints: Tous") logger.info("") @@ -303,11 +303,12 @@ Exemples: add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("password", help="Mot de passe") - add_p.add_argument("--full-name", help="Nom complet") + add_p.add_argument("--full-name", help="Nom complet", default=None) add_p.add_argument( "--tags", - nargs="*", - help="Tags OpenAPI autorisés (ex. Clients Devis Authentication)", + nargs="+", # Au moins 1 tag requis SI spécifié + help="Tags OpenAPI autorisés (ex: Clients Devis). Vide = admin complet", + default=None, ) swagger_sub.add_parser("list", help="Lister utilisateurs") @@ -341,13 +342,13 @@ Exemples: if args.command == "swagger": if args.swagger_command == "add": - await add_swagger_user( - args.username, args.password, args.full_name, args.tags - ) + tags = args.tags if args.tags is not None else None + await add_swagger_user(args.username, args.password, args.full_name, tags) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": - await delete_swagger_user(args.username, args.tags) + tags = args.tags if args.tags is not None else None + await delete_swagger_user(args.username, tags) else: swagger_parser.print_help() From 5f40c677a8dd8c949aed5eb6fed80b05fae94a97 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Wed, 21 Jan 2026 12:21:10 +0300 Subject: [PATCH 3/4] refactor(manage_security): simplify delete_swagger_user function --- scripts/manage_security.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 94cd00d..c90b310 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -123,8 +123,7 @@ async def list_swagger_users(): logger.info("") -async def delete_swagger_user(username: str, tags: Optional[List[str]] = None): - """Supprimer un utilisateur Swagger""" +async def delete_swagger_user(username: str): async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -132,12 +131,12 @@ async def delete_swagger_user(username: str, tags: Optional[List[str]] = None): user = result.scalar_one_or_none() if not user: - logger.error(f" Utilisateur '{username}' introuvable") + logger.error(f"❌ Utilisateur '{username}' introuvable") return await session.delete(user) await session.commit() - logger.info("🗑️ Utilisateur Swagger supprimé: {}".format(username)) + logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") async def create_api_key( @@ -306,8 +305,8 @@ Exemples: add_p.add_argument("--full-name", help="Nom complet", default=None) add_p.add_argument( "--tags", - nargs="+", # Au moins 1 tag requis SI spécifié - help="Tags OpenAPI autorisés (ex: Clients Devis). Vide = admin complet", + nargs="*", + help="Tags autorisés (Clients Devis etc). Vide ou omis = admin complet", default=None, ) @@ -342,13 +341,12 @@ Exemples: if args.command == "swagger": if args.swagger_command == "add": - tags = args.tags if args.tags is not None else None + tags = args.tags if args.tags else None await add_swagger_user(args.username, args.password, args.full_name, tags) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": - tags = args.tags if args.tags is not None else None - await delete_swagger_user(args.username, tags) + await delete_swagger_user(args.username) else: swagger_parser.print_help() From 797aed0240120c2e3e9e49fe963dbb581e6b37ba Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Wed, 21 Jan 2026 12:56:02 +0300 Subject: [PATCH 4/4] feat(security): enhance swagger auth with user context and filtered docs --- api.py | 224 +++++++++++++++++++++++++---------------- middleware/security.py | 66 ++++++++---- 2 files changed, 184 insertions(+), 106 deletions(-) diff --git a/api.py b/api.py index 67dd846..97e2c7e 100644 --- a/api.py +++ b/api.py @@ -3,6 +3,8 @@ from fastapi.responses import JSONResponse from fastapi.openapi.utils import get_openapi from fastapi.responses import StreamingResponse, HTMLResponse, Response from fastapi.encoders import jsonable_encoder +from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html + from pydantic import BaseModel, Field, EmailStr from typing import List, Optional from datetime import datetime, date @@ -96,7 +98,6 @@ from utils.generic_functions import ( universign_envoyer, ) - from middleware.security import SwaggerAuthMiddleware, ApiKeyMiddlewareHTTP from core.dependencies import get_current_user from config.cors_config import setup_cors @@ -123,12 +124,12 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): + """Lifecycle de l'application""" await init_db() logger.info("Base de données initialisée") email_queue.session_factory = async_session_factory email_queue.sage_client = sage_client - logger.info("sage_client injecté dans email_queue") email_queue.start(num_workers=settings.max_email_workers) @@ -137,18 +138,12 @@ async def lifespan(app: FastAPI): sync_service = UniversignSyncService( api_url=settings.universign_api_url, api_key=settings.universign_api_key ) - sync_service.configure( sage_client=sage_client, email_queue=email_queue, settings=settings ) - scheduler = UniversignSyncScheduler( - sync_service=sync_service, - interval_minutes=5, - ) - + scheduler = UniversignSyncScheduler(sync_service=sync_service, interval_minutes=5) sync_task = asyncio.create_task(scheduler.start(async_session_factory)) - logger.info("Synchronisation Universign démarrée (5min)") yield @@ -160,25 +155,24 @@ async def lifespan(app: FastAPI): app = FastAPI( - title="Sage Gateways", + title="Sage Gateways API", version="3.0.0", - description="Configuration multi-tenant des connexions Sage Gateway", + description="API multi-tenant pour Sage 100c avec authentification hybride", lifespan=lifespan, openapi_tags=TAGS_METADATA, + docs_url=None, + redoc_url=None, + openapi_url=None, ) -""" app.add_middleware( - CORSMiddleware, - allow_origins=settings.cors_origins, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["*"], - allow_credentials=True, -) """ + +def get_swagger_user_from_state(request: Request) -> Optional[dict]: + return getattr(request.state, "swagger_user", None) -def generate_filtered_openapi(app: FastAPI, allowed_tags: Optional[List[str]] = None): - """Génère le schéma OpenAPI filtré selon les tags autorisés""" - +def generate_filtered_openapi_schema( + app: FastAPI, allowed_tags: Optional[List[str]] = None +) -> dict: base_schema = get_openapi( title=app.title, version=app.version, @@ -204,19 +198,33 @@ def generate_filtered_openapi(app: FastAPI, allowed_tags: Optional[List[str]] = base_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}] if not allowed_tags: + logger.info("📚 Schéma OpenAPI complet (admin)") return base_schema filtered_paths = {} for path, path_item in base_schema.get("paths", {}).items(): - for method, operation in path_item.items(): - if method in ["get", "post", "put", "delete", "patch", "options"]: - operation_tags = operation.get("tags", []) + filtered_operations = {} - if any(tag in allowed_tags for tag in operation_tags): - if path not in filtered_paths: - filtered_paths[path] = {} - filtered_paths[path][method] = operation + for method, operation in path_item.items(): + if method not in [ + "get", + "post", + "put", + "delete", + "patch", + "options", + "head", + ]: + continue + + operation_tags = operation.get("tags", []) + + if any(tag in allowed_tags for tag in operation_tags): + filtered_operations[method] = operation + + if filtered_operations: + filtered_paths[path] = filtered_operations base_schema["paths"] = filtered_paths @@ -227,48 +235,81 @@ def generate_filtered_openapi(app: FastAPI, allowed_tags: Optional[List[str]] = if tag_obj.get("name") in allowed_tags ] + logger.info(f"🔒 Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}") + return base_schema -async def get_swagger_user_from_request(request: Request) -> Optional[dict]: - """Récupère l'utilisateur Swagger depuis la requête authentifiée""" - auth_header = request.headers.get("Authorization") +@app.get("/openapi.json", include_in_schema=False) +async def custom_openapi_endpoint(request: Request): + swagger_user = get_swagger_user_from_state(request) - if not auth_header or not auth_header.startswith("Basic "): - return None + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) - import base64 - from fastapi.security import HTTPBasicCredentials - from database.db_config import async_session_factory - from database.models.api_key import SwaggerUser - from sqlalchemy import select + username = swagger_user.get("username", "unknown") + allowed_tags = swagger_user.get("allowed_tags") - try: - encoded_credentials = auth_header.split(" ")[1] - decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") - username, _ = decoded_credentials.split(":", 1) + logger.info(f"📖 OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}") - async with async_session_factory() as session: - result = await session.execute( - select(SwaggerUser).where(SwaggerUser.username == username) - ) - swagger_user = result.scalar_one_or_none() + schema = generate_filtered_openapi_schema(app, allowed_tags) - if swagger_user and swagger_user.is_active: - return { - "username": swagger_user.username, - "allowed_tags": swagger_user.allowed_tags_list, - } - except Exception as e: - logger.error(f"Erreur récupération utilisateur Swagger: {e}") + return JSONResponse(content=schema) - return None + +@app.get("/docs", include_in_schema=False) +async def custom_swagger_ui(request: Request): + swagger_user = get_swagger_user_from_state(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + return get_swagger_ui_html( + openapi_url="/openapi.json", + title=f"{app.title} - Documentation", + swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", + swagger_ui_parameters={ + "persistAuthorization": True, + "displayRequestDuration": True, + "filter": True, + "tryItOutEnabled": True, + }, + ) + + +@app.get("/redoc", include_in_schema=False) +async def custom_redoc(request: Request): + swagger_user = get_swagger_user_from_state(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + return get_redoc_html( + openapi_url="/openapi.json", + title=f"{app.title} - Documentation", + redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", + ) setup_cors(app, mode="open") + app.add_middleware(SwaggerAuthMiddleware) + app.add_middleware(ApiKeyMiddlewareHTTP) + app.include_router(api_keys_router) app.include_router(auth_router) app.include_router(sage_gateway_router) @@ -276,26 +317,6 @@ app.include_router(universign_router) app.include_router(entreprises_router) -@app.get("/openapi.json", include_in_schema=False) -async def get_openapi_filtered(request: Request): - """Retourne le schéma OpenAPI filtré selon l'utilisateur""" - - swagger_user = await get_swagger_user_from_request(request) - - if not swagger_user: - return JSONResponse( - status_code=401, - content={"detail": "Authentification requise"}, - headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, - ) - - allowed_tags = swagger_user.get("allowed_tags") - - schema = generate_filtered_openapi(app, allowed_tags) - - return JSONResponse(content=schema) - - @app.get("/clients", response_model=List[ClientDetails], tags=["Clients"]) async def obtenir_clients( query: Optional[str] = Query(None), @@ -3409,7 +3430,7 @@ async def get_reglement_detail(rg_no): return sage_client.get_reglement_detail(rg_no) -@app.get("/health", tags=["System"]) +""" @app.get("/health", tags=["System"]) async def health_check( user: User = Depends(get_current_user), sage: SageGatewayClient = Depends(get_sage_client_for_user), @@ -3426,29 +3447,64 @@ async def health_check( "queue_size": email_queue.queue.qsize(), }, "timestamp": datetime.now().isoformat(), - } + } """ @app.get("/", tags=["System"]) async def root(): + """ + Point d'entrée de l'API + """ return { - "api": "Sage 100c Dataven - VPS Linux", + "api": "Sage 100c Dataven API", "version": "3.0.0", - "documentation": "/docs (authentification requise)", - "health": "/health", + "status": "operational", + "documentation": { + "swagger": "/docs", + "redoc": "/redoc", + "openapi": "/openapi.json", + }, "authentication": { "methods": [ { - "type": "JWT", + "type": "JWT (Bearer Token)", "header": "Authorization: Bearer ", - "endpoint": "/api/auth/login", + "obtain_token": "POST /auth/login", + "description": "Pour les utilisateurs finaux", }, { "type": "API Key", "header": "X-API-Key: sdk_live_xxx", - "endpoint": "/api/api-keys", + "manage_keys": "GET /api-keys", + "description": "Pour les intégrations externes", }, - ] + ], + "note": "Les routes acceptent JWT OU API Key (au choix)", + }, + "swagger_access": { + "authentication": "HTTP Basic Auth (voir /scripts/manage_security.py)", + "filtering": "Les routes visibles dépendent des tags autorisés de l'utilisateur", + }, + } + + +@app.get("/health", tags=["System"]) +async def health_check(): + """ + Vérification de santé de l'API (sans authentification) + """ + return { + "status": "healthy", + "timestamp": "2025-01-21T00:00:00Z", + "services": { + "api": "operational", + "database": "connected", + "email_queue": { + "running": email_queue.running, + "workers": len(email_queue.workers) + if hasattr(email_queue, "workers") + else 0, + }, }, } diff --git a/middleware/security.py b/middleware/security.py index 88df4f6..010d4d8 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -2,12 +2,13 @@ from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from starlette.middleware.base import BaseHTTPMiddleware -from starlette.types import ASGIApp +from starlette.types import ASGIApp, Receive, Send from sqlalchemy import select -from typing import Callable +from typing import Callable, Optional from datetime import datetime import logging import base64 +import json logger = logging.getLogger(__name__) @@ -20,7 +21,7 @@ class SwaggerAuthMiddleware: def __init__(self, app: ASGIApp): self.app = app - async def __call__(self, scope, receive, send): + async def __call__(self, scope, receive: Receive, send: Send): if scope["type"] != "http": await self.app(scope, receive, send) return @@ -50,7 +51,9 @@ class SwaggerAuthMiddleware: credentials = HTTPBasicCredentials(username=username, password=password) - if not await self._verify_credentials(credentials): + swagger_user = await self._verify_credentials(credentials) + + if not swagger_user: response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Identifiants invalides"}, @@ -59,6 +62,14 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return + if "state" not in scope: + scope["state"] = {} + scope["state"]["swagger_user"] = swagger_user + + logger.info( + f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}" + ) + except Exception as e: logger.error(f"Erreur parsing auth header: {e}") response = JSONResponse( @@ -71,8 +82,9 @@ class SwaggerAuthMiddleware: await self.app(scope, receive, send) - async def _verify_credentials(self, credentials: HTTPBasicCredentials) -> bool: - """Vérifie les identifiants dans la base de données""" + async def _verify_credentials( + self, credentials: HTTPBasicCredentials + ) -> Optional[dict]: from database.db_config import async_session_factory from database.models.api_key import SwaggerUser from security.auth import verify_password @@ -92,15 +104,22 @@ class SwaggerAuthMiddleware: ): swagger_user.last_login = datetime.now() await session.commit() + logger.info(f"✓ Accès Swagger autorisé: {credentials.username}") - return True + + return { + "id": swagger_user.id, + "username": swagger_user.username, + "allowed_tags": swagger_user.allowed_tags_list, # None = admin complet + "is_active": swagger_user.is_active, + } logger.warning(f"✗ Accès Swagger refusé: {credentials.username}") - return False + return None except Exception as e: - logger.error(f"Erreur vérification credentials: {e}") - return False + logger.error(f"Erreur vérification credentials: {e}", exc_info=True) + return None class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): @@ -116,7 +135,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ] def _is_excluded_path(self, path: str) -> bool: - """Vérifie si le chemin est exclu de l'authentification""" + """Vérifie si le chemin est exclu de l'authentification API Key""" if path == "/": return True @@ -149,7 +168,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): if token.startswith("sdk_live_"): logger.warning( - " API Key envoyée dans Authorization au lieu de X-API-Key" + "⚠ API Key envoyée dans Authorization au lieu de X-API-Key" ) return await self._handle_api_key_auth( request, token, path, method, call_next @@ -159,7 +178,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): request.state.authenticated_via = "jwt" return await call_next(request) - logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI") + logger.debug(f"❓ Aucune auth pour {method} {path} → délégation à FastAPI") return await call_next(request) async def _handle_api_key_auth( @@ -170,7 +189,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): method: str, call_next: Callable, ): - """Gère l'authentification par API Key avec vérification STRICTE""" try: from database.db_config import async_session_factory from services.api_key import ApiKeyService @@ -181,7 +199,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): api_key_obj = await service.verify_api_key(api_key) if not api_key_obj: - logger.warning(f" Clé API invalide: {method} {path}") + logger.warning(f"❌ Clé API invalide: {method} {path}") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -192,7 +210,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): is_allowed, rate_info = await service.check_rate_limit(api_key_obj) if not is_allowed: - logger.warning(f" Rate limit: {api_key_obj.name}") + logger.warning(f"⏱ Rate limit: {api_key_obj.name}") return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Rate limit dépassé"}, @@ -205,8 +223,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): has_access = await service.check_endpoint_access(api_key_obj, path) if not has_access: - import json - allowed = ( json.loads(api_key_obj.allowed_endpoints) if api_key_obj.allowed_endpoints @@ -214,7 +230,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ) logger.warning( - f" ACCÈS REFUSÉ: {api_key_obj.name}\n" + f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n" f" Endpoint demandé: {path}\n" f" Endpoints autorisés: {allowed}" ) @@ -233,12 +249,12 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" - logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name} → {method} {path}") + logger.info(f"✅ ACCÈS AUTORISÉ: {api_key_obj.name} → {method} {path}") return await call_next(request) except Exception as e: - logger.error(f" Erreur validation API Key: {e}", exc_info=True) + logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": f"Erreur interne: {str(e)}"}, @@ -248,7 +264,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ApiKeyMiddleware = ApiKeyMiddlewareHTTP -def get_api_key_from_request(request: Request): +def get_api_key_from_request(request: Request) -> Optional: """Récupère l'objet ApiKey depuis la requête si présent""" return getattr(request.state, "api_key", None) @@ -258,10 +274,16 @@ def get_auth_method(request: Request) -> str: return getattr(request.state, "authenticated_via", "none") +def get_swagger_user_from_request(request: Request) -> Optional[dict]: + """Récupère l'utilisateur Swagger depuis la requête""" + return getattr(request.state, "swagger_user", None) + + __all__ = [ "SwaggerAuthMiddleware", "ApiKeyMiddlewareHTTP", "ApiKeyMiddleware", "get_api_key_from_request", "get_auth_method", + "get_swagger_user_from_request", ]