diff --git a/config/cors_config.py b/config/cors_config.py index 6ee880e..0f3a4d2 100644 --- a/config/cors_config.py +++ b/config/cors_config.py @@ -1,17 +1,3 @@ -""" -CONFIGURATION CORS POUR API AVEC CLÉS API MULTIPLES - -Problématique: -- Les clés API seront utilisées depuis de nombreux domaines/IPs différents -- Impossible de lister tous les origins autorisés à l'avance -- Solution: CORS permissif mais sécurisé par les clés API - -Stratégies: -1. CORS ouvert avec validation par clé API (RECOMMANDÉ) -2. CORS dynamique basé sur whitelist -3. CORS avec wildcard et credentials=False -""" - from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from typing import List @@ -21,66 +7,24 @@ import logging logger = logging.getLogger(__name__) -# ============================================ -# STRATÉGIE 1: CORS OUVERT + VALIDATION API KEY -# ============================================ -# ✅ RECOMMANDÉ pour les API publiques avec clés -# -# Principe: -# - Accepter toutes les origines (allow_origins=["*"]) -# - La sécurité est assurée par la validation des clés API -# - Les clés API protègent l'accès, pas le CORS - - def configure_cors_open(app: FastAPI): - """ - Configuration CORS ouverte (RECOMMANDÉE) - - ✅ Accepte toutes les origines - ✅ Sécurité assurée par les clés API - ✅ Simplifie l'utilisation pour les clients - - ⚠️ Attention: credentials=False obligatoire avec allow_origins=["*"] - """ app.add_middleware( CORSMiddleware, - allow_origins=["*"], # Accepte toutes les origines - allow_credentials=False, # ⚠️ Obligatoire avec "*" + allow_origins=["*"], + allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], - allow_headers=["*"], # Accepte tous les headers (dont X-API-Key) + allow_headers=["*"], expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], - max_age=3600, # Cache preflight requests pendant 1h + max_age=3600, ) - logger.info("🌐 CORS configuré: Mode OUVERT (sécurisé par API Keys)") + logger.info(" CORS configuré: Mode OUVERT (sécurisé par API Keys)") logger.info(" - Origins: * (toutes)") logger.info(" - Headers: * (dont X-API-Key)") logger.info(" - Credentials: False") -# ============================================ -# STRATÉGIE 2: CORS AVEC WHITELIST DYNAMIQUE -# ============================================ -# 🔶 Pour environnements contrôlés -# -# Principe: -# - Lister explicitement les domaines autorisés -# - Peut inclure des patterns wildcards -# - Credentials possible (cookies, etc.) - - def configure_cors_whitelist(app: FastAPI): - """ - Configuration CORS avec whitelist (MODE CONTRÔLÉ) - - ✅ Meilleur contrôle des origines - ✅ Credentials possible - ❌ Nécessite maintenance de la liste - - À utiliser si vous connaissez tous les domaines clients - """ - - # Charger depuis .env ou config allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "") if allowed_origins_str: @@ -90,57 +34,11 @@ def configure_cors_whitelist(app: FastAPI): if origin.strip() ] else: - # Valeurs par défaut - allowed_origins = [ - "http://localhost:3000", # Frontend dev React/Vue - "http://localhost:5173", # Vite dev - "http://localhost:8080", # Frontend dev alternatif - "https://app.votre-domaine.com", - "https://admin.votre-domaine.com", - # Ajouter vos domaines de production - ] + allowed_origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, - allow_credentials=True, # ✅ Possible avec liste explicite - allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], - allow_headers=["Content-Type", "Authorization", "X-API-Key"], - expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], - max_age=3600, - ) - - logger.info("🌐 CORS configuré: Mode WHITELIST") - logger.info(f" - Origins autorisées: {len(allowed_origins)}") - for origin in allowed_origins: - logger.info(f" • {origin}") - - -# ============================================ -# STRATÉGIE 3: CORS AVEC REGEX (AVANCÉ) -# ============================================ -# 🔶 Pour patterns complexes -# -# Exemple: Autoriser tous les sous-domaines *.votre-domaine.com - - -def configure_cors_regex(app: FastAPI): - """ - Configuration CORS avec patterns regex (AVANCÉ) - - ✅ Flexible pour sous-domaines - ✅ Supporte patterns complexes - ❌ Plus complexe à configurer - - Utilise allow_origin_regex au lieu de allow_origins - """ - - # Pattern regex pour autoriser tous les sous-domaines - origin_regex = r"https://.*\.votre-domaine\.com" - - app.add_middleware( - CORSMiddleware, - allow_origin_regex=origin_regex, # ⚠️ Utilise regex au lieu de liste allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "X-API-Key"], @@ -148,32 +46,31 @@ def configure_cors_regex(app: FastAPI): max_age=3600, ) - logger.info("🌐 CORS configuré: Mode REGEX") + logger.info(" CORS configuré: Mode WHITELIST") + logger.info(f" - Origins autorisées: {len(allowed_origins)}") + for origin in allowed_origins: + logger.info(f" • {origin}") + + +def configure_cors_regex(app: FastAPI): + origin_regex = r"*" + + app.add_middleware( + CORSMiddleware, + allow_origin_regex=origin_regex, + allow_credentials=True, + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["Content-Type", "Authorization", "X-API-Key"], + expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], + max_age=3600, + ) + + logger.info(" CORS configuré: Mode REGEX") logger.info(f" - Pattern: {origin_regex}") -# ============================================ -# STRATÉGIE 4: CORS HYBRIDE (PRODUCTION) -# ============================================ -# ✅ RECOMMANDÉ pour production -# -# Principe: -# - Whitelist pour les domaines connus (credentials=True) -# - Fallback sur "*" pour le reste (credentials=False) - - def configure_cors_hybrid(app: FastAPI): - """ - Configuration CORS hybride (PRODUCTION) - - ✅ Meilleur des deux mondes - ✅ Whitelist pour domaines connus - ✅ Fallback ouvert pour API Keys externes - - Note: Nécessite un middleware custom pour gérer les deux modes - """ from starlette.middleware.base import BaseHTTPMiddleware - from starlette.responses import Response class HybridCORSMiddleware(BaseHTTPMiddleware): def __init__(self, app, known_origins: List[str]): @@ -183,7 +80,6 @@ def configure_cors_hybrid(app: FastAPI): async def dispatch(self, request, call_next): origin = request.headers.get("origin") - # Si origin connue → CORS strict avec credentials if origin in self.known_origins: response = await call_next(request) response.headers["Access-Control-Allow-Origin"] = origin @@ -196,7 +92,6 @@ def configure_cors_hybrid(app: FastAPI): ) return response - # Sinon → CORS ouvert sans credentials response = await call_next(request) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = ( @@ -205,40 +100,16 @@ def configure_cors_hybrid(app: FastAPI): response.headers["Access-Control-Allow-Headers"] = "*" return response - # Domaines connus (whitelist) - known_origins = [ - "https://app.votre-domaine.com", - "https://admin.votre-domaine.com", - "http://localhost:3000", - "http://localhost:5173", - ] + known_origins = ["*"] app.add_middleware(HybridCORSMiddleware, known_origins=known_origins) - logger.info("🌐 CORS configuré: Mode HYBRIDE") + logger.info(" CORS configuré: Mode HYBRIDE") logger.info(f" - Whitelist: {len(known_origins)} domaines") logger.info(" - Fallback: * (ouvert)") -# ============================================ -# FONCTION PRINCIPALE -# ============================================ - - def setup_cors(app: FastAPI, mode: str = "open"): - """ - Configure CORS selon le mode choisi - - Args: - app: Instance FastAPI - mode: "open" | "whitelist" | "regex" | "hybrid" - - Recommandations: - - Development: "open" - - Production (API publique): "open" ou "hybrid" - - Production (API interne): "whitelist" - """ - if mode == "open": configure_cors_open(app) elif mode == "whitelist": @@ -249,73 +120,6 @@ def setup_cors(app: FastAPI, mode: str = "open"): configure_cors_hybrid(app) else: logger.warning( - f"⚠️ Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." + f" Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." ) configure_cors_open(app) - - -# ============================================ -# EXEMPLE D'UTILISATION DANS api.py -# ============================================ - -""" -# Dans api.py - -from config.cors_config import setup_cors - -app = FastAPI(...) - -# DÉVELOPPEMENT -setup_cors(app, mode="open") - -# PRODUCTION (API publique avec clés) -setup_cors(app, mode="hybrid") - -# PRODUCTION (API interne uniquement) -setup_cors(app, mode="whitelist") -""" - - -# ============================================ -# VARIABLES D'ENVIRONNEMENT (.env) -# ============================================ - -""" -# Pour mode "whitelist" -CORS_ALLOWED_ORIGINS=https://app.example.com,https://admin.example.com,http://localhost:3000 - -# Pour mode "regex" -CORS_ORIGIN_REGEX=https://.*\.example\.com - -# Choisir le mode -CORS_MODE=open -""" - - -# ============================================ -# FAQ CORS + API KEYS -# ============================================ - -""" -Q: Pourquoi allow_origins=["*"] est sécurisé avec des API Keys ? -R: Les clés API protègent l'accès aux données. CORS empêche seulement les - navigateurs web de faire des requêtes cross-origin. Un attaquant peut - contourner CORS facilement (curl, postman), donc la vraie sécurité vient - de la validation des clés API, pas du CORS. - -Q: Pourquoi credentials=False avec allow_origins=["*"] ? -R: C'est une restriction du navigateur (spec CORS). Vous ne pouvez pas avoir - credentials=True ET origins=["*"] en même temps. - -Q: Mes clients utilisent des IPs dynamiques, que faire ? -R: Utilisez mode "open". Les clés API sont justement faites pour ça - - permettre l'accès depuis n'importe quelle origine, de manière sécurisée. - -Q: Je veux quand même utiliser des cookies/sessions ? -R: Utilisez mode "whitelist" ou "hybrid" pour les domaines qui ont besoin - de credentials, et laissez les autres utiliser X-API-Key sans credentials. - -Q: Comment tester CORS localement ? -R: Créez un simple fichier HTML avec fetch() et ouvrez-le en file:// - Ou utilisez un serveur local (python -m http.server) -""" diff --git a/core/dependencies.py b/core/dependencies.py index 6782e98..ff443a6 100644 --- a/core/dependencies.py +++ b/core/dependencies.py @@ -10,7 +10,7 @@ import logging logger = logging.getLogger(__name__) -security = HTTPBearer(auto_error=False) # ⚠️ auto_error=False pour gérer manuellement +security = HTTPBearer(auto_error=False) async def get_current_user_hybrid( @@ -18,19 +18,6 @@ async def get_current_user_hybrid( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: - """ - VERSION HYBRIDE: Accepte JWT OU API Key - - Priorité: - 1. JWT (Authorization: Bearer) - 2. API Key (déjà validée par middleware) - - Si API Key utilisée, retourne un "user virtuel" basé sur la clé - """ - - # ============================================ - # OPTION 1: JWT (comportement standard) - # ============================================ if credentials and credentials.credentials: token = credentials.credentials @@ -84,19 +71,12 @@ async def get_current_user_hybrid( detail="Compte temporairement verrouillé suite à trop de tentatives échouées", ) - logger.debug(f"🔐 Authentifié via JWT: {user.email}") + logger.debug(f" Authentifié via JWT: {user.email}") return user - # ============================================ - # OPTION 2: API Key (validée par middleware) - # ============================================ api_key_obj = getattr(request.state, "api_key", None) if api_key_obj: - # Créer un "user virtuel" basé sur la clé API - # Cela permet aux routes existantes de fonctionner sans modification - - # Si la clé est associée à un vrai user, le récupérer if api_key_obj.user_id: result = await session.execute( select(User).where(User.id == api_key_obj.user_id) @@ -105,11 +85,10 @@ async def get_current_user_hybrid( if user: logger.debug( - f"🔑 Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" + f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" ) return user - # Sinon, créer un user virtuel (pour les clés API sans user associé) from database import User as UserModel virtual_user = UserModel( @@ -117,21 +96,17 @@ async def get_current_user_hybrid( email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local", nom="API Key", prenom=api_key_obj.name, - role="api_client", # Rôle spécial pour les API keys + role="api_client", is_verified=True, is_active=True, ) - # Marquer que c'est un user virtuel virtual_user._is_api_key_user = True virtual_user._api_key_obj = api_key_obj - logger.debug(f"🔑 Authentifié via API Key: {api_key_obj.name} (user virtuel)") + logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)") return virtual_user - # ============================================ - # AUCUNE AUTHENTIFICATION - # ============================================ raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise (JWT ou API Key)", @@ -152,32 +127,20 @@ async def get_current_user_optional_hybrid( def require_role_hybrid(*allowed_roles: str): - """ - VERSION HYBRIDE: Vérification de rôle compatible avec API Keys - - Notes: - - Les users via JWT ont leur vrai rôle - - Les users via API Key ont le rôle "api_client" par défaut - - Vous pouvez autoriser "api_client" dans allowed_roles pour permettre l'accès aux API Keys - """ - async def role_checker( request: Request, user: User = Depends(get_current_user_hybrid) ) -> User: - # Vérifier si c'est un user API Key is_api_key_user = getattr(user, "_is_api_key_user", False) if is_api_key_user: - # Pour les API Keys, vérifier si "api_client" est autorisé if "api_client" not in allowed_roles and "*" not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.", ) - logger.debug(f"✅ API Key autorisée pour cette route") + logger.debug(" API Key autorisée pour cette route") return user - # Pour les vrais users, vérification standard if user.role not in allowed_roles and "*" not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, @@ -189,11 +152,6 @@ def require_role_hybrid(*allowed_roles: str): return role_checker -# ============================================ -# HELPERS -# ============================================ - - def is_api_key_user(user: User) -> bool: """Vérifie si l'utilisateur est authentifié via API Key""" return getattr(user, "_is_api_key_user", False) @@ -204,10 +162,5 @@ def get_api_key_from_user(user: User): return getattr(user, "_api_key_obj", None) -# ============================================ -# RÉTROCOMPATIBILITÉ -# ============================================ - -# Alias pour garder la compatibilité avec le code existant get_current_user = get_current_user_hybrid get_current_user_optional = get_current_user_optional_hybrid diff --git a/database/models/api_key.py b/database/models/api_key.py index 1e54342..0d246ab 100644 --- a/database/models/api_key.py +++ b/database/models/api_key.py @@ -12,29 +12,21 @@ class ApiKey(Base): id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) key_hash = Column(String(64), unique=True, nullable=False, index=True) - key_prefix = Column( - String(10), nullable=False - ) # Premiers caractères pour identification + key_prefix = Column(String(10), nullable=False) name = Column(String(255), nullable=False) description = Column(Text, nullable=True) - # Métadonnées - user_id = Column(String(36), nullable=True) # Optionnel si associé à un utilisateur + user_id = Column(String(36), nullable=True) created_by = Column(String(255), nullable=False) - # Contrôle d'accès is_active = Column(Boolean, default=True, nullable=False) rate_limit_per_minute = Column(Integer, default=60, nullable=False) - allowed_endpoints = Column( - Text, nullable=True - ) # JSON array des endpoints autorisés + allowed_endpoints = Column(Text, nullable=True) - # Statistiques total_requests = Column(Integer, default=0, nullable=False) last_used_at = Column(DateTime, nullable=True) - # Dates created_at = Column(DateTime, default=datetime.now, nullable=False) expires_at = Column(DateTime, nullable=True) revoked_at = Column(DateTime, nullable=True) diff --git a/middleware/security.py b/middleware/security.py index 17186a0..137e7dd 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -1,5 +1,4 @@ -import secrets -from fastapi import Request, HTTPException, status +from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from sqlalchemy import select @@ -13,23 +12,14 @@ from security.auth import verify_password logger = logging.getLogger(__name__) -# === Configuration Swagger === security = HTTPBasic() async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: - """ - VERSION 2: Vérification des identifiants Swagger via base de données - - ✅ Plus sécurisé - ✅ Gestion centralisée - ✅ Tracking des connexions - """ username = credentials.username password = credentials.password try: - # Utiliser get_session de manière asynchrone async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -38,29 +28,21 @@ async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: if swagger_user and swagger_user.is_active: if verify_password(password, swagger_user.hashed_password): - # Mise à jour de la dernière connexion swagger_user.last_login = datetime.now() await session.commit() - logger.info(f"✅ Accès Swagger autorisé (DB): {username}") + logger.info(f" Accès Swagger autorisé (DB): {username}") return True - logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}") + logger.warning(f" Tentative d'accès Swagger refusée: {username}") return False except Exception as e: - logger.error(f"❌ Erreur vérification Swagger credentials: {e}") + logger.error(f" Erreur vérification Swagger credentials: {e}") return False class SwaggerAuthMiddleware: - """ - Middleware pour protéger les endpoints de documentation - (/docs, /redoc, /openapi.json) - - VERSION 2: Avec vérification en base de données - """ - def __init__(self, app): self.app = app @@ -72,15 +54,12 @@ class SwaggerAuthMiddleware: request = Request(scope, receive=receive) path = request.url.path - # Endpoints à protéger protected_paths = ["/docs", "/redoc", "/openapi.json"] if any(path.startswith(protected_path) for protected_path in protected_paths): - # Vérification de l'authentification Basic auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): - # Demande d'authentification response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -91,7 +70,6 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return - # Extraction des credentials try: import base64 @@ -103,7 +81,6 @@ class SwaggerAuthMiddleware: credentials = HTTPBasicCredentials(username=username, password=password) - # Vérification via DB if not await verify_swagger_credentials(credentials): response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, @@ -114,7 +91,7 @@ class SwaggerAuthMiddleware: return except Exception as e: - logger.error(f"❌ Erreur parsing auth header: {e}") + logger.error(f" Erreur parsing auth header: {e}") response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Format d'authentification invalide"}, @@ -123,19 +100,10 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return - # Si tout est OK, continuer await self.app(scope, receive, send) class ApiKeyMiddleware: - """ - Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics - - ✅ Accepte X-API-Key OU Authorization: Bearer (JWT) - ✅ Les deux méthodes sont équivalentes - ✅ Les endpoints /auth/* restent accessibles sans auth - """ - def __init__(self, app): self.app = app @@ -147,53 +115,37 @@ class ApiKeyMiddleware: request = Request(scope, receive=receive) path = request.url.path - # ============================================ - # ENDPOINTS EXCLUS (accessibles sans auth) - # ============================================ excluded_paths = [ "/docs", "/redoc", "/openapi.json", "/health", "/", - # === ROUTES AUTH (toujours publiques) === - "/api/v1/auth/login", - "/api/v1/auth/register", - "/api/v1/auth/verify-email", - "/api/v1/auth/reset-password", - "/api/v1/auth/request-reset", - "/api/v1/auth/refresh", + "/auth/login", + "/auth/register", + "/auth/verify-email", + "/auth/reset-password", + "/auth/request-reset", + "/auth/refresh", ] if any(path.startswith(excluded_path) for excluded_path in excluded_paths): await self.app(scope, receive, send) return - # ============================================ - # VÉRIFICATION HYBRIDE: API Key OU JWT - # ============================================ - - # Option 1: Vérifier si JWT présent auth_header = request.headers.get("Authorization") has_jwt = auth_header and auth_header.startswith("Bearer ") - # Option 2: Vérifier si API Key présente api_key = request.headers.get("X-API-Key") has_api_key = api_key is not None - # ============================================ - # LOGIQUE HYBRIDE - # ============================================ - if has_jwt: - # JWT présent → laisser passer, sera validé par les dependencies FastAPI - logger.debug(f"🔑 JWT détecté pour {path}") + logger.debug(f" JWT détecté pour {path}") await self.app(scope, receive, send) return elif has_api_key: - # API Key présente → valider la clé - logger.debug(f"🔑 API Key détectée pour {path}") + logger.debug(f" API Key détectée pour {path}") from services.api_key import ApiKeyService @@ -213,7 +165,6 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # Vérification du rate limit is_allowed, rate_info = await api_key_service.check_rate_limit( api_key_obj ) @@ -229,7 +180,6 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # Vérification de l'accès à l'endpoint has_access = await api_key_service.check_endpoint_access( api_key_obj, path ) @@ -245,18 +195,16 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # ✅ Clé valide → ajouter les infos à la requête request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" - logger.info(f"✅ API Key valide: {api_key_obj.name} → {path}") + logger.info(f" API Key valide: {api_key_obj.name} → {path}") - # Continuer la requête await self.app(scope, receive, send) return except Exception as e: - logger.error(f"❌ Erreur validation API Key: {e}", exc_info=True) + logger.error(f" Erreur validation API Key: {e}", exc_info=True) response = JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ @@ -267,7 +215,6 @@ class ApiKeyMiddleware: return else: - # ❌ Ni JWT ni API Key response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -280,17 +227,10 @@ class ApiKeyMiddleware: return -# === Fonction helper pour récupérer l'API Key depuis la requête === def get_api_key_from_request(request: Request) -> Optional: """Récupère l'objet ApiKey depuis la requête si présent""" return getattr(request.state, "api_key", None) def get_auth_method(request: Request) -> str: - """ - Retourne la méthode d'authentification utilisée - - Returns: - "jwt" | "api_key" | "none" - """ return getattr(request.state, "authenticated_via", "none") diff --git a/routes/api_keys.py b/routes/api_keys.py index 8c8f6db..27f0efc 100644 --- a/routes/api_keys.py +++ b/routes/api_keys.py @@ -27,13 +27,6 @@ async def create_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 🔑 Créer une nouvelle clé API - - **Réservé aux admins** - - ⚠️ La clé en clair ne sera affichée qu'une seule fois ! - """ service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( @@ -46,7 +39,7 @@ async def create_api_key( allowed_endpoints=data.allowed_endpoints, ) - logger.info(f"✅ Clé API créée par {user.email}: {data.name}") + logger.info(f" Clé API créée par {user.email}: {data.name}") response_data = api_key_to_response(api_key_obj) response_data["api_key"] = api_key_plain @@ -60,15 +53,8 @@ async def list_api_keys( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 📋 Lister toutes les clés API - - **Réservé aux admins** - liste toutes les clés - **Utilisateurs standards** - liste uniquement leurs clés - """ service = ApiKeyService(session) - # Si admin, voir toutes les clés, sinon seulement les siennes user_id = None if user.role in ["admin", "super_admin"] else user.id keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id) @@ -84,7 +70,7 @@ async def get_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """🔍 Récupérer une clé API par son ID""" + """ Récupérer une clé API par son ID""" service = ApiKeyService(session) api_key_obj = await service.get_by_id(key_id) @@ -95,7 +81,6 @@ async def get_api_key( detail=f"Clé API {key_id} introuvable", ) - # Vérification des permissions if user.role not in ["admin", "super_admin"]: if api_key_obj.user_id != user.id: raise HTTPException( @@ -112,11 +97,6 @@ async def revoke_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 🚫 Révoquer une clé API - - **Action irréversible** - la clé sera désactivée définitivement - """ service = ApiKeyService(session) api_key_obj = await service.get_by_id(key_id) @@ -127,7 +107,6 @@ async def revoke_api_key( detail=f"Clé API {key_id} introuvable", ) - # Vérification des permissions if user.role not in ["admin", "super_admin"]: if api_key_obj.user_id != user.id: raise HTTPException( @@ -143,7 +122,7 @@ async def revoke_api_key( detail="Erreur lors de la révocation", ) - logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}") + logger.info(f" Clé API révoquée par {user.email}: {api_key_obj.name}") return { "success": True, @@ -156,11 +135,6 @@ async def verify_api_key_endpoint( api_key: str = Query(..., description="Clé API à vérifier"), session: AsyncSession = Depends(get_session), ): - """ - ✅ Vérifier la validité d'une clé API - - **Endpoint public** - permet de tester une clé - """ service = ApiKeyService(session) api_key_obj = await service.verify_api_key(api_key) diff --git a/schemas/api_key.py b/schemas/api_key.py index 6a2d659..4ec49b6 100644 --- a/schemas/api_key.py +++ b/schemas/api_key.py @@ -42,7 +42,7 @@ class ApiKeyCreatedResponse(ApiKeyResponse): """Schema de réponse après création (inclut la clé en clair)""" api_key: str = Field( - ..., description="⚠️ Clé API en clair - à sauvegarder immédiatement" + ..., description=" Clé API en clair - à sauvegarder immédiatement" ) diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 35c0cff..1f234b9 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,18 +1,3 @@ -#!/usr/bin/env python3 -""" -Script CLI pour gérer les clés API et utilisateurs Swagger - -Usage: - python manage_security.py swagger add - python manage_security.py swagger list - python manage_security.py swagger delete - - python manage_security.py apikey create [--days 365] [--rate-limit 60] - python manage_security.py apikey list - python manage_security.py apikey revoke - python manage_security.py apikey verify -""" - import asyncio import sys from pathlib import Path @@ -23,7 +8,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from database import get_session from database.models.api_key import SwaggerUser from services.api_key import ApiKeyService -from security.auth import hash_password, verify_password +from security.auth import hash_password from sqlalchemy import select import logging @@ -31,24 +16,18 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# ============================================ -# GESTION DES UTILISATEURS SWAGGER -# ============================================ - async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" async with get_session() as session: - # Vérifier si existe result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: - logger.error(f"❌ L'utilisateur {username} existe déjà") + logger.error(f" L'utilisateur {username} existe déjà") return - # Créer user = SwaggerUser( username=username, hashed_password=hash_password(password), @@ -59,10 +38,10 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): session.add(user) await session.commit() - logger.info(f"✅ Utilisateur Swagger créé: {username}") - print(f"\n✅ Utilisateur créé avec succès") + logger.info(f" Utilisateur Swagger créé: {username}") + print("\n Utilisateur créé avec succès") print(f" Username: {username}") - print(f" Accès: https://votre-serveur/docs") + print(" Accès: https://votre-serveur/docs") async def list_swagger_users(): @@ -75,9 +54,9 @@ async def list_swagger_users(): print("Aucun utilisateur Swagger trouvé") return - print(f"\n📋 {len(users)} utilisateur(s) Swagger:\n") + print(f"\n {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = "✅ Actif" if user.is_active else "❌ Inactif" + status = " Actif" if user.is_active else " Inactif" print(f" • {user.username:<20} {status}") if user.full_name: print(f" Nom: {user.full_name}") @@ -95,7 +74,7 @@ async def delete_swagger_user(username: str): user = result.scalar_one_or_none() if not user: - logger.error(f"❌ Utilisateur {username} introuvable") + logger.error(f" Utilisateur {username} introuvable") return await session.delete(user) @@ -104,10 +83,6 @@ async def delete_swagger_user(username: str): logger.info(f"🗑️ Utilisateur supprimé: {username}") -# ============================================ -# GESTION DES CLÉS API -# ============================================ - async def create_api_key( name: str, description: str = None, @@ -128,14 +103,14 @@ async def create_api_key( allowed_endpoints=endpoints, ) - print(f"\n✅ Clé API créée avec succès\n") + print("\n Clé API créée avec succès\n") print(f" ID: {api_key_obj.id}") print(f" Nom: {name}") print(f" Clé: {api_key_plain}") print(f" Préfixe: {api_key_obj.key_prefix}") print(f" Rate limit: {rate_limit} req/min") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") - print(f"\n⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") + print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") async def list_api_keys(): @@ -148,11 +123,15 @@ async def list_api_keys(): print("Aucune clé API trouvée") return - print(f"\n📋 {len(keys)} clé(s) API:\n") + print(f"\n {len(keys)} clé(s) API:\n") for key in keys: - status = "✅" if key.is_active else "❌" - expired = "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else "" - + status = "" if key.is_active else "" + expired = ( + "⏰ Expirée" + if key.expires_at and key.expires_at < datetime.now() + else "" + ) + print(f" {status} {key.name:<30} ({key.key_prefix}...)") print(f" ID: {key.id}") print(f" Requêtes: {key.total_requests}") @@ -166,19 +145,19 @@ async def revoke_api_key(key_id: str): """Révoquer une clé API""" async with get_session() as session: service = ApiKeyService(session) - + api_key = await service.get_by_id(key_id) if not api_key: - logger.error(f"❌ Clé {key_id} introuvable") + logger.error(f" Clé {key_id} introuvable") return success = await service.revoke_api_key(key_id) - + if success: - logger.info(f"🚫 Clé révoquée: {api_key.name}") - print(f"\n✅ Clé '{api_key.name}' révoquée avec succès") + logger.info(f" Clé révoquée: {api_key.name}") + print(f"\n Clé '{api_key.name}' révoquée avec succès") else: - logger.error("❌ Erreur lors de la révocation") + logger.error(" Erreur lors de la révocation") async def verify_api_key_cmd(api_key: str): @@ -188,64 +167,59 @@ async def verify_api_key_cmd(api_key: str): api_key_obj = await service.verify_api_key(api_key) if api_key_obj: - print(f"\n✅ Clé API valide\n") + print("\n Clé API valide\n") print(f" Nom: {api_key_obj.name}") print(f" ID: {api_key_obj.id}") print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") print(f" Requêtes: {api_key_obj.total_requests}") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") else: - print(f"\n❌ Clé API invalide, expirée ou révoquée\n") + print("\n Clé API invalide, expirée ou révoquée\n") -# ============================================ -# CLI PRINCIPAL -# ============================================ - async def main(): parser = argparse.ArgumentParser( description="Gestion de la sécurité Sage Dataven API" ) - + subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") - # === SWAGGER === - swagger_parser = subparsers.add_parser("swagger", help="Gestion utilisateurs Swagger") + swagger_parser = subparsers.add_parser( + "swagger", help="Gestion utilisateurs Swagger" + ) swagger_subparsers = swagger_parser.add_subparsers(dest="action") - # swagger add swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") swagger_add.add_argument("username", help="Nom d'utilisateur") swagger_add.add_argument("password", help="Mot de passe") swagger_add.add_argument("--full-name", help="Nom complet") - # swagger list swagger_subparsers.add_parser("list", help="Lister les utilisateurs") - # swagger delete - swagger_delete = swagger_subparsers.add_parser("delete", help="Supprimer un utilisateur") + swagger_delete = swagger_subparsers.add_parser( + "delete", help="Supprimer un utilisateur" + ) swagger_delete.add_argument("username", help="Nom d'utilisateur") - # === API KEYS === apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") apikey_subparsers = apikey_parser.add_subparsers(dest="action") - # apikey create apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") apikey_create.add_argument("name", help="Nom de la clé") apikey_create.add_argument("--description", help="Description") - apikey_create.add_argument("--days", type=int, default=365, help="Expiration en jours") - apikey_create.add_argument("--rate-limit", type=int, default=60, help="Limite req/min") + apikey_create.add_argument( + "--days", type=int, default=365, help="Expiration en jours" + ) + apikey_create.add_argument( + "--rate-limit", type=int, default=60, help="Limite req/min" + ) apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") - # apikey list apikey_subparsers.add_parser("list", help="Lister les clés") - # apikey revoke apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") apikey_revoke.add_argument("key_id", help="ID de la clé") - # apikey verify apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") apikey_verify.add_argument("api_key", help="Clé API à vérifier") @@ -255,7 +229,6 @@ async def main(): parser.print_help() return - # Exécution des commandes if args.command == "swagger": if args.action == "add": await add_swagger_user(args.username, args.password, args.full_name) @@ -287,4 +260,5 @@ async def main(): if __name__ == "__main__": from datetime import datetime - asyncio.run(main()) \ No newline at end of file + + asyncio.run(main()) diff --git a/scripts/test_security.py b/scripts/test_security.py index 79f0299..497870e 100644 --- a/scripts/test_security.py +++ b/scripts/test_security.py @@ -1,16 +1,7 @@ -#!/usr/bin/env python3 -""" -Script de test automatisé pour vérifier la sécurité de l'API - -Usage: - python test_security.py --url http://votre-vps:8000 --swagger-user admin --swagger-pass password -""" - import requests import argparse import sys -from typing import Dict, Tuple -import json +from typing import Tuple class SecurityTester: @@ -20,7 +11,7 @@ class SecurityTester: def log_test(self, name: str, passed: bool, details: str = ""): """Enregistrer le résultat d'un test""" - status = "✅ PASS" if passed else "❌ FAIL" + status = " PASS" if passed else " FAIL" print(f"{status} - {name}") if details: print(f" {details}") @@ -36,7 +27,7 @@ class SecurityTester: def test_swagger_without_auth(self) -> bool: """Test 1: Swagger UI devrait demander une authentification""" - print("\n🔍 Test 1: Protection Swagger UI") + print("\n Test 1: Protection Swagger UI") try: response = requests.get(f"{self.base_url}/docs", timeout=5) @@ -62,7 +53,7 @@ class SecurityTester: def test_swagger_with_auth(self, username: str, password: str) -> bool: """Test 2: Swagger UI accessible avec credentials valides""" - print("\n🔍 Test 2: Accès Swagger avec authentification") + print("\n Test 2: Accès Swagger avec authentification") try: response = requests.get( @@ -90,7 +81,7 @@ class SecurityTester: def test_api_without_auth(self) -> bool: """Test 3: Endpoints API devraient demander une authentification""" - print("\n🔍 Test 3: Protection des endpoints API") + print("\n Test 3: Protection des endpoints API") test_endpoints = ["/api/v1/clients", "/api/v1/documents"] @@ -100,15 +91,15 @@ class SecurityTester: response = requests.get(f"{self.base_url}{endpoint}", timeout=5) if response.status_code == 401: - print(f" ✅ {endpoint} protégé (401)") + print(f" {endpoint} protégé (401)") else: print( - f" ❌ {endpoint} accessible sans auth (code {response.status_code})" + f" {endpoint} accessible sans auth (code {response.status_code})" ) all_protected = False except Exception as e: - print(f" ⚠️ {endpoint} erreur: {str(e)}") + print(f" {endpoint} erreur: {str(e)}") all_protected = False self.log_test("Endpoints API protégés", all_protected) @@ -116,7 +107,7 @@ class SecurityTester: def test_health_endpoint_public(self) -> bool: """Test 4: Endpoint /health devrait être accessible sans auth""" - print("\n🔍 Test 4: Endpoint /health public") + print("\n Test 4: Endpoint /health public") try: response = requests.get(f"{self.base_url}/health", timeout=5) @@ -138,10 +129,9 @@ class SecurityTester: def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]: """Test 5: Créer une clé API via l'endpoint""" - print("\n🔍 Test 5: Création d'une clé API") + print("\n Test 5: Création d'une clé API") try: - # 1. Login pour obtenir un JWT login_response = requests.post( f"{self.base_url}/api/v1/auth/login", json={"email": username, "password": password}, @@ -158,7 +148,6 @@ class SecurityTester: jwt_token = login_response.json().get("access_token") - # 2. Créer une clé API create_response = requests.post( f"{self.base_url}/api/v1/api-keys", headers={"Authorization": f"Bearer {jwt_token}"}, @@ -189,7 +178,7 @@ class SecurityTester: def test_api_key_usage(self, api_key: str) -> bool: """Test 6: Utiliser une clé API pour accéder à un endpoint""" - print("\n🔍 Test 6: Utilisation d'une clé API") + print("\n Test 6: Utilisation d'une clé API") if not api_key: self.log_test("Utilisation clé API", False, "Pas de clé disponible") @@ -219,7 +208,7 @@ class SecurityTester: def test_invalid_api_key(self) -> bool: """Test 7: Une clé invalide devrait être refusée""" - print("\n🔍 Test 7: Rejet de clé API invalide") + print("\n Test 7: Rejet de clé API invalide") invalid_key = "sdk_live_invalid_key_12345" @@ -247,13 +236,12 @@ class SecurityTester: def test_rate_limiting(self, api_key: str) -> bool: """Test 8: Rate limiting (optionnel, peut prendre du temps)""" - print("\n🔍 Test 8: Rate limiting (test simple)") + print("\n Test 8: Rate limiting (test simple)") if not api_key: self.log_test("Rate limiting", False, "Pas de clé disponible") return False - # Envoyer 70 requêtes rapidement (limite = 60/min) print(" Envoi de 70 requêtes rapides...") rate_limited = False @@ -267,7 +255,7 @@ class SecurityTester: if response.status_code == 429: rate_limited = True - print(f" ⚠️ Rate limit atteint à la requête {i + 1}") + print(f" Rate limit atteint à la requête {i + 1}") break except Exception: @@ -287,22 +275,22 @@ class SecurityTester: def print_summary(self): """Afficher le résumé des tests""" print("\n" + "=" * 60) - print("📊 RÉSUMÉ DES TESTS") + print(" RÉSUMÉ DES TESTS") print("=" * 60) total = self.results["passed"] + self.results["failed"] success_rate = (self.results["passed"] / total * 100) if total > 0 else 0 print(f"\nTotal: {total} tests") - print(f"✅ Réussis: {self.results['passed']}") - print(f"❌ Échoués: {self.results['failed']}") - print(f"📈 Taux de réussite: {success_rate:.1f}%\n") + print(f" Réussis: {self.results['passed']}") + print(f" Échoués: {self.results['failed']}") + print(f"Taux de réussite: {success_rate:.1f}%\n") if self.results["failed"] == 0: print("🎉 Tous les tests sont passés ! Sécurité OK.") return 0 else: - print("⚠️ Certains tests ont échoué. Vérifiez la configuration.") + print(" Certains tests ont échoué. Vérifiez la configuration.") return 1 @@ -333,18 +321,16 @@ def main(): args = parser.parse_args() - print("🚀 Démarrage des tests de sécurité") - print(f"🎯 URL cible: {args.url}\n") + print(" Démarrage des tests de sécurité") + print(f" URL cible: {args.url}\n") tester = SecurityTester(args.url) - # Exécuter les tests tester.test_swagger_without_auth() tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass) tester.test_api_without_auth() tester.test_health_endpoint_public() - # Tests nécessitant une clé API success, api_key = tester.test_api_key_creation( args.swagger_user, args.swagger_pass ) @@ -356,11 +342,10 @@ def main(): if not args.skip_rate_limit: tester.test_rate_limiting(api_key) else: - print("\n⏭️ Test de rate limiting sauté") + print("\n Test de rate limiting sauté") else: - print("\n⚠️ Tests avec clé API sautés (création échouée)") + print("\n Tests avec clé API sautés (création échouée)") - # Résumé exit_code = tester.print_summary() sys.exit(exit_code) diff --git a/services/api_key.py b/services/api_key.py index d54943a..ad3cf6f 100644 --- a/services/api_key.py +++ b/services/api_key.py @@ -21,7 +21,6 @@ class ApiKeyService: @staticmethod def generate_api_key() -> str: """Génère une clé API unique et sécurisée""" - # Format: sdk_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx random_part = secrets.token_urlsafe(32) return f"sdk_live_{random_part}" @@ -33,7 +32,6 @@ class ApiKeyService: @staticmethod def get_key_prefix(api_key: str) -> str: """Extrait le préfixe de la clé pour identification""" - # Retourne les 12 premiers caractères return api_key[:12] if len(api_key) >= 12 else api_key async def create_api_key( @@ -46,23 +44,14 @@ class ApiKeyService: rate_limit_per_minute: int = 60, allowed_endpoints: Optional[List[str]] = None, ) -> tuple[ApiKey, str]: - """ - Crée une nouvelle clé API - - Returns: - tuple[ApiKey, str]: (objet ApiKey, clé en clair - à ne montrer qu'une fois) - """ - # Génération de la clé api_key_plain = self.generate_api_key() key_hash = self.hash_api_key(api_key_plain) key_prefix = self.get_key_prefix(api_key_plain) - # Calcul de la date d'expiration expires_at = None if expires_in_days: expires_at = datetime.now() + timedelta(days=expires_in_days) - # Création de l'objet api_key_obj = ApiKey( key_hash=key_hash, key_prefix=key_prefix, @@ -81,24 +70,18 @@ class ApiKeyService: await self.session.commit() await self.session.refresh(api_key_obj) - logger.info(f"✅ Clé API créée: {name} (prefix: {key_prefix})") + logger.info(f" Clé API créée: {name} (prefix: {key_prefix})") return api_key_obj, api_key_plain async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]: - """ - Vérifie une clé API et retourne l'objet si valide - - Returns: - Optional[ApiKey]: L'objet ApiKey si valide, None sinon - """ key_hash = self.hash_api_key(api_key_plain) result = await self.session.execute( select(ApiKey).where( and_( ApiKey.key_hash == key_hash, - ApiKey.is_active == True, + ApiKey.is_active, ApiKey.revoked_at.is_(None), or_( ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now() @@ -110,14 +93,13 @@ class ApiKeyService: api_key_obj = result.scalar_one_or_none() if api_key_obj: - # Mise à jour des statistiques api_key_obj.total_requests += 1 api_key_obj.last_used_at = datetime.now() await self.session.commit() - logger.debug(f"🔑 Clé API validée: {api_key_obj.name}") + logger.debug(f" Clé API validée: {api_key_obj.name}") else: - logger.warning(f"⚠️ Clé API invalide ou expirée") + logger.warning(" Clé API invalide ou expirée") return api_key_obj @@ -152,7 +134,7 @@ class ApiKeyService: api_key_obj.revoked_at = datetime.now() await self.session.commit() - logger.info(f"🚫 Clé API révoquée: {api_key_obj.name}") + logger.info(f" Clé API révoquée: {api_key_obj.name}") return True async def get_by_id(self, key_id: str) -> Optional[ApiKey]: @@ -161,14 +143,6 @@ class ApiKeyService: return result.scalar_one_or_none() async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]: - """ - Vérifie le rate limit d'une clé API (à implémenter avec Redis/cache) - - Returns: - tuple[bool, Dict]: (is_allowed, info_dict) - """ - # TODO: Implémenter avec Redis pour un vrai rate limiting - # Pour l'instant, retourne toujours True return True, { "allowed": True, "limit": api_key_obj.rate_limit_per_minute, @@ -178,13 +152,11 @@ class ApiKeyService: async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool: """Vérifie si la clé a accès à un endpoint spécifique""" if not api_key_obj.allowed_endpoints: - # Si aucune restriction, accès total return True try: allowed = json.loads(api_key_obj.allowed_endpoints) - # Support des wildcards for pattern in allowed: if pattern == "*": return True @@ -197,7 +169,7 @@ class ApiKeyService: return False except json.JSONDecodeError: - logger.error(f"❌ Erreur parsing allowed_endpoints pour {api_key_obj.id}") + logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}") return False diff --git a/services/universign_document.py b/services/universign_document.py index 9cb714e..394c3ce 100644 --- a/services/universign_document.py +++ b/services/universign_document.py @@ -23,7 +23,7 @@ class UniversignDocumentService: def fetch_transaction_documents(self, transaction_id: str) -> Optional[List[Dict]]: try: - logger.info(f"📋 Récupération documents pour transaction: {transaction_id}") + logger.info(f" Récupération documents pour transaction: {transaction_id}") response = requests.get( f"{self.api_url}/transactions/{transaction_id}", diff --git a/services/universign_sync.py b/services/universign_sync.py index 807802d..da634f2 100644 --- a/services/universign_sync.py +++ b/services/universign_sync.py @@ -344,7 +344,7 @@ class UniversignSyncService: universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") - logger.info(f"📊 Statut Universign brut: {universign_status_raw}") + logger.info(f" Statut Universign brut: {universign_status_raw}") new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value @@ -367,7 +367,7 @@ class UniversignSyncService: if status_changed: logger.info( - f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" + f"CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" ) try: @@ -392,7 +392,7 @@ class UniversignSyncService: if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() - logger.info("📅 Date d'envoi mise à jour") + logger.info("Date d'envoi mise à jour") if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() @@ -404,8 +404,7 @@ class UniversignSyncService: if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() - logger.info("⏰ Date d'expiration mise à jour") - + logger.info("Date d'expiration mise à jour") documents = universign_data.get("documents", []) if documents: @@ -432,9 +431,7 @@ class UniversignSyncService: logger.warning(f"Échec téléchargement: {download_error}") except Exception as e: - logger.error( - f" Erreur téléchargement document: {e}", exc_info=True - ) + logger.error(f" Erreur téléchargement document: {e}", exc_info=True) await self._sync_signers(session, transaction, universign_data) @@ -529,9 +526,7 @@ class UniversignSyncService: logger.warning(f"Échec téléchargement: {download_error}") except Exception as e: - logger.error( - f" Erreur téléchargement document: {e}", exc_info=True - ) + logger.error(f" Erreur téléchargement document: {e}", exc_info=True) else: logger.debug( f"Document déjà téléchargé: {transaction.signed_document_path}" diff --git a/utils/generic_functions.py b/utils/generic_functions.py index 4cce52f..29a361e 100644 --- a/utils/generic_functions.py +++ b/utils/generic_functions.py @@ -425,7 +425,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "REFUSE": { "fr": "Signature refusée", "en": "Signature refused", - "icon": "❌", + "icon": "", "color": "red", }, "EXPIRE": { @@ -437,7 +437,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "ERREUR": { "fr": "Erreur technique", "en": "Technical error", - "icon": "⚠️", + "icon": "", "color": "red", }, } diff --git a/utils/universign_status_mapping.py b/utils/universign_status_mapping.py index 50e29cc..8391698 100644 --- a/utils/universign_status_mapping.py +++ b/utils/universign_status_mapping.py @@ -96,7 +96,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "REFUSE": { "fr": "Signature refusée", "en": "Signature refused", - "icon": "❌", + "icon": "", "color": "red", }, "EXPIRE": {