diff --git a/middleware/security.py b/middleware/security.py index 2bea533..493ed3e 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -4,7 +4,7 @@ from fastapi.security import HTTPBasic, HTTPBasicCredentials from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import ASGIApp from sqlalchemy import select -from typing import Callable +from typing import Optional, Callable from datetime import datetime import logging import base64 diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 17df3af..52b21c7 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,7 +1,6 @@ import asyncio import sys from pathlib import Path - from database import get_session from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService @@ -31,7 +30,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): existing = result.scalar_one_or_none() if existing: - logger.error(f" L'utilisateur '{username}' existe déjà") + logger.error(f"❌ L'utilisateur '{username}' existe déjà") return swagger_user = SwaggerUser( @@ -44,7 +43,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): session.add(swagger_user) await session.commit() - logger.info(f" Utilisateur Swagger créé: {username}") + logger.info(f"✅ Utilisateur Swagger créé: {username}") logger.info(f" Nom complet: {swagger_user.full_name}") logger.info(f" Actif: {swagger_user.is_active}") @@ -59,13 +58,13 @@ async def list_swagger_users(): users = result.scalars().all() if not users: - logger.info(" Aucun utilisateur Swagger") + logger.info("📭 Aucun utilisateur Swagger") break - logger.info(f" {len(users)} utilisateur(s) Swagger:\n") + logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = "" if user.is_active else "" + status = "✅" if user.is_active else "❌" logger.info(f" {status} {user.username}") logger.info(f" Nom: {user.full_name}") logger.info(f" Créé: {user.created_at}") @@ -85,13 +84,13 @@ async def delete_swagger_user(username: str): user = result.scalar_one_or_none() if not user: - logger.error(f" Utilisateur '{username}' introuvable") + logger.error(f"❌ Utilisateur '{username}' introuvable") break await session.delete(user) await session.commit() - logger.info(f" Utilisateur Swagger supprimé: {username}") + logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") break @@ -116,9 +115,9 @@ async def create_api_key( allowed_endpoints=endpoints, ) - logger.info("=" * 60) - logger.info(" Clé API créée avec succès") - logger.info("=" * 60) + logger.info("=" * 70) + logger.info("🔑 Clé API créée avec succès") + logger.info("=" * 70) logger.info(f" ID: {api_key_obj.id}") logger.info(f" Nom: {api_key_obj.name}") logger.info(f" Clé: {api_key_plain}") @@ -128,15 +127,16 @@ async def create_api_key( logger.info(f" Expire le: {api_key_obj.expires_at}") if api_key_obj.allowed_endpoints: - logger.info( - f" Endpoints autorisés: {', '.join(api_key_obj.allowed_endpoints)}" - ) + import json + + endpoints_list = json.loads(api_key_obj.allowed_endpoints) + logger.info(f" Endpoints autorisés: {', '.join(endpoints_list)}") else: logger.info(" Endpoints autorisés: Tous") - logger.info("=" * 60) - logger.info(" IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !") - logger.info("=" * 60) + logger.info("=" * 70) + logger.info("⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !") + logger.info("=" * 70) break @@ -149,17 +149,17 @@ async def list_api_keys(): keys = await service.list_api_keys() if not keys: - logger.info(" Aucune clé API") + logger.info("📭 Aucune clé API") break - logger.info(f" {len(keys)} clé(s) API:\n") + logger.info(f"🔑 {len(keys)} clé(s) API:\n") for key in keys: status = ( - "" + "✅" if key.is_active and (not key.expires_at or key.expires_at > datetime.now()) - else "" + else "❌" ) logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") @@ -171,9 +171,15 @@ async def list_api_keys(): logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: - logger.info( - f" Endpoints: {', '.join(key.allowed_endpoints[:3])}..." - ) + import json + + try: + endpoints = json.loads(key.allowed_endpoints) + logger.info( + f" Endpoints: {', '.join(endpoints[:5])}{'...' if len(endpoints) > 5 else ''}" + ) + except: + pass logger.info("") @@ -190,13 +196,13 @@ async def revoke_api_key(key_id: str): key = result.scalar_one_or_none() if not key: - logger.error(f" Clé API '{key_id}' introuvable") + logger.error(f"❌ Clé API '{key_id}' introuvable") break key.is_active = False await session.commit() - logger.info(f" Clé API révoquée: {key.name}") + logger.info(f"🗑️ Clé API révoquée: {key.name}") logger.info(f" ID: {key.id}") logger.info(f" Préfixe: {key.key_prefix}") @@ -212,11 +218,11 @@ async def verify_api_key(api_key: str): key = await service.verify_api_key(api_key) if not key: - logger.error(" Clé API invalide ou expirée") + logger.error("❌ Clé API invalide ou expirée") break logger.info("=" * 60) - logger.info(" Clé API valide") + logger.info("✅ Clé API valide") logger.info("=" * 60) logger.info(f" Nom: {key.name}") logger.info(f" ID: {key.id}") @@ -224,6 +230,18 @@ async def verify_api_key(api_key: str): logger.info(f" Requêtes totales: {key.total_requests}") logger.info(f" Expire le: {key.expires_at or 'Jamais'}") logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + + if key.allowed_endpoints: + import json + + try: + endpoints = json.loads(key.allowed_endpoints) + logger.info(f" Endpoints autorisés: {endpoints}") + except: + pass + else: + logger.info(" Endpoints autorisés: Tous") + logger.info("=" * 60) break @@ -267,7 +285,7 @@ async def main(): create_parser.add_argument( "--endpoints", nargs="+", - help="Endpoints autorisés (ex: /clients /articles)", + help="Endpoints autorisés (ex: /clients /articles /devis/*)", ) apikey_subparsers.add_parser("list", help="Lister les clés API") @@ -317,10 +335,10 @@ if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - logger.info("\n Interrupted") + logger.info("\n⏹️ Interrupted") sys.exit(0) except Exception as e: - logger.error(f" Erreur: {e}") + logger.error(f"❌ Erreur: {e}") import traceback traceback.print_exc()