Sage100-vps/scripts/manage_security.py

264 lines
8.6 KiB
Python

import asyncio
import sys
from pathlib import Path
import argparse
sys.path.insert(0, str(Path(__file__).parent.parent))
from database import get_session
from database.models.api_key import SwaggerUser
from services.api_key import ApiKeyService
from security.auth import hash_password
from sqlalchemy import select
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def add_swagger_user(username: str, password: str, full_name: str = None):
"""Ajouter un utilisateur Swagger"""
async with get_session() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
existing = result.scalar_one_or_none()
if existing:
logger.error(f" L'utilisateur {username} existe déjà")
return
user = SwaggerUser(
username=username,
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
)
session.add(user)
await session.commit()
logger.info(f" Utilisateur Swagger créé: {username}")
print("\n Utilisateur créé avec succès")
print(f" Username: {username}")
print(" Accès: https://votre-serveur/docs")
async def list_swagger_users():
"""Lister les utilisateurs Swagger"""
async with get_session() as session:
result = await session.execute(select(SwaggerUser))
users = result.scalars().all()
if not users:
print("Aucun utilisateur Swagger trouvé")
return
print(f"\n {len(users)} utilisateur(s) Swagger:\n")
for user in users:
status = " Actif" if user.is_active else " Inactif"
print(f"{user.username:<20} {status}")
if user.full_name:
print(f" Nom: {user.full_name}")
if user.last_login:
print(f" Dernière connexion: {user.last_login}")
print()
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with get_session() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur {username} introuvable")
return
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur supprimé: {username}")
async def create_api_key(
name: str,
description: str = None,
expires_in_days: int = 365,
rate_limit: int = 60,
endpoints: list = None,
):
"""Créer une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
name=name,
description=description,
created_by="CLI",
expires_in_days=expires_in_days,
rate_limit_per_minute=rate_limit,
allowed_endpoints=endpoints,
)
print("\n Clé API créée avec succès\n")
print(f" ID: {api_key_obj.id}")
print(f" Nom: {name}")
print(f" Clé: {api_key_plain}")
print(f" Préfixe: {api_key_obj.key_prefix}")
print(f" Rate limit: {rate_limit} req/min")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}")
print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n")
async def list_api_keys():
"""Lister les clés API"""
async with get_session() as session:
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
print("Aucune clé API trouvée")
return
print(f"\n {len(keys)} clé(s) API:\n")
for key in keys:
status = "" if key.is_active else ""
expired = (
"⏰ Expirée"
if key.expires_at and key.expires_at < datetime.now()
else ""
)
print(f" {status} {key.name:<30} ({key.key_prefix}...)")
print(f" ID: {key.id}")
print(f" Requêtes: {key.total_requests}")
print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if expired:
print(f" {expired}")
print()
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key = await service.get_by_id(key_id)
if not api_key:
logger.error(f" Clé {key_id} introuvable")
return
success = await service.revoke_api_key(key_id)
if success:
logger.info(f" Clé révoquée: {api_key.name}")
print(f"\n Clé '{api_key.name}' révoquée avec succès")
else:
logger.error(" Erreur lors de la révocation")
async def verify_api_key_cmd(api_key: str):
"""Vérifier une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
if api_key_obj:
print("\n Clé API valide\n")
print(f" Nom: {api_key_obj.name}")
print(f" ID: {api_key_obj.id}")
print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
print(f" Requêtes: {api_key_obj.total_requests}")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n")
else:
print("\n Clé API invalide, expirée ou révoquée\n")
async def main():
parser = argparse.ArgumentParser(
description="Gestion de la sécurité Sage Dataven API"
)
subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles")
swagger_parser = subparsers.add_parser(
"swagger", help="Gestion utilisateurs Swagger"
)
swagger_subparsers = swagger_parser.add_subparsers(dest="action")
swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur")
swagger_add.add_argument("username", help="Nom d'utilisateur")
swagger_add.add_argument("password", help="Mot de passe")
swagger_add.add_argument("--full-name", help="Nom complet")
swagger_subparsers.add_parser("list", help="Lister les utilisateurs")
swagger_delete = swagger_subparsers.add_parser(
"delete", help="Supprimer un utilisateur"
)
swagger_delete.add_argument("username", help="Nom d'utilisateur")
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_subparsers = apikey_parser.add_subparsers(dest="action")
apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API")
apikey_create.add_argument("name", help="Nom de la clé")
apikey_create.add_argument("--description", help="Description")
apikey_create.add_argument(
"--days", type=int, default=365, help="Expiration en jours"
)
apikey_create.add_argument(
"--rate-limit", type=int, default=60, help="Limite req/min"
)
apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
apikey_subparsers.add_parser("list", help="Lister les clés")
apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé")
apikey_revoke.add_argument("key_id", help="ID de la clé")
apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé")
apikey_verify.add_argument("api_key", help="Clé API à vérifier")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
if args.command == "swagger":
if args.action == "add":
await add_swagger_user(args.username, args.password, args.full_name)
elif args.action == "list":
await list_swagger_users()
elif args.action == "delete":
await delete_swagger_user(args.username)
else:
swagger_parser.print_help()
elif args.command == "apikey":
if args.action == "create":
await create_api_key(
args.name,
args.description,
args.days,
args.rate_limit,
args.endpoints,
)
elif args.action == "list":
await list_api_keys()
elif args.action == "revoke":
await revoke_api_key(args.key_id)
elif args.action == "verify":
await verify_api_key_cmd(args.api_key)
else:
apikey_parser.print_help()
if __name__ == "__main__":
from datetime import datetime
asyncio.run(main())