diff --git a/api.py b/api.py index 06f57e4..97d4425 100644 --- a/api.py +++ b/api.py @@ -1,5 +1,5 @@ from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body -from fastapi.middleware.cors import CORSMiddleware +from fastapi.openapi.utils import get_openapi from fastapi.responses import StreamingResponse, HTMLResponse, Response from fastapi.encoders import jsonable_encoder from pydantic import BaseModel, Field, EmailStr @@ -179,7 +179,12 @@ def custom_openapi(): if app.openapi_schema: return app.openapi_schema - openapi_schema = app.openapi() + openapi_schema = get_openapi( + title=app.title, + version=app.version, + description=app.description, + routes=app.routes, + ) openapi_schema["components"]["securitySchemes"] = { "HTTPBearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}, @@ -192,8 +197,7 @@ def custom_openapi(): return app.openapi_schema -""" # Après app = FastAPI(...), ajouter: -app.openapi = custom_openapi """ +app.openapi = custom_openapi setup_cors(app, mode="open") diff --git a/middleware/security.py b/middleware/security.py index 493ed3e..8b2d90a 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -1,3 +1,4 @@ + from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials @@ -15,6 +16,7 @@ security = HTTPBasic() class SwaggerAuthMiddleware: + PROTECTED_PATHS = ["/docs", "/redoc", "/openapi.json"] def __init__(self, app: ASGIApp): @@ -241,7 +243,7 @@ def get_auth_method(request: Request) -> str: __all__ = [ "SwaggerAuthMiddleware", "ApiKeyMiddlewareHTTP", - "ApiKeyMiddleware", + "ApiKeyMiddleware", # Alias "get_api_key_from_request", "get_auth_method", ] diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 52b21c7..e2c0297 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,28 +1,44 @@ -import asyncio +#!/usr/bin/env python3 +""" +Script de gestion des utilisateurs Swagger et clés API +====================================================== + +Usage (depuis /app dans le container Docker): + python scripts/manage_security.py swagger add + python scripts/manage_security.py swagger list + python scripts/manage_security.py apikey create --endpoints "/clients" "/devis" + python scripts/manage_security.py apikey list + python scripts/manage_security.py apikey verify +""" + import sys from pathlib import Path -from database import get_session + +_script_dir = Path(__file__).resolve().parent +_app_dir = _script_dir.parent +if str(_app_dir) not in sys.path: + sys.path.insert(0, str(_app_dir)) + +import asyncio +import argparse +import logging +from datetime import datetime + +from sqlalchemy import select + +from database.db_config import get_session from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password -from sqlalchemy import select - -import argparse -from datetime import datetime -import logging - -current_dir = Path(__file__).resolve().parent -parent_dir = current_dir.parent -sys.path.insert(0, str(parent_dir)) - logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) + + async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" - async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -45,14 +61,11 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): logger.info(f"✅ Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") - logger.info(f" Actif: {swagger_user.is_active}") - break async def list_swagger_users(): """Lister tous les utilisateurs Swagger""" - async for session in get_session(): result = await session.execute(select(SwaggerUser)) users = result.scalars().all() @@ -62,21 +75,17 @@ async def list_swagger_users(): break logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") - for user in users: status = "✅" if user.is_active else "❌" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") - logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}") - logger.info("") - + logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n") break async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" - async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -89,11 +98,12 @@ async def delete_swagger_user(username: str): await session.delete(user) await session.commit() - logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") break + + async def create_api_key( name: str, description: str = None, @@ -102,7 +112,6 @@ async def create_api_key( endpoints: list = None, ): """Créer une clé API""" - async for session in get_session(): service = ApiKeyService(session) @@ -123,27 +132,27 @@ async def create_api_key( logger.info(f" Clé: {api_key_plain}") logger.info(f" Préfixe: {api_key_obj.key_prefix}") logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") - logger.info(f" Créée le: {api_key_obj.created_at}") logger.info(f" Expire le: {api_key_obj.expires_at}") if api_key_obj.allowed_endpoints: import json - endpoints_list = json.loads(api_key_obj.allowed_endpoints) - logger.info(f" Endpoints autorisés: {', '.join(endpoints_list)}") + try: + endpoints_list = json.loads(api_key_obj.allowed_endpoints) + logger.info(f" Endpoints: {', '.join(endpoints_list)}") + except: + logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}") else: - logger.info(" Endpoints autorisés: Tous") + logger.info(" Endpoints: Tous (aucune restriction)") logger.info("=" * 70) - logger.info("⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !") + logger.info("⚠️ SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !") logger.info("=" * 70) - break async def list_api_keys(): """Lister toutes les clés API""" - async for session in get_session(): service = ApiKeyService(session) keys = await service.list_api_keys() @@ -155,19 +164,16 @@ async def list_api_keys(): logger.info(f"🔑 {len(keys)} clé(s) API:\n") for key in keys: - status = ( - "✅" - if key.is_active - and (not key.expires_at or key.expires_at > datetime.now()) - else "❌" + is_valid = key.is_active and ( + not key.expires_at or key.expires_at > datetime.now() ) + status = "✅" if is_valid else "❌" logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes: {key.total_requests}") - logger.info(f" Créée le: {key.created_at}") - logger.info(f" Expire le: {key.expires_at or 'Jamais'}") + logger.info(f" Expire: {key.expires_at or 'Jamais'}") logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: @@ -175,23 +181,21 @@ async def list_api_keys(): try: endpoints = json.loads(key.allowed_endpoints) - logger.info( - f" Endpoints: {', '.join(endpoints[:5])}{'...' if len(endpoints) > 5 else ''}" - ) + display = ", ".join(endpoints[:4]) + if len(endpoints) > 4: + display += f"... (+{len(endpoints) - 4})" + logger.info(f" Endpoints: {display}") except: pass - + else: + logger.info(" Endpoints: Tous") logger.info("") - break async def revoke_api_key(key_id: str): """Révoquer une clé API""" - async for session in get_session(): - service = ApiKeyService(session) - result = await session.execute(select(ApiKey).where(ApiKey.id == key_id)) key = result.scalar_one_or_none() @@ -200,21 +204,18 @@ async def revoke_api_key(key_id: str): break key.is_active = False + key.revoked_at = datetime.now() await session.commit() logger.info(f"🗑️ Clé API révoquée: {key.name}") logger.info(f" ID: {key.id}") - logger.info(f" Préfixe: {key.key_prefix}") - break async def verify_api_key(api_key: str): """Vérifier une clé API""" - async for session in get_session(): service = ApiKeyService(session) - key = await service.verify_api_key(api_key) if not key: @@ -228,8 +229,7 @@ async def verify_api_key(api_key: str): logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes totales: {key.total_requests}") - logger.info(f" Expire le: {key.expires_at or 'Jamais'}") - logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + logger.info(f" Expire: {key.expires_at or 'Jamais'}") if key.allowed_endpoints: import json @@ -241,60 +241,58 @@ async def verify_api_key(api_key: str): pass else: logger.info(" Endpoints autorisés: Tous") - logger.info("=" * 60) - break + + async def main(): parser = argparse.ArgumentParser( - description="Gestion des utilisateurs Swagger et clés API" + description="Gestion des utilisateurs Swagger et clés API", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Exemples: + python scripts/manage_security.py swagger add admin MyP@ssw0rd + python scripts/manage_security.py swagger list + python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 + python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" + python scripts/manage_security.py apikey list + python scripts/manage_security.py apikey verify sdk_live_xxxxx + """, ) - subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") + subparsers = parser.add_subparsers(dest="command", help="Commandes") - swagger_parser = subparsers.add_parser( - "swagger", help="Gestion des utilisateurs Swagger" - ) - swagger_subparsers = swagger_parser.add_subparsers(dest="swagger_command") + swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger") + swagger_sub = swagger_parser.add_subparsers(dest="swagger_command") - add_parser = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") - add_parser.add_argument("username", help="Nom d'utilisateur") - add_parser.add_argument("password", help="Mot de passe") - add_parser.add_argument("--full-name", help="Nom complet (optionnel)") + add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") + add_p.add_argument("username", help="Nom d'utilisateur") + add_p.add_argument("password", help="Mot de passe") + add_p.add_argument("--full-name", help="Nom complet") - swagger_subparsers.add_parser("list", help="Lister les utilisateurs") + swagger_sub.add_parser("list", help="Lister utilisateurs") - delete_parser = swagger_subparsers.add_parser( - "delete", help="Supprimer un utilisateur" - ) - delete_parser.add_argument("username", help="Nom d'utilisateur") + del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur") + del_p.add_argument("username", help="Nom d'utilisateur") - apikey_parser = subparsers.add_parser("apikey", help="Gestion des clés API") - apikey_subparsers = apikey_parser.add_subparsers(dest="apikey_command") + apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") + apikey_sub = apikey_parser.add_subparsers(dest="apikey_command") - create_parser = apikey_subparsers.add_parser("create", help="Créer une clé API") - create_parser.add_argument("name", help="Nom de la clé") - create_parser.add_argument("--description", help="Description (optionnel)") - create_parser.add_argument( - "--days", type=int, default=365, help="Jours avant expiration (défaut: 365)" - ) - create_parser.add_argument( - "--rate-limit", type=int, default=60, help="Requêtes par minute (défaut: 60)" - ) - create_parser.add_argument( - "--endpoints", - nargs="+", - help="Endpoints autorisés (ex: /clients /articles /devis/*)", - ) + create_p = apikey_sub.add_parser("create", help="Créer clé API") + create_p.add_argument("name", help="Nom de la clé") + create_p.add_argument("--description", help="Description") + create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)") + create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min") + create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") - apikey_subparsers.add_parser("list", help="Lister les clés API") + apikey_sub.add_parser("list", help="Lister clés") - revoke_parser = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") - revoke_parser.add_argument("key_id", help="ID de la clé") + rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé") + rev_p.add_argument("key_id", help="ID de la clé") - verify_parser = apikey_subparsers.add_parser("verify", help="Vérifier une clé") - verify_parser.add_argument("api_key", help="Clé API complète") + ver_p = apikey_sub.add_parser("verify", help="Vérifier clé") + ver_p.add_argument("api_key", help="Clé API complète") args = parser.parse_args() @@ -335,7 +333,7 @@ if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - logger.info("\n⏹️ Interrupted") + print("\n⏹️ Interrupted") sys.exit(0) except Exception as e: logger.error(f"❌ Erreur: {e}")