Sage100-vps/scripts/manage_security.py

446 lines
14 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script de gestion avancée des utilisateurs Swagger et API Keys
avec configuration des schémas d'authentification
"""
import sys
import os
from pathlib import Path
import asyncio
import argparse
import logging
from datetime import datetime
from typing import Optional, List
import json
from sqlalchemy import select
_current_file = Path(__file__).resolve()
_script_dir = _current_file.parent
_app_dir = _script_dir.parent
if str(_app_dir) in sys.path:
sys.path.remove(str(_app_dir))
sys.path.insert(0, str(_app_dir))
os.chdir(str(_app_dir))
try:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService
from security.auth import hash_password
except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}")
print(" Vérifiez que vous êtes dans /app")
sys.exit(1)
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
AVAILABLE_TAGS = {
"Authentication": "🔐 Authentification et gestion des comptes",
"API Keys Management": " Gestion des clés API",
"Clients": "👥 Gestion des clients",
"Fournisseurs": "🏭 Gestion des fournisseurs",
"Prospects": "🎯 Gestion des prospects",
"Tiers": "📋 Gestion générale des tiers",
"Contacts": "📞 Contacts des tiers",
"Articles": "📦 Catalogue articles",
"Familles": "🏷️ Familles d'articles",
"Stock": "📊 Mouvements de stock",
"Devis": "📄 Devis",
"Commandes": "🛒 Commandes",
"Livraisons": "🚚 Bons de livraison",
"Factures": "💰 Factures",
"Avoirs": "↩️ Avoirs",
"Règlements": "💳 Règlements et encaissements",
"Workflows": "🔄 Transformations de documents",
"Documents": "📑 Gestion documents (PDF)",
"Emails": "📧 Envoi d'emails",
"Validation": " Validations métier",
"Collaborateurs": "👔 Collaborateurs internes",
"Société": "🏢 Informations société",
"Référentiels": " Données de référence",
"System": "⚙️ Système et santé",
"Admin": "🛠️ Administration",
"Debug": "🐛 Debug et diagnostics",
}
PRESET_PROFILES = {
"commercial": [
"Clients",
"Contacts",
"Devis",
"Commandes",
"Factures",
"Articles",
"Documents",
"Emails",
],
"comptable": [
"Clients",
"Fournisseurs",
"Factures",
"Avoirs",
"Règlements",
"Documents",
"Emails",
],
"logistique": [
"Articles",
"Stock",
"Commandes",
"Livraisons",
"Fournisseurs",
"Documents",
],
"readonly": ["Clients", "Articles", "Devis", "Commandes", "Factures", "Documents"],
"developer": [
"Authentication",
"API Keys Management",
"System",
"Clients",
"Articles",
"Devis",
"Commandes",
"Factures",
],
}
async def add_swagger_user(
username: str,
password: str,
full_name: str = None,
tags: Optional[List[str]] = None,
preset: Optional[str] = None,
):
"""Ajouter un utilisateur Swagger avec configuration avancée"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
existing = result.scalar_one_or_none()
if existing:
logger.error(f" L'utilisateur '{username}' existe déjà")
return
if preset:
if preset not in PRESET_PROFILES:
logger.error(
f" Preset '{preset}' inconnu. Disponibles: {list(PRESET_PROFILES.keys())}"
)
return
tags = PRESET_PROFILES[preset]
logger.info(f"📋 Application du preset '{preset}': {len(tags)} tags")
swagger_user = SwaggerUser(
username=username,
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
allowed_tags=json.dumps(tags) if tags else None,
)
session.add(swagger_user)
await session.commit()
logger.info(f" Utilisateur Swagger créé: {username}")
logger.info(f" Nom complet: {swagger_user.full_name}")
if tags:
logger.info(f" 🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
else:
logger.info(" 👑 Accès ADMIN COMPLET (tous les tags)")
async def list_swagger_users():
"""Lister tous les utilisateurs Swagger avec détails"""
async with async_session_factory() as session:
result = await session.execute(select(SwaggerUser))
users = result.scalars().all()
if not users:
logger.info("🔭 Aucun utilisateur Swagger")
return
logger.info(f"\n👥 {len(users)} utilisateur(s) Swagger:\n")
logger.info("=" * 80)
for user in users:
status = " ACTIF" if user.is_active else " NON ACTIF"
logger.info(f"\n{status} {user.username}")
logger.info(f"📛 Nom: {user.full_name}")
logger.info(f"🆔 ID: {user.id}")
logger.info(f"📅 Créé: {user.created_at}")
logger.info(f"🕐 Dernière connexion: {user.last_login or 'Jamais'}")
if user.allowed_tags:
try:
tags = json.loads(user.allowed_tags)
if tags:
logger.info(f"🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
auth_schemes = []
if "Authentication" in tags:
auth_schemes.append("JWT (Bearer)")
if "API Keys Management" in tags or len(tags) > 3:
auth_schemes.append("X-API-Key")
if not auth_schemes:
auth_schemes.append("JWT (Bearer)")
logger.info(
f"🔐 Authentification autorisée: {', '.join(auth_schemes)}"
)
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)")
except json.JSONDecodeError:
logger.info("⚠️ Tags: Erreur format")
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)")
logger.info("\n" + "=" * 80)
async def update_swagger_user(
username: str,
add_tags: Optional[List[str]] = None,
remove_tags: Optional[List[str]] = None,
set_tags: Optional[List[str]] = None,
preset: Optional[str] = None,
active: Optional[bool] = None,
):
"""Mettre à jour un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
modified = False
if preset:
if preset not in PRESET_PROFILES:
logger.error(f" Preset '{preset}' inconnu")
return
user.allowed_tags = json.dumps(PRESET_PROFILES[preset])
logger.info(f"📋 Preset '{preset}' appliqué")
modified = True
elif set_tags is not None:
user.allowed_tags = json.dumps(set_tags) if set_tags else None
logger.info(f"🔄 Tags remplacés: {len(set_tags) if set_tags else 0}")
modified = True
elif add_tags or remove_tags:
current_tags = []
if user.allowed_tags:
try:
current_tags = json.loads(user.allowed_tags)
except json.JSONDecodeError:
current_tags = []
if add_tags:
for tag in add_tags:
if tag not in current_tags:
current_tags.append(tag)
logger.info(f" Tag ajouté: {tag}")
modified = True
if remove_tags:
for tag in remove_tags:
if tag in current_tags:
current_tags.remove(tag)
logger.info(f" Tag retiré: {tag}")
modified = True
user.allowed_tags = json.dumps(current_tags) if current_tags else None
if active is not None:
user.is_active = active
logger.info(f"🔄 Statut: {'ACTIF' if active else 'INACTIF'}")
modified = True
if modified:
await session.commit()
logger.info(f" Utilisateur '{username}' mis à jour")
else:
logger.info(" Aucune modification effectuée")
async def list_available_tags():
"""Liste tous les tags disponibles avec description"""
logger.info("\n TAGS DISPONIBLES:\n")
logger.info("=" * 80)
for tag, desc in AVAILABLE_TAGS.items():
logger.info(f" {desc}")
logger.info(f" Nom: {tag}\n")
logger.info("=" * 80)
logger.info("\n📦 PRESETS DISPONIBLES:\n")
for preset_name, tags in PRESET_PROFILES.items():
logger.info(f" {preset_name}:")
logger.info(f" {', '.join(tags)}\n")
logger.info("=" * 80)
async def delete_swagger_user(username: str):
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
async def main():
parser = argparse.ArgumentParser(
description="Gestion avancée des utilisateurs Swagger et clés API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
EXEMPLES D'UTILISATION:
1. Créer un utilisateur avec preset:
python scripts/manage_security.py swagger add commercial Pass123! --preset commercial
2. Créer un admin complet:
python scripts/manage_security.py swagger add admin AdminPass
3. Créer avec tags spécifiques:
python scripts/manage_security.py swagger add client Pass123! --tags Clients Devis Factures
4. Mettre à jour un utilisateur (ajouter des tags):
python scripts/manage_security.py swagger update client --add-tags Commandes Livraisons
5. Changer complètement les tags:
python scripts/manage_security.py swagger update client --set-tags Clients Articles
6. Appliquer un preset:
python scripts/manage_security.py swagger update client --preset comptable
7. Lister les tags disponibles:
python scripts/manage_security.py swagger tags
8. Désactiver temporairement:
python scripts/manage_security.py swagger update client --inactive
""",
)
subparsers = parser.add_subparsers(dest="command", help="Commandes")
swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger")
swagger_sub = swagger_parser.add_subparsers(dest="swagger_command")
add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur")
add_p.add_argument("username", help="Nom d'utilisateur")
add_p.add_argument("password", help="Mot de passe")
add_p.add_argument("--full-name", help="Nom complet", default=None)
add_p.add_argument(
"--tags",
nargs="*",
help="Tags autorisés. Vide = admin complet",
default=None,
)
add_p.add_argument(
"--preset",
choices=list(PRESET_PROFILES.keys()),
help="Appliquer un preset de tags",
)
update_p = swagger_sub.add_parser("update", help="Mettre à jour utilisateur")
update_p.add_argument("username", help="Nom d'utilisateur")
update_p.add_argument("--add-tags", nargs="+", help="Ajouter des tags")
update_p.add_argument("--remove-tags", nargs="+", help="Retirer des tags")
update_p.add_argument("--set-tags", nargs="*", help="Définir les tags (remplace)")
update_p.add_argument(
"--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer preset"
)
update_p.add_argument("--active", action="store_true", help="Activer l'utilisateur")
update_p.add_argument(
"--inactive", action="store_true", help="Désactiver l'utilisateur"
)
swagger_sub.add_parser("list", help="Lister utilisateurs")
del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur")
del_p.add_argument("username", help="Nom d'utilisateur")
swagger_sub.add_parser("tags", help="Lister les tags disponibles")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
if args.command == "swagger":
if args.swagger_command == "add":
await add_swagger_user(
args.username,
args.password,
args.full_name,
args.tags,
args.preset,
)
elif args.swagger_command == "update":
active = None
if args.active:
active = True
elif args.inactive:
active = False
await update_swagger_user(
args.username,
add_tags=args.add_tags,
remove_tags=args.remove_tags,
set_tags=args.set_tags,
preset=args.preset,
active=active,
)
elif args.swagger_command == "list":
await list_swagger_users()
elif args.swagger_command == "delete":
await delete_swagger_user(args.username)
elif args.swagger_command == "tags":
await list_available_tags()
else:
swagger_parser.print_help()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n Interrupted")
sys.exit(0)
except Exception as e:
logger.error(f" Erreur: {e}")
import traceback
traceback.print_exc()
sys.exit(1)