import asyncio import sys from pathlib import Path from database import get_session from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password from sqlalchemy import select import argparse from datetime import datetime import logging current_dir = Path(__file__).resolve().parent parent_dir = current_dir.parent sys.path.insert(0, str(parent_dir)) logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: logger.error(f"❌ L'utilisateur '{username}' existe déjà") return swagger_user = SwaggerUser( username=username, hashed_password=hash_password(password), full_name=full_name or username, is_active=True, ) session.add(swagger_user) await session.commit() logger.info(f"✅ Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") logger.info(f" Actif: {swagger_user.is_active}") break async def list_swagger_users(): """Lister tous les utilisateurs Swagger""" async for session in get_session(): result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: logger.info("📭 Aucun utilisateur Swagger") break logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: status = "✅" if user.is_active else "❌" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}") logger.info("") break async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: logger.error(f"❌ Utilisateur '{username}' introuvable") break await session.delete(user) await session.commit() logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") break async def create_api_key( name: str, description: str = None, expires_in_days: int = 365, rate_limit: int = 60, endpoints: list = None, ): """Créer une clé API""" async for session in get_session(): service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( name=name, description=description, created_by="cli", expires_in_days=expires_in_days, rate_limit_per_minute=rate_limit, allowed_endpoints=endpoints, ) logger.info("=" * 70) logger.info("🔑 Clé API créée avec succès") logger.info("=" * 70) logger.info(f" ID: {api_key_obj.id}") logger.info(f" Nom: {api_key_obj.name}") logger.info(f" Clé: {api_key_plain}") logger.info(f" Préfixe: {api_key_obj.key_prefix}") logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") logger.info(f" Créée le: {api_key_obj.created_at}") logger.info(f" Expire le: {api_key_obj.expires_at}") if api_key_obj.allowed_endpoints: import json endpoints_list = json.loads(api_key_obj.allowed_endpoints) logger.info(f" Endpoints autorisés: {', '.join(endpoints_list)}") else: logger.info(" Endpoints autorisés: Tous") logger.info("=" * 70) logger.info("⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !") logger.info("=" * 70) break async def list_api_keys(): """Lister toutes les clés API""" async for session in get_session(): service = ApiKeyService(session) keys = await service.list_api_keys() if not keys: logger.info("📭 Aucune clé API") break logger.info(f"🔑 {len(keys)} clé(s) API:\n") for key in keys: status = ( "✅" if key.is_active and (not key.expires_at or key.expires_at > datetime.now()) else "❌" ) logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes: {key.total_requests}") logger.info(f" Créée le: {key.created_at}") logger.info(f" Expire le: {key.expires_at or 'Jamais'}") logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: import json try: endpoints = json.loads(key.allowed_endpoints) logger.info( f" Endpoints: {', '.join(endpoints[:5])}{'...' if len(endpoints) > 5 else ''}" ) except: pass logger.info("") break async def revoke_api_key(key_id: str): """Révoquer une clé API""" async for session in get_session(): service = ApiKeyService(session) result = await session.execute(select(ApiKey).where(ApiKey.id == key_id)) key = result.scalar_one_or_none() if not key: logger.error(f"❌ Clé API '{key_id}' introuvable") break key.is_active = False await session.commit() logger.info(f"🗑️ Clé API révoquée: {key.name}") logger.info(f" ID: {key.id}") logger.info(f" Préfixe: {key.key_prefix}") break async def verify_api_key(api_key: str): """Vérifier une clé API""" async for session in get_session(): service = ApiKeyService(session) key = await service.verify_api_key(api_key) if not key: logger.error("❌ Clé API invalide ou expirée") break logger.info("=" * 60) logger.info("✅ Clé API valide") logger.info("=" * 60) logger.info(f" Nom: {key.name}") logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes totales: {key.total_requests}") logger.info(f" Expire le: {key.expires_at or 'Jamais'}") logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: import json try: endpoints = json.loads(key.allowed_endpoints) logger.info(f" Endpoints autorisés: {endpoints}") except: pass else: logger.info(" Endpoints autorisés: Tous") logger.info("=" * 60) break async def main(): parser = argparse.ArgumentParser( description="Gestion des utilisateurs Swagger et clés API" ) subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") swagger_parser = subparsers.add_parser( "swagger", help="Gestion des utilisateurs Swagger" ) swagger_subparsers = swagger_parser.add_subparsers(dest="swagger_command") add_parser = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") add_parser.add_argument("username", help="Nom d'utilisateur") add_parser.add_argument("password", help="Mot de passe") add_parser.add_argument("--full-name", help="Nom complet (optionnel)") swagger_subparsers.add_parser("list", help="Lister les utilisateurs") delete_parser = swagger_subparsers.add_parser( "delete", help="Supprimer un utilisateur" ) delete_parser.add_argument("username", help="Nom d'utilisateur") apikey_parser = subparsers.add_parser("apikey", help="Gestion des clés API") apikey_subparsers = apikey_parser.add_subparsers(dest="apikey_command") create_parser = apikey_subparsers.add_parser("create", help="Créer une clé API") create_parser.add_argument("name", help="Nom de la clé") create_parser.add_argument("--description", help="Description (optionnel)") create_parser.add_argument( "--days", type=int, default=365, help="Jours avant expiration (défaut: 365)" ) create_parser.add_argument( "--rate-limit", type=int, default=60, help="Requêtes par minute (défaut: 60)" ) create_parser.add_argument( "--endpoints", nargs="+", help="Endpoints autorisés (ex: /clients /articles /devis/*)", ) apikey_subparsers.add_parser("list", help="Lister les clés API") revoke_parser = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") revoke_parser.add_argument("key_id", help="ID de la clé") verify_parser = apikey_subparsers.add_parser("verify", help="Vérifier une clé") verify_parser.add_argument("api_key", help="Clé API complète") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "swagger": if args.swagger_command == "add": await add_swagger_user(args.username, args.password, args.full_name) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": await delete_swagger_user(args.username) else: swagger_parser.print_help() elif args.command == "apikey": if args.apikey_command == "create": await create_api_key( name=args.name, description=args.description, expires_in_days=args.days, rate_limit=args.rate_limit, endpoints=args.endpoints, ) elif args.apikey_command == "list": await list_api_keys() elif args.apikey_command == "revoke": await revoke_api_key(args.key_id) elif args.apikey_command == "verify": await verify_api_key(args.api_key) else: apikey_parser.print_help() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("\n⏹️ Interrupted") sys.exit(0) except Exception as e: logger.error(f"❌ Erreur: {e}") import traceback traceback.print_exc() sys.exit(1)