#!/usr/bin/env python3 """ Script de gestion avancée des utilisateurs Swagger et API Keys avec configuration des schémas d'authentification """ import sys import os from pathlib import Path import asyncio import argparse import logging from datetime import datetime from typing import Optional, List import json from sqlalchemy import select _current_file = Path(__file__).resolve() _script_dir = _current_file.parent _app_dir = _script_dir.parent if str(_app_dir) in sys.path: sys.path.remove(str(_app_dir)) sys.path.insert(0, str(_app_dir)) os.chdir(str(_app_dir)) try: from database.db_config import async_session_factory from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password except ImportError as e: print(f"\n ERREUR D'IMPORT: {e}") print(" Vérifiez que vous êtes dans /app") sys.exit(1) logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) AVAILABLE_TAGS = { "Authentication": "🔐 Authentification et gestion des comptes", "API Keys Management": " Gestion des clés API", "Clients": "👥 Gestion des clients", "Fournisseurs": "🏭 Gestion des fournisseurs", "Prospects": "🎯 Gestion des prospects", "Tiers": "📋 Gestion générale des tiers", "Contacts": "📞 Contacts des tiers", "Articles": "📦 Catalogue articles", "Familles": "🏷️ Familles d'articles", "Stock": "📊 Mouvements de stock", "Devis": "📄 Devis", "Commandes": "🛒 Commandes", "Livraisons": "🚚 Bons de livraison", "Factures": "💰 Factures", "Avoirs": "↩️ Avoirs", "Règlements": "💳 Règlements et encaissements", "Workflows": "🔄 Transformations de documents", "Documents": "📑 Gestion documents (PDF)", "Emails": "📧 Envoi d'emails", "Validation": " Validations métier", "Collaborateurs": "👔 Collaborateurs internes", "Société": "🏢 Informations société", "Référentiels": " Données de référence", "System": "⚙️ Système et santé", "Admin": "🛠️ Administration", "Debug": "🐛 Debug et diagnostics", } PRESET_PROFILES = { "commercial": [ "Clients", "Contacts", "Devis", "Commandes", "Factures", "Articles", "Documents", "Emails", ], "comptable": [ "Clients", "Fournisseurs", "Factures", "Avoirs", "Règlements", "Documents", "Emails", ], "logistique": [ "Articles", "Stock", "Commandes", "Livraisons", "Fournisseurs", "Documents", ], "readonly": ["Clients", "Articles", "Devis", "Commandes", "Factures", "Documents"], "developer": [ "Authentication", "API Keys Management", "System", "Clients", "Articles", "Devis", "Commandes", "Factures", ], } async def add_swagger_user( username: str, password: str, full_name: str = None, tags: Optional[List[str]] = None, preset: Optional[str] = None, ): """Ajouter un utilisateur Swagger avec configuration avancée""" async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: logger.error(f" L'utilisateur '{username}' existe déjà") return if preset: if preset not in PRESET_PROFILES: logger.error( f" Preset '{preset}' inconnu. Disponibles: {list(PRESET_PROFILES.keys())}" ) return tags = PRESET_PROFILES[preset] logger.info(f"📋 Application du preset '{preset}': {len(tags)} tags") swagger_user = SwaggerUser( username=username, hashed_password=hash_password(password), full_name=full_name or username, is_active=True, allowed_tags=json.dumps(tags) if tags else None, ) session.add(swagger_user) await session.commit() logger.info(f" Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") if tags: logger.info(f" 🏷️ Tags autorisés ({len(tags)}):") for tag in tags: desc = AVAILABLE_TAGS.get(tag, "") logger.info(f" • {tag} {desc}") else: logger.info(" 👑 Accès ADMIN COMPLET (tous les tags)") async def list_swagger_users(): """Lister tous les utilisateurs Swagger avec détails""" async with async_session_factory() as session: result = await session.execute(select(SwaggerUser)) users = result.scalars().all() if not users: logger.info("🔭 Aucun utilisateur Swagger") return logger.info(f"\n👥 {len(users)} utilisateur(s) Swagger:\n") logger.info("=" * 80) for user in users: status = " ACTIF" if user.is_active else " NON ACTIF" logger.info(f"\n{status} {user.username}") logger.info(f"📛 Nom: {user.full_name}") logger.info(f"🆔 ID: {user.id}") logger.info(f"📅 Créé: {user.created_at}") logger.info(f"🕐 Dernière connexion: {user.last_login or 'Jamais'}") if user.allowed_tags: try: tags = json.loads(user.allowed_tags) if tags: logger.info(f"🏷️ Tags autorisés ({len(tags)}):") for tag in tags: desc = AVAILABLE_TAGS.get(tag, "") logger.info(f" • {tag} {desc}") auth_schemes = [] if "Authentication" in tags: auth_schemes.append("JWT (Bearer)") if "API Keys Management" in tags or len(tags) > 3: auth_schemes.append("X-API-Key") if not auth_schemes: auth_schemes.append("JWT (Bearer)") logger.info( f"🔐 Authentification autorisée: {', '.join(auth_schemes)}" ) else: logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)") logger.info("🔐 Authentification: JWT + X-API-Key (tout)") except json.JSONDecodeError: logger.info("⚠️ Tags: Erreur format") else: logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)") logger.info("🔐 Authentification: JWT + X-API-Key (tout)") logger.info("\n" + "=" * 80) async def update_swagger_user( username: str, add_tags: Optional[List[str]] = None, remove_tags: Optional[List[str]] = None, set_tags: Optional[List[str]] = None, preset: Optional[str] = None, active: Optional[bool] = None, ): """Mettre à jour un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: logger.error(f" Utilisateur '{username}' introuvable") return modified = False if preset: if preset not in PRESET_PROFILES: logger.error(f" Preset '{preset}' inconnu") return user.allowed_tags = json.dumps(PRESET_PROFILES[preset]) logger.info(f"📋 Preset '{preset}' appliqué") modified = True elif set_tags is not None: user.allowed_tags = json.dumps(set_tags) if set_tags else None logger.info(f"🔄 Tags remplacés: {len(set_tags) if set_tags else 0}") modified = True elif add_tags or remove_tags: current_tags = [] if user.allowed_tags: try: current_tags = json.loads(user.allowed_tags) except json.JSONDecodeError: current_tags = [] if add_tags: for tag in add_tags: if tag not in current_tags: current_tags.append(tag) logger.info(f"➕ Tag ajouté: {tag}") modified = True if remove_tags: for tag in remove_tags: if tag in current_tags: current_tags.remove(tag) logger.info(f"➖ Tag retiré: {tag}") modified = True user.allowed_tags = json.dumps(current_tags) if current_tags else None if active is not None: user.is_active = active logger.info(f"🔄 Statut: {'ACTIF' if active else 'INACTIF'}") modified = True if modified: await session.commit() logger.info(f" Utilisateur '{username}' mis à jour") else: logger.info("ℹ️ Aucune modification effectuée") async def list_available_tags(): """Liste tous les tags disponibles avec description""" logger.info("\n TAGS DISPONIBLES:\n") logger.info("=" * 80) for tag, desc in AVAILABLE_TAGS.items(): logger.info(f" {desc}") logger.info(f" Nom: {tag}\n") logger.info("=" * 80) logger.info("\n📦 PRESETS DISPONIBLES:\n") for preset_name, tags in PRESET_PROFILES.items(): logger.info(f" {preset_name}:") logger.info(f" {', '.join(tags)}\n") logger.info("=" * 80) async def delete_swagger_user(username: str): async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) user = result.scalar_one_or_none() if not user: logger.error(f" Utilisateur '{username}' introuvable") return await session.delete(user) await session.commit() logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") async def main(): parser = argparse.ArgumentParser( description="Gestion avancée des utilisateurs Swagger et clés API", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" EXEMPLES D'UTILISATION: 1. Créer un utilisateur avec preset: python scripts/manage_security.py swagger add commercial Pass123! --preset commercial 2. Créer un admin complet: python scripts/manage_security.py swagger add admin AdminPass 3. Créer avec tags spécifiques: python scripts/manage_security.py swagger add client Pass123! --tags Clients Devis Factures 4. Mettre à jour un utilisateur (ajouter des tags): python scripts/manage_security.py swagger update client --add-tags Commandes Livraisons 5. Changer complètement les tags: python scripts/manage_security.py swagger update client --set-tags Clients Articles 6. Appliquer un preset: python scripts/manage_security.py swagger update client --preset comptable 7. Lister les tags disponibles: python scripts/manage_security.py swagger tags 8. Désactiver temporairement: python scripts/manage_security.py swagger update client --inactive """, ) subparsers = parser.add_subparsers(dest="command", help="Commandes") swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger") swagger_sub = swagger_parser.add_subparsers(dest="swagger_command") add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("password", help="Mot de passe") add_p.add_argument("--full-name", help="Nom complet", default=None) add_p.add_argument( "--tags", nargs="*", help="Tags autorisés. Vide = admin complet", default=None, ) add_p.add_argument( "--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer un preset de tags", ) update_p = swagger_sub.add_parser("update", help="Mettre à jour utilisateur") update_p.add_argument("username", help="Nom d'utilisateur") update_p.add_argument("--add-tags", nargs="+", help="Ajouter des tags") update_p.add_argument("--remove-tags", nargs="+", help="Retirer des tags") update_p.add_argument("--set-tags", nargs="*", help="Définir les tags (remplace)") update_p.add_argument( "--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer preset" ) update_p.add_argument("--active", action="store_true", help="Activer l'utilisateur") update_p.add_argument( "--inactive", action="store_true", help="Désactiver l'utilisateur" ) swagger_sub.add_parser("list", help="Lister utilisateurs") del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur") del_p.add_argument("username", help="Nom d'utilisateur") swagger_sub.add_parser("tags", help="Lister les tags disponibles") args = parser.parse_args() if not args.command: parser.print_help() return if args.command == "swagger": if args.swagger_command == "add": await add_swagger_user( args.username, args.password, args.full_name, args.tags, args.preset, ) elif args.swagger_command == "update": active = None if args.active: active = True elif args.inactive: active = False await update_swagger_user( args.username, add_tags=args.add_tags, remove_tags=args.remove_tags, set_tags=args.set_tags, preset=args.preset, active=active, ) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete": await delete_swagger_user(args.username) elif args.swagger_command == "tags": await list_available_tags() else: swagger_parser.print_help() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nℹ️ Interrupted") sys.exit(0) except Exception as e: logger.error(f" Erreur: {e}") import traceback traceback.print_exc() sys.exit(1)