From 28c8fb3008d004b9ecad3e94df3131c0ac7ddbcc Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 20 Jan 2026 15:27:26 +0300 Subject: [PATCH] refactor(security): improve user management and session handling --- core/dependencies.py | 5 ++- scripts/manage_security.py | 85 ++++++++++++++------------------------ 2 files changed, 35 insertions(+), 55 deletions(-) diff --git a/core/dependencies.py b/core/dependencies.py index 76c85be..c1468dd 100644 --- a/core/dependencies.py +++ b/core/dependencies.py @@ -33,9 +33,12 @@ async def get_current_user_hybrid( virtual_user = User( id=f"api_key_{api_key_obj.id}", email=f"api_key_{api_key_obj.id}@virtual.local", - username=api_key_obj.name, + nom=api_key_obj.name, + prenom="API", + hashed_password="", role="api_client", is_active=True, + is_verified=True, ) virtual_user._is_api_key_user = True diff --git a/scripts/manage_security.py b/scripts/manage_security.py index e2c0297..cfeadcf 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,24 +1,6 @@ -#!/usr/bin/env python3 -""" -Script de gestion des utilisateurs Swagger et clés API -====================================================== - -Usage (depuis /app dans le container Docker): - python scripts/manage_security.py swagger add - python scripts/manage_security.py swagger list - python scripts/manage_security.py apikey create --endpoints "/clients" "/devis" - python scripts/manage_security.py apikey list - python scripts/manage_security.py apikey verify -""" - import sys +import os from pathlib import Path - -_script_dir = Path(__file__).resolve().parent -_app_dir = _script_dir.parent -if str(_app_dir) not in sys.path: - sys.path.insert(0, str(_app_dir)) - import asyncio import argparse import logging @@ -26,20 +8,26 @@ from datetime import datetime from sqlalchemy import select -from database.db_config import get_session +from database.db_config import async_session_factory +from database.models.user import User from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password +_script_dir = Path(__file__).resolve().parent +_app_dir = _script_dir.parent + +sys.path.insert(0, str(_app_dir)) +os.chdir(str(_app_dir)) + + logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) - - async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" - async for session in get_session(): + async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) @@ -61,18 +49,17 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): logger.info(f"✅ Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") - break async def list_swagger_users(): """Lister tous les utilisateurs Swagger""" - async for session in get_session(): + async with async_session_factory() as session: result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: - logger.info("📭 Aucun utilisateur Swagger") - break + logger.info("🔭 Aucun utilisateur Swagger") + return logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: @@ -81,12 +68,11 @@ async def list_swagger_users(): logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n") - break async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" - async for session in get_session(): + async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) @@ -94,14 +80,11 @@ async def delete_swagger_user(username: str): if not user: logger.error(f"❌ Utilisateur '{username}' introuvable") - break + return await session.delete(user) await session.commit() logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") - break - - async def create_api_key( @@ -112,7 +95,7 @@ async def create_api_key( endpoints: list = None, ): """Créer une clé API""" - async for session in get_session(): + async with async_session_factory() as session: service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( @@ -148,18 +131,17 @@ async def create_api_key( logger.info("=" * 70) logger.info("⚠️ SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !") logger.info("=" * 70) - break async def list_api_keys(): """Lister toutes les clés API""" - async for session in get_session(): + async with async_session_factory() as session: service = ApiKeyService(session) keys = await service.list_api_keys() if not keys: - logger.info("📭 Aucune clé API") - break + logger.info("🔭 Aucune clé API") + return logger.info(f"🔑 {len(keys)} clé(s) API:\n") @@ -190,18 +172,17 @@ async def list_api_keys(): else: logger.info(" Endpoints: Tous") logger.info("") - break async def revoke_api_key(key_id: str): """Révoquer une clé API""" - async for session in get_session(): + async with async_session_factory() as session: result = await session.execute(select(ApiKey).where(ApiKey.id == key_id)) key = result.scalar_one_or_none() if not key: logger.error(f"❌ Clé API '{key_id}' introuvable") - break + return key.is_active = False key.revoked_at = datetime.now() @@ -209,18 +190,17 @@ async def revoke_api_key(key_id: str): logger.info(f"🗑️ Clé API révoquée: {key.name}") logger.info(f" ID: {key.id}") - break async def verify_api_key(api_key: str): """Vérifier une clé API""" - async for session in get_session(): + async with async_session_factory() as session: service = ApiKeyService(session) key = await service.verify_api_key(api_key) if not key: logger.error("❌ Clé API invalide ou expirée") - break + return logger.info("=" * 60) logger.info("✅ Clé API valide") @@ -242,9 +222,6 @@ async def verify_api_key(api_key: str): else: logger.info(" Endpoints autorisés: Tous") logger.info("=" * 60) - break - - async def main(): @@ -253,12 +230,12 @@ async def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: - python scripts/manage_security.py swagger add admin MyP@ssw0rd - python scripts/manage_security.py swagger list - python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 - python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" - python scripts/manage_security.py apikey list - python scripts/manage_security.py apikey verify sdk_live_xxxxx + python scripts/manage_security.py swagger add admin MyP@ssw0rd + python scripts/manage_security.py swagger list + python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 + python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" + python scripts/manage_security.py apikey list + python scripts/manage_security.py apikey verify sdk_live_xxxxx """, ) subparsers = parser.add_subparsers(dest="command", help="Commandes") @@ -333,7 +310,7 @@ if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - print("\n⏹️ Interrupted") + print("\nℹ️ Interrupted") sys.exit(0) except Exception as e: logger.error(f"❌ Erreur: {e}")