From 09eae50952efea3ad62134eb83b8a211188e3dda Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Mon, 19 Jan 2026 20:32:40 +0300 Subject: [PATCH 1/4] refactor: clean up code by removing unnecessary comments --- api.py | 3 -- config/config.py | 8 ----- create_admin.py | 3 -- database/models/api_key.py | 64 +++++++++++++++++++++++++++++++++ routes/enterprise.py | 1 - routes/universign.py | 1 - sage_client.py | 2 -- schemas/articles/articles.py | 1 - schemas/documents/reglements.py | 6 ---- schemas/sage/sage_gateway.py | 2 -- schemas/tiers/commercial.py | 5 --- schemas/tiers/tiers.py | 13 ------- services/universign_document.py | 37 ++++++------------- services/universign_sync.py | 43 +++------------------- utils/generic_functions.py | 6 +--- 15 files changed, 80 insertions(+), 115 deletions(-) create mode 100644 database/models/api_key.py diff --git a/api.py b/api.py index 1b2e078..27f5de3 100644 --- a/api.py +++ b/api.py @@ -132,7 +132,6 @@ async def lifespan(app: FastAPI): api_url=settings.universign_api_url, api_key=settings.universign_api_key ) - # Configuration du service avec les dépendances sync_service.configure( sage_client=sage_client, email_queue=email_queue, settings=settings ) @@ -180,7 +179,6 @@ app.include_router(entreprises_router) @app.get("/clients", response_model=List[ClientDetails], tags=["Clients"]) async def obtenir_clients( query: Optional[str] = Query(None), - # sage: SageGatewayClient = Depends(get_sage_client_for_user), ): try: clients = sage_client.lister_clients(filtre=query or "") @@ -2768,7 +2766,6 @@ async def get_current_sage_config( } -# Routes Collaborateurs @app.get( "/collaborateurs", response_model=List[CollaborateurDetails], diff --git a/config/config.py b/config/config.py index 63bf99b..d60c4d8 100644 --- a/config/config.py +++ b/config/config.py @@ -7,7 +7,6 @@ class Settings(BaseSettings): env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore" ) - # === JWT & Auth === jwt_secret: str jwt_algorithm: str access_token_expire_minutes: int @@ -21,15 +20,12 @@ class Settings(BaseSettings): SAGE_TYPE_BON_AVOIR: int = 50 SAGE_TYPE_FACTURE: int = 60 - # === Sage Gateway (Windows) === sage_gateway_url: str sage_gateway_token: str frontend_url: str - # === Base de données === database_url: str = "sqlite+aiosqlite:///./data/sage_dataven.db" - # === SMTP === smtp_host: str smtp_port: int = 587 smtp_user: str @@ -37,21 +33,17 @@ class Settings(BaseSettings): smtp_from: str smtp_use_tls: bool = True - # === Universign === universign_api_key: str universign_api_url: str - # === API === api_host: str api_port: int api_reload: bool = False - # === Email Queue === max_email_workers: int = 3 max_retry_attempts: int = 3 retry_delay_seconds: int = 3 - # === CORS === cors_origins: List[str] = ["*"] diff --git a/create_admin.py b/create_admin.py index d3cb786..96197ec 100644 --- a/create_admin.py +++ b/create_admin.py @@ -19,7 +19,6 @@ async def create_admin(): print(" Création d'un compte administrateur") print("=" * 60 + "\n") - # Saisie des informations email = input("Email de l'admin: ").strip().lower() if not email or "@" not in email: print(" Email invalide") @@ -32,7 +31,6 @@ async def create_admin(): print(" Prénom et nom requis") return False - # Mot de passe avec validation while True: password = input( "Mot de passe (min 8 car., 1 maj, 1 min, 1 chiffre, 1 spécial): " @@ -58,7 +56,6 @@ async def create_admin(): print(f"\n Un utilisateur avec l'email {email} existe déjà") return False - # Créer l'admin admin = User( id=str(uuid.uuid4()), email=email, diff --git a/database/models/api_key.py b/database/models/api_key.py new file mode 100644 index 0000000..1e54342 --- /dev/null +++ b/database/models/api_key.py @@ -0,0 +1,64 @@ +from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text +from datetime import datetime +import uuid + +from database.models.generic_model import Base + + +class ApiKey(Base): + """Modèle pour les clés API publiques""" + + __tablename__ = "api_keys" + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + key_hash = Column(String(64), unique=True, nullable=False, index=True) + key_prefix = Column( + String(10), nullable=False + ) # Premiers caractères pour identification + + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + + # Métadonnées + user_id = Column(String(36), nullable=True) # Optionnel si associé à un utilisateur + created_by = Column(String(255), nullable=False) + + # Contrôle d'accès + is_active = Column(Boolean, default=True, nullable=False) + rate_limit_per_minute = Column(Integer, default=60, nullable=False) + allowed_endpoints = Column( + Text, nullable=True + ) # JSON array des endpoints autorisés + + # Statistiques + total_requests = Column(Integer, default=0, nullable=False) + last_used_at = Column(DateTime, nullable=True) + + # Dates + created_at = Column(DateTime, default=datetime.now, nullable=False) + expires_at = Column(DateTime, nullable=True) + revoked_at = Column(DateTime, nullable=True) + + def __repr__(self): + return f"" + + +class SwaggerUser(Base): + """Modèle pour les utilisateurs autorisés à accéder au Swagger""" + + __tablename__ = "swagger_users" + + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + username = Column(String(100), unique=True, nullable=False, index=True) + hashed_password = Column(String(255), nullable=False) + + full_name = Column(String(255), nullable=True) + email = Column(String(255), nullable=True) + + is_active = Column(Boolean, default=True, nullable=False) + + created_at = Column(DateTime, default=datetime.now, nullable=False) + last_login = Column(DateTime, nullable=True) + + def __repr__(self): + return f"" diff --git a/routes/enterprise.py b/routes/enterprise.py index 2ed18d1..2de1e5f 100644 --- a/routes/enterprise.py +++ b/routes/enterprise.py @@ -22,7 +22,6 @@ async def rechercher_entreprise( try: logger.info(f" Recherche entreprise: '{q}'") - # Appel API api_response = await rechercher_entreprise_api(q, per_page) resultats_api = api_response.get("results", []) diff --git a/routes/universign.py b/routes/universign.py index 20d7960..f67d042 100644 --- a/routes/universign.py +++ b/routes/universign.py @@ -511,7 +511,6 @@ async def webhook_universign( transaction_id = None if payload.get("type", "").startswith("transaction.") and "payload" in payload: - # Le transaction_id est dans payload.object.id nested_object = payload.get("payload", {}).get("object", {}) if nested_object.get("object") == "transaction": transaction_id = nested_object.get("id") diff --git a/sage_client.py b/sage_client.py index 0137512..9ad7b50 100644 --- a/sage_client.py +++ b/sage_client.py @@ -1,4 +1,3 @@ -# sage_client.py import requests from typing import Dict, List, Optional from config.config import settings @@ -468,7 +467,6 @@ class SageGatewayClient: "tva_encaissement": tva_encaissement, } - # Champs optionnels if date_reglement: payload["date_reglement"] = date_reglement if code_journal: diff --git a/schemas/articles/articles.py b/schemas/articles/articles.py index 79b2d62..26996a7 100644 --- a/schemas/articles/articles.py +++ b/schemas/articles/articles.py @@ -76,7 +76,6 @@ class Article(BaseModel): ) nb_emplacements: int = Field(0, description="Nombre d'emplacements") - # Champs énumérés normalisés suivi_stock: Optional[int] = Field( None, description="Type de suivi de stock (AR_SuiviStock): 0=Aucun, 1=CMUP, 2=FIFO/LIFO, 3=Sérialisé", diff --git a/schemas/documents/reglements.py b/schemas/documents/reglements.py index bf6d178..5cc5e2c 100644 --- a/schemas/documents/reglements.py +++ b/schemas/documents/reglements.py @@ -10,12 +10,10 @@ logger = logging.getLogger(__name__) class ReglementFactureCreate(BaseModel): """Requête de règlement d'une facture côté VPS""" - # Montant et devise montant: Decimal = Field(..., gt=0, description="Montant à régler") devise_code: Optional[int] = Field(0, description="Code devise (0=EUR par défaut)") cours_devise: Optional[Decimal] = Field(1.0, description="Cours de la devise") - # Mode et journal mode_reglement: int = Field( ..., ge=0, description="Code mode règlement depuis /reglements/modes" ) @@ -23,13 +21,11 @@ class ReglementFactureCreate(BaseModel): ..., min_length=1, description="Code journal depuis /journaux/tresorerie" ) - # Dates date_reglement: Optional[date] = Field( None, description="Date du règlement (défaut: aujourd'hui)" ) date_echeance: Optional[date] = Field(None, description="Date d'échéance") - # Références reference: Optional[str] = Field( "", max_length=17, description="Référence pièce règlement" ) @@ -37,7 +33,6 @@ class ReglementFactureCreate(BaseModel): "", max_length=35, description="Libellé du règlement" ) - # TVA sur encaissement tva_encaissement: Optional[bool] = Field( False, description="Appliquer TVA sur encaissement" ) @@ -81,7 +76,6 @@ class ReglementMultipleCreate(BaseModel): libelle: Optional[str] = Field("") tva_encaissement: Optional[bool] = Field(False) - # Factures spécifiques (optionnel) numeros_factures: Optional[List[str]] = Field( None, description="Si vide, règle les plus anciennes en premier" ) diff --git a/schemas/sage/sage_gateway.py b/schemas/sage/sage_gateway.py index e503641..9501129 100644 --- a/schemas/sage/sage_gateway.py +++ b/schemas/sage/sage_gateway.py @@ -10,7 +10,6 @@ class GatewayHealthStatus(str, Enum): UNKNOWN = "unknown" -# === CREATE === class SageGatewayCreate(BaseModel): name: str = Field( @@ -71,7 +70,6 @@ class SageGatewayUpdate(BaseModel): return v.rstrip("/") if v else v -# === RESPONSE === class SageGatewayResponse(BaseModel): id: str diff --git a/schemas/tiers/commercial.py b/schemas/tiers/commercial.py index 5a4685b..de74165 100644 --- a/schemas/tiers/commercial.py +++ b/schemas/tiers/commercial.py @@ -9,7 +9,6 @@ class CollaborateurBase(BaseModel): prenom: Optional[str] = Field(None, max_length=50) fonction: Optional[str] = Field(None, max_length=50) - # Adresse adresse: Optional[str] = Field(None, max_length=100) complement: Optional[str] = Field(None, max_length=100) code_postal: Optional[str] = Field(None, max_length=10) @@ -17,7 +16,6 @@ class CollaborateurBase(BaseModel): code_region: Optional[str] = Field(None, max_length=50) pays: Optional[str] = Field(None, max_length=50) - # Services service: Optional[str] = Field(None, max_length=50) vendeur: bool = Field(default=False) caissier: bool = Field(default=False) @@ -25,18 +23,15 @@ class CollaborateurBase(BaseModel): chef_ventes: bool = Field(default=False) numero_chef_ventes: Optional[int] = None - # Contact telephone: Optional[str] = Field(None, max_length=20) telecopie: Optional[str] = Field(None, max_length=20) email: Optional[EmailStr] = None tel_portable: Optional[str] = Field(None, max_length=20) - # Réseaux sociaux facebook: Optional[str] = Field(None, max_length=100) linkedin: Optional[str] = Field(None, max_length=100) skype: Optional[str] = Field(None, max_length=100) - # Autres matricule: Optional[str] = Field(None, max_length=20) sommeil: bool = Field(default=False) diff --git a/schemas/tiers/tiers.py b/schemas/tiers/tiers.py index 58166a1..7a46ef5 100644 --- a/schemas/tiers/tiers.py +++ b/schemas/tiers/tiers.py @@ -14,7 +14,6 @@ class TypeTiersInt(IntEnum): class TiersDetails(BaseModel): - # IDENTIFICATION numero: Optional[str] = Field(None, description="Code tiers (CT_Num)") intitule: Optional[str] = Field( None, description="Raison sociale ou Nom complet (CT_Intitule)" @@ -37,7 +36,6 @@ class TiersDetails(BaseModel): ) code_naf: Optional[str] = Field(None, description="Code NAF/APE (CT_Ape)") - # ADRESSE contact: Optional[str] = Field( None, description="Nom du contact principal (CT_Contact)" ) @@ -50,7 +48,6 @@ class TiersDetails(BaseModel): region: Optional[str] = Field(None, description="Région/État (CT_CodeRegion)") pays: Optional[str] = Field(None, description="Pays (CT_Pays)") - # TELECOM telephone: Optional[str] = Field(None, description="Téléphone fixe (CT_Telephone)") telecopie: Optional[str] = Field(None, description="Fax (CT_Telecopie)") email: Optional[str] = Field(None, description="Email principal (CT_EMail)") @@ -58,13 +55,11 @@ class TiersDetails(BaseModel): facebook: Optional[str] = Field(None, description="Profil Facebook (CT_Facebook)") linkedin: Optional[str] = Field(None, description="Profil LinkedIn (CT_LinkedIn)") - # TAUX taux01: Optional[float] = Field(None, description="Taux personnalisé 1 (CT_Taux01)") taux02: Optional[float] = Field(None, description="Taux personnalisé 2 (CT_Taux02)") taux03: Optional[float] = Field(None, description="Taux personnalisé 3 (CT_Taux03)") taux04: Optional[float] = Field(None, description="Taux personnalisé 4 (CT_Taux04)") - # STATISTIQUES statistique01: Optional[str] = Field( None, description="Statistique 1 (CT_Statistique01)" ) @@ -96,7 +91,6 @@ class TiersDetails(BaseModel): None, description="Statistique 10 (CT_Statistique10)" ) - # COMMERCIAL encours_autorise: Optional[float] = Field( None, description="Encours maximum autorisé (CT_Encours)" ) @@ -113,7 +107,6 @@ class TiersDetails(BaseModel): None, description="Détails du commercial/collaborateur" ) - # FACTURATION lettrage_auto: Optional[bool] = Field( None, description="Lettrage automatique (CT_Lettrage)" ) @@ -146,7 +139,6 @@ class TiersDetails(BaseModel): None, description="Bon à payer obligatoire (CT_BonAPayer)" ) - # LOGISTIQUE priorite_livraison: Optional[int] = Field( None, description="Priorité livraison (CT_PrioriteLivr)" ) @@ -160,17 +152,14 @@ class TiersDetails(BaseModel): None, description="Délai appro jours (CT_DelaiAppro)" ) - # COMMENTAIRE commentaire: Optional[str] = Field( None, description="Commentaire libre (CT_Commentaire)" ) - # ANALYTIQUE section_analytique: Optional[str] = Field( None, description="Section analytique (CA_Num)" ) - # ORGANISATION / SURVEILLANCE mode_reglement_code: Optional[int] = Field( None, description="Code mode règlement (MR_No)" ) @@ -200,7 +189,6 @@ class TiersDetails(BaseModel): None, description="Résultat financier (CT_SvResultat)" ) - # COMPTE GENERAL ET CATEGORIES compte_general: Optional[str] = Field( None, description="Compte général principal (CG_NumPrinc)" ) @@ -211,7 +199,6 @@ class TiersDetails(BaseModel): None, description="Catégorie comptable (N_CatCompta)" ) - # CONTACTS contacts: Optional[List[Contact]] = Field( default_factory=list, description="Liste des contacts du tiers" ) diff --git a/services/universign_document.py b/services/universign_document.py index fa899a9..9cb714e 100644 --- a/services/universign_document.py +++ b/services/universign_document.py @@ -38,7 +38,6 @@ class UniversignDocumentService: logger.info(f"{len(documents)} document(s) trouvé(s)") - # Log détaillé de chaque document for idx, doc in enumerate(documents): logger.debug( f" Document {idx}: id={doc.get('id')}, " @@ -64,7 +63,7 @@ class UniversignDocumentService: logger.error(f"⏱️ Timeout récupération transaction {transaction_id}") return None except Exception as e: - logger.error(f"❌ Erreur fetch documents: {e}", exc_info=True) + logger.error(f" Erreur fetch documents: {e}", exc_info=True) return None def download_signed_document( @@ -94,7 +93,6 @@ class UniversignDocumentService: f"Content-Type={content_type}, Size={content_length}" ) - # Vérification du type de contenu if ( "pdf" not in content_type.lower() and "octet-stream" not in content_type.lower() @@ -104,31 +102,30 @@ class UniversignDocumentService: f"Tentative de lecture quand même..." ) - # Lecture du contenu content = response.content if len(content) < 1024: - logger.error(f"❌ Document trop petit: {len(content)} octets") + logger.error(f" Document trop petit: {len(content)} octets") return None return content elif response.status_code == 404: logger.error( - f"❌ Document {document_id} introuvable pour transaction {transaction_id}" + f" Document {document_id} introuvable pour transaction {transaction_id}" ) return None elif response.status_code == 403: logger.error( - f"❌ Accès refusé au document {document_id}. " + f" Accès refusé au document {document_id}. " f"Vérifiez que la transaction est bien signée." ) return None else: logger.error( - f"❌ Erreur HTTP {response.status_code}: {response.text[:500]}" + f" Erreur HTTP {response.status_code}: {response.text[:500]}" ) return None @@ -136,13 +133,12 @@ class UniversignDocumentService: logger.error(f"⏱️ Timeout téléchargement document {document_id}") return None except Exception as e: - logger.error(f"❌ Erreur téléchargement: {e}", exc_info=True) + logger.error(f" Erreur téléchargement: {e}", exc_info=True) return None async def download_and_store_signed_document( self, session: AsyncSession, transaction, force: bool = False ) -> Tuple[bool, Optional[str]]: - # Vérification si déjà téléchargé if not force and transaction.signed_document_path: if os.path.exists(transaction.signed_document_path): logger.debug( @@ -153,7 +149,6 @@ class UniversignDocumentService: transaction.download_attempts += 1 try: - # ÉTAPE 1: Récupérer les documents de la transaction logger.info( f"Récupération document signé pour: {transaction.transaction_id}" ) @@ -167,13 +162,11 @@ class UniversignDocumentService: await session.commit() return False, error - # ÉTAPE 2: Récupérer le premier document (ou chercher celui qui est signé) document_id = None for doc in documents: doc_id = doc.get("id") doc_status = doc.get("status", "").lower() - # Priorité aux documents marqués comme signés/complétés if doc_status in ["signed", "completed", "closed"]: document_id = doc_id logger.info( @@ -181,34 +174,30 @@ class UniversignDocumentService: ) break - # Fallback sur le premier document si aucun n'est explicitement signé if document_id is None: document_id = doc_id if not document_id: error = "Impossible de déterminer l'ID du document à télécharger" - logger.error(f"❌ {error}") + logger.error(f" {error}") transaction.download_error = error await session.commit() return False, error - # Stocker le document_id pour référence future if hasattr(transaction, "universign_document_id"): transaction.universign_document_id = document_id - # ÉTAPE 3: Télécharger le document signé pdf_content = self.download_signed_document( transaction_id=transaction.transaction_id, document_id=document_id ) if not pdf_content: error = f"Échec téléchargement document {document_id}" - logger.error(f"❌ {error}") + logger.error(f" {error}") transaction.download_error = error await session.commit() return False, error - # ÉTAPE 4: Stocker le fichier localement filename = self._generate_filename(transaction) file_path = SIGNED_DOCS_DIR / filename @@ -217,13 +206,11 @@ class UniversignDocumentService: file_size = os.path.getsize(file_path) - # Mise à jour de la transaction transaction.signed_document_path = str(file_path) transaction.signed_document_downloaded_at = datetime.now() transaction.signed_document_size_bytes = file_size transaction.download_error = None - # Stocker aussi l'URL de téléchargement pour référence transaction.document_url = ( f"{self.api_url}/transactions/{transaction.transaction_id}" f"/documents/{document_id}/download" @@ -239,14 +226,14 @@ class UniversignDocumentService: except OSError as e: error = f"Erreur filesystem: {str(e)}" - logger.error(f"❌ {error}") + logger.error(f" {error}") transaction.download_error = error await session.commit() return False, error except Exception as e: error = f"Erreur inattendue: {str(e)}" - logger.error(f"❌ {error}", exc_info=True) + logger.error(f" {error}", exc_info=True) transaction.download_error = error await session.commit() return False, error @@ -294,7 +281,6 @@ class UniversignDocumentService: return deleted, int(size_freed_mb) - # === MÉTHODES DE DIAGNOSTIC === def diagnose_transaction(self, transaction_id: str) -> Dict: """ @@ -308,7 +294,6 @@ class UniversignDocumentService: } try: - # Test 1: Récupération de la transaction logger.info(f"Diagnostic transaction: {transaction_id}") response = requests.get( @@ -334,7 +319,6 @@ class UniversignDocumentService: "participants_count": len(data.get("participants", [])), } - # Test 2: Documents disponibles documents = data.get("documents", []) result["checks"]["documents"] = [] @@ -345,7 +329,6 @@ class UniversignDocumentService: "status": doc.get("status"), } - # Test téléchargement if doc.get("id"): download_url = ( f"{self.api_url}/transactions/{transaction_id}" diff --git a/services/universign_sync.py b/services/universign_sync.py index 28e633c..807802d 100644 --- a/services/universign_sync.py +++ b/services/universign_sync.py @@ -159,7 +159,6 @@ class UniversignSyncService: return stats - # CORRECTION 1 : process_webhook dans universign_sync.py async def process_webhook( self, session: AsyncSession, payload: Dict, transaction_id: str = None ) -> Tuple[bool, Optional[str]]: @@ -167,9 +166,7 @@ class UniversignSyncService: Traite un webhook Universign - CORRECTION : meilleure gestion des payloads """ try: - # Si transaction_id n'est pas fourni, essayer de l'extraire if not transaction_id: - # Même logique que dans universign.py if ( payload.get("type", "").startswith("transaction.") and "payload" in payload @@ -195,7 +192,6 @@ class UniversignSyncService: f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}" ) - # Récupérer la transaction locale query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) @@ -208,25 +204,20 @@ class UniversignSyncService: logger.warning(f"Transaction {transaction_id} inconnue localement") return False, "Transaction inconnue" - # Marquer comme webhook reçu transaction.webhook_received = True - # Stocker l'ancien statut pour comparaison old_status = transaction.local_status.value - # Force la synchronisation complète success, error = await self.sync_transaction( session, transaction, force=True ) - # Log du changement de statut if success and transaction.local_status.value != old_status: logger.info( f"Webhook traité: {transaction_id} | " f"{old_status} → {transaction.local_status.value}" ) - # Enregistrer le log du webhook await self._log_sync_attempt( session=session, transaction=transaction, @@ -248,7 +239,6 @@ class UniversignSyncService: logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True) return False, str(e) - # CORRECTION 2 : _sync_signers - Ne pas écraser les signers existants async def _sync_signers( self, session: AsyncSession, @@ -271,7 +261,6 @@ class UniversignSyncService: logger.warning(f"Signataire sans email à l'index {idx}, ignoré") continue - # PROTECTION : gérer les statuts inconnus raw_status = signer_data.get("status") or signer_data.get( "state", "waiting" ) @@ -302,7 +291,6 @@ class UniversignSyncService: if signer_data.get("name") and not signer.name: signer.name = signer_data.get("name") else: - # Nouveau signer avec gestion d'erreur intégrée try: signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}", @@ -330,7 +318,6 @@ class UniversignSyncService: ): import json - # Si statut final et pas de force, skip if is_final_status(transaction.local_status.value) and not force: logger.debug( f"⏭️ Skip {transaction.transaction_id}: statut final " @@ -340,14 +327,13 @@ class UniversignSyncService: await session.commit() return True, None - # Récupération du statut distant logger.info(f"Synchronisation: {transaction.transaction_id}") result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" - logger.error(f"❌ {error}: {transaction.transaction_id}") + logger.error(f" {error}: {transaction.transaction_id}") transaction.sync_attempts += 1 transaction.sync_error = error await self._log_sync_attempt(session, transaction, "polling", False, error) @@ -360,7 +346,6 @@ class UniversignSyncService: logger.info(f"📊 Statut Universign brut: {universign_status_raw}") - # Convertir le statut new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value @@ -369,7 +354,6 @@ class UniversignSyncService: f"{new_local_status} (Local) | Actuel: {previous_local_status}" ) - # Vérifier la transition if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" @@ -386,7 +370,6 @@ class UniversignSyncService: f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" ) - # Mise à jour du statut Universign brut try: transaction.universign_status = UniversignTransactionStatus( universign_status_raw @@ -404,11 +387,9 @@ class UniversignSyncService: else: transaction.universign_status = UniversignTransactionStatus.STARTED - # Mise à jour du statut local transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() - # Mise à jour des dates if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() logger.info("📅 Date d'envoi mise à jour") @@ -419,15 +400,12 @@ class UniversignSyncService: if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() - logger.info("❌ Date de refus mise à jour") + logger.info(" Date de refus mise à jour") if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() logger.info("⏰ Date d'expiration mise à jour") - # === SECTION CORRIGÉE: Gestion des documents === - # Ne plus chercher document_url dans la réponse (elle n'existe pas!) - # Le téléchargement se fait via le service document qui utilise le bon endpoint documents = universign_data.get("documents", []) if documents: @@ -437,7 +415,6 @@ class UniversignSyncService: f"status={first_doc.get('status')}" ) - # Téléchargement automatique du document signé if new_local_status == "SIGNE" and not transaction.signed_document_path: logger.info("Déclenchement téléchargement document signé...") @@ -456,20 +433,16 @@ class UniversignSyncService: except Exception as e: logger.error( - f"❌ Erreur téléchargement document: {e}", exc_info=True + f" Erreur téléchargement document: {e}", exc_info=True ) - # === FIN SECTION CORRIGÉE === - # Synchroniser les signataires await self._sync_signers(session, transaction, universign_data) - # Mise à jour des métadonnées de sync transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None - # Log de la tentative await self._log_sync_attempt( session=session, transaction=transaction, @@ -491,7 +464,6 @@ class UniversignSyncService: await session.commit() - # Exécuter les actions post-changement if status_changed: logger.info(f"🎬 Exécution actions pour statut: {new_local_status}") await self._execute_status_actions( @@ -507,7 +479,7 @@ class UniversignSyncService: except Exception as e: error_msg = f"Erreur lors de la synchronisation: {str(e)}" - logger.error(f"❌ {error_msg}", exc_info=True) + logger.error(f" {error_msg}", exc_info=True) transaction.sync_error = error_msg[:1000] transaction.sync_attempts += 1 @@ -519,20 +491,16 @@ class UniversignSyncService: return False, error_msg - # CORRECTION 3 : Amélioration du logging dans sync_transaction async def _sync_transaction_documents_corrected( self, session, transaction, universign_data: dict, new_local_status: str ): - # Récupérer et stocker les infos documents documents = universign_data.get("documents", []) if documents: - # Stocker le premier document_id pour référence first_doc = documents[0] first_doc_id = first_doc.get("id") if first_doc_id: - # Stocker l'ID du document (si le champ existe dans le modèle) if hasattr(transaction, "universign_document_id"): transaction.universign_document_id = first_doc_id @@ -543,7 +511,6 @@ class UniversignSyncService: else: logger.debug("Aucun document dans la réponse Universign") - # Téléchargement automatique si signé if new_local_status == "SIGNE": if not transaction.signed_document_path: logger.info("Déclenchement téléchargement document signé...") @@ -563,7 +530,7 @@ class UniversignSyncService: except Exception as e: logger.error( - f"❌ Erreur téléchargement document: {e}", exc_info=True + f" Erreur téléchargement document: {e}", exc_info=True ) else: logger.debug( diff --git a/utils/generic_functions.py b/utils/generic_functions.py index 41b734b..4cce52f 100644 --- a/utils/generic_functions.py +++ b/utils/generic_functions.py @@ -290,15 +290,11 @@ def _preparer_lignes_document(lignes: List) -> List[Dict]: UNIVERSIGN_TO_LOCAL: Dict[str, str] = { - # États initiaux "draft": "EN_ATTENTE", "ready": "EN_ATTENTE", - # En cours "started": "EN_COURS", - # États finaux (succès) "completed": "SIGNE", "closed": "SIGNE", - # États finaux (échec) "refused": "REFUSE", "expired": "EXPIRE", "canceled": "REFUSE", @@ -441,7 +437,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "ERREUR": { "fr": "Erreur technique", "en": "Technical error", - "icon": "", + "icon": "⚠️", "color": "red", }, } From 9f6c1de8efcb808397f4d149293ea8bdd97d86df Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Mon, 19 Jan 2026 20:35:03 +0300 Subject: [PATCH 2/4] feat(api-keys): implement api key management system --- middleware/security.py | 296 +++++++++++++++++++++++++++++++++++++++++ routes/api_keys.py | 180 +++++++++++++++++++++++++ schemas/api_key.py | 77 +++++++++++ 3 files changed, 553 insertions(+) create mode 100644 middleware/security.py create mode 100644 routes/api_keys.py create mode 100644 schemas/api_key.py diff --git a/middleware/security.py b/middleware/security.py new file mode 100644 index 0000000..17186a0 --- /dev/null +++ b/middleware/security.py @@ -0,0 +1,296 @@ +import secrets +from fastapi import Request, HTTPException, status +from fastapi.responses import JSONResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from sqlalchemy import select +from typing import Optional +from datetime import datetime +import logging + +from database import get_session +from database.models.api_key import SwaggerUser +from security.auth import verify_password + +logger = logging.getLogger(__name__) + +# === Configuration Swagger === +security = HTTPBasic() + + +async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: + """ + VERSION 2: Vérification des identifiants Swagger via base de données + + ✅ Plus sécurisé + ✅ Gestion centralisée + ✅ Tracking des connexions + """ + username = credentials.username + password = credentials.password + + try: + # Utiliser get_session de manière asynchrone + async for session in get_session(): + result = await session.execute( + select(SwaggerUser).where(SwaggerUser.username == username) + ) + swagger_user = result.scalar_one_or_none() + + if swagger_user and swagger_user.is_active: + if verify_password(password, swagger_user.hashed_password): + # Mise à jour de la dernière connexion + swagger_user.last_login = datetime.now() + await session.commit() + + logger.info(f"✅ Accès Swagger autorisé (DB): {username}") + return True + + logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}") + return False + + except Exception as e: + logger.error(f"❌ Erreur vérification Swagger credentials: {e}") + return False + + +class SwaggerAuthMiddleware: + """ + Middleware pour protéger les endpoints de documentation + (/docs, /redoc, /openapi.json) + + VERSION 2: Avec vérification en base de données + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive=receive) + path = request.url.path + + # Endpoints à protéger + protected_paths = ["/docs", "/redoc", "/openapi.json"] + + if any(path.startswith(protected_path) for protected_path in protected_paths): + # Vérification de l'authentification Basic + auth_header = request.headers.get("Authorization") + + if not auth_header or not auth_header.startswith("Basic "): + # Demande d'authentification + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Authentification requise pour accéder à la documentation" + }, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + # Extraction des credentials + try: + import base64 + + encoded_credentials = auth_header.split(" ")[1] + decoded_credentials = base64.b64decode(encoded_credentials).decode( + "utf-8" + ) + username, password = decoded_credentials.split(":", 1) + + credentials = HTTPBasicCredentials(username=username, password=password) + + # Vérification via DB + if not await verify_swagger_credentials(credentials): + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"detail": "Identifiants invalides"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + except Exception as e: + logger.error(f"❌ Erreur parsing auth header: {e}") + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={"detail": "Format d'authentification invalide"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + await response(scope, receive, send) + return + + # Si tout est OK, continuer + await self.app(scope, receive, send) + + +class ApiKeyMiddleware: + """ + Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics + + ✅ Accepte X-API-Key OU Authorization: Bearer (JWT) + ✅ Les deux méthodes sont équivalentes + ✅ Les endpoints /auth/* restent accessibles sans auth + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive=receive) + path = request.url.path + + # ============================================ + # ENDPOINTS EXCLUS (accessibles sans auth) + # ============================================ + excluded_paths = [ + "/docs", + "/redoc", + "/openapi.json", + "/health", + "/", + # === ROUTES AUTH (toujours publiques) === + "/api/v1/auth/login", + "/api/v1/auth/register", + "/api/v1/auth/verify-email", + "/api/v1/auth/reset-password", + "/api/v1/auth/request-reset", + "/api/v1/auth/refresh", + ] + + if any(path.startswith(excluded_path) for excluded_path in excluded_paths): + await self.app(scope, receive, send) + return + + # ============================================ + # VÉRIFICATION HYBRIDE: API Key OU JWT + # ============================================ + + # Option 1: Vérifier si JWT présent + auth_header = request.headers.get("Authorization") + has_jwt = auth_header and auth_header.startswith("Bearer ") + + # Option 2: Vérifier si API Key présente + api_key = request.headers.get("X-API-Key") + has_api_key = api_key is not None + + # ============================================ + # LOGIQUE HYBRIDE + # ============================================ + + if has_jwt: + # JWT présent → laisser passer, sera validé par les dependencies FastAPI + logger.debug(f"🔑 JWT détecté pour {path}") + await self.app(scope, receive, send) + return + + elif has_api_key: + # API Key présente → valider la clé + logger.debug(f"🔑 API Key détectée pour {path}") + + from services.api_key import ApiKeyService + + try: + async for session in get_session(): + api_key_service = ApiKeyService(session) + api_key_obj = await api_key_service.verify_api_key(api_key) + + if not api_key_obj: + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Clé API invalide ou expirée", + "hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer ", + }, + ) + await response(scope, receive, send) + return + + # Vérification du rate limit + is_allowed, rate_info = await api_key_service.check_rate_limit( + api_key_obj + ) + if not is_allowed: + response = JSONResponse( + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + content={"detail": "Rate limit dépassé"}, + headers={ + "X-RateLimit-Limit": str(rate_info["limit"]), + "X-RateLimit-Remaining": "0", + }, + ) + await response(scope, receive, send) + return + + # Vérification de l'accès à l'endpoint + has_access = await api_key_service.check_endpoint_access( + api_key_obj, path + ) + if not has_access: + response = JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "detail": "Accès non autorisé à cet endpoint", + "endpoint": path, + "api_key": api_key_obj.key_prefix + "...", + }, + ) + await response(scope, receive, send) + return + + # ✅ Clé valide → ajouter les infos à la requête + request.state.api_key = api_key_obj + request.state.authenticated_via = "api_key" + + logger.info(f"✅ API Key valide: {api_key_obj.name} → {path}") + + # Continuer la requête + await self.app(scope, receive, send) + return + + except Exception as e: + logger.error(f"❌ Erreur validation API Key: {e}", exc_info=True) + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "detail": "Erreur interne lors de la validation de la clé" + }, + ) + await response(scope, receive, send) + return + + else: + # ❌ Ni JWT ni API Key + response = JSONResponse( + status_code=status.HTTP_401_UNAUTHORIZED, + content={ + "detail": "Authentification requise", + "hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer '", + }, + headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'}, + ) + await response(scope, receive, send) + return + + +# === Fonction helper pour récupérer l'API Key depuis la requête === +def get_api_key_from_request(request: Request) -> Optional: + """Récupère l'objet ApiKey depuis la requête si présent""" + return getattr(request.state, "api_key", None) + + +def get_auth_method(request: Request) -> str: + """ + Retourne la méthode d'authentification utilisée + + Returns: + "jwt" | "api_key" | "none" + """ + return getattr(request.state, "authenticated_via", "none") diff --git a/routes/api_keys.py b/routes/api_keys.py new file mode 100644 index 0000000..8c8f6db --- /dev/null +++ b/routes/api_keys.py @@ -0,0 +1,180 @@ +from fastapi import APIRouter, Depends, HTTPException, status, Query +from sqlalchemy.ext.asyncio import AsyncSession +import logging + +from database import get_session, User +from core.dependencies import get_current_user, require_role +from services.api_key import ApiKeyService, api_key_to_response +from schemas.api_key import ( + ApiKeyCreate, + ApiKeyCreatedResponse, + ApiKeyResponse, + ApiKeyList, +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api-keys", tags=["API Keys Management"]) + + +@router.post( + "", + response_model=ApiKeyCreatedResponse, + status_code=status.HTTP_201_CREATED, + dependencies=[Depends(require_role("admin", "super_admin"))], +) +async def create_api_key( + data: ApiKeyCreate, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 🔑 Créer une nouvelle clé API + + **Réservé aux admins** + + ⚠️ La clé en clair ne sera affichée qu'une seule fois ! + """ + service = ApiKeyService(session) + + api_key_obj, api_key_plain = await service.create_api_key( + name=data.name, + description=data.description, + created_by=user.email, + user_id=user.id, + expires_in_days=data.expires_in_days, + rate_limit_per_minute=data.rate_limit_per_minute, + allowed_endpoints=data.allowed_endpoints, + ) + + logger.info(f"✅ Clé API créée par {user.email}: {data.name}") + + response_data = api_key_to_response(api_key_obj) + response_data["api_key"] = api_key_plain + + return ApiKeyCreatedResponse(**response_data) + + +@router.get("", response_model=ApiKeyList) +async def list_api_keys( + include_revoked: bool = Query(False, description="Inclure les clés révoquées"), + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 📋 Lister toutes les clés API + + **Réservé aux admins** - liste toutes les clés + **Utilisateurs standards** - liste uniquement leurs clés + """ + service = ApiKeyService(session) + + # Si admin, voir toutes les clés, sinon seulement les siennes + user_id = None if user.role in ["admin", "super_admin"] else user.id + + keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id) + + items = [ApiKeyResponse(**api_key_to_response(k)) for k in keys] + + return ApiKeyList(total=len(items), items=items) + + +@router.get("/{key_id}", response_model=ApiKeyResponse) +async def get_api_key( + key_id: str, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """🔍 Récupérer une clé API par son ID""" + service = ApiKeyService(session) + + api_key_obj = await service.get_by_id(key_id) + + if not api_key_obj: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Clé API {key_id} introuvable", + ) + + # Vérification des permissions + if user.role not in ["admin", "super_admin"]: + if api_key_obj.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Accès refusé à cette clé", + ) + + return ApiKeyResponse(**api_key_to_response(api_key_obj)) + + +@router.delete("/{key_id}", status_code=status.HTTP_200_OK) +async def revoke_api_key( + key_id: str, + session: AsyncSession = Depends(get_session), + user: User = Depends(get_current_user), +): + """ + 🚫 Révoquer une clé API + + **Action irréversible** - la clé sera désactivée définitivement + """ + service = ApiKeyService(session) + + api_key_obj = await service.get_by_id(key_id) + + if not api_key_obj: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Clé API {key_id} introuvable", + ) + + # Vérification des permissions + if user.role not in ["admin", "super_admin"]: + if api_key_obj.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Accès refusé à cette clé", + ) + + success = await service.revoke_api_key(key_id) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Erreur lors de la révocation", + ) + + logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}") + + return { + "success": True, + "message": f"Clé API '{api_key_obj.name}' révoquée avec succès", + } + + +@router.post("/verify", status_code=status.HTTP_200_OK) +async def verify_api_key_endpoint( + api_key: str = Query(..., description="Clé API à vérifier"), + session: AsyncSession = Depends(get_session), +): + """ + ✅ Vérifier la validité d'une clé API + + **Endpoint public** - permet de tester une clé + """ + service = ApiKeyService(session) + + api_key_obj = await service.verify_api_key(api_key) + + if not api_key_obj: + return { + "valid": False, + "message": "Clé API invalide, expirée ou révoquée", + } + + return { + "valid": True, + "message": "Clé API valide", + "key_name": api_key_obj.name, + "rate_limit": api_key_obj.rate_limit_per_minute, + "expires_at": api_key_obj.expires_at, + } diff --git a/schemas/api_key.py b/schemas/api_key.py new file mode 100644 index 0000000..6a2d659 --- /dev/null +++ b/schemas/api_key.py @@ -0,0 +1,77 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime + + +class ApiKeyCreate(BaseModel): + """Schema pour créer une clé API""" + + name: str = Field(..., min_length=3, max_length=255, description="Nom de la clé") + description: Optional[str] = Field(None, description="Description de l'usage") + expires_in_days: Optional[int] = Field( + None, ge=1, le=3650, description="Expiration en jours (max 10 ans)" + ) + rate_limit_per_minute: int = Field( + 60, ge=1, le=1000, description="Limite de requêtes par minute" + ) + allowed_endpoints: Optional[List[str]] = Field( + None, description="Endpoints autorisés ([] = tous, ['/clients*'] = wildcard)" + ) + + +class ApiKeyResponse(BaseModel): + """Schema de réponse pour une clé API""" + + id: str + name: str + description: Optional[str] + key_prefix: str + is_active: bool + is_expired: bool + rate_limit_per_minute: int + allowed_endpoints: Optional[List[str]] + total_requests: int + last_used_at: Optional[datetime] + created_at: datetime + expires_at: Optional[datetime] + revoked_at: Optional[datetime] + created_by: str + + +class ApiKeyCreatedResponse(ApiKeyResponse): + """Schema de réponse après création (inclut la clé en clair)""" + + api_key: str = Field( + ..., description="⚠️ Clé API en clair - à sauvegarder immédiatement" + ) + + +class ApiKeyList(BaseModel): + """Liste de clés API""" + + total: int + items: List[ApiKeyResponse] + + +class SwaggerUserCreate(BaseModel): + """Schema pour créer un utilisateur Swagger""" + + username: str = Field(..., min_length=3, max_length=100) + password: str = Field(..., min_length=8) + full_name: Optional[str] = None + email: Optional[str] = None + + +class SwaggerUserResponse(BaseModel): + """Schema de réponse pour un utilisateur Swagger""" + + id: str + username: str + full_name: Optional[str] + email: Optional[str] + is_active: bool + created_at: datetime + last_login: Optional[datetime] + + class Config: + from_attributes = True From a10fda072cf489405d9d0d8f9870947ff75d1335 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Mon, 19 Jan 2026 20:38:01 +0300 Subject: [PATCH 3/4] feat(security): implement API key authentication system --- .gitignore | 4 +- config/cors_config.py | 321 ++++++++++++++++++++++++++++++++ core/dependencies.py | 239 ++++++++++++++++++------ scripts/manage_security.py | 290 +++++++++++++++++++++++++++++ scripts/test_security.py | 369 +++++++++++++++++++++++++++++++++++++ services/api_key.py | 233 +++++++++++++++++++++++ 6 files changed, 1395 insertions(+), 61 deletions(-) create mode 100644 config/cors_config.py create mode 100644 scripts/manage_security.py create mode 100644 scripts/test_security.py create mode 100644 services/api_key.py diff --git a/.gitignore b/.gitignore index 7d75fa8..ed8f8ab 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,6 @@ tools/ .env.staging .env.production -.trunk \ No newline at end of file +.trunk + +*clean*.py \ No newline at end of file diff --git a/config/cors_config.py b/config/cors_config.py new file mode 100644 index 0000000..6ee880e --- /dev/null +++ b/config/cors_config.py @@ -0,0 +1,321 @@ +""" +CONFIGURATION CORS POUR API AVEC CLÉS API MULTIPLES + +Problématique: +- Les clés API seront utilisées depuis de nombreux domaines/IPs différents +- Impossible de lister tous les origins autorisés à l'avance +- Solution: CORS permissif mais sécurisé par les clés API + +Stratégies: +1. CORS ouvert avec validation par clé API (RECOMMANDÉ) +2. CORS dynamique basé sur whitelist +3. CORS avec wildcard et credentials=False +""" + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from typing import List +import os +import logging + +logger = logging.getLogger(__name__) + + +# ============================================ +# STRATÉGIE 1: CORS OUVERT + VALIDATION API KEY +# ============================================ +# ✅ RECOMMANDÉ pour les API publiques avec clés +# +# Principe: +# - Accepter toutes les origines (allow_origins=["*"]) +# - La sécurité est assurée par la validation des clés API +# - Les clés API protègent l'accès, pas le CORS + + +def configure_cors_open(app: FastAPI): + """ + Configuration CORS ouverte (RECOMMANDÉE) + + ✅ Accepte toutes les origines + ✅ Sécurité assurée par les clés API + ✅ Simplifie l'utilisation pour les clients + + ⚠️ Attention: credentials=False obligatoire avec allow_origins=["*"] + """ + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Accepte toutes les origines + allow_credentials=False, # ⚠️ Obligatoire avec "*" + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["*"], # Accepte tous les headers (dont X-API-Key) + expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], + max_age=3600, # Cache preflight requests pendant 1h + ) + + logger.info("🌐 CORS configuré: Mode OUVERT (sécurisé par API Keys)") + logger.info(" - Origins: * (toutes)") + logger.info(" - Headers: * (dont X-API-Key)") + logger.info(" - Credentials: False") + + +# ============================================ +# STRATÉGIE 2: CORS AVEC WHITELIST DYNAMIQUE +# ============================================ +# 🔶 Pour environnements contrôlés +# +# Principe: +# - Lister explicitement les domaines autorisés +# - Peut inclure des patterns wildcards +# - Credentials possible (cookies, etc.) + + +def configure_cors_whitelist(app: FastAPI): + """ + Configuration CORS avec whitelist (MODE CONTRÔLÉ) + + ✅ Meilleur contrôle des origines + ✅ Credentials possible + ❌ Nécessite maintenance de la liste + + À utiliser si vous connaissez tous les domaines clients + """ + + # Charger depuis .env ou config + allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "") + + if allowed_origins_str: + allowed_origins = [ + origin.strip() + for origin in allowed_origins_str.split(",") + if origin.strip() + ] + else: + # Valeurs par défaut + allowed_origins = [ + "http://localhost:3000", # Frontend dev React/Vue + "http://localhost:5173", # Vite dev + "http://localhost:8080", # Frontend dev alternatif + "https://app.votre-domaine.com", + "https://admin.votre-domaine.com", + # Ajouter vos domaines de production + ] + + app.add_middleware( + CORSMiddleware, + allow_origins=allowed_origins, + allow_credentials=True, # ✅ Possible avec liste explicite + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["Content-Type", "Authorization", "X-API-Key"], + expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], + max_age=3600, + ) + + logger.info("🌐 CORS configuré: Mode WHITELIST") + logger.info(f" - Origins autorisées: {len(allowed_origins)}") + for origin in allowed_origins: + logger.info(f" • {origin}") + + +# ============================================ +# STRATÉGIE 3: CORS AVEC REGEX (AVANCÉ) +# ============================================ +# 🔶 Pour patterns complexes +# +# Exemple: Autoriser tous les sous-domaines *.votre-domaine.com + + +def configure_cors_regex(app: FastAPI): + """ + Configuration CORS avec patterns regex (AVANCÉ) + + ✅ Flexible pour sous-domaines + ✅ Supporte patterns complexes + ❌ Plus complexe à configurer + + Utilise allow_origin_regex au lieu de allow_origins + """ + + # Pattern regex pour autoriser tous les sous-domaines + origin_regex = r"https://.*\.votre-domaine\.com" + + app.add_middleware( + CORSMiddleware, + allow_origin_regex=origin_regex, # ⚠️ Utilise regex au lieu de liste + allow_credentials=True, + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["Content-Type", "Authorization", "X-API-Key"], + expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], + max_age=3600, + ) + + logger.info("🌐 CORS configuré: Mode REGEX") + logger.info(f" - Pattern: {origin_regex}") + + +# ============================================ +# STRATÉGIE 4: CORS HYBRIDE (PRODUCTION) +# ============================================ +# ✅ RECOMMANDÉ pour production +# +# Principe: +# - Whitelist pour les domaines connus (credentials=True) +# - Fallback sur "*" pour le reste (credentials=False) + + +def configure_cors_hybrid(app: FastAPI): + """ + Configuration CORS hybride (PRODUCTION) + + ✅ Meilleur des deux mondes + ✅ Whitelist pour domaines connus + ✅ Fallback ouvert pour API Keys externes + + Note: Nécessite un middleware custom pour gérer les deux modes + """ + from starlette.middleware.base import BaseHTTPMiddleware + from starlette.responses import Response + + class HybridCORSMiddleware(BaseHTTPMiddleware): + def __init__(self, app, known_origins: List[str]): + super().__init__(app) + self.known_origins = set(known_origins) + + async def dispatch(self, request, call_next): + origin = request.headers.get("origin") + + # Si origin connue → CORS strict avec credentials + if origin in self.known_origins: + response = await call_next(request) + response.headers["Access-Control-Allow-Origin"] = origin + response.headers["Access-Control-Allow-Credentials"] = "true" + response.headers["Access-Control-Allow-Methods"] = ( + "GET, POST, PUT, DELETE, PATCH, OPTIONS" + ) + response.headers["Access-Control-Allow-Headers"] = ( + "Content-Type, Authorization, X-API-Key" + ) + return response + + # Sinon → CORS ouvert sans credentials + response = await call_next(request) + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Methods"] = ( + "GET, POST, PUT, DELETE, PATCH, OPTIONS" + ) + response.headers["Access-Control-Allow-Headers"] = "*" + return response + + # Domaines connus (whitelist) + known_origins = [ + "https://app.votre-domaine.com", + "https://admin.votre-domaine.com", + "http://localhost:3000", + "http://localhost:5173", + ] + + app.add_middleware(HybridCORSMiddleware, known_origins=known_origins) + + logger.info("🌐 CORS configuré: Mode HYBRIDE") + logger.info(f" - Whitelist: {len(known_origins)} domaines") + logger.info(" - Fallback: * (ouvert)") + + +# ============================================ +# FONCTION PRINCIPALE +# ============================================ + + +def setup_cors(app: FastAPI, mode: str = "open"): + """ + Configure CORS selon le mode choisi + + Args: + app: Instance FastAPI + mode: "open" | "whitelist" | "regex" | "hybrid" + + Recommandations: + - Development: "open" + - Production (API publique): "open" ou "hybrid" + - Production (API interne): "whitelist" + """ + + if mode == "open": + configure_cors_open(app) + elif mode == "whitelist": + configure_cors_whitelist(app) + elif mode == "regex": + configure_cors_regex(app) + elif mode == "hybrid": + configure_cors_hybrid(app) + else: + logger.warning( + f"⚠️ Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." + ) + configure_cors_open(app) + + +# ============================================ +# EXEMPLE D'UTILISATION DANS api.py +# ============================================ + +""" +# Dans api.py + +from config.cors_config import setup_cors + +app = FastAPI(...) + +# DÉVELOPPEMENT +setup_cors(app, mode="open") + +# PRODUCTION (API publique avec clés) +setup_cors(app, mode="hybrid") + +# PRODUCTION (API interne uniquement) +setup_cors(app, mode="whitelist") +""" + + +# ============================================ +# VARIABLES D'ENVIRONNEMENT (.env) +# ============================================ + +""" +# Pour mode "whitelist" +CORS_ALLOWED_ORIGINS=https://app.example.com,https://admin.example.com,http://localhost:3000 + +# Pour mode "regex" +CORS_ORIGIN_REGEX=https://.*\.example\.com + +# Choisir le mode +CORS_MODE=open +""" + + +# ============================================ +# FAQ CORS + API KEYS +# ============================================ + +""" +Q: Pourquoi allow_origins=["*"] est sécurisé avec des API Keys ? +R: Les clés API protègent l'accès aux données. CORS empêche seulement les + navigateurs web de faire des requêtes cross-origin. Un attaquant peut + contourner CORS facilement (curl, postman), donc la vraie sécurité vient + de la validation des clés API, pas du CORS. + +Q: Pourquoi credentials=False avec allow_origins=["*"] ? +R: C'est une restriction du navigateur (spec CORS). Vous ne pouvez pas avoir + credentials=True ET origins=["*"] en même temps. + +Q: Mes clients utilisent des IPs dynamiques, que faire ? +R: Utilisez mode "open". Les clés API sont justement faites pour ça - + permettre l'accès depuis n'importe quelle origine, de manière sécurisée. + +Q: Je veux quand même utiliser des cookies/sessions ? +R: Utilisez mode "whitelist" ou "hybrid" pour les domaines qui ont besoin + de credentials, et laissez les autres utiliser X-API-Key sans credentials. + +Q: Comment tester CORS localement ? +R: Créez un simple fichier HTML avec fetch() et ouvrez-le en file:// + Ou utilisez un serveur local (python -m http.server) +""" diff --git a/core/dependencies.py b/core/dependencies.py index 039081c..6782e98 100644 --- a/core/dependencies.py +++ b/core/dependencies.py @@ -1,4 +1,4 @@ -from fastapi import Depends, HTTPException, status +from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select @@ -6,89 +6,208 @@ from database import get_session, User from security.auth import decode_token from typing import Optional from datetime import datetime +import logging -security = HTTPBearer() +logger = logging.getLogger(__name__) + +security = HTTPBearer(auto_error=False) # ⚠️ auto_error=False pour gérer manuellement -async def get_current_user( - credentials: HTTPAuthorizationCredentials = Depends(security), +async def get_current_user_hybrid( + request: Request, + credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: - token = credentials.credentials + """ + VERSION HYBRIDE: Accepte JWT OU API Key - payload = decode_token(token) - if not payload: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Token invalide ou expiré", - headers={"WWW-Authenticate": "Bearer"}, + Priorité: + 1. JWT (Authorization: Bearer) + 2. API Key (déjà validée par middleware) + + Si API Key utilisée, retourne un "user virtuel" basé sur la clé + """ + + # ============================================ + # OPTION 1: JWT (comportement standard) + # ============================================ + if credentials and credentials.credentials: + token = credentials.credentials + + payload = decode_token(token) + if not payload: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token invalide ou expiré", + headers={"WWW-Authenticate": "Bearer"}, + ) + + if payload.get("type") != "access": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Type de token incorrect", + headers={"WWW-Authenticate": "Bearer"}, + ) + + user_id: str = payload.get("sub") + if not user_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token malformé", + headers={"WWW-Authenticate": "Bearer"}, + ) + + result = await session.execute(select(User).where(User.id == user_id)) + user = result.scalar_one_or_none() + + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Utilisateur introuvable", + headers={"WWW-Authenticate": "Bearer"}, + ) + + if not user.is_active: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé" + ) + + if not user.is_verified: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Email non vérifié. Consultez votre boîte de réception.", + ) + + if user.locked_until and user.locked_until > datetime.now(): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Compte temporairement verrouillé suite à trop de tentatives échouées", + ) + + logger.debug(f"🔐 Authentifié via JWT: {user.email}") + return user + + # ============================================ + # OPTION 2: API Key (validée par middleware) + # ============================================ + api_key_obj = getattr(request.state, "api_key", None) + + if api_key_obj: + # Créer un "user virtuel" basé sur la clé API + # Cela permet aux routes existantes de fonctionner sans modification + + # Si la clé est associée à un vrai user, le récupérer + if api_key_obj.user_id: + result = await session.execute( + select(User).where(User.id == api_key_obj.user_id) + ) + user = result.scalar_one_or_none() + + if user: + logger.debug( + f"🔑 Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" + ) + return user + + # Sinon, créer un user virtuel (pour les clés API sans user associé) + from database import User as UserModel + + virtual_user = UserModel( + id=f"api_key_{api_key_obj.id}", + email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local", + nom="API Key", + prenom=api_key_obj.name, + role="api_client", # Rôle spécial pour les API keys + is_verified=True, + is_active=True, ) - if payload.get("type") != "access": - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Type de token incorrect", - headers={"WWW-Authenticate": "Bearer"}, - ) + # Marquer que c'est un user virtuel + virtual_user._is_api_key_user = True + virtual_user._api_key_obj = api_key_obj - user_id: str = payload.get("sub") - if not user_id: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Token malformé", - headers={"WWW-Authenticate": "Bearer"}, - ) + logger.debug(f"🔑 Authentifié via API Key: {api_key_obj.name} (user virtuel)") + return virtual_user - result = await session.execute(select(User).where(User.id == user_id)) - user = result.scalar_one_or_none() - - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Utilisateur introuvable", - headers={"WWW-Authenticate": "Bearer"}, - ) - - if not user.is_active: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé" - ) - - if not user.is_verified: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Email non vérifié. Consultez votre boîte de réception.", - ) - - if user.locked_until and user.locked_until > datetime.now(): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Compte temporairement verrouillé suite à trop de tentatives échouées", - ) - - return user + # ============================================ + # AUCUNE AUTHENTIFICATION + # ============================================ + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Authentification requise (JWT ou API Key)", + headers={"WWW-Authenticate": "Bearer"}, + ) -async def get_current_user_optional( +async def get_current_user_optional_hybrid( + request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> Optional[User]: - if not credentials: - return None - + """Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)""" try: - return await get_current_user(credentials, session) + return await get_current_user_hybrid(request, credentials, session) except HTTPException: return None -def require_role(*allowed_roles: str): - async def role_checker(user: User = Depends(get_current_user)) -> User: - if user.role not in allowed_roles: +def require_role_hybrid(*allowed_roles: str): + """ + VERSION HYBRIDE: Vérification de rôle compatible avec API Keys + + Notes: + - Les users via JWT ont leur vrai rôle + - Les users via API Key ont le rôle "api_client" par défaut + - Vous pouvez autoriser "api_client" dans allowed_roles pour permettre l'accès aux API Keys + """ + + async def role_checker( + request: Request, user: User = Depends(get_current_user_hybrid) + ) -> User: + # Vérifier si c'est un user API Key + is_api_key_user = getattr(user, "_is_api_key_user", False) + + if is_api_key_user: + # Pour les API Keys, vérifier si "api_client" est autorisé + if "api_client" not in allowed_roles and "*" not in allowed_roles: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.", + ) + logger.debug(f"✅ API Key autorisée pour cette route") + return user + + # Pour les vrais users, vérification standard + if user.role not in allowed_roles and "*" not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}", ) + return user return role_checker + + +# ============================================ +# HELPERS +# ============================================ + + +def is_api_key_user(user: User) -> bool: + """Vérifie si l'utilisateur est authentifié via API Key""" + return getattr(user, "_is_api_key_user", False) + + +def get_api_key_from_user(user: User): + """Récupère l'objet API Key depuis un user virtuel""" + return getattr(user, "_api_key_obj", None) + + +# ============================================ +# RÉTROCOMPATIBILITÉ +# ============================================ + +# Alias pour garder la compatibilité avec le code existant +get_current_user = get_current_user_hybrid +get_current_user_optional = get_current_user_optional_hybrid diff --git a/scripts/manage_security.py b/scripts/manage_security.py new file mode 100644 index 0000000..35c0cff --- /dev/null +++ b/scripts/manage_security.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +""" +Script CLI pour gérer les clés API et utilisateurs Swagger + +Usage: + python manage_security.py swagger add + python manage_security.py swagger list + python manage_security.py swagger delete + + python manage_security.py apikey create [--days 365] [--rate-limit 60] + python manage_security.py apikey list + python manage_security.py apikey revoke + python manage_security.py apikey verify +""" + +import asyncio +import sys +from pathlib import Path +import argparse + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from database import get_session +from database.models.api_key import SwaggerUser +from services.api_key import ApiKeyService +from security.auth import hash_password, verify_password +from sqlalchemy import select +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# ============================================ +# GESTION DES UTILISATEURS SWAGGER +# ============================================ + +async def add_swagger_user(username: str, password: str, full_name: str = None): + """Ajouter un utilisateur Swagger""" + async with get_session() as session: + # Vérifier si existe + result = await session.execute( + select(SwaggerUser).where(SwaggerUser.username == username) + ) + existing = result.scalar_one_or_none() + + if existing: + logger.error(f"❌ L'utilisateur {username} existe déjà") + return + + # Créer + user = SwaggerUser( + username=username, + hashed_password=hash_password(password), + full_name=full_name or username, + is_active=True, + ) + + session.add(user) + await session.commit() + + logger.info(f"✅ Utilisateur Swagger créé: {username}") + print(f"\n✅ Utilisateur créé avec succès") + print(f" Username: {username}") + print(f" Accès: https://votre-serveur/docs") + + +async def list_swagger_users(): + """Lister les utilisateurs Swagger""" + async with get_session() as session: + result = await session.execute(select(SwaggerUser)) + users = result.scalars().all() + + if not users: + print("Aucun utilisateur Swagger trouvé") + return + + print(f"\n📋 {len(users)} utilisateur(s) Swagger:\n") + for user in users: + status = "✅ Actif" if user.is_active else "❌ Inactif" + print(f" • {user.username:<20} {status}") + if user.full_name: + print(f" Nom: {user.full_name}") + if user.last_login: + print(f" Dernière connexion: {user.last_login}") + print() + + +async def delete_swagger_user(username: str): + """Supprimer un utilisateur Swagger""" + async with get_session() as session: + result = await session.execute( + select(SwaggerUser).where(SwaggerUser.username == username) + ) + user = result.scalar_one_or_none() + + if not user: + logger.error(f"❌ Utilisateur {username} introuvable") + return + + await session.delete(user) + await session.commit() + + logger.info(f"🗑️ Utilisateur supprimé: {username}") + + +# ============================================ +# GESTION DES CLÉS API +# ============================================ + +async def create_api_key( + name: str, + description: str = None, + expires_in_days: int = 365, + rate_limit: int = 60, + endpoints: list = None, +): + """Créer une clé API""" + async with get_session() as session: + service = ApiKeyService(session) + + api_key_obj, api_key_plain = await service.create_api_key( + name=name, + description=description, + created_by="CLI", + expires_in_days=expires_in_days, + rate_limit_per_minute=rate_limit, + allowed_endpoints=endpoints, + ) + + print(f"\n✅ Clé API créée avec succès\n") + print(f" ID: {api_key_obj.id}") + print(f" Nom: {name}") + print(f" Clé: {api_key_plain}") + print(f" Préfixe: {api_key_obj.key_prefix}") + print(f" Rate limit: {rate_limit} req/min") + print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") + print(f"\n⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") + + +async def list_api_keys(): + """Lister les clés API""" + async with get_session() as session: + service = ApiKeyService(session) + keys = await service.list_api_keys() + + if not keys: + print("Aucune clé API trouvée") + return + + print(f"\n📋 {len(keys)} clé(s) API:\n") + for key in keys: + status = "✅" if key.is_active else "❌" + expired = "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else "" + + print(f" {status} {key.name:<30} ({key.key_prefix}...)") + print(f" ID: {key.id}") + print(f" Requêtes: {key.total_requests}") + print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + if expired: + print(f" {expired}") + print() + + +async def revoke_api_key(key_id: str): + """Révoquer une clé API""" + async with get_session() as session: + service = ApiKeyService(session) + + api_key = await service.get_by_id(key_id) + if not api_key: + logger.error(f"❌ Clé {key_id} introuvable") + return + + success = await service.revoke_api_key(key_id) + + if success: + logger.info(f"🚫 Clé révoquée: {api_key.name}") + print(f"\n✅ Clé '{api_key.name}' révoquée avec succès") + else: + logger.error("❌ Erreur lors de la révocation") + + +async def verify_api_key_cmd(api_key: str): + """Vérifier une clé API""" + async with get_session() as session: + service = ApiKeyService(session) + api_key_obj = await service.verify_api_key(api_key) + + if api_key_obj: + print(f"\n✅ Clé API valide\n") + print(f" Nom: {api_key_obj.name}") + print(f" ID: {api_key_obj.id}") + print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") + print(f" Requêtes: {api_key_obj.total_requests}") + print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") + else: + print(f"\n❌ Clé API invalide, expirée ou révoquée\n") + + +# ============================================ +# CLI PRINCIPAL +# ============================================ + +async def main(): + parser = argparse.ArgumentParser( + description="Gestion de la sécurité Sage Dataven API" + ) + + subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") + + # === SWAGGER === + swagger_parser = subparsers.add_parser("swagger", help="Gestion utilisateurs Swagger") + swagger_subparsers = swagger_parser.add_subparsers(dest="action") + + # swagger add + swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") + swagger_add.add_argument("username", help="Nom d'utilisateur") + swagger_add.add_argument("password", help="Mot de passe") + swagger_add.add_argument("--full-name", help="Nom complet") + + # swagger list + swagger_subparsers.add_parser("list", help="Lister les utilisateurs") + + # swagger delete + swagger_delete = swagger_subparsers.add_parser("delete", help="Supprimer un utilisateur") + swagger_delete.add_argument("username", help="Nom d'utilisateur") + + # === API KEYS === + apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") + apikey_subparsers = apikey_parser.add_subparsers(dest="action") + + # apikey create + apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") + apikey_create.add_argument("name", help="Nom de la clé") + apikey_create.add_argument("--description", help="Description") + apikey_create.add_argument("--days", type=int, default=365, help="Expiration en jours") + apikey_create.add_argument("--rate-limit", type=int, default=60, help="Limite req/min") + apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") + + # apikey list + apikey_subparsers.add_parser("list", help="Lister les clés") + + # apikey revoke + apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") + apikey_revoke.add_argument("key_id", help="ID de la clé") + + # apikey verify + apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") + apikey_verify.add_argument("api_key", help="Clé API à vérifier") + + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + # Exécution des commandes + if args.command == "swagger": + if args.action == "add": + await add_swagger_user(args.username, args.password, args.full_name) + elif args.action == "list": + await list_swagger_users() + elif args.action == "delete": + await delete_swagger_user(args.username) + else: + swagger_parser.print_help() + + elif args.command == "apikey": + if args.action == "create": + await create_api_key( + args.name, + args.description, + args.days, + args.rate_limit, + args.endpoints, + ) + elif args.action == "list": + await list_api_keys() + elif args.action == "revoke": + await revoke_api_key(args.key_id) + elif args.action == "verify": + await verify_api_key_cmd(args.api_key) + else: + apikey_parser.print_help() + + +if __name__ == "__main__": + from datetime import datetime + asyncio.run(main()) \ No newline at end of file diff --git a/scripts/test_security.py b/scripts/test_security.py new file mode 100644 index 0000000..79f0299 --- /dev/null +++ b/scripts/test_security.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python3 +""" +Script de test automatisé pour vérifier la sécurité de l'API + +Usage: + python test_security.py --url http://votre-vps:8000 --swagger-user admin --swagger-pass password +""" + +import requests +import argparse +import sys +from typing import Dict, Tuple +import json + + +class SecurityTester: + def __init__(self, base_url: str): + self.base_url = base_url.rstrip("/") + self.results = {"passed": 0, "failed": 0, "tests": []} + + def log_test(self, name: str, passed: bool, details: str = ""): + """Enregistrer le résultat d'un test""" + status = "✅ PASS" if passed else "❌ FAIL" + print(f"{status} - {name}") + if details: + print(f" {details}") + + self.results["tests"].append( + {"name": name, "passed": passed, "details": details} + ) + + if passed: + self.results["passed"] += 1 + else: + self.results["failed"] += 1 + + def test_swagger_without_auth(self) -> bool: + """Test 1: Swagger UI devrait demander une authentification""" + print("\n🔍 Test 1: Protection Swagger UI") + + try: + response = requests.get(f"{self.base_url}/docs", timeout=5) + + if response.status_code == 401: + self.log_test( + "Swagger protégé", + True, + "Code 401 retourné sans authentification", + ) + return True + else: + self.log_test( + "Swagger protégé", + False, + f"Code {response.status_code} au lieu de 401", + ) + return False + + except Exception as e: + self.log_test("Swagger protégé", False, f"Erreur: {str(e)}") + return False + + def test_swagger_with_auth(self, username: str, password: str) -> bool: + """Test 2: Swagger UI accessible avec credentials valides""" + print("\n🔍 Test 2: Accès Swagger avec authentification") + + try: + response = requests.get( + f"{self.base_url}/docs", auth=(username, password), timeout=5 + ) + + if response.status_code == 200: + self.log_test( + "Accès Swagger avec auth", + True, + f"Authentifié comme {username}", + ) + return True + else: + self.log_test( + "Accès Swagger avec auth", + False, + f"Code {response.status_code}, credentials invalides?", + ) + return False + + except Exception as e: + self.log_test("Accès Swagger avec auth", False, f"Erreur: {str(e)}") + return False + + def test_api_without_auth(self) -> bool: + """Test 3: Endpoints API devraient demander une authentification""" + print("\n🔍 Test 3: Protection des endpoints API") + + test_endpoints = ["/api/v1/clients", "/api/v1/documents"] + + all_protected = True + for endpoint in test_endpoints: + try: + response = requests.get(f"{self.base_url}{endpoint}", timeout=5) + + if response.status_code == 401: + print(f" ✅ {endpoint} protégé (401)") + else: + print( + f" ❌ {endpoint} accessible sans auth (code {response.status_code})" + ) + all_protected = False + + except Exception as e: + print(f" ⚠️ {endpoint} erreur: {str(e)}") + all_protected = False + + self.log_test("Endpoints API protégés", all_protected) + return all_protected + + def test_health_endpoint_public(self) -> bool: + """Test 4: Endpoint /health devrait être accessible sans auth""" + print("\n🔍 Test 4: Endpoint /health public") + + try: + response = requests.get(f"{self.base_url}/health", timeout=5) + + if response.status_code == 200: + self.log_test("/health accessible", True, "Endpoint public fonctionne") + return True + else: + self.log_test( + "/health accessible", + False, + f"Code {response.status_code} inattendu", + ) + return False + + except Exception as e: + self.log_test("/health accessible", False, f"Erreur: {str(e)}") + return False + + def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]: + """Test 5: Créer une clé API via l'endpoint""" + print("\n🔍 Test 5: Création d'une clé API") + + try: + # 1. Login pour obtenir un JWT + login_response = requests.post( + f"{self.base_url}/api/v1/auth/login", + json={"email": username, "password": password}, + timeout=5, + ) + + if login_response.status_code != 200: + self.log_test( + "Création clé API", + False, + "Impossible de se connecter pour obtenir un JWT", + ) + return False, "" + + jwt_token = login_response.json().get("access_token") + + # 2. Créer une clé API + create_response = requests.post( + f"{self.base_url}/api/v1/api-keys", + headers={"Authorization": f"Bearer {jwt_token}"}, + json={ + "name": "Test API Key", + "description": "Clé de test automatisé", + "rate_limit_per_minute": 60, + "expires_in_days": 30, + }, + timeout=5, + ) + + if create_response.status_code == 201: + api_key = create_response.json().get("api_key") + self.log_test("Création clé API", True, f"Clé créée: {api_key[:20]}...") + return True, api_key + else: + self.log_test( + "Création clé API", + False, + f"Code {create_response.status_code}", + ) + return False, "" + + except Exception as e: + self.log_test("Création clé API", False, f"Erreur: {str(e)}") + return False, "" + + def test_api_key_usage(self, api_key: str) -> bool: + """Test 6: Utiliser une clé API pour accéder à un endpoint""" + print("\n🔍 Test 6: Utilisation d'une clé API") + + if not api_key: + self.log_test("Utilisation clé API", False, "Pas de clé disponible") + return False + + try: + response = requests.get( + f"{self.base_url}/api/v1/clients", + headers={"X-API-Key": api_key}, + timeout=5, + ) + + if response.status_code == 200: + self.log_test("Utilisation clé API", True, "Clé acceptée") + return True + else: + self.log_test( + "Utilisation clé API", + False, + f"Code {response.status_code}, clé refusée?", + ) + return False + + except Exception as e: + self.log_test("Utilisation clé API", False, f"Erreur: {str(e)}") + return False + + def test_invalid_api_key(self) -> bool: + """Test 7: Une clé invalide devrait être refusée""" + print("\n🔍 Test 7: Rejet de clé API invalide") + + invalid_key = "sdk_live_invalid_key_12345" + + try: + response = requests.get( + f"{self.base_url}/api/v1/clients", + headers={"X-API-Key": invalid_key}, + timeout=5, + ) + + if response.status_code == 401: + self.log_test("Clé invalide rejetée", True, "Code 401 comme attendu") + return True + else: + self.log_test( + "Clé invalide rejetée", + False, + f"Code {response.status_code} au lieu de 401", + ) + return False + + except Exception as e: + self.log_test("Clé invalide rejetée", False, f"Erreur: {str(e)}") + return False + + def test_rate_limiting(self, api_key: str) -> bool: + """Test 8: Rate limiting (optionnel, peut prendre du temps)""" + print("\n🔍 Test 8: Rate limiting (test simple)") + + if not api_key: + self.log_test("Rate limiting", False, "Pas de clé disponible") + return False + + # Envoyer 70 requêtes rapidement (limite = 60/min) + print(" Envoi de 70 requêtes rapides...") + + rate_limited = False + for i in range(70): + try: + response = requests.get( + f"{self.base_url}/health", + headers={"X-API-Key": api_key}, + timeout=1, + ) + + if response.status_code == 429: + rate_limited = True + print(f" ⚠️ Rate limit atteint à la requête {i + 1}") + break + + except Exception: + pass + + if rate_limited: + self.log_test("Rate limiting", True, "Rate limit détecté") + return True + else: + self.log_test( + "Rate limiting", + True, + "Aucun rate limit détecté (peut être normal si pas implémenté)", + ) + return True + + def print_summary(self): + """Afficher le résumé des tests""" + print("\n" + "=" * 60) + print("📊 RÉSUMÉ DES TESTS") + print("=" * 60) + + total = self.results["passed"] + self.results["failed"] + success_rate = (self.results["passed"] / total * 100) if total > 0 else 0 + + print(f"\nTotal: {total} tests") + print(f"✅ Réussis: {self.results['passed']}") + print(f"❌ Échoués: {self.results['failed']}") + print(f"📈 Taux de réussite: {success_rate:.1f}%\n") + + if self.results["failed"] == 0: + print("🎉 Tous les tests sont passés ! Sécurité OK.") + return 0 + else: + print("⚠️ Certains tests ont échoué. Vérifiez la configuration.") + return 1 + + +def main(): + parser = argparse.ArgumentParser( + description="Test automatisé de la sécurité de l'API" + ) + + parser.add_argument( + "--url", + required=True, + help="URL de base de l'API (ex: http://localhost:8000)", + ) + + parser.add_argument( + "--swagger-user", required=True, help="Utilisateur Swagger pour les tests" + ) + + parser.add_argument( + "--swagger-pass", required=True, help="Mot de passe Swagger pour les tests" + ) + + parser.add_argument( + "--skip-rate-limit", + action="store_true", + help="Sauter le test de rate limiting (long)", + ) + + args = parser.parse_args() + + print("🚀 Démarrage des tests de sécurité") + print(f"🎯 URL cible: {args.url}\n") + + tester = SecurityTester(args.url) + + # Exécuter les tests + tester.test_swagger_without_auth() + tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass) + tester.test_api_without_auth() + tester.test_health_endpoint_public() + + # Tests nécessitant une clé API + success, api_key = tester.test_api_key_creation( + args.swagger_user, args.swagger_pass + ) + + if success and api_key: + tester.test_api_key_usage(api_key) + tester.test_invalid_api_key() + + if not args.skip_rate_limit: + tester.test_rate_limiting(api_key) + else: + print("\n⏭️ Test de rate limiting sauté") + else: + print("\n⚠️ Tests avec clé API sautés (création échouée)") + + # Résumé + exit_code = tester.print_summary() + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/services/api_key.py b/services/api_key.py new file mode 100644 index 0000000..d54943a --- /dev/null +++ b/services/api_key.py @@ -0,0 +1,233 @@ +import secrets +import hashlib +import json +from datetime import datetime, timedelta +from typing import Optional, List, Dict +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_, or_ +import logging + +from database.models.api_key import ApiKey + +logger = logging.getLogger(__name__) + + +class ApiKeyService: + """Service de gestion des clés API""" + + def __init__(self, session: AsyncSession): + self.session = session + + @staticmethod + def generate_api_key() -> str: + """Génère une clé API unique et sécurisée""" + # Format: sdk_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + random_part = secrets.token_urlsafe(32) + return f"sdk_live_{random_part}" + + @staticmethod + def hash_api_key(api_key: str) -> str: + """Hash la clé API pour stockage sécurisé""" + return hashlib.sha256(api_key.encode()).hexdigest() + + @staticmethod + def get_key_prefix(api_key: str) -> str: + """Extrait le préfixe de la clé pour identification""" + # Retourne les 12 premiers caractères + return api_key[:12] if len(api_key) >= 12 else api_key + + async def create_api_key( + self, + name: str, + description: Optional[str] = None, + created_by: str = "system", + user_id: Optional[str] = None, + expires_in_days: Optional[int] = None, + rate_limit_per_minute: int = 60, + allowed_endpoints: Optional[List[str]] = None, + ) -> tuple[ApiKey, str]: + """ + Crée une nouvelle clé API + + Returns: + tuple[ApiKey, str]: (objet ApiKey, clé en clair - à ne montrer qu'une fois) + """ + # Génération de la clé + api_key_plain = self.generate_api_key() + key_hash = self.hash_api_key(api_key_plain) + key_prefix = self.get_key_prefix(api_key_plain) + + # Calcul de la date d'expiration + expires_at = None + if expires_in_days: + expires_at = datetime.now() + timedelta(days=expires_in_days) + + # Création de l'objet + api_key_obj = ApiKey( + key_hash=key_hash, + key_prefix=key_prefix, + name=name, + description=description, + created_by=created_by, + user_id=user_id, + expires_at=expires_at, + rate_limit_per_minute=rate_limit_per_minute, + allowed_endpoints=json.dumps(allowed_endpoints) + if allowed_endpoints + else None, + ) + + self.session.add(api_key_obj) + await self.session.commit() + await self.session.refresh(api_key_obj) + + logger.info(f"✅ Clé API créée: {name} (prefix: {key_prefix})") + + return api_key_obj, api_key_plain + + async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]: + """ + Vérifie une clé API et retourne l'objet si valide + + Returns: + Optional[ApiKey]: L'objet ApiKey si valide, None sinon + """ + key_hash = self.hash_api_key(api_key_plain) + + result = await self.session.execute( + select(ApiKey).where( + and_( + ApiKey.key_hash == key_hash, + ApiKey.is_active == True, + ApiKey.revoked_at.is_(None), + or_( + ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now() + ), + ) + ) + ) + + api_key_obj = result.scalar_one_or_none() + + if api_key_obj: + # Mise à jour des statistiques + api_key_obj.total_requests += 1 + api_key_obj.last_used_at = datetime.now() + await self.session.commit() + + logger.debug(f"🔑 Clé API validée: {api_key_obj.name}") + else: + logger.warning(f"⚠️ Clé API invalide ou expirée") + + return api_key_obj + + async def list_api_keys( + self, + include_revoked: bool = False, + user_id: Optional[str] = None, + ) -> List[ApiKey]: + """Liste les clés API""" + query = select(ApiKey) + + if not include_revoked: + query = query.where(ApiKey.revoked_at.is_(None)) + + if user_id: + query = query.where(ApiKey.user_id == user_id) + + query = query.order_by(ApiKey.created_at.desc()) + + result = await self.session.execute(query) + return list(result.scalars().all()) + + async def revoke_api_key(self, key_id: str) -> bool: + """Révoque une clé API""" + result = await self.session.execute(select(ApiKey).where(ApiKey.id == key_id)) + api_key_obj = result.scalar_one_or_none() + + if not api_key_obj: + return False + + api_key_obj.is_active = False + api_key_obj.revoked_at = datetime.now() + await self.session.commit() + + logger.info(f"🚫 Clé API révoquée: {api_key_obj.name}") + return True + + async def get_by_id(self, key_id: str) -> Optional[ApiKey]: + """Récupère une clé API par son ID""" + result = await self.session.execute(select(ApiKey).where(ApiKey.id == key_id)) + return result.scalar_one_or_none() + + async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]: + """ + Vérifie le rate limit d'une clé API (à implémenter avec Redis/cache) + + Returns: + tuple[bool, Dict]: (is_allowed, info_dict) + """ + # TODO: Implémenter avec Redis pour un vrai rate limiting + # Pour l'instant, retourne toujours True + return True, { + "allowed": True, + "limit": api_key_obj.rate_limit_per_minute, + "remaining": api_key_obj.rate_limit_per_minute, + } + + async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool: + """Vérifie si la clé a accès à un endpoint spécifique""" + if not api_key_obj.allowed_endpoints: + # Si aucune restriction, accès total + return True + + try: + allowed = json.loads(api_key_obj.allowed_endpoints) + + # Support des wildcards + for pattern in allowed: + if pattern == "*": + return True + if pattern.endswith("*"): + prefix = pattern[:-1] + if endpoint.startswith(prefix): + return True + if pattern == endpoint: + return True + + return False + except json.JSONDecodeError: + logger.error(f"❌ Erreur parsing allowed_endpoints pour {api_key_obj.id}") + return False + + +def api_key_to_response(api_key_obj: ApiKey, show_key: bool = False) -> Dict: + """Convertit un objet ApiKey en réponse API""" + + allowed_endpoints = None + if api_key_obj.allowed_endpoints: + try: + allowed_endpoints = json.loads(api_key_obj.allowed_endpoints) + except json.JSONDecodeError: + pass + + is_expired = False + if api_key_obj.expires_at: + is_expired = api_key_obj.expires_at < datetime.now() + + return { + "id": api_key_obj.id, + "name": api_key_obj.name, + "description": api_key_obj.description, + "key_prefix": api_key_obj.key_prefix, + "is_active": api_key_obj.is_active, + "is_expired": is_expired, + "rate_limit_per_minute": api_key_obj.rate_limit_per_minute, + "allowed_endpoints": allowed_endpoints, + "total_requests": api_key_obj.total_requests, + "last_used_at": api_key_obj.last_used_at, + "created_at": api_key_obj.created_at, + "expires_at": api_key_obj.expires_at, + "revoked_at": api_key_obj.revoked_at, + "created_by": api_key_obj.created_by, + } From 1164c7975ae2742c6248cd443a0b3175d26c0a97 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 20 Jan 2026 06:31:17 +0300 Subject: [PATCH 4/4] refactor: remove emojis and clean up code comments --- config/cors_config.py | 254 ++++------------------------- core/dependencies.py | 59 +------ database/models/api_key.py | 14 +- middleware/security.py | 90 ++-------- routes/api_keys.py | 32 +--- schemas/api_key.py | 2 +- scripts/manage_security.py | 108 +++++------- scripts/test_security.py | 61 +++---- services/api_key.py | 40 +---- services/universign_document.py | 2 +- services/universign_sync.py | 17 +- utils/generic_functions.py | 4 +- utils/universign_status_mapping.py | 2 +- 13 files changed, 137 insertions(+), 548 deletions(-) diff --git a/config/cors_config.py b/config/cors_config.py index 6ee880e..0f3a4d2 100644 --- a/config/cors_config.py +++ b/config/cors_config.py @@ -1,17 +1,3 @@ -""" -CONFIGURATION CORS POUR API AVEC CLÉS API MULTIPLES - -Problématique: -- Les clés API seront utilisées depuis de nombreux domaines/IPs différents -- Impossible de lister tous les origins autorisés à l'avance -- Solution: CORS permissif mais sécurisé par les clés API - -Stratégies: -1. CORS ouvert avec validation par clé API (RECOMMANDÉ) -2. CORS dynamique basé sur whitelist -3. CORS avec wildcard et credentials=False -""" - from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from typing import List @@ -21,66 +7,24 @@ import logging logger = logging.getLogger(__name__) -# ============================================ -# STRATÉGIE 1: CORS OUVERT + VALIDATION API KEY -# ============================================ -# ✅ RECOMMANDÉ pour les API publiques avec clés -# -# Principe: -# - Accepter toutes les origines (allow_origins=["*"]) -# - La sécurité est assurée par la validation des clés API -# - Les clés API protègent l'accès, pas le CORS - - def configure_cors_open(app: FastAPI): - """ - Configuration CORS ouverte (RECOMMANDÉE) - - ✅ Accepte toutes les origines - ✅ Sécurité assurée par les clés API - ✅ Simplifie l'utilisation pour les clients - - ⚠️ Attention: credentials=False obligatoire avec allow_origins=["*"] - """ app.add_middleware( CORSMiddleware, - allow_origins=["*"], # Accepte toutes les origines - allow_credentials=False, # ⚠️ Obligatoire avec "*" + allow_origins=["*"], + allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], - allow_headers=["*"], # Accepte tous les headers (dont X-API-Key) + allow_headers=["*"], expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], - max_age=3600, # Cache preflight requests pendant 1h + max_age=3600, ) - logger.info("🌐 CORS configuré: Mode OUVERT (sécurisé par API Keys)") + logger.info(" CORS configuré: Mode OUVERT (sécurisé par API Keys)") logger.info(" - Origins: * (toutes)") logger.info(" - Headers: * (dont X-API-Key)") logger.info(" - Credentials: False") -# ============================================ -# STRATÉGIE 2: CORS AVEC WHITELIST DYNAMIQUE -# ============================================ -# 🔶 Pour environnements contrôlés -# -# Principe: -# - Lister explicitement les domaines autorisés -# - Peut inclure des patterns wildcards -# - Credentials possible (cookies, etc.) - - def configure_cors_whitelist(app: FastAPI): - """ - Configuration CORS avec whitelist (MODE CONTRÔLÉ) - - ✅ Meilleur contrôle des origines - ✅ Credentials possible - ❌ Nécessite maintenance de la liste - - À utiliser si vous connaissez tous les domaines clients - """ - - # Charger depuis .env ou config allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "") if allowed_origins_str: @@ -90,57 +34,11 @@ def configure_cors_whitelist(app: FastAPI): if origin.strip() ] else: - # Valeurs par défaut - allowed_origins = [ - "http://localhost:3000", # Frontend dev React/Vue - "http://localhost:5173", # Vite dev - "http://localhost:8080", # Frontend dev alternatif - "https://app.votre-domaine.com", - "https://admin.votre-domaine.com", - # Ajouter vos domaines de production - ] + allowed_origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, - allow_credentials=True, # ✅ Possible avec liste explicite - allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], - allow_headers=["Content-Type", "Authorization", "X-API-Key"], - expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], - max_age=3600, - ) - - logger.info("🌐 CORS configuré: Mode WHITELIST") - logger.info(f" - Origins autorisées: {len(allowed_origins)}") - for origin in allowed_origins: - logger.info(f" • {origin}") - - -# ============================================ -# STRATÉGIE 3: CORS AVEC REGEX (AVANCÉ) -# ============================================ -# 🔶 Pour patterns complexes -# -# Exemple: Autoriser tous les sous-domaines *.votre-domaine.com - - -def configure_cors_regex(app: FastAPI): - """ - Configuration CORS avec patterns regex (AVANCÉ) - - ✅ Flexible pour sous-domaines - ✅ Supporte patterns complexes - ❌ Plus complexe à configurer - - Utilise allow_origin_regex au lieu de allow_origins - """ - - # Pattern regex pour autoriser tous les sous-domaines - origin_regex = r"https://.*\.votre-domaine\.com" - - app.add_middleware( - CORSMiddleware, - allow_origin_regex=origin_regex, # ⚠️ Utilise regex au lieu de liste allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "X-API-Key"], @@ -148,32 +46,31 @@ def configure_cors_regex(app: FastAPI): max_age=3600, ) - logger.info("🌐 CORS configuré: Mode REGEX") + logger.info(" CORS configuré: Mode WHITELIST") + logger.info(f" - Origins autorisées: {len(allowed_origins)}") + for origin in allowed_origins: + logger.info(f" • {origin}") + + +def configure_cors_regex(app: FastAPI): + origin_regex = r"*" + + app.add_middleware( + CORSMiddleware, + allow_origin_regex=origin_regex, + allow_credentials=True, + allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"], + allow_headers=["Content-Type", "Authorization", "X-API-Key"], + expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"], + max_age=3600, + ) + + logger.info(" CORS configuré: Mode REGEX") logger.info(f" - Pattern: {origin_regex}") -# ============================================ -# STRATÉGIE 4: CORS HYBRIDE (PRODUCTION) -# ============================================ -# ✅ RECOMMANDÉ pour production -# -# Principe: -# - Whitelist pour les domaines connus (credentials=True) -# - Fallback sur "*" pour le reste (credentials=False) - - def configure_cors_hybrid(app: FastAPI): - """ - Configuration CORS hybride (PRODUCTION) - - ✅ Meilleur des deux mondes - ✅ Whitelist pour domaines connus - ✅ Fallback ouvert pour API Keys externes - - Note: Nécessite un middleware custom pour gérer les deux modes - """ from starlette.middleware.base import BaseHTTPMiddleware - from starlette.responses import Response class HybridCORSMiddleware(BaseHTTPMiddleware): def __init__(self, app, known_origins: List[str]): @@ -183,7 +80,6 @@ def configure_cors_hybrid(app: FastAPI): async def dispatch(self, request, call_next): origin = request.headers.get("origin") - # Si origin connue → CORS strict avec credentials if origin in self.known_origins: response = await call_next(request) response.headers["Access-Control-Allow-Origin"] = origin @@ -196,7 +92,6 @@ def configure_cors_hybrid(app: FastAPI): ) return response - # Sinon → CORS ouvert sans credentials response = await call_next(request) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = ( @@ -205,40 +100,16 @@ def configure_cors_hybrid(app: FastAPI): response.headers["Access-Control-Allow-Headers"] = "*" return response - # Domaines connus (whitelist) - known_origins = [ - "https://app.votre-domaine.com", - "https://admin.votre-domaine.com", - "http://localhost:3000", - "http://localhost:5173", - ] + known_origins = ["*"] app.add_middleware(HybridCORSMiddleware, known_origins=known_origins) - logger.info("🌐 CORS configuré: Mode HYBRIDE") + logger.info(" CORS configuré: Mode HYBRIDE") logger.info(f" - Whitelist: {len(known_origins)} domaines") logger.info(" - Fallback: * (ouvert)") -# ============================================ -# FONCTION PRINCIPALE -# ============================================ - - def setup_cors(app: FastAPI, mode: str = "open"): - """ - Configure CORS selon le mode choisi - - Args: - app: Instance FastAPI - mode: "open" | "whitelist" | "regex" | "hybrid" - - Recommandations: - - Development: "open" - - Production (API publique): "open" ou "hybrid" - - Production (API interne): "whitelist" - """ - if mode == "open": configure_cors_open(app) elif mode == "whitelist": @@ -249,73 +120,6 @@ def setup_cors(app: FastAPI, mode: str = "open"): configure_cors_hybrid(app) else: logger.warning( - f"⚠️ Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." + f" Mode CORS inconnu: {mode}. Utilisation de 'open' par défaut." ) configure_cors_open(app) - - -# ============================================ -# EXEMPLE D'UTILISATION DANS api.py -# ============================================ - -""" -# Dans api.py - -from config.cors_config import setup_cors - -app = FastAPI(...) - -# DÉVELOPPEMENT -setup_cors(app, mode="open") - -# PRODUCTION (API publique avec clés) -setup_cors(app, mode="hybrid") - -# PRODUCTION (API interne uniquement) -setup_cors(app, mode="whitelist") -""" - - -# ============================================ -# VARIABLES D'ENVIRONNEMENT (.env) -# ============================================ - -""" -# Pour mode "whitelist" -CORS_ALLOWED_ORIGINS=https://app.example.com,https://admin.example.com,http://localhost:3000 - -# Pour mode "regex" -CORS_ORIGIN_REGEX=https://.*\.example\.com - -# Choisir le mode -CORS_MODE=open -""" - - -# ============================================ -# FAQ CORS + API KEYS -# ============================================ - -""" -Q: Pourquoi allow_origins=["*"] est sécurisé avec des API Keys ? -R: Les clés API protègent l'accès aux données. CORS empêche seulement les - navigateurs web de faire des requêtes cross-origin. Un attaquant peut - contourner CORS facilement (curl, postman), donc la vraie sécurité vient - de la validation des clés API, pas du CORS. - -Q: Pourquoi credentials=False avec allow_origins=["*"] ? -R: C'est une restriction du navigateur (spec CORS). Vous ne pouvez pas avoir - credentials=True ET origins=["*"] en même temps. - -Q: Mes clients utilisent des IPs dynamiques, que faire ? -R: Utilisez mode "open". Les clés API sont justement faites pour ça - - permettre l'accès depuis n'importe quelle origine, de manière sécurisée. - -Q: Je veux quand même utiliser des cookies/sessions ? -R: Utilisez mode "whitelist" ou "hybrid" pour les domaines qui ont besoin - de credentials, et laissez les autres utiliser X-API-Key sans credentials. - -Q: Comment tester CORS localement ? -R: Créez un simple fichier HTML avec fetch() et ouvrez-le en file:// - Ou utilisez un serveur local (python -m http.server) -""" diff --git a/core/dependencies.py b/core/dependencies.py index 6782e98..ff443a6 100644 --- a/core/dependencies.py +++ b/core/dependencies.py @@ -10,7 +10,7 @@ import logging logger = logging.getLogger(__name__) -security = HTTPBearer(auto_error=False) # ⚠️ auto_error=False pour gérer manuellement +security = HTTPBearer(auto_error=False) async def get_current_user_hybrid( @@ -18,19 +18,6 @@ async def get_current_user_hybrid( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), session: AsyncSession = Depends(get_session), ) -> User: - """ - VERSION HYBRIDE: Accepte JWT OU API Key - - Priorité: - 1. JWT (Authorization: Bearer) - 2. API Key (déjà validée par middleware) - - Si API Key utilisée, retourne un "user virtuel" basé sur la clé - """ - - # ============================================ - # OPTION 1: JWT (comportement standard) - # ============================================ if credentials and credentials.credentials: token = credentials.credentials @@ -84,19 +71,12 @@ async def get_current_user_hybrid( detail="Compte temporairement verrouillé suite à trop de tentatives échouées", ) - logger.debug(f"🔐 Authentifié via JWT: {user.email}") + logger.debug(f" Authentifié via JWT: {user.email}") return user - # ============================================ - # OPTION 2: API Key (validée par middleware) - # ============================================ api_key_obj = getattr(request.state, "api_key", None) if api_key_obj: - # Créer un "user virtuel" basé sur la clé API - # Cela permet aux routes existantes de fonctionner sans modification - - # Si la clé est associée à un vrai user, le récupérer if api_key_obj.user_id: result = await session.execute( select(User).where(User.id == api_key_obj.user_id) @@ -105,11 +85,10 @@ async def get_current_user_hybrid( if user: logger.debug( - f"🔑 Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" + f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}" ) return user - # Sinon, créer un user virtuel (pour les clés API sans user associé) from database import User as UserModel virtual_user = UserModel( @@ -117,21 +96,17 @@ async def get_current_user_hybrid( email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local", nom="API Key", prenom=api_key_obj.name, - role="api_client", # Rôle spécial pour les API keys + role="api_client", is_verified=True, is_active=True, ) - # Marquer que c'est un user virtuel virtual_user._is_api_key_user = True virtual_user._api_key_obj = api_key_obj - logger.debug(f"🔑 Authentifié via API Key: {api_key_obj.name} (user virtuel)") + logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)") return virtual_user - # ============================================ - # AUCUNE AUTHENTIFICATION - # ============================================ raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise (JWT ou API Key)", @@ -152,32 +127,20 @@ async def get_current_user_optional_hybrid( def require_role_hybrid(*allowed_roles: str): - """ - VERSION HYBRIDE: Vérification de rôle compatible avec API Keys - - Notes: - - Les users via JWT ont leur vrai rôle - - Les users via API Key ont le rôle "api_client" par défaut - - Vous pouvez autoriser "api_client" dans allowed_roles pour permettre l'accès aux API Keys - """ - async def role_checker( request: Request, user: User = Depends(get_current_user_hybrid) ) -> User: - # Vérifier si c'est un user API Key is_api_key_user = getattr(user, "_is_api_key_user", False) if is_api_key_user: - # Pour les API Keys, vérifier si "api_client" est autorisé if "api_client" not in allowed_roles and "*" not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.", ) - logger.debug(f"✅ API Key autorisée pour cette route") + logger.debug(" API Key autorisée pour cette route") return user - # Pour les vrais users, vérification standard if user.role not in allowed_roles and "*" not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, @@ -189,11 +152,6 @@ def require_role_hybrid(*allowed_roles: str): return role_checker -# ============================================ -# HELPERS -# ============================================ - - def is_api_key_user(user: User) -> bool: """Vérifie si l'utilisateur est authentifié via API Key""" return getattr(user, "_is_api_key_user", False) @@ -204,10 +162,5 @@ def get_api_key_from_user(user: User): return getattr(user, "_api_key_obj", None) -# ============================================ -# RÉTROCOMPATIBILITÉ -# ============================================ - -# Alias pour garder la compatibilité avec le code existant get_current_user = get_current_user_hybrid get_current_user_optional = get_current_user_optional_hybrid diff --git a/database/models/api_key.py b/database/models/api_key.py index 1e54342..0d246ab 100644 --- a/database/models/api_key.py +++ b/database/models/api_key.py @@ -12,29 +12,21 @@ class ApiKey(Base): id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) key_hash = Column(String(64), unique=True, nullable=False, index=True) - key_prefix = Column( - String(10), nullable=False - ) # Premiers caractères pour identification + key_prefix = Column(String(10), nullable=False) name = Column(String(255), nullable=False) description = Column(Text, nullable=True) - # Métadonnées - user_id = Column(String(36), nullable=True) # Optionnel si associé à un utilisateur + user_id = Column(String(36), nullable=True) created_by = Column(String(255), nullable=False) - # Contrôle d'accès is_active = Column(Boolean, default=True, nullable=False) rate_limit_per_minute = Column(Integer, default=60, nullable=False) - allowed_endpoints = Column( - Text, nullable=True - ) # JSON array des endpoints autorisés + allowed_endpoints = Column(Text, nullable=True) - # Statistiques total_requests = Column(Integer, default=0, nullable=False) last_used_at = Column(DateTime, nullable=True) - # Dates created_at = Column(DateTime, default=datetime.now, nullable=False) expires_at = Column(DateTime, nullable=True) revoked_at = Column(DateTime, nullable=True) diff --git a/middleware/security.py b/middleware/security.py index 17186a0..137e7dd 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -1,5 +1,4 @@ -import secrets -from fastapi import Request, HTTPException, status +from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from sqlalchemy import select @@ -13,23 +12,14 @@ from security.auth import verify_password logger = logging.getLogger(__name__) -# === Configuration Swagger === security = HTTPBasic() async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: - """ - VERSION 2: Vérification des identifiants Swagger via base de données - - ✅ Plus sécurisé - ✅ Gestion centralisée - ✅ Tracking des connexions - """ username = credentials.username password = credentials.password try: - # Utiliser get_session de manière asynchrone async for session in get_session(): result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -38,29 +28,21 @@ async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool: if swagger_user and swagger_user.is_active: if verify_password(password, swagger_user.hashed_password): - # Mise à jour de la dernière connexion swagger_user.last_login = datetime.now() await session.commit() - logger.info(f"✅ Accès Swagger autorisé (DB): {username}") + logger.info(f" Accès Swagger autorisé (DB): {username}") return True - logger.warning(f"⚠️ Tentative d'accès Swagger refusée: {username}") + logger.warning(f" Tentative d'accès Swagger refusée: {username}") return False except Exception as e: - logger.error(f"❌ Erreur vérification Swagger credentials: {e}") + logger.error(f" Erreur vérification Swagger credentials: {e}") return False class SwaggerAuthMiddleware: - """ - Middleware pour protéger les endpoints de documentation - (/docs, /redoc, /openapi.json) - - VERSION 2: Avec vérification en base de données - """ - def __init__(self, app): self.app = app @@ -72,15 +54,12 @@ class SwaggerAuthMiddleware: request = Request(scope, receive=receive) path = request.url.path - # Endpoints à protéger protected_paths = ["/docs", "/redoc", "/openapi.json"] if any(path.startswith(protected_path) for protected_path in protected_paths): - # Vérification de l'authentification Basic auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): - # Demande d'authentification response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -91,7 +70,6 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return - # Extraction des credentials try: import base64 @@ -103,7 +81,6 @@ class SwaggerAuthMiddleware: credentials = HTTPBasicCredentials(username=username, password=password) - # Vérification via DB if not await verify_swagger_credentials(credentials): response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, @@ -114,7 +91,7 @@ class SwaggerAuthMiddleware: return except Exception as e: - logger.error(f"❌ Erreur parsing auth header: {e}") + logger.error(f" Erreur parsing auth header: {e}") response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Format d'authentification invalide"}, @@ -123,19 +100,10 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return - # Si tout est OK, continuer await self.app(scope, receive, send) class ApiKeyMiddleware: - """ - Middleware HYBRIDE pour vérifier les clés API sur les endpoints publics - - ✅ Accepte X-API-Key OU Authorization: Bearer (JWT) - ✅ Les deux méthodes sont équivalentes - ✅ Les endpoints /auth/* restent accessibles sans auth - """ - def __init__(self, app): self.app = app @@ -147,53 +115,37 @@ class ApiKeyMiddleware: request = Request(scope, receive=receive) path = request.url.path - # ============================================ - # ENDPOINTS EXCLUS (accessibles sans auth) - # ============================================ excluded_paths = [ "/docs", "/redoc", "/openapi.json", "/health", "/", - # === ROUTES AUTH (toujours publiques) === - "/api/v1/auth/login", - "/api/v1/auth/register", - "/api/v1/auth/verify-email", - "/api/v1/auth/reset-password", - "/api/v1/auth/request-reset", - "/api/v1/auth/refresh", + "/auth/login", + "/auth/register", + "/auth/verify-email", + "/auth/reset-password", + "/auth/request-reset", + "/auth/refresh", ] if any(path.startswith(excluded_path) for excluded_path in excluded_paths): await self.app(scope, receive, send) return - # ============================================ - # VÉRIFICATION HYBRIDE: API Key OU JWT - # ============================================ - - # Option 1: Vérifier si JWT présent auth_header = request.headers.get("Authorization") has_jwt = auth_header and auth_header.startswith("Bearer ") - # Option 2: Vérifier si API Key présente api_key = request.headers.get("X-API-Key") has_api_key = api_key is not None - # ============================================ - # LOGIQUE HYBRIDE - # ============================================ - if has_jwt: - # JWT présent → laisser passer, sera validé par les dependencies FastAPI - logger.debug(f"🔑 JWT détecté pour {path}") + logger.debug(f" JWT détecté pour {path}") await self.app(scope, receive, send) return elif has_api_key: - # API Key présente → valider la clé - logger.debug(f"🔑 API Key détectée pour {path}") + logger.debug(f" API Key détectée pour {path}") from services.api_key import ApiKeyService @@ -213,7 +165,6 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # Vérification du rate limit is_allowed, rate_info = await api_key_service.check_rate_limit( api_key_obj ) @@ -229,7 +180,6 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # Vérification de l'accès à l'endpoint has_access = await api_key_service.check_endpoint_access( api_key_obj, path ) @@ -245,18 +195,16 @@ class ApiKeyMiddleware: await response(scope, receive, send) return - # ✅ Clé valide → ajouter les infos à la requête request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" - logger.info(f"✅ API Key valide: {api_key_obj.name} → {path}") + logger.info(f" API Key valide: {api_key_obj.name} → {path}") - # Continuer la requête await self.app(scope, receive, send) return except Exception as e: - logger.error(f"❌ Erreur validation API Key: {e}", exc_info=True) + logger.error(f" Erreur validation API Key: {e}", exc_info=True) response = JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ @@ -267,7 +215,6 @@ class ApiKeyMiddleware: return else: - # ❌ Ni JWT ni API Key response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -280,17 +227,10 @@ class ApiKeyMiddleware: return -# === Fonction helper pour récupérer l'API Key depuis la requête === def get_api_key_from_request(request: Request) -> Optional: """Récupère l'objet ApiKey depuis la requête si présent""" return getattr(request.state, "api_key", None) def get_auth_method(request: Request) -> str: - """ - Retourne la méthode d'authentification utilisée - - Returns: - "jwt" | "api_key" | "none" - """ return getattr(request.state, "authenticated_via", "none") diff --git a/routes/api_keys.py b/routes/api_keys.py index 8c8f6db..27f0efc 100644 --- a/routes/api_keys.py +++ b/routes/api_keys.py @@ -27,13 +27,6 @@ async def create_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 🔑 Créer une nouvelle clé API - - **Réservé aux admins** - - ⚠️ La clé en clair ne sera affichée qu'une seule fois ! - """ service = ApiKeyService(session) api_key_obj, api_key_plain = await service.create_api_key( @@ -46,7 +39,7 @@ async def create_api_key( allowed_endpoints=data.allowed_endpoints, ) - logger.info(f"✅ Clé API créée par {user.email}: {data.name}") + logger.info(f" Clé API créée par {user.email}: {data.name}") response_data = api_key_to_response(api_key_obj) response_data["api_key"] = api_key_plain @@ -60,15 +53,8 @@ async def list_api_keys( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 📋 Lister toutes les clés API - - **Réservé aux admins** - liste toutes les clés - **Utilisateurs standards** - liste uniquement leurs clés - """ service = ApiKeyService(session) - # Si admin, voir toutes les clés, sinon seulement les siennes user_id = None if user.role in ["admin", "super_admin"] else user.id keys = await service.list_api_keys(include_revoked=include_revoked, user_id=user_id) @@ -84,7 +70,7 @@ async def get_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """🔍 Récupérer une clé API par son ID""" + """ Récupérer une clé API par son ID""" service = ApiKeyService(session) api_key_obj = await service.get_by_id(key_id) @@ -95,7 +81,6 @@ async def get_api_key( detail=f"Clé API {key_id} introuvable", ) - # Vérification des permissions if user.role not in ["admin", "super_admin"]: if api_key_obj.user_id != user.id: raise HTTPException( @@ -112,11 +97,6 @@ async def revoke_api_key( session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user), ): - """ - 🚫 Révoquer une clé API - - **Action irréversible** - la clé sera désactivée définitivement - """ service = ApiKeyService(session) api_key_obj = await service.get_by_id(key_id) @@ -127,7 +107,6 @@ async def revoke_api_key( detail=f"Clé API {key_id} introuvable", ) - # Vérification des permissions if user.role not in ["admin", "super_admin"]: if api_key_obj.user_id != user.id: raise HTTPException( @@ -143,7 +122,7 @@ async def revoke_api_key( detail="Erreur lors de la révocation", ) - logger.info(f"🚫 Clé API révoquée par {user.email}: {api_key_obj.name}") + logger.info(f" Clé API révoquée par {user.email}: {api_key_obj.name}") return { "success": True, @@ -156,11 +135,6 @@ async def verify_api_key_endpoint( api_key: str = Query(..., description="Clé API à vérifier"), session: AsyncSession = Depends(get_session), ): - """ - ✅ Vérifier la validité d'une clé API - - **Endpoint public** - permet de tester une clé - """ service = ApiKeyService(session) api_key_obj = await service.verify_api_key(api_key) diff --git a/schemas/api_key.py b/schemas/api_key.py index 6a2d659..4ec49b6 100644 --- a/schemas/api_key.py +++ b/schemas/api_key.py @@ -42,7 +42,7 @@ class ApiKeyCreatedResponse(ApiKeyResponse): """Schema de réponse après création (inclut la clé en clair)""" api_key: str = Field( - ..., description="⚠️ Clé API en clair - à sauvegarder immédiatement" + ..., description=" Clé API en clair - à sauvegarder immédiatement" ) diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 35c0cff..1f234b9 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,18 +1,3 @@ -#!/usr/bin/env python3 -""" -Script CLI pour gérer les clés API et utilisateurs Swagger - -Usage: - python manage_security.py swagger add - python manage_security.py swagger list - python manage_security.py swagger delete - - python manage_security.py apikey create [--days 365] [--rate-limit 60] - python manage_security.py apikey list - python manage_security.py apikey revoke - python manage_security.py apikey verify -""" - import asyncio import sys from pathlib import Path @@ -23,7 +8,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from database import get_session from database.models.api_key import SwaggerUser from services.api_key import ApiKeyService -from security.auth import hash_password, verify_password +from security.auth import hash_password from sqlalchemy import select import logging @@ -31,24 +16,18 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# ============================================ -# GESTION DES UTILISATEURS SWAGGER -# ============================================ - async def add_swagger_user(username: str, password: str, full_name: str = None): """Ajouter un utilisateur Swagger""" async with get_session() as session: - # Vérifier si existe result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) ) existing = result.scalar_one_or_none() if existing: - logger.error(f"❌ L'utilisateur {username} existe déjà") + logger.error(f" L'utilisateur {username} existe déjà") return - # Créer user = SwaggerUser( username=username, hashed_password=hash_password(password), @@ -59,10 +38,10 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): session.add(user) await session.commit() - logger.info(f"✅ Utilisateur Swagger créé: {username}") - print(f"\n✅ Utilisateur créé avec succès") + logger.info(f" Utilisateur Swagger créé: {username}") + print("\n Utilisateur créé avec succès") print(f" Username: {username}") - print(f" Accès: https://votre-serveur/docs") + print(" Accès: https://votre-serveur/docs") async def list_swagger_users(): @@ -75,9 +54,9 @@ async def list_swagger_users(): print("Aucun utilisateur Swagger trouvé") return - print(f"\n📋 {len(users)} utilisateur(s) Swagger:\n") + print(f"\n {len(users)} utilisateur(s) Swagger:\n") for user in users: - status = "✅ Actif" if user.is_active else "❌ Inactif" + status = " Actif" if user.is_active else " Inactif" print(f" • {user.username:<20} {status}") if user.full_name: print(f" Nom: {user.full_name}") @@ -95,7 +74,7 @@ async def delete_swagger_user(username: str): user = result.scalar_one_or_none() if not user: - logger.error(f"❌ Utilisateur {username} introuvable") + logger.error(f" Utilisateur {username} introuvable") return await session.delete(user) @@ -104,10 +83,6 @@ async def delete_swagger_user(username: str): logger.info(f"🗑️ Utilisateur supprimé: {username}") -# ============================================ -# GESTION DES CLÉS API -# ============================================ - async def create_api_key( name: str, description: str = None, @@ -128,14 +103,14 @@ async def create_api_key( allowed_endpoints=endpoints, ) - print(f"\n✅ Clé API créée avec succès\n") + print("\n Clé API créée avec succès\n") print(f" ID: {api_key_obj.id}") print(f" Nom: {name}") print(f" Clé: {api_key_plain}") print(f" Préfixe: {api_key_obj.key_prefix}") print(f" Rate limit: {rate_limit} req/min") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}") - print(f"\n⚠️ IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") + print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n") async def list_api_keys(): @@ -148,11 +123,15 @@ async def list_api_keys(): print("Aucune clé API trouvée") return - print(f"\n📋 {len(keys)} clé(s) API:\n") + print(f"\n {len(keys)} clé(s) API:\n") for key in keys: - status = "✅" if key.is_active else "❌" - expired = "⏰ Expirée" if key.expires_at and key.expires_at < datetime.now() else "" - + status = "" if key.is_active else "" + expired = ( + "⏰ Expirée" + if key.expires_at and key.expires_at < datetime.now() + else "" + ) + print(f" {status} {key.name:<30} ({key.key_prefix}...)") print(f" ID: {key.id}") print(f" Requêtes: {key.total_requests}") @@ -166,19 +145,19 @@ async def revoke_api_key(key_id: str): """Révoquer une clé API""" async with get_session() as session: service = ApiKeyService(session) - + api_key = await service.get_by_id(key_id) if not api_key: - logger.error(f"❌ Clé {key_id} introuvable") + logger.error(f" Clé {key_id} introuvable") return success = await service.revoke_api_key(key_id) - + if success: - logger.info(f"🚫 Clé révoquée: {api_key.name}") - print(f"\n✅ Clé '{api_key.name}' révoquée avec succès") + logger.info(f" Clé révoquée: {api_key.name}") + print(f"\n Clé '{api_key.name}' révoquée avec succès") else: - logger.error("❌ Erreur lors de la révocation") + logger.error(" Erreur lors de la révocation") async def verify_api_key_cmd(api_key: str): @@ -188,64 +167,59 @@ async def verify_api_key_cmd(api_key: str): api_key_obj = await service.verify_api_key(api_key) if api_key_obj: - print(f"\n✅ Clé API valide\n") + print("\n Clé API valide\n") print(f" Nom: {api_key_obj.name}") print(f" ID: {api_key_obj.id}") print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") print(f" Requêtes: {api_key_obj.total_requests}") print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n") else: - print(f"\n❌ Clé API invalide, expirée ou révoquée\n") + print("\n Clé API invalide, expirée ou révoquée\n") -# ============================================ -# CLI PRINCIPAL -# ============================================ - async def main(): parser = argparse.ArgumentParser( description="Gestion de la sécurité Sage Dataven API" ) - + subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles") - # === SWAGGER === - swagger_parser = subparsers.add_parser("swagger", help="Gestion utilisateurs Swagger") + swagger_parser = subparsers.add_parser( + "swagger", help="Gestion utilisateurs Swagger" + ) swagger_subparsers = swagger_parser.add_subparsers(dest="action") - # swagger add swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur") swagger_add.add_argument("username", help="Nom d'utilisateur") swagger_add.add_argument("password", help="Mot de passe") swagger_add.add_argument("--full-name", help="Nom complet") - # swagger list swagger_subparsers.add_parser("list", help="Lister les utilisateurs") - # swagger delete - swagger_delete = swagger_subparsers.add_parser("delete", help="Supprimer un utilisateur") + swagger_delete = swagger_subparsers.add_parser( + "delete", help="Supprimer un utilisateur" + ) swagger_delete.add_argument("username", help="Nom d'utilisateur") - # === API KEYS === apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API") apikey_subparsers = apikey_parser.add_subparsers(dest="action") - # apikey create apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API") apikey_create.add_argument("name", help="Nom de la clé") apikey_create.add_argument("--description", help="Description") - apikey_create.add_argument("--days", type=int, default=365, help="Expiration en jours") - apikey_create.add_argument("--rate-limit", type=int, default=60, help="Limite req/min") + apikey_create.add_argument( + "--days", type=int, default=365, help="Expiration en jours" + ) + apikey_create.add_argument( + "--rate-limit", type=int, default=60, help="Limite req/min" + ) apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés") - # apikey list apikey_subparsers.add_parser("list", help="Lister les clés") - # apikey revoke apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé") apikey_revoke.add_argument("key_id", help="ID de la clé") - # apikey verify apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé") apikey_verify.add_argument("api_key", help="Clé API à vérifier") @@ -255,7 +229,6 @@ async def main(): parser.print_help() return - # Exécution des commandes if args.command == "swagger": if args.action == "add": await add_swagger_user(args.username, args.password, args.full_name) @@ -287,4 +260,5 @@ async def main(): if __name__ == "__main__": from datetime import datetime - asyncio.run(main()) \ No newline at end of file + + asyncio.run(main()) diff --git a/scripts/test_security.py b/scripts/test_security.py index 79f0299..497870e 100644 --- a/scripts/test_security.py +++ b/scripts/test_security.py @@ -1,16 +1,7 @@ -#!/usr/bin/env python3 -""" -Script de test automatisé pour vérifier la sécurité de l'API - -Usage: - python test_security.py --url http://votre-vps:8000 --swagger-user admin --swagger-pass password -""" - import requests import argparse import sys -from typing import Dict, Tuple -import json +from typing import Tuple class SecurityTester: @@ -20,7 +11,7 @@ class SecurityTester: def log_test(self, name: str, passed: bool, details: str = ""): """Enregistrer le résultat d'un test""" - status = "✅ PASS" if passed else "❌ FAIL" + status = " PASS" if passed else " FAIL" print(f"{status} - {name}") if details: print(f" {details}") @@ -36,7 +27,7 @@ class SecurityTester: def test_swagger_without_auth(self) -> bool: """Test 1: Swagger UI devrait demander une authentification""" - print("\n🔍 Test 1: Protection Swagger UI") + print("\n Test 1: Protection Swagger UI") try: response = requests.get(f"{self.base_url}/docs", timeout=5) @@ -62,7 +53,7 @@ class SecurityTester: def test_swagger_with_auth(self, username: str, password: str) -> bool: """Test 2: Swagger UI accessible avec credentials valides""" - print("\n🔍 Test 2: Accès Swagger avec authentification") + print("\n Test 2: Accès Swagger avec authentification") try: response = requests.get( @@ -90,7 +81,7 @@ class SecurityTester: def test_api_without_auth(self) -> bool: """Test 3: Endpoints API devraient demander une authentification""" - print("\n🔍 Test 3: Protection des endpoints API") + print("\n Test 3: Protection des endpoints API") test_endpoints = ["/api/v1/clients", "/api/v1/documents"] @@ -100,15 +91,15 @@ class SecurityTester: response = requests.get(f"{self.base_url}{endpoint}", timeout=5) if response.status_code == 401: - print(f" ✅ {endpoint} protégé (401)") + print(f" {endpoint} protégé (401)") else: print( - f" ❌ {endpoint} accessible sans auth (code {response.status_code})" + f" {endpoint} accessible sans auth (code {response.status_code})" ) all_protected = False except Exception as e: - print(f" ⚠️ {endpoint} erreur: {str(e)}") + print(f" {endpoint} erreur: {str(e)}") all_protected = False self.log_test("Endpoints API protégés", all_protected) @@ -116,7 +107,7 @@ class SecurityTester: def test_health_endpoint_public(self) -> bool: """Test 4: Endpoint /health devrait être accessible sans auth""" - print("\n🔍 Test 4: Endpoint /health public") + print("\n Test 4: Endpoint /health public") try: response = requests.get(f"{self.base_url}/health", timeout=5) @@ -138,10 +129,9 @@ class SecurityTester: def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]: """Test 5: Créer une clé API via l'endpoint""" - print("\n🔍 Test 5: Création d'une clé API") + print("\n Test 5: Création d'une clé API") try: - # 1. Login pour obtenir un JWT login_response = requests.post( f"{self.base_url}/api/v1/auth/login", json={"email": username, "password": password}, @@ -158,7 +148,6 @@ class SecurityTester: jwt_token = login_response.json().get("access_token") - # 2. Créer une clé API create_response = requests.post( f"{self.base_url}/api/v1/api-keys", headers={"Authorization": f"Bearer {jwt_token}"}, @@ -189,7 +178,7 @@ class SecurityTester: def test_api_key_usage(self, api_key: str) -> bool: """Test 6: Utiliser une clé API pour accéder à un endpoint""" - print("\n🔍 Test 6: Utilisation d'une clé API") + print("\n Test 6: Utilisation d'une clé API") if not api_key: self.log_test("Utilisation clé API", False, "Pas de clé disponible") @@ -219,7 +208,7 @@ class SecurityTester: def test_invalid_api_key(self) -> bool: """Test 7: Une clé invalide devrait être refusée""" - print("\n🔍 Test 7: Rejet de clé API invalide") + print("\n Test 7: Rejet de clé API invalide") invalid_key = "sdk_live_invalid_key_12345" @@ -247,13 +236,12 @@ class SecurityTester: def test_rate_limiting(self, api_key: str) -> bool: """Test 8: Rate limiting (optionnel, peut prendre du temps)""" - print("\n🔍 Test 8: Rate limiting (test simple)") + print("\n Test 8: Rate limiting (test simple)") if not api_key: self.log_test("Rate limiting", False, "Pas de clé disponible") return False - # Envoyer 70 requêtes rapidement (limite = 60/min) print(" Envoi de 70 requêtes rapides...") rate_limited = False @@ -267,7 +255,7 @@ class SecurityTester: if response.status_code == 429: rate_limited = True - print(f" ⚠️ Rate limit atteint à la requête {i + 1}") + print(f" Rate limit atteint à la requête {i + 1}") break except Exception: @@ -287,22 +275,22 @@ class SecurityTester: def print_summary(self): """Afficher le résumé des tests""" print("\n" + "=" * 60) - print("📊 RÉSUMÉ DES TESTS") + print(" RÉSUMÉ DES TESTS") print("=" * 60) total = self.results["passed"] + self.results["failed"] success_rate = (self.results["passed"] / total * 100) if total > 0 else 0 print(f"\nTotal: {total} tests") - print(f"✅ Réussis: {self.results['passed']}") - print(f"❌ Échoués: {self.results['failed']}") - print(f"📈 Taux de réussite: {success_rate:.1f}%\n") + print(f" Réussis: {self.results['passed']}") + print(f" Échoués: {self.results['failed']}") + print(f"Taux de réussite: {success_rate:.1f}%\n") if self.results["failed"] == 0: print("🎉 Tous les tests sont passés ! Sécurité OK.") return 0 else: - print("⚠️ Certains tests ont échoué. Vérifiez la configuration.") + print(" Certains tests ont échoué. Vérifiez la configuration.") return 1 @@ -333,18 +321,16 @@ def main(): args = parser.parse_args() - print("🚀 Démarrage des tests de sécurité") - print(f"🎯 URL cible: {args.url}\n") + print(" Démarrage des tests de sécurité") + print(f" URL cible: {args.url}\n") tester = SecurityTester(args.url) - # Exécuter les tests tester.test_swagger_without_auth() tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass) tester.test_api_without_auth() tester.test_health_endpoint_public() - # Tests nécessitant une clé API success, api_key = tester.test_api_key_creation( args.swagger_user, args.swagger_pass ) @@ -356,11 +342,10 @@ def main(): if not args.skip_rate_limit: tester.test_rate_limiting(api_key) else: - print("\n⏭️ Test de rate limiting sauté") + print("\n Test de rate limiting sauté") else: - print("\n⚠️ Tests avec clé API sautés (création échouée)") + print("\n Tests avec clé API sautés (création échouée)") - # Résumé exit_code = tester.print_summary() sys.exit(exit_code) diff --git a/services/api_key.py b/services/api_key.py index d54943a..ad3cf6f 100644 --- a/services/api_key.py +++ b/services/api_key.py @@ -21,7 +21,6 @@ class ApiKeyService: @staticmethod def generate_api_key() -> str: """Génère une clé API unique et sécurisée""" - # Format: sdk_live_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx random_part = secrets.token_urlsafe(32) return f"sdk_live_{random_part}" @@ -33,7 +32,6 @@ class ApiKeyService: @staticmethod def get_key_prefix(api_key: str) -> str: """Extrait le préfixe de la clé pour identification""" - # Retourne les 12 premiers caractères return api_key[:12] if len(api_key) >= 12 else api_key async def create_api_key( @@ -46,23 +44,14 @@ class ApiKeyService: rate_limit_per_minute: int = 60, allowed_endpoints: Optional[List[str]] = None, ) -> tuple[ApiKey, str]: - """ - Crée une nouvelle clé API - - Returns: - tuple[ApiKey, str]: (objet ApiKey, clé en clair - à ne montrer qu'une fois) - """ - # Génération de la clé api_key_plain = self.generate_api_key() key_hash = self.hash_api_key(api_key_plain) key_prefix = self.get_key_prefix(api_key_plain) - # Calcul de la date d'expiration expires_at = None if expires_in_days: expires_at = datetime.now() + timedelta(days=expires_in_days) - # Création de l'objet api_key_obj = ApiKey( key_hash=key_hash, key_prefix=key_prefix, @@ -81,24 +70,18 @@ class ApiKeyService: await self.session.commit() await self.session.refresh(api_key_obj) - logger.info(f"✅ Clé API créée: {name} (prefix: {key_prefix})") + logger.info(f" Clé API créée: {name} (prefix: {key_prefix})") return api_key_obj, api_key_plain async def verify_api_key(self, api_key_plain: str) -> Optional[ApiKey]: - """ - Vérifie une clé API et retourne l'objet si valide - - Returns: - Optional[ApiKey]: L'objet ApiKey si valide, None sinon - """ key_hash = self.hash_api_key(api_key_plain) result = await self.session.execute( select(ApiKey).where( and_( ApiKey.key_hash == key_hash, - ApiKey.is_active == True, + ApiKey.is_active, ApiKey.revoked_at.is_(None), or_( ApiKey.expires_at.is_(None), ApiKey.expires_at > datetime.now() @@ -110,14 +93,13 @@ class ApiKeyService: api_key_obj = result.scalar_one_or_none() if api_key_obj: - # Mise à jour des statistiques api_key_obj.total_requests += 1 api_key_obj.last_used_at = datetime.now() await self.session.commit() - logger.debug(f"🔑 Clé API validée: {api_key_obj.name}") + logger.debug(f" Clé API validée: {api_key_obj.name}") else: - logger.warning(f"⚠️ Clé API invalide ou expirée") + logger.warning(" Clé API invalide ou expirée") return api_key_obj @@ -152,7 +134,7 @@ class ApiKeyService: api_key_obj.revoked_at = datetime.now() await self.session.commit() - logger.info(f"🚫 Clé API révoquée: {api_key_obj.name}") + logger.info(f" Clé API révoquée: {api_key_obj.name}") return True async def get_by_id(self, key_id: str) -> Optional[ApiKey]: @@ -161,14 +143,6 @@ class ApiKeyService: return result.scalar_one_or_none() async def check_rate_limit(self, api_key_obj: ApiKey) -> tuple[bool, Dict]: - """ - Vérifie le rate limit d'une clé API (à implémenter avec Redis/cache) - - Returns: - tuple[bool, Dict]: (is_allowed, info_dict) - """ - # TODO: Implémenter avec Redis pour un vrai rate limiting - # Pour l'instant, retourne toujours True return True, { "allowed": True, "limit": api_key_obj.rate_limit_per_minute, @@ -178,13 +152,11 @@ class ApiKeyService: async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool: """Vérifie si la clé a accès à un endpoint spécifique""" if not api_key_obj.allowed_endpoints: - # Si aucune restriction, accès total return True try: allowed = json.loads(api_key_obj.allowed_endpoints) - # Support des wildcards for pattern in allowed: if pattern == "*": return True @@ -197,7 +169,7 @@ class ApiKeyService: return False except json.JSONDecodeError: - logger.error(f"❌ Erreur parsing allowed_endpoints pour {api_key_obj.id}") + logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}") return False diff --git a/services/universign_document.py b/services/universign_document.py index 9cb714e..394c3ce 100644 --- a/services/universign_document.py +++ b/services/universign_document.py @@ -23,7 +23,7 @@ class UniversignDocumentService: def fetch_transaction_documents(self, transaction_id: str) -> Optional[List[Dict]]: try: - logger.info(f"📋 Récupération documents pour transaction: {transaction_id}") + logger.info(f" Récupération documents pour transaction: {transaction_id}") response = requests.get( f"{self.api_url}/transactions/{transaction_id}", diff --git a/services/universign_sync.py b/services/universign_sync.py index 807802d..da634f2 100644 --- a/services/universign_sync.py +++ b/services/universign_sync.py @@ -344,7 +344,7 @@ class UniversignSyncService: universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") - logger.info(f"📊 Statut Universign brut: {universign_status_raw}") + logger.info(f" Statut Universign brut: {universign_status_raw}") new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value @@ -367,7 +367,7 @@ class UniversignSyncService: if status_changed: logger.info( - f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" + f"CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" ) try: @@ -392,7 +392,7 @@ class UniversignSyncService: if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() - logger.info("📅 Date d'envoi mise à jour") + logger.info("Date d'envoi mise à jour") if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() @@ -404,8 +404,7 @@ class UniversignSyncService: if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() - logger.info("⏰ Date d'expiration mise à jour") - + logger.info("Date d'expiration mise à jour") documents = universign_data.get("documents", []) if documents: @@ -432,9 +431,7 @@ class UniversignSyncService: logger.warning(f"Échec téléchargement: {download_error}") except Exception as e: - logger.error( - f" Erreur téléchargement document: {e}", exc_info=True - ) + logger.error(f" Erreur téléchargement document: {e}", exc_info=True) await self._sync_signers(session, transaction, universign_data) @@ -529,9 +526,7 @@ class UniversignSyncService: logger.warning(f"Échec téléchargement: {download_error}") except Exception as e: - logger.error( - f" Erreur téléchargement document: {e}", exc_info=True - ) + logger.error(f" Erreur téléchargement document: {e}", exc_info=True) else: logger.debug( f"Document déjà téléchargé: {transaction.signed_document_path}" diff --git a/utils/generic_functions.py b/utils/generic_functions.py index 4cce52f..29a361e 100644 --- a/utils/generic_functions.py +++ b/utils/generic_functions.py @@ -425,7 +425,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "REFUSE": { "fr": "Signature refusée", "en": "Signature refused", - "icon": "❌", + "icon": "", "color": "red", }, "EXPIRE": { @@ -437,7 +437,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "ERREUR": { "fr": "Erreur technique", "en": "Technical error", - "icon": "⚠️", + "icon": "", "color": "red", }, } diff --git a/utils/universign_status_mapping.py b/utils/universign_status_mapping.py index 50e29cc..8391698 100644 --- a/utils/universign_status_mapping.py +++ b/utils/universign_status_mapping.py @@ -96,7 +96,7 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = { "REFUSE": { "fr": "Signature refusée", "en": "Signature refused", - "icon": "❌", + "icon": "", "color": "red", }, "EXPIRE": {