import sys import os from pathlib import Path _current_file = Path(__file__).resolve() _script_dir = _current_file.parent _app_dir = _script_dir.parent print(f"DEBUG: Script path: {_current_file}") print(f"DEBUG: App dir: {_app_dir}") print(f"DEBUG: Current working dir: {os.getcwd()}") if str(_app_dir) in sys.path: sys.path.remove(str(_app_dir)) sys.path.insert(0, str(_app_dir)) os.chdir(str(_app_dir)) print(f"DEBUG: sys.path[0]: {sys.path[0]}") print(f"DEBUG: New working dir: {os.getcwd()}") _test_imports = [ "database", "database.db_config", "database.models", "services", "security", ] print("\nDEBUG: Vérification des imports...") for module in _test_imports: try: __import__(module) print(f" {module}") except ImportError as e: print(f" {module}: {e}") import asyncio import argparse import logging from datetime import datetime from sqlalchemy import select try: from database.db_config import async_session_factory from database.models.user import User from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password except ImportError as e: print(f"\n ERREUR D'IMPORT: {e}") print(f" Vérifiez que vous êtes dans /app") print(f" Commande correcte: cd /app && python scripts/manage_security.py ...") sys.exit(1) logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: logger.error(f" L'utilisateur '{username}' existe déjà") return swagger_user = SwaggerUser( username=username, hashed_password=hash_password(password), full_name=full_name or username, is_active=True, ) session.add(swagger_user) await session.commit() logger.info(f" Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") async def list_swagger_users(): """Lister tous les utilisateurs Swagger""" async with async_session_factory() as session: result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: logger.info("🔭 Aucun utilisateur Swagger") return logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: status = "" if user.is_active else "" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n") async def delete_swagger_user(username: str): """Supprimer un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: logger.error(f" Utilisateur '{username}' introuvable") return await session.delete(user) await session.commit() logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") async def create_api_key( name: str, description: str = None, expires_in_days: int = 365, rate_limit: int = 60, endpoints: list = None, ): """Créer une clé API""" async with async_session_factory() as session: service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( name=name, description=description, created_by="cli", expires_in_days=expires_in_days, rate_limit_per_minute=rate_limit, allowed_endpoints=endpoints, ) logger.info("=" * 70) logger.info("🔑 Clé API créée avec succès") logger.info("=" * 70) logger.info(f" ID: {api_key_obj.id}") logger.info(f" Nom: {api_key_obj.name}") logger.info(f" Clé: {api_key_plain}") logger.info(f" Préfixe: {api_key_obj.key_prefix}") logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") logger.info(f" Expire le: {api_key_obj.expires_at}") if api_key_obj.allowed_endpoints: import json try: endpoints_list = json.loads(api_key_obj.allowed_endpoints) logger.info(f" Endpoints: {', '.join(endpoints_list)}") except: logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}") else: logger.info(" Endpoints: Tous (aucune restriction)") logger.info("=" * 70) logger.info(" SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !") logger.info("=" * 70) async def list_api_keys(): """Lister toutes les clés API""" async with async_session_factory() as session: service = ApiKeyService(session) keys = await service.list_api_keys() if not keys: logger.info("🔭 Aucune clé API") return logger.info(f"🔑 {len(keys)} clé(s) API:\n") for key in keys: is_valid = key.is_active and ( not key.expires_at or key.expires_at > datetime.now() ) status = "" if is_valid else "" logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes: {key.total_requests}") logger.info(f" Expire: {key.expires_at or 'Jamais'}") logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: import json try: endpoints = json.loads(key.allowed_endpoints) display = ", ".join(endpoints[:4]) if len(endpoints) > 4: display += f"... (+{len(endpoints) - 4})" logger.info(f" Endpoints: {display}") except: pass else: logger.info(" Endpoints: Tous") logger.info("") async def revoke_api_key(key_id: str): """Révoquer une clé API""" async with async_session_factory() as session: result = await session.execute(select(ApiKey).where(ApiKey.id == key_id)) key = result.scalar_one_or_none() if not key: logger.error(f" Clé API '{key_id}' introuvable") return key.is_active = False key.revoked_at = datetime.now() await session.commit() logger.info(f"🗑️ Clé API révoquée: {key.name}") logger.info(f" ID: {key.id}") async def verify_api_key(api_key: str): """Vérifier une clé API""" async with async_session_factory() as session: service = ApiKeyService(session) key = await service.verify_api_key(api_key) if not key: logger.error(" Clé API invalide ou expirée") return logger.info("=" * 60) logger.info(" Clé API valide") logger.info("=" * 60) logger.info(f" Nom: {key.name}") logger.info(f" ID: {key.id}") logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") logger.info(f" Requêtes totales: {key.total_requests}") logger.info(f" Expire: {key.expires_at or 'Jamais'}") if key.allowed_endpoints: import json try: endpoints = json.loads(key.allowed_endpoints) logger.info(f" Endpoints autorisés: {endpoints}") except: pass else: logger.info(" Endpoints autorisés: Tous") logger.info("=" * 60) async def main(): parser = argparse.ArgumentParser( description="Gestion des utilisateurs Swagger et clés API", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: python scripts/manage_security.py swagger add admin MyP@ssw0rd python scripts/manage_security.py swagger list python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" python scripts/manage_security.py apikey list python scripts/manage_security.py apikey verify sdk_live_xxxxx """, ) subparsers = parser.add_subparsers(dest="command", help="Commandes") swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger") swagger_sub = swagger_parser.add_subparsers(dest="swagger_command") add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("password", help="Mot de passe") add_p.add_argument("--full-name", help="Nom complet") swagger_sub.add_parser("list", help="Lister utilisateurs") del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur") del_p.add_argument("username", help="Nom d'utilisateur") apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") apikey_sub = apikey_parser.add_subparsers(dest="apikey_command") create_p = apikey_sub.add_parser("create", help="Créer clé API") create_p.add_argument("name", help="Nom de la clé") create_p.add_argument("--description", help="Description") create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)") create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min") create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") apikey_sub.add_parser("list", help="Lister clés") rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé") rev_p.add_argument("key_id", help="ID de la clé") ver_p = apikey_sub.add_parser("verify", help="Vérifier clé") ver_p.add_argument("api_key", help="Clé API complète") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "swagger": if args.swagger_command == "add": await add_swagger_user(args.username, args.password, args.full_name) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": await delete_swagger_user(args.username) else: swagger_parser.print_help() elif args.command == "apikey": if args.apikey_command == "create": await create_api_key( name=args.name, description=args.description, expires_in_days=args.days, rate_limit=args.rate_limit, endpoints=args.endpoints, ) elif args.apikey_command == "list": await list_api_keys() elif args.apikey_command == "revoke": await revoke_api_key(args.key_id) elif args.apikey_command == "verify": await verify_api_key(args.api_key) else: apikey_parser.print_help() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nℹ️ Interrupted") sys.exit(0) except Exception as e: logger.error(f" Erreur: {e}") import traceback traceback.print_exc() sys.exit(1)