From cc0062b3bc716f784f17f5df33b5a14787621c3a Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 20 Jan 2026 12:11:20 +0300 Subject: [PATCH] refactor(security): improve security management script with better logging and structure --- api.py | 2 - routes/universign.py | 1 - scripts/manage_security.py | 298 ++++++++++++++++++------------- tools/extract_pydantic_models.py | 2 - 4 files changed, 177 insertions(+), 126 deletions(-) diff --git a/api.py b/api.py index 851608c..0196fa7 100644 --- a/api.py +++ b/api.py @@ -181,7 +181,6 @@ def custom_openapi(): openapi_schema = app.openapi() - # Définir deux schémas de sécurité openapi_schema["components"]["securitySchemes"] = { "HTTPBearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}, "ApiKeyAuth": {"type": "apiKey", "in": "header", "name": "X-API-Key"}, @@ -193,7 +192,6 @@ def custom_openapi(): return app.openapi_schema -# Après app = FastAPI(...), ajouter: app.openapi = custom_openapi diff --git a/routes/universign.py b/routes/universign.py index 2bf0e7a..bada5aa 100644 --- a/routes/universign.py +++ b/routes/universign.py @@ -35,7 +35,6 @@ logger = logging.getLogger(__name__) router = APIRouter( prefix="/universign", tags=["Universign"], - # dependencies=[Depends(get_current_user)] ) sync_service = UniversignSyncService( diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 1f234b9..3745f53 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,86 +1,94 @@ import asyncio import sys -from pathlib import Path -import argparse - -sys.path.insert(0, str(Path(__file__).parent.parent)) - from database import get_session -from database.models.api_key import SwaggerUser +from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password from sqlalchemy import select +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import argparse +from datetime import datetime import logging -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" - async with get_session() as session: + + async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: - logger.error(f" L'utilisateur {username} existe déjà") + logger.error(f" L'utilisateur '{username}' existe déjà") return - user = SwaggerUser( + swagger_user = SwaggerUser( username=username, hashed_password=hash_password(password), full_name=full_name or username, is_active=True, ) - session.add(user) + session.add(swagger_user) await session.commit() logger.info(f" Utilisateur Swagger créé: {username}") - print("\n Utilisateur créé avec succès") - print(f" Username: {username}") - print(" Accès: https://votre-serveur/docs") + logger.info(f" Nom complet: {swagger_user.full_name}") + logger.info(f" Actif: {swagger_user.is_active}") + + break async def list_swagger_users(): - """Lister les utilisateurs Swagger""" - async with get_session() as session: + """Lister tous les utilisateurs Swagger""" + + async for session in get_session(): result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: - print("Aucun utilisateur Swagger trouvé") - return + logger.info(" Aucun utilisateur Swagger") + break + + logger.info(f" {len(users)} utilisateur(s) Swagger:\n") - print(f"\n {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = " Actif" if user.is_active else " Inactif" - print(f" • {user.username:<20} {status}") - if user.full_name: - print(f" Nom: {user.full_name}") - if user.last_login: - print(f" Dernière connexion: {user.last_login}") - print() + status = "" if user.is_active else "" + logger.info(f" {status} {user.username}") + logger.info(f" Nom: {user.full_name}") + logger.info(f" Créé: {user.created_at}") + logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}") + logger.info("") + + break async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" - async with get_session() as session: + + async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: - logger.error(f" Utilisateur {username} introuvable") - return + logger.error(f" Utilisateur '{username}' introuvable") + break await session.delete(user) await session.commit() - logger.info(f"🗑️ Utilisateur supprimé: {username}") + logger.info(f" Utilisateur Swagger supprimé: {username}") + break async def create_api_key( @@ -91,137 +99,180 @@ async def create_api_key( endpoints: list = None, ): """Créer une clé API""" - async with get_session() as session: + + async for session in get_session(): service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( name=name, description=description, - created_by="CLI", + created_by="cli", expires_in_days=expires_in_days, rate_limit_per_minute=rate_limit, allowed_endpoints=endpoints, ) - print("\n Clé API créée avec succès\n") - print(f" ID: {api_key_obj.id}") - print(f" Nom: {name}") - print(f" Clé: {api_key_plain}") - print(f" Préfixe: {api_key_obj.key_prefix}") - print(f" Rate limit: {rate_limit} req/min") - print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") - print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") + logger.info("=" * 60) + logger.info(" Clé API créée avec succès") + logger.info("=" * 60) + logger.info(f" ID: {api_key_obj.id}") + logger.info(f" Nom: {api_key_obj.name}") + logger.info(f" Clé: {api_key_plain}") + logger.info(f" Préfixe: {api_key_obj.key_prefix}") + logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") + logger.info(f" Créée le: {api_key_obj.created_at}") + logger.info(f" Expire le: {api_key_obj.expires_at}") + + if api_key_obj.allowed_endpoints: + logger.info( + f" Endpoints autorisés: {', '.join(api_key_obj.allowed_endpoints)}" + ) + else: + logger.info(" Endpoints autorisés: Tous") + + logger.info("=" * 60) + logger.info(" IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !") + logger.info("=" * 60) + + break async def list_api_keys(): - """Lister les clés API""" - async with get_session() as session: + """Lister toutes les clés API""" + + async for session in get_session(): service = ApiKeyService(session) keys = await service.list_api_keys() if not keys: - print("Aucune clé API trouvée") - return + logger.info(" Aucune clé API") + break + + logger.info(f" {len(keys)} clé(s) API:\n") - print(f"\n {len(keys)} clé(s) API:\n") for key in keys: - status = "" if key.is_active else "" - expired = ( - "⏰ Expirée" - if key.expires_at and key.expires_at < datetime.now() + status = ( + "" + if key.is_active + and (not key.expires_at or key.expires_at > datetime.now()) else "" ) - print(f" {status} {key.name:<30} ({key.key_prefix}...)") - print(f" ID: {key.id}") - print(f" Requêtes: {key.total_requests}") - print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") - if expired: - print(f" {expired}") - print() + logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") + logger.info(f" ID: {key.id}") + logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") + logger.info(f" Requêtes: {key.total_requests}") + logger.info(f" Créée le: {key.created_at}") + logger.info(f" Expire le: {key.expires_at or 'Jamais'}") + logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + + if key.allowed_endpoints: + logger.info( + f" Endpoints: {', '.join(key.allowed_endpoints[:3])}..." + ) + + logger.info("") + + break async def revoke_api_key(key_id: str): """Révoquer une clé API""" - async with get_session() as session: + + async for session in get_session(): service = ApiKeyService(session) - api_key = await service.get_by_id(key_id) - if not api_key: - logger.error(f" Clé {key_id} introuvable") - return + result = await session.execute(select(ApiKey).where(ApiKey.id == key_id)) + key = result.scalar_one_or_none() - success = await service.revoke_api_key(key_id) + if not key: + logger.error(f" Clé API '{key_id}' introuvable") + break - if success: - logger.info(f" Clé révoquée: {api_key.name}") - print(f"\n Clé '{api_key.name}' révoquée avec succès") - else: - logger.error(" Erreur lors de la révocation") + key.is_active = False + await session.commit() + + logger.info(f" Clé API révoquée: {key.name}") + logger.info(f" ID: {key.id}") + logger.info(f" Préfixe: {key.key_prefix}") + + break -async def verify_api_key_cmd(api_key: str): +async def verify_api_key(api_key: str): """Vérifier une clé API""" - async with get_session() as session: - service = ApiKeyService(session) - api_key_obj = await service.verify_api_key(api_key) - if api_key_obj: - print("\n Clé API valide\n") - print(f" Nom: {api_key_obj.name}") - print(f" ID: {api_key_obj.id}") - print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") - print(f" Requêtes: {api_key_obj.total_requests}") - print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") - else: - print("\n Clé API invalide, expirée ou révoquée\n") + async for session in get_session(): + service = ApiKeyService(session) + + key = await service.verify_api_key(api_key) + + if not key: + logger.error(" Clé API invalide ou expirée") + break + + logger.info("=" * 60) + logger.info(" Clé API valide") + logger.info("=" * 60) + logger.info(f" Nom: {key.name}") + logger.info(f" ID: {key.id}") + logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") + logger.info(f" Requêtes totales: {key.total_requests}") + logger.info(f" Expire le: {key.expires_at or 'Jamais'}") + logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + logger.info("=" * 60) + + break async def main(): parser = argparse.ArgumentParser( - description="Gestion de la sécurité Sage Dataven API" + description="Gestion des utilisateurs Swagger et clés API" ) - subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") swagger_parser = subparsers.add_parser( - "swagger", help="Gestion utilisateurs Swagger" + "swagger", help="Gestion des utilisateurs Swagger" ) - swagger_subparsers = swagger_parser.add_subparsers(dest="action") + swagger_subparsers = swagger_parser.add_subparsers(dest="swagger_command") - swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") - swagger_add.add_argument("username", help="Nom d'utilisateur") - swagger_add.add_argument("password", help="Mot de passe") - swagger_add.add_argument("--full-name", help="Nom complet") + add_parser = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") + add_parser.add_argument("username", help="Nom d'utilisateur") + add_parser.add_argument("password", help="Mot de passe") + add_parser.add_argument("--full-name", help="Nom complet (optionnel)") swagger_subparsers.add_parser("list", help="Lister les utilisateurs") - swagger_delete = swagger_subparsers.add_parser( + delete_parser = swagger_subparsers.add_parser( "delete", help="Supprimer un utilisateur" ) - swagger_delete.add_argument("username", help="Nom d'utilisateur") + delete_parser.add_argument("username", help="Nom d'utilisateur") - apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") - apikey_subparsers = apikey_parser.add_subparsers(dest="action") + apikey_parser = subparsers.add_parser("apikey", help="Gestion des clés API") + apikey_subparsers = apikey_parser.add_subparsers(dest="apikey_command") - apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") - apikey_create.add_argument("name", help="Nom de la clé") - apikey_create.add_argument("--description", help="Description") - apikey_create.add_argument( - "--days", type=int, default=365, help="Expiration en jours" + create_parser = apikey_subparsers.add_parser("create", help="Créer une clé API") + create_parser.add_argument("name", help="Nom de la clé") + create_parser.add_argument("--description", help="Description (optionnel)") + create_parser.add_argument( + "--days", type=int, default=365, help="Jours avant expiration (défaut: 365)" ) - apikey_create.add_argument( - "--rate-limit", type=int, default=60, help="Limite req/min" + create_parser.add_argument( + "--rate-limit", type=int, default=60, help="Requêtes par minute (défaut: 60)" + ) + create_parser.add_argument( + "--endpoints", + nargs="+", + help="Endpoints autorisés (ex: /clients /articles)", ) - apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") - apikey_subparsers.add_parser("list", help="Lister les clés") + apikey_subparsers.add_parser("list", help="Lister les clés API") - apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") - apikey_revoke.add_argument("key_id", help="ID de la clé") + revoke_parser = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") + revoke_parser.add_argument("key_id", help="ID de la clé") - apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") - apikey_verify.add_argument("api_key", help="Clé API à vérifier") + verify_parser = apikey_subparsers.add_parser("verify", help="Vérifier une clé") + verify_parser.add_argument("api_key", help="Clé API complète") args = parser.parse_args() @@ -230,35 +281,40 @@ async def main(): return if args.command == "swagger": - if args.action == "add": + if args.swagger_command == "add": await add_swagger_user(args.username, args.password, args.full_name) - elif args.action == "list": + elif args.swagger_command == "list": await list_swagger_users() - elif args.action == "delete": + elif args.swagger_command == "delete": await delete_swagger_user(args.username) else: swagger_parser.print_help() elif args.command == "apikey": - if args.action == "create": + if args.apikey_command == "create": await create_api_key( - args.name, - args.description, - args.days, - args.rate_limit, - args.endpoints, + name=args.name, + description=args.description, + expires_in_days=args.days, + rate_limit=args.rate_limit, + endpoints=args.endpoints, ) - elif args.action == "list": + elif args.apikey_command == "list": await list_api_keys() - elif args.action == "revoke": + elif args.apikey_command == "revoke": await revoke_api_key(args.key_id) - elif args.action == "verify": - await verify_api_key_cmd(args.api_key) + elif args.apikey_command == "verify": + await verify_api_key(args.api_key) else: apikey_parser.print_help() if __name__ == "__main__": - from datetime import datetime - - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("\n👋 Interrupted") + sys.exit(0) + except Exception as e: + logger.error(f" Erreur: {e}") + sys.exit(1) diff --git a/tools/extract_pydantic_models.py b/tools/extract_pydantic_models.py index 595e15f..5718790 100644 --- a/tools/extract_pydantic_models.py +++ b/tools/extract_pydantic_models.py @@ -24,7 +24,6 @@ for node in tree.body: continue other_nodes.append(node) -# --- Extraction des classes --- imports = """ from pydantic import BaseModel, Field from typing import Optional, List @@ -44,7 +43,6 @@ for cls in pydantic_classes: print(f"✅ Modèle extrait : {class_name} → {file_path}") -# --- Réécriture du fichier source sans les modèles --- new_tree = ast.Module(body=other_nodes, type_ignores=[]) new_source = ast.unparse(new_tree)