import asyncio import sys from pathlib import Path import argparse sys.path.insert(0, str(Path(__file__).parent.parent)) from database import get_session from database.models.api_key import SwaggerUser from services.api_key import ApiKeyService from security.auth import hash_password from sqlalchemy import select import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" async with get_session() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: logger.error(f" L'utilisateur {username} existe déjà") return user = SwaggerUser( username=username, hashed_password=hash_password(password), full_name=full_name or username, is_active=True, ) session.add(user) await session.commit() logger.info(f" Utilisateur Swagger créé: {username}") print("\n Utilisateur créé avec succès") print(f" Username: {username}") print(" Accès: https://votre-serveur/docs") async def list_swagger_users(): """Lister les utilisateurs Swagger""" async with get_session() as session: result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: print("Aucun utilisateur Swagger trouvé") return print(f"\n {len(users)} utilisateur(s) Swagger:\n") for user in users: status = " Actif" if user.is_active else " Inactif" print(f" • {user.username:<20} {status}") if user.full_name: print(f" Nom: {user.full_name}") if user.last_login: print(f" Dernière connexion: {user.last_login}") print() async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" async with get_session() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: logger.error(f" Utilisateur {username} introuvable") return await session.delete(user) await session.commit() logger.info(f"🗑️ Utilisateur supprimé: {username}") async def create_api_key( name: str, description: str = None, expires_in_days: int = 365, rate_limit: int = 60, endpoints: list = None, ): """Créer une clé API""" async with get_session() as session: service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( name=name, description=description, created_by="CLI", expires_in_days=expires_in_days, rate_limit_per_minute=rate_limit, allowed_endpoints=endpoints, ) print("\n Clé API créée avec succès\n") print(f" ID: {api_key_obj.id}") print(f" Nom: {name}") print(f" Clé: {api_key_plain}") print(f" Préfixe: {api_key_obj.key_prefix}") print(f" Rate limit: {rate_limit} req/min") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") async def list_api_keys(): """Lister les clés API""" async with get_session() as session: service = ApiKeyService(session) keys = await service.list_api_keys() if not keys: print("Aucune clé API trouvée") return print(f"\n {len(keys)} clé(s) API:\n") for key in keys: status = "" if key.is_active else "" expired = ( "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else "" ) print(f" {status} {key.name:<30} ({key.key_prefix}...)") print(f" ID: {key.id}") print(f" Requêtes: {key.total_requests}") print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if expired: print(f" {expired}") print() async def revoke_api_key(key_id: str): """Révoquer une clé API""" async with get_session() as session: service = ApiKeyService(session) api_key = await service.get_by_id(key_id) if not api_key: logger.error(f" Clé {key_id} introuvable") return success = await service.revoke_api_key(key_id) if success: logger.info(f" Clé révoquée: {api_key.name}") print(f"\n Clé '{api_key.name}' révoquée avec succès") else: logger.error(" Erreur lors de la révocation") async def verify_api_key_cmd(api_key: str): """Vérifier une clé API""" async with get_session() as session: service = ApiKeyService(session) api_key_obj = await service.verify_api_key(api_key) if api_key_obj: print("\n Clé API valide\n") print(f" Nom: {api_key_obj.name}") print(f" ID: {api_key_obj.id}") print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") print(f" Requêtes: {api_key_obj.total_requests}") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") else: print("\n Clé API invalide, expirée ou révoquée\n") async def main(): parser = argparse.ArgumentParser( description="Gestion de la sécurité Sage Dataven API" ) subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") swagger_parser = subparsers.add_parser( "swagger", help="Gestion utilisateurs Swagger" ) swagger_subparsers = swagger_parser.add_subparsers(dest="action") swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") swagger_add.add_argument("username", help="Nom d'utilisateur") swagger_add.add_argument("password", help="Mot de passe") swagger_add.add_argument("--full-name", help="Nom complet") swagger_subparsers.add_parser("list", help="Lister les utilisateurs") swagger_delete = swagger_subparsers.add_parser( "delete", help="Supprimer un utilisateur" ) swagger_delete.add_argument("username", help="Nom d'utilisateur") apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") apikey_subparsers = apikey_parser.add_subparsers(dest="action") apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") apikey_create.add_argument("name", help="Nom de la clé") apikey_create.add_argument("--description", help="Description") apikey_create.add_argument( "--days", type=int, default=365, help="Expiration en jours" ) apikey_create.add_argument( "--rate-limit", type=int, default=60, help="Limite req/min" ) apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") apikey_subparsers.add_parser("list", help="Lister les clés") apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") apikey_revoke.add_argument("key_id", help="ID de la clé") apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") apikey_verify.add_argument("api_key", help="Clé API à vérifier") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "swagger": if args.action == "add": await add_swagger_user(args.username, args.password, args.full_name) elif args.action == "list": await list_swagger_users() elif args.action == "delete": await delete_swagger_user(args.username) else: swagger_parser.print_help() elif args.command == "apikey": if args.action == "create": await create_api_key( args.name, args.description, args.days, args.rate_limit, args.endpoints, ) elif args.action == "list": await list_api_keys() elif args.action == "revoke": await revoke_api_key(args.key_id) elif args.action == "verify": await verify_api_key_cmd(args.api_key) else: apikey_parser.print_help() if __name__ == "__main__": from datetime import datetime asyncio.run(main())