from fastapi import FastAPI, HTTPException, Query, Depends, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field, EmailStr from typing import List, Optional, Dict from datetime import date, datetime from enum import Enum import uvicorn from contextlib import asynccontextmanager import uuid import csv import io import logging from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from routes.auth import router as auth_router from core.dependencies import get_current_user, require_role # Configuration logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("sage_api.log"), logging.StreamHandler()], ) logger = logging.getLogger(__name__) # Imports locaux from config import settings from database import ( init_db, async_session_factory, get_session, EmailLog, StatutEmail as StatutEmailEnum, WorkflowLog, SignatureLog, StatutSignature as StatutSignatureEnum, ) from email_queue import email_queue from sage_client import sage_client # ===================================================== # ENUMS # ===================================================== class TypeDocument(int, Enum): DEVIS = settings.SAGE_TYPE_DEVIS BON_COMMANDE = settings.SAGE_TYPE_BON_COMMANDE PREPARATION = settings.SAGE_TYPE_PREPARATION BON_LIVRAISON = settings.SAGE_TYPE_BON_LIVRAISON BON_RETOUR = settings.SAGE_TYPE_BON_RETOUR BON_AVOIR = settings.SAGE_TYPE_BON_AVOIR FACTURE = settings.SAGE_TYPE_FACTURE class StatutSignature(str, Enum): EN_ATTENTE = "EN_ATTENTE" ENVOYE = "ENVOYE" SIGNE = "SIGNE" REFUSE = "REFUSE" EXPIRE = "EXPIRE" class StatutEmail(str, Enum): EN_ATTENTE = "EN_ATTENTE" EN_COURS = "EN_COURS" ENVOYE = "ENVOYE" OUVERT = "OUVERT" ERREUR = "ERREUR" BOUNCE = "BOUNCE" # ===================================================== # MODÈLES PYDANTIC # ===================================================== class ClientResponse(BaseModel): numero: str intitule: str adresse: Optional[str] = None code_postal: Optional[str] = None ville: Optional[str] = None email: Optional[str] = None telephone: Optional[str] = None class ArticleResponse(BaseModel): reference: str designation: str prix_vente: float stock_reel: float class LigneDevis(BaseModel): article_code: str quantite: float prix_unitaire_ht: Optional[float] = None remise_pourcentage: Optional[float] = 0.0 @validator("article_code", pre=True) def strip_insecables(cls, v): return v.replace("\xa0", "").strip() class DevisRequest(BaseModel): client_id: str date_devis: Optional[date] = None lignes: List[LigneDevis] class DevisResponse(BaseModel): id: str client_id: str date_devis: str montant_total_ht: float montant_total_ttc: float nb_lignes: int class SignatureRequest(BaseModel): doc_id: str type_doc: TypeDocument email_signataire: EmailStr nom_signataire: str class EmailEnvoiRequest(BaseModel): destinataire: EmailStr cc: Optional[List[EmailStr]] = [] cci: Optional[List[EmailStr]] = [] sujet: str corps_html: str document_ids: Optional[List[str]] = None type_document: Optional[TypeDocument] = None class RelanceDevisRequest(BaseModel): doc_id: str message_personnalise: Optional[str] = None class BaremeRemiseResponse(BaseModel): client_id: str remise_max_autorisee: float remise_demandee: float autorisee: bool message: str # À ajouter dans api.py après les imports et avant les endpoints existants from pydantic import BaseModel from typing import List, Optional from datetime import datetime # ===================================================== # MODÈLES PYDANTIC POUR USERS # ===================================================== class UserResponse(BaseModel): """Modèle de réponse pour un utilisateur""" id: str email: str nom: str prenom: str role: str is_verified: bool is_active: bool created_at: str last_login: Optional[str] = None failed_login_attempts: int = 0 class Config: from_attributes = True # ===================================================== # SERVICES EXTERNES (Universign) # ===================================================== async def universign_envoyer( doc_id: str, pdf_bytes: bytes, email: str, nom: str ) -> Dict: """Envoi signature via API Universign""" import requests try: api_key = settings.universign_api_key api_url = settings.universign_api_url auth = (api_key, "") # Étape 1: Créer transaction response = requests.post( f"{api_url}/transactions", auth=auth, json={"name": f"Devis {doc_id}", "language": "fr"}, timeout=30, ) response.raise_for_status() transaction_id = response.json().get("id") # Étape 2: Upload PDF files = {"file": (f"Devis_{doc_id}.pdf", pdf_bytes, "application/pdf")} response = requests.post(f"{api_url}/files", auth=auth, files=files, timeout=30) response.raise_for_status() file_id = response.json().get("id") # Étape 3: Ajouter document response = requests.post( f"{api_url}/transactions/{transaction_id}/documents", auth=auth, data={"document": file_id}, timeout=30, ) response.raise_for_status() document_id = response.json().get("id") # Étape 4: Créer champ signature response = requests.post( f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields", auth=auth, data={"type": "signature"}, timeout=30, ) response.raise_for_status() field_id = response.json().get("id") # Étape 5: Assigner signataire response = requests.post( f"{api_url}/transactions/{transaction_id}/signatures", auth=auth, data={"signer": email, "field": field_id}, timeout=30, ) response.raise_for_status() # Étape 6: Démarrer transaction response = requests.post( f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30 ) response.raise_for_status() final_data = response.json() signer_url = ( final_data.get("actions", [{}])[0].get("url", "") if final_data.get("actions") else "" ) logger.info(f"✅ Signature Universign envoyée: {transaction_id}") return { "transaction_id": transaction_id, "signer_url": signer_url, "statut": "ENVOYE", } except Exception as e: logger.error(f"❌ Erreur Universign: {e}") return {"error": str(e), "statut": "ERREUR"} async def universign_statut(transaction_id: str) -> Dict: """Récupération statut signature""" import requests try: response = requests.get( f"{settings.universign_api_url}/transactions/{transaction_id}", auth=(settings.universign_api_key, ""), timeout=10, ) if response.status_code == 200: data = response.json() statut_map = { "draft": "EN_ATTENTE", "started": "EN_ATTENTE", "completed": "SIGNE", "refused": "REFUSE", "expired": "EXPIRE", "canceled": "REFUSE", } return { "statut": statut_map.get(data.get("state"), "EN_ATTENTE"), "date_signature": data.get("completed_at"), } else: return {"statut": "ERREUR"} except Exception as e: logger.error(f"Erreur statut Universign: {e}") return {"statut": "ERREUR", "error": str(e)} # ===================================================== # CYCLE DE VIE # ===================================================== @asynccontextmanager async def lifespan(app: FastAPI): # Init base de données await init_db() logger.info("✅ Base de données initialisée") # ✅ CORRECTION: Injecter session_factory ET sage_client dans email_queue email_queue.session_factory = async_session_factory email_queue.sage_client = sage_client logger.info("✅ sage_client injecté dans email_queue") # Démarrer queue email_queue.start(num_workers=settings.max_email_workers) logger.info(f"✅ Email queue démarrée") yield # Cleanup email_queue.stop() logger.info("👋 Services arrêtés") # ===================================================== # APPLICATION # ===================================================== app = FastAPI( title="API Sage 100c Dataven", version="2.0.0", description="API de gestion commerciale - VPS Linux", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["*"], allow_credentials=True, ) app.include_router(auth_router) # ===================================================== # ENDPOINTS - US-A1 (CRÉATION RAPIDE DEVIS) # ===================================================== @app.get("/clients", response_model=List[ClientResponse], tags=["US-A1"]) async def rechercher_clients(query: Optional[str] = Query(None)): """🔍 Recherche clients via gateway Windows""" try: clients = sage_client.lister_clients(filtre=query or "") return [ClientResponse(**c) for c in clients] except Exception as e: logger.error(f"Erreur recherche clients: {e}") raise HTTPException(500, str(e)) @app.get("/articles", response_model=List[ArticleResponse], tags=["US-A1"]) async def rechercher_articles(query: Optional[str] = Query(None)): """🔍 Recherche articles via gateway Windows""" try: articles = sage_client.lister_articles(filtre=query or "") return [ArticleResponse(**a) for a in articles] except Exception as e: logger.error(f"Erreur recherche articles: {e}") raise HTTPException(500, str(e)) @app.post("/devis", response_model=DevisResponse, status_code=201, tags=["US-A1"]) async def creer_devis(devis: DevisRequest): """📝 Création de devis via gateway Windows""" try: # Préparer les données pour la gateway devis_data = { "client_id": devis.client_id, "date_devis": devis.date_devis.isoformat() if devis.date_devis else None, "lignes": [ { "article_code": l.article_code, "quantite": l.quantite, "prix_unitaire_ht": l.prix_unitaire_ht, "remise_pourcentage": l.remise_pourcentage, } for l in devis.lignes ], } # Appel HTTP vers Windows resultat = sage_client.creer_devis(devis_data) logger.info(f"✅ Devis créé: {resultat.get('numero_devis')}") return DevisResponse( id=resultat["numero_devis"], client_id=devis.client_id, date_devis=resultat["date_devis"], montant_total_ht=resultat["total_ht"], montant_total_ttc=resultat["total_ttc"], nb_lignes=resultat["nb_lignes"], ) except Exception as e: logger.error(f"Erreur création devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis", tags=["US-A1"]) async def lister_devis( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None), inclure_lignes: bool = Query( True, description="Inclure les lignes de chaque devis" ), ): """ 📋 Liste tous les devis via gateway Windows Args: limit: Nombre maximum de devis à retourner statut: Filtre par statut (0=Devis, 2=Accepté, 5=Transformé, etc.) inclure_lignes: Si True, charge les lignes de chaque devis (par défaut: True) ✅ AMÉLIORATION: Les lignes sont maintenant incluses par défaut """ try: devis_list = sage_client.lister_devis( limit=limit, statut=statut, inclure_lignes=inclure_lignes ) return devis_list except Exception as e: logger.error(f"Erreur liste devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis/{id}", tags=["US-A1"]) async def lire_devis(id: str): """📄 Lecture d'un devis via gateway Windows""" try: devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") return devis except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis/{id}/pdf", tags=["US-A1"]) async def telecharger_devis_pdf(id: str): """📄 Téléchargement PDF (généré via email_queue)""" try: # Générer PDF en appelant la méthode de email_queue # qui elle-même appellera sage_client pour récupérer les données pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS) return StreamingResponse( iter([pdf_bytes]), media_type="application/pdf", headers={"Content-Disposition": f"attachment; filename=Devis_{id}.pdf"}, ) except Exception as e: logger.error(f"Erreur génération PDF: {e}") raise HTTPException(500, str(e)) @app.post("/devis/{id}/envoyer", tags=["US-A1"]) async def envoyer_devis_email( id: str, request: EmailEnvoiRequest, session: AsyncSession = Depends(get_session) ): """📧 Envoi devis par email""" try: # Vérifier que le devis existe devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Créer logs email pour chaque destinataire tous_destinataires = [request.destinataire] + request.cc + request.cci email_logs = [] for dest in tous_destinataires: email_log = EmailLog( id=str(uuid.uuid4()), destinataire=dest, sujet=request.sujet, corps_html=request.corps_html, document_ids=id, type_document=TypeDocument.DEVIS, statut=StatutEmailEnum.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() email_queue.enqueue(email_log.id) email_logs.append(email_log.id) await session.commit() logger.info( f"✅ Email devis {id} mis en file: {len(tous_destinataires)} destinataire(s)" ) return { "success": True, "email_log_ids": email_logs, "devis_id": id, "message": f"{len(tous_destinataires)} email(s) en file d'attente", } except HTTPException: raise except Exception as e: logger.error(f"Erreur envoi email: {e}") raise HTTPException(500, str(e)) @app.put("/devis/{id}/statut", tags=["US-A1"]) async def changer_statut_devis(id: str, nouveau_statut: int = Query(..., ge=0, le=5)): """ 📄 Changement de statut d'un devis via gateway Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) resultat = sage_client.changer_statut_devis(id, nouveau_statut) logger.info(f"✅ Statut devis {id} changé: {nouveau_statut}") return { "success": True, "devis_id": id, "statut_ancien": resultat.get("statut_ancien"), "statut_nouveau": resultat.get("statut_nouveau"), "message": "Statut mis à jour avec succès", } except Exception as e: logger.error(f"Erreur changement statut: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A2 (WORKFLOW SANS RESSAISIE) # ===================================================== @app.get("/commandes", tags=["US-A2"]) async def lister_commandes( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """ 📋 Liste toutes les commandes via gateway Windows ✅ CORRECTION : Utilise sage_client.lister_commandes() qui existe déjà Le filtrage sur type 10 est fait côté Windows dans main.py """ try: commandes = sage_client.lister_commandes(limit=limit, statut=statut) return commandes except Exception as e: logger.error(f"Erreur liste commandes: {e}") raise HTTPException(500, str(e)) @app.post("/workflow/devis/{id}/to-commande", tags=["US-A2"]) async def devis_vers_commande(id: str, session: AsyncSession = Depends(get_session)): """ 🔧 Transformation Devis → Commande ✅ CORRECTION : Utilise les VRAIS types Sage (0 → 10) """ try: resultat = sage_client.transformer_document( numero_source=id, type_source=settings.SAGE_TYPE_DEVIS, # = 0 type_cible=settings.SAGE_TYPE_BON_COMMANDE, # = 10 ) workflow_log = WorkflowLog( id=str(uuid.uuid4()), document_source=id, type_source=TypeDocument.DEVIS, document_cible=resultat.get("document_cible", ""), type_cible=TypeDocument.BON_COMMANDE, nb_lignes=resultat.get("nb_lignes", 0), date_transformation=datetime.now(), succes=True, ) session.add(workflow_log) await session.commit() logger.info( f"✅ Transformation: Devis {id} → Commande {resultat['document_cible']}" ) return { "success": True, "document_source": id, "document_cible": resultat["document_cible"], "nb_lignes": resultat["nb_lignes"], } except Exception as e: logger.error(f"Erreur transformation: {e}") raise HTTPException(500, str(e)) @app.post("/workflow/commande/{id}/to-facture", tags=["US-A2"]) async def commande_vers_facture(id: str, session: AsyncSession = Depends(get_session)): """ 🔧 Transformation Commande → Facture ✅ Utilise les VRAIS types Sage (10 → 60) """ try: resultat = sage_client.transformer_document( numero_source=id, type_source=settings.SAGE_TYPE_BON_COMMANDE, # = 10 type_cible=settings.SAGE_TYPE_FACTURE, # = 60 ) workflow_log = WorkflowLog( id=str(uuid.uuid4()), document_source=id, type_source=TypeDocument.BON_COMMANDE, document_cible=resultat.get("document_cible", ""), type_cible=TypeDocument.FACTURE, nb_lignes=resultat.get("nb_lignes", 0), date_transformation=datetime.now(), succes=True, ) session.add(workflow_log) await session.commit() logger.info( f"✅ Transformation: Commande {id} → Facture {resultat['document_cible']}" ) return { "success": True, "document_source": id, "document_cible": resultat["document_cible"], "nb_lignes": resultat["nb_lignes"], } except Exception as e: logger.error(f"Erreur transformation: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A3 (SIGNATURE ÉLECTRONIQUE) # ===================================================== @app.post("/signature/universign/send", tags=["US-A3"]) async def envoyer_signature( demande: SignatureRequest, session: AsyncSession = Depends(get_session) ): """✍️ Envoi document pour signature Universign""" try: # Générer PDF pdf_bytes = email_queue._generate_pdf(demande.doc_id, demande.type_doc) # Envoi Universign resultat = await universign_envoyer( demande.doc_id, pdf_bytes, demande.email_signataire, demande.nom_signataire ) if "error" in resultat: raise HTTPException(500, resultat["error"]) # Logger en DB signature_log = SignatureLog( id=str(uuid.uuid4()), document_id=demande.doc_id, type_document=demande.type_doc, transaction_id=resultat["transaction_id"], signer_url=resultat["signer_url"], email_signataire=demande.email_signataire, nom_signataire=demande.nom_signataire, statut=StatutSignatureEnum.ENVOYE, date_envoi=datetime.now(), ) session.add(signature_log) await session.commit() # MAJ champ libre Sage via gateway Windows sage_client.mettre_a_jour_champ_libre( demande.doc_id, demande.type_doc, "UniversignID", resultat["transaction_id"] ) logger.info(f"✅ Signature envoyée: {demande.doc_id}") return { "success": True, "transaction_id": resultat["transaction_id"], "signer_url": resultat["signer_url"], } except HTTPException: raise except Exception as e: logger.error(f"Erreur signature: {e}") raise HTTPException(500, str(e)) @app.get("/signature/universign/status", tags=["US-A3"]) async def statut_signature(docId: str = Query(...)): """🔍 Récupération du statut de signature en temps réel""" # Chercher dans la DB locale try: async with async_session_factory() as session: query = select(SignatureLog).where(SignatureLog.document_id == docId) result = await session.execute(query) signature_log = result.scalar_one_or_none() if not signature_log: raise HTTPException(404, "Signature introuvable") # Interroger Universign statut = await universign_statut(signature_log.transaction_id) return { "doc_id": docId, "statut": statut["statut"], "date_signature": statut.get("date_signature"), } except HTTPException: raise except Exception as e: logger.error(f"Erreur statut signature: {e}") raise HTTPException(500, str(e)) @app.get("/signatures", tags=["US-A3"]) async def lister_signatures( statut: Optional[StatutSignature] = Query(None), limit: int = Query(100, le=1000), session: AsyncSession = Depends(get_session), ): """📋 Liste toutes les demandes de signature""" query = select(SignatureLog).order_by(SignatureLog.date_envoi.desc()) if statut: statut_db = StatutSignatureEnum[statut.value] query = query.where(SignatureLog.statut == statut_db) query = query.limit(limit) result = await session.execute(query) signatures = result.scalars().all() return [ { "id": sig.id, "document_id": sig.document_id, "type_document": sig.type_document.value, "transaction_id": sig.transaction_id, "signer_url": sig.signer_url, "email_signataire": sig.email_signataire, "nom_signataire": sig.nom_signataire, "statut": sig.statut.value, "date_envoi": sig.date_envoi.isoformat() if sig.date_envoi else None, "date_signature": ( sig.date_signature.isoformat() if sig.date_signature else None ), "est_relance": sig.est_relance, "nb_relances": sig.nb_relances or 0, } for sig in signatures ] @app.get("/signatures/{transaction_id}/status", tags=["US-A3"]) async def statut_signature_detail( transaction_id: str, session: AsyncSession = Depends(get_session) ): """🔍 Récupération du statut détaillé d'une signature""" query = select(SignatureLog).where(SignatureLog.transaction_id == transaction_id) result = await session.execute(query) signature_log = result.scalar_one_or_none() if not signature_log: raise HTTPException(404, f"Transaction {transaction_id} introuvable") # Interroger Universign statut_universign = await universign_statut(transaction_id) if statut_universign.get("statut") != "ERREUR": statut_map = { "EN_ATTENTE": StatutSignatureEnum.EN_ATTENTE, "ENVOYE": StatutSignatureEnum.ENVOYE, "SIGNE": StatutSignatureEnum.SIGNE, "REFUSE": StatutSignatureEnum.REFUSE, "EXPIRE": StatutSignatureEnum.EXPIRE, } nouveau_statut = statut_map.get( statut_universign["statut"], StatutSignatureEnum.EN_ATTENTE ) signature_log.statut = nouveau_statut if statut_universign.get("date_signature"): signature_log.date_signature = datetime.fromisoformat( statut_universign["date_signature"].replace("Z", "+00:00") ) await session.commit() return { "transaction_id": transaction_id, "document_id": signature_log.document_id, "statut": signature_log.statut.value, "email_signataire": signature_log.email_signataire, "date_envoi": ( signature_log.date_envoi.isoformat() if signature_log.date_envoi else None ), "date_signature": ( signature_log.date_signature.isoformat() if signature_log.date_signature else None ), "signer_url": signature_log.signer_url, } @app.post("/signatures/refresh-all", tags=["US-A3"]) async def rafraichir_statuts_signatures(session: AsyncSession = Depends(get_session)): """🔄 Rafraîchit TOUS les statuts des signatures en attente""" query = select(SignatureLog).where( SignatureLog.statut.in_( [StatutSignatureEnum.EN_ATTENTE, StatutSignatureEnum.ENVOYE] ) ) result = await session.execute(query) signatures = result.scalars().all() nb_mises_a_jour = 0 for sig in signatures: try: statut_universign = await universign_statut(sig.transaction_id) if statut_universign.get("statut") != "ERREUR": statut_map = { "SIGNE": StatutSignatureEnum.SIGNE, "REFUSE": StatutSignatureEnum.REFUSE, "EXPIRE": StatutSignatureEnum.EXPIRE, } nouveau = statut_map.get(statut_universign["statut"]) if nouveau and nouveau != sig.statut: sig.statut = nouveau if statut_universign.get("date_signature"): sig.date_signature = datetime.fromisoformat( statut_universign["date_signature"].replace("Z", "+00:00") ) nb_mises_a_jour += 1 except Exception as e: logger.error(f"Erreur refresh signature {sig.transaction_id}: {e}") continue await session.commit() return { "success": True, "nb_signatures_verifiees": len(signatures), "nb_mises_a_jour": nb_mises_a_jour, } @app.post("/devis/{id}/signer", tags=["US-A3"]) async def envoyer_devis_signature( id: str, request: SignatureRequest, session: AsyncSession = Depends(get_session) ): """✏️ Envoi d'un devis pour signature électronique""" try: # Vérifier devis via gateway Windows devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Générer PDF pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS) # Envoi Universign resultat = await universign_envoyer( id, pdf_bytes, request.email_signataire, request.nom_signataire ) if "error" in resultat: raise HTTPException(500, f"Erreur Universign: {resultat['error']}") # Logger en DB signature_log = SignatureLog( id=str(uuid.uuid4()), document_id=id, type_document=TypeDocument.DEVIS, transaction_id=resultat["transaction_id"], signer_url=resultat["signer_url"], email_signataire=request.email_signataire, nom_signataire=request.nom_signataire, statut=StatutSignatureEnum.ENVOYE, date_envoi=datetime.now(), ) session.add(signature_log) await session.commit() # MAJ champ libre Sage via gateway sage_client.mettre_a_jour_champ_libre( id, TypeDocument.DEVIS, "UniversignID", resultat["transaction_id"] ) return { "success": True, "devis_id": id, "transaction_id": resultat["transaction_id"], "signer_url": resultat["signer_url"], } except HTTPException: raise except Exception as e: logger.error(f"Erreur envoi signature: {e}") raise HTTPException(500, str(e)) # ============================================ # US-A4 - ENVOI EMAILS EN LOT # ============================================ class EmailBatchRequest(BaseModel): destinataires: List[EmailStr] = Field(..., min_length=1, max_length=100) sujet: str = Field(..., min_length=1, max_length=500) corps_html: str = Field(..., min_length=1) document_ids: Optional[List[str]] = None type_document: Optional[TypeDocument] = None @app.post("/emails/send-batch", tags=["US-A4"]) async def envoyer_emails_lot( batch: EmailBatchRequest, session: AsyncSession = Depends(get_session) ): """📧 US-A4: Envoi groupé via email_queue""" resultats = [] for destinataire in batch.destinataires: email_log = EmailLog( id=str(uuid.uuid4()), destinataire=destinataire, sujet=batch.sujet, corps_html=batch.corps_html, document_ids=",".join(batch.document_ids) if batch.document_ids else None, type_document=batch.type_document, statut=StatutEmailEnum.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() email_queue.enqueue(email_log.id) resultats.append( { "destinataire": destinataire, "log_id": email_log.id, "statut": "EN_ATTENTE", } ) await session.commit() nb_documents = len(batch.document_ids) if batch.document_ids else 0 logger.info( f"✅ {len(batch.destinataires)} emails mis en file avec {nb_documents} docs" ) return { "total": len(batch.destinataires), "succes": len(batch.destinataires), "documents_attaches": nb_documents, "details": resultats, } # ===================================================== # ENDPOINTS - US-A5 # ===================================================== @app.post("/devis/valider-remise", response_model=BaremeRemiseResponse, tags=["US-A5"]) async def valider_remise( client_id: str = Query(..., min_length=1), remise_pourcentage: float = Query(0.0, ge=0, le=100), ): """ 💰 US-A5: Validation remise via barème client Sage """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) remise_max = sage_client.lire_remise_max_client(client_id) autorisee = remise_pourcentage <= remise_max if not autorisee: message = f"⚠️ Remise trop élevée (max autorisé: {remise_max}%)" logger.warning( f"Remise refusée pour {client_id}: {remise_pourcentage}% > {remise_max}%" ) else: message = "✅ Remise autorisée" return BaremeRemiseResponse( client_id=client_id, remise_max_autorisee=remise_max, remise_demandee=remise_pourcentage, autorisee=autorisee, message=message, ) except Exception as e: logger.error(f"Erreur validation remise: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A6 (RELANCE DEVIS) # ===================================================== @app.post("/devis/{id}/relancer-signature", tags=["US-A6"]) async def relancer_devis_signature( id: str, relance: RelanceDevisRequest, session: AsyncSession = Depends(get_session) ): """📧 Relance devis via Universign""" try: # Lire devis via gateway devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Récupérer contact via gateway contact = sage_client.lire_contact_client(devis["client_code"]) if not contact or not contact.get("email"): raise HTTPException(400, "Aucun email trouvé pour ce client") # Générer PDF pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS) # Envoi Universign resultat = await universign_envoyer( id, pdf_bytes, contact["email"], contact["nom"] or contact["client_intitule"], ) if "error" in resultat: raise HTTPException(500, resultat["error"]) # Logger en DB signature_log = SignatureLog( id=str(uuid.uuid4()), document_id=id, type_document=TypeDocument.DEVIS, transaction_id=resultat["transaction_id"], signer_url=resultat["signer_url"], email_signataire=contact["email"], nom_signataire=contact["nom"] or contact["client_intitule"], statut=StatutSignatureEnum.ENVOYE, date_envoi=datetime.now(), est_relance=True, nb_relances=1, ) session.add(signature_log) await session.commit() return { "success": True, "devis_id": id, "transaction_id": resultat["transaction_id"], "message": "Relance signature envoyée", } except HTTPException: raise except Exception as e: logger.error(f"Erreur relance: {e}") raise HTTPException(500, str(e)) class ContactClientResponse(BaseModel): client_code: str client_intitule: str email: Optional[str] nom: Optional[str] telephone: Optional[str] peut_etre_relance: bool @app.get("/devis/{id}/contact", response_model=ContactClientResponse, tags=["US-A6"]) async def recuperer_contact_devis(id: str): """👤 US-A6: Récupération du contact client associé au devis""" try: # Lire devis via gateway Windows devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Lire contact via gateway Windows contact = sage_client.lire_contact_client(devis["client_code"]) if not contact: raise HTTPException( 404, f"Contact introuvable pour client {devis['client_code']}" ) peut_relancer = bool(contact.get("email")) return ContactClientResponse(**contact, peut_etre_relance=peut_relancer) except HTTPException: raise except Exception as e: logger.error(f"Erreur récupération contact: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A7 # ===================================================== @app.get("/factures", tags=["US-A7"]) async def lister_factures( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """ 📋 Liste toutes les factures via gateway Windows ✅ CORRECTION : Utilise sage_client.lister_factures() qui existe déjà Le filtrage sur type 60 est fait côté Windows dans main.py """ try: factures = sage_client.lister_factures(limit=limit, statut=statut) return factures except Exception as e: logger.error(f"Erreur liste factures: {e}") raise HTTPException(500, str(e)) class RelanceFactureRequest(BaseModel): doc_id: str message_personnalise: Optional[str] = None # Templates email (si pas déjà définis) templates_email_db = { "relance_facture": { "id": "relance_facture", "nom": "Relance Facture", "sujet": "Rappel - Facture {{DO_Piece}}", "corps_html": """
Bonjour {{CT_Intitule}},
La facture {{DO_Piece}} du {{DO_Date}} d'un montant de {{DO_TotalTTC}}€ TTC reste impayée.
Merci de régulariser dans les meilleurs délais.
Cordialement,
""", "variables_disponibles": [ "DO_Piece", "DO_Date", "CT_Intitule", "DO_TotalHT", "DO_TotalTTC", ], } } @app.post("/factures/{id}/relancer", tags=["US-A7"]) async def relancer_facture( id: str, relance: RelanceFactureRequest, session: AsyncSession = Depends(get_session), ): """💸 US-A7: Relance facture en un clic""" try: # Lire facture via gateway Windows facture = sage_client.lire_document(id, TypeDocument.FACTURE) if not facture: raise HTTPException(404, f"Facture {id} introuvable") # Récupérer contact via gateway Windows contact = sage_client.lire_contact_client(facture["client_code"]) if not contact or not contact.get("email"): raise HTTPException(400, "Aucun email trouvé pour ce client") # Préparer email template = templates_email_db["relance_facture"] variables = { "DO_Piece": facture.get("numero", id), "DO_Date": str(facture.get("date", "")), "CT_Intitule": facture.get("client_intitule", ""), "DO_TotalHT": f"{facture.get('total_ht', 0):.2f}", "DO_TotalTTC": f"{facture.get('total_ttc', 0):.2f}", } sujet = template["sujet"] corps = relance.message_personnalise or template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", valeur) corps = corps.replace(f"{{{{{var}}}}}", valeur) # Créer log email email_log = EmailLog( id=str(uuid.uuid4()), destinataire=contact["email"], sujet=sujet, corps_html=corps, document_ids=id, type_document=TypeDocument.FACTURE, statut=StatutEmailEnum.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() # Enqueue email_queue.enqueue(email_log.id) # ✅ MAJ champ libre via gateway Windows sage_client.mettre_a_jour_derniere_relance(id, TypeDocument.FACTURE) await session.commit() logger.info(f"✅ Relance facture: {id} → {contact['email']}") return { "success": True, "facture_id": id, "email_log_id": email_log.id, "destinataire": contact["email"], } except HTTPException: raise except Exception as e: logger.error(f"Erreur relance facture: {e}") raise HTTPException(500, str(e)) # ============================================ # US-A9 - JOURNAL DES E-MAILS # ============================================ @app.get("/emails/logs", tags=["US-A9"]) async def journal_emails( statut: Optional[StatutEmail] = Query(None), destinataire: Optional[str] = Query(None), limit: int = Query(100, le=1000), session: AsyncSession = Depends(get_session), ): """📋 US-A9: Journal des e-mails envoyés""" query = select(EmailLog) if statut: query = query.where(EmailLog.statut == StatutEmailEnum[statut.value]) if destinataire: query = query.where(EmailLog.destinataire.contains(destinataire)) query = query.order_by(EmailLog.date_creation.desc()).limit(limit) result = await session.execute(query) logs = result.scalars().all() return [ { "id": log.id, "destinataire": log.destinataire, "sujet": log.sujet, "statut": log.statut.value, "date_creation": log.date_creation.isoformat(), "date_envoi": log.date_envoi.isoformat() if log.date_envoi else None, "nb_tentatives": log.nb_tentatives, "derniere_erreur": log.derniere_erreur, "document_ids": log.document_ids, } for log in logs ] @app.get("/emails/logs/export", tags=["US-A9"]) async def exporter_logs_csv( statut: Optional[StatutEmail] = Query(None), session: AsyncSession = Depends(get_session), ): """📥 US-A9: Export CSV des logs d'envoi""" query = select(EmailLog) if statut: query = query.where(EmailLog.statut == StatutEmailEnum[statut.value]) query = query.order_by(EmailLog.date_creation.desc()) result = await session.execute(query) logs = result.scalars().all() # Génération CSV output = io.StringIO() writer = csv.writer(output) # En-têtes writer.writerow( [ "ID", "Destinataire", "Sujet", "Statut", "Date Création", "Date Envoi", "Nb Tentatives", "Erreur", "Documents", ] ) # Données for log in logs: writer.writerow( [ log.id, log.destinataire, log.sujet, log.statut.value, log.date_creation.isoformat(), log.date_envoi.isoformat() if log.date_envoi else "", log.nb_tentatives, log.derniere_erreur or "", log.document_ids or "", ] ) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=emails_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" }, ) # ============================================ # US-A10 - MODÈLES D'E-MAILS # ============================================ class TemplateEmail(BaseModel): id: Optional[str] = None nom: str sujet: str corps_html: str variables_disponibles: List[str] = [] class TemplatePreviewRequest(BaseModel): template_id: str document_id: str type_document: TypeDocument @app.get("/templates/emails", response_model=List[TemplateEmail], tags=["US-A10"]) async def lister_templates(): """📧 US-A10: Liste tous les templates d'emails""" return [TemplateEmail(**template) for template in templates_email_db.values()] @app.get( "/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"] ) async def lire_template(template_id: str): """📖 Lecture d'un template par ID""" if template_id not in templates_email_db: raise HTTPException(404, f"Template {template_id} introuvable") return TemplateEmail(**templates_email_db[template_id]) @app.post("/templates/emails", response_model=TemplateEmail, tags=["US-A10"]) async def creer_template(template: TemplateEmail): """➕ Création d'un nouveau template""" template_id = str(uuid.uuid4()) templates_email_db[template_id] = { "id": template_id, "nom": template.nom, "sujet": template.sujet, "corps_html": template.corps_html, "variables_disponibles": template.variables_disponibles, } logger.info(f"Template créé: {template_id}") return TemplateEmail(id=template_id, **template.dict()) @app.put( "/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"] ) async def modifier_template(template_id: str, template: TemplateEmail): """✏️ Modification d'un template existant""" if template_id not in templates_email_db: raise HTTPException(404, f"Template {template_id} introuvable") # Ne pas modifier les templates système if template_id in ["relance_devis", "relance_facture"]: raise HTTPException(400, "Les templates système ne peuvent pas être modifiés") templates_email_db[template_id] = { "id": template_id, "nom": template.nom, "sujet": template.sujet, "corps_html": template.corps_html, "variables_disponibles": template.variables_disponibles, } logger.info(f"Template modifié: {template_id}") return TemplateEmail(id=template_id, **template.dict()) @app.delete("/templates/emails/{template_id}", tags=["US-A10"]) async def supprimer_template(template_id: str): """🗑️ Suppression d'un template""" if template_id not in templates_email_db: raise HTTPException(404, f"Template {template_id} introuvable") if template_id in ["relance_devis", "relance_facture"]: raise HTTPException(400, "Les templates système ne peuvent pas être supprimés") del templates_email_db[template_id] logger.info(f"Template supprimé: {template_id}") return {"success": True, "message": f"Template {template_id} supprimé"} @app.post("/templates/emails/preview", tags=["US-A10"]) async def previsualiser_email(preview: TemplatePreviewRequest): """👁️ US-A10: Prévisualisation email avec fusion variables""" if preview.template_id not in templates_email_db: raise HTTPException(404, f"Template {preview.template_id} introuvable") template = templates_email_db[preview.template_id] # Lire document via gateway Windows doc = sage_client.lire_document(preview.document_id, preview.type_document) if not doc: raise HTTPException(404, f"Document {preview.document_id} introuvable") # Variables variables = { "DO_Piece": doc.get("numero", preview.document_id), "DO_Date": str(doc.get("date", "")), "CT_Intitule": doc.get("client_intitule", ""), "DO_TotalHT": f"{doc.get('total_ht', 0):.2f}", "DO_TotalTTC": f"{doc.get('total_ttc', 0):.2f}", } # Fusion sujet_preview = template["sujet"] corps_preview = template["corps_html"] for var, valeur in variables.items(): sujet_preview = sujet_preview.replace(f"{{{{{var}}}}}", valeur) corps_preview = corps_preview.replace(f"{{{{{var}}}}}", valeur) return { "template_id": preview.template_id, "document_id": preview.document_id, "sujet": sujet_preview, "corps_html": corps_preview, "variables_utilisees": variables, } # ===================================================== # ENDPOINTS - HEALTH # ===================================================== @app.get("/health", tags=["System"]) async def health_check(): """🏥 Health check""" gateway_health = sage_client.health() return { "status": "healthy", "sage_gateway": gateway_health, "email_queue": { "running": email_queue.running, "workers": len(email_queue.workers), "queue_size": email_queue.queue.qsize(), }, "timestamp": datetime.now().isoformat(), } @app.get("/", tags=["System"]) async def root(): """🏠 Page d'accueil""" return { "api": "Sage 100c Dataven - VPS Linux", "version": "2.0.0", "documentation": "/docs", "health": "/health", } # ===================================================== # ENDPOINTS - ADMIN # ===================================================== @app.get("/admin/cache/info", tags=["Admin"]) async def info_cache(): """ 📊 Informations sur l'état du cache Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) cache_info = sage_client.get_cache_info() return cache_info except Exception as e: logger.error(f"Erreur info cache: {e}") raise HTTPException(500, str(e)) # 🔧 CORRECTION ADMIN: Cache refresh (doit appeler Windows) @app.post("/admin/cache/refresh", tags=["Admin"]) async def forcer_actualisation(): """ 🔄 Force l'actualisation du cache Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) resultat = sage_client.refresh_cache() cache_info = sage_client.get_cache_info() return { "success": True, "message": "Cache actualisé sur Windows Server", "info": cache_info, } except Exception as e: logger.error(f"Erreur refresh cache: {e}") raise HTTPException(500, str(e)) # ✅ RESTE INCHANGÉ: admin/queue/status (local au VPS) @app.get("/admin/queue/status", tags=["Admin"]) async def statut_queue(): """ 📊 Statut de la queue d'emails (local VPS) """ return { "queue_size": email_queue.queue.qsize(), "workers": len(email_queue.workers), "running": email_queue.running, } # ===================================================== # ENDPOINTS - PROSPECTS # ===================================================== @app.get("/prospects", tags=["Prospects"]) async def rechercher_prospects(query: Optional[str] = Query(None)): """🔍 Recherche prospects via gateway Windows""" try: prospects = sage_client.lister_prospects(filtre=query or "") return prospects except Exception as e: logger.error(f"Erreur recherche prospects: {e}") raise HTTPException(500, str(e)) @app.get("/prospects/{code}", tags=["Prospects"]) async def lire_prospect(code: str): """📄 Lecture d'un prospect par code""" try: prospect = sage_client.lire_prospect(code) if not prospect: raise HTTPException(404, f"Prospect {code} introuvable") return prospect except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture prospect: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - FOURNISSEURS # ===================================================== @app.get("/fournisseurs", tags=["Fournisseurs"]) async def rechercher_fournisseurs(query: Optional[str] = Query(None)): """🔍 Recherche fournisseurs via gateway Windows""" try: fournisseurs = sage_client.lister_fournisseurs(filtre=query or "") return fournisseurs except Exception as e: logger.error(f"Erreur recherche fournisseurs: {e}") raise HTTPException(500, str(e)) @app.get("/fournisseurs/{code}", tags=["Fournisseurs"]) async def lire_fournisseur(code: str): """📄 Lecture d'un fournisseur par code""" try: fournisseur = sage_client.lire_fournisseur(code) if not fournisseur: raise HTTPException(404, f"Fournisseur {code} introuvable") return fournisseur except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture fournisseur: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - AVOIRS # ===================================================== @app.get("/avoirs", tags=["Avoirs"]) async def lister_avoirs( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """📋 Liste tous les avoirs via gateway Windows""" try: avoirs = sage_client.lister_avoirs(limit=limit, statut=statut) return avoirs except Exception as e: logger.error(f"Erreur liste avoirs: {e}") raise HTTPException(500, str(e)) @app.get("/avoirs/{numero}", tags=["Avoirs"]) async def lire_avoir(numero: str): """📄 Lecture d'un avoir avec ses lignes""" try: avoir = sage_client.lire_avoir(numero) if not avoir: raise HTTPException(404, f"Avoir {numero} introuvable") return avoir except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture avoir: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - LIVRAISONS # ===================================================== @app.get("/livraisons", tags=["Livraisons"]) async def lister_livraisons( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """📋 Liste tous les bons de livraison via gateway Windows""" try: livraisons = sage_client.lister_livraisons(limit=limit, statut=statut) return livraisons except Exception as e: logger.error(f"Erreur liste livraisons: {e}") raise HTTPException(500, str(e)) @app.get("/livraisons/{numero}", tags=["Livraisons"]) async def lire_livraison(numero: str): """📄 Lecture d'une livraison avec ses lignes""" try: livraison = sage_client.lire_livraison(numero) if not livraison: raise HTTPException(404, f"Livraison {numero} introuvable") return livraison except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture livraison: {e}") raise HTTPException(500, str(e)) @app.get("/debug/users", response_model=List[UserResponse], tags=["Debug"]) async def lister_utilisateurs_debug( session: AsyncSession = Depends(get_session), limit: int = Query(100, le=1000), role: Optional[str] = Query(None), verified_only: bool = Query(False), ): """ 🔓 **ROUTE DEBUG** - Liste tous les utilisateurs inscrits ⚠️ **ATTENTION**: Cette route n'est PAS protégée par authentification. À utiliser uniquement en développement ou à sécuriser en production. Args: limit: Nombre maximum d'utilisateurs à retourner role: Filtrer par rôle (user, admin, commercial) verified_only: Afficher uniquement les utilisateurs vérifiés Returns: Liste des utilisateurs avec leurs informations (mot de passe masqué) """ from database import User from sqlalchemy import select try: # Construction de la requête query = select(User) # Filtres optionnels if role: query = query.where(User.role == role) if verified_only: query = query.where(User.is_verified == True) # Tri par date de création (plus récents en premier) query = query.order_by(User.created_at.desc()).limit(limit) # Exécution result = await session.execute(query) users = result.scalars().all() # Conversion en réponse users_response = [] for user in users: users_response.append( UserResponse( id=user.id, email=user.email, nom=user.nom, prenom=user.prenom, role=user.role, is_verified=user.is_verified, is_active=user.is_active, created_at=user.created_at.isoformat() if user.created_at else "", last_login=user.last_login.isoformat() if user.last_login else None, failed_login_attempts=user.failed_login_attempts or 0, ) ) logger.info( f"📋 Liste utilisateurs retournée: {len(users_response)} résultat(s)" ) return users_response except Exception as e: logger.error(f"❌ Erreur liste utilisateurs: {e}") raise HTTPException(500, str(e)) @app.get("/debug/users/stats", tags=["Debug"]) async def statistiques_utilisateurs(session: AsyncSession = Depends(get_session)): """ 📊 **ROUTE DEBUG** - Statistiques sur les utilisateurs ⚠️ Non protégée - à sécuriser en production """ from database import User from sqlalchemy import select, func try: # Total utilisateurs total_query = select(func.count(User.id)) total_result = await session.execute(total_query) total = total_result.scalar() # Utilisateurs vérifiés verified_query = select(func.count(User.id)).where(User.is_verified == True) verified_result = await session.execute(verified_query) verified = verified_result.scalar() # Utilisateurs actifs active_query = select(func.count(User.id)).where(User.is_active == True) active_result = await session.execute(active_query) active = active_result.scalar() # Par rôle roles_query = select(User.role, func.count(User.id)).group_by(User.role) roles_result = await session.execute(roles_query) roles_stats = {role: count for role, count in roles_result.all()} return { "total_utilisateurs": total, "utilisateurs_verifies": verified, "utilisateurs_actifs": active, "utilisateurs_non_verifies": total - verified, "repartition_roles": roles_stats, "taux_verification": f"{(verified/total*100):.1f}%" if total > 0 else "0%", } except Exception as e: logger.error(f"❌ Erreur stats utilisateurs: {e}") raise HTTPException(500, str(e)) @app.get("/debug/users/{user_id}", response_model=UserResponse, tags=["Debug"]) async def lire_utilisateur_debug( user_id: str, session: AsyncSession = Depends(get_session) ): """ 👤 **ROUTE DEBUG** - Détails d'un utilisateur par ID ⚠️ Non protégée - à sécuriser en production """ from database import User from sqlalchemy import select try: query = select(User).where(User.id == user_id) result = await session.execute(query) user = result.scalar_one_or_none() if not user: raise HTTPException(404, f"Utilisateur {user_id} introuvable") return UserResponse( id=user.id, email=user.email, nom=user.nom, prenom=user.prenom, role=user.role, is_verified=user.is_verified, is_active=user.is_active, created_at=user.created_at.isoformat() if user.created_at else "", last_login=user.last_login.isoformat() if user.last_login else None, failed_login_attempts=user.failed_login_attempts or 0, ) except HTTPException: raise except Exception as e: logger.error(f"❌ Erreur lecture utilisateur: {e}") raise HTTPException(500, str(e)) @app.get("/debug/database/check", tags=["Debug"]) async def verifier_integrite_database(session: AsyncSession = Depends(get_session)): """ 🔍 Vérification de l'intégrité de la base de données Retourne des statistiques détaillées sur toutes les tables """ from database import User, RefreshToken, LoginAttempt, EmailLog, SignatureLog from sqlalchemy import func, text try: diagnostics = {} # === TABLE USERS === # Compter tous les users total_users = await session.execute(select(func.count(User.id))) diagnostics["users"] = {"total": total_users.scalar(), "details": []} # Lister tous les users avec détails all_users = await session.execute(select(User)) users_list = all_users.scalars().all() for u in users_list: diagnostics["users"]["details"].append( { "id": u.id, "email": u.email, "nom": f"{u.prenom} {u.nom}", "role": u.role, "is_active": u.is_active, "is_verified": u.is_verified, "created_at": u.created_at.isoformat() if u.created_at else None, "has_reset_token": u.reset_token is not None, "has_verification_token": u.verification_token is not None, } ) # === TABLE REFRESH_TOKENS === total_tokens = await session.execute(select(func.count(RefreshToken.id))) diagnostics["refresh_tokens"] = {"total": total_tokens.scalar()} # === TABLE LOGIN_ATTEMPTS === total_attempts = await session.execute(select(func.count(LoginAttempt.id))) diagnostics["login_attempts"] = {"total": total_attempts.scalar()} # === TABLE EMAIL_LOGS === total_emails = await session.execute(select(func.count(EmailLog.id))) diagnostics["email_logs"] = {"total": total_emails.scalar()} # === TABLE SIGNATURE_LOGS === total_signatures = await session.execute(select(func.count(SignatureLog.id))) diagnostics["signature_logs"] = {"total": total_signatures.scalar()} # === VÉRIFIER LES FICHIERS SQLITE === import os db_file = "sage_dataven.db" diagnostics["database_file"] = { "exists": os.path.exists(db_file), "size_bytes": os.path.getsize(db_file) if os.path.exists(db_file) else 0, "path": os.path.abspath(db_file), } # === TESTER UNE REQUÊTE RAW SQL === try: raw_count = await session.execute(text("SELECT COUNT(*) FROM users")) diagnostics["raw_sql_check"] = { "users_count": raw_count.scalar(), "status": "✅ Connexion DB OK", } except Exception as e: diagnostics["raw_sql_check"] = {"status": "❌ Erreur", "error": str(e)} return { "success": True, "timestamp": datetime.now().isoformat(), "diagnostics": diagnostics, } except Exception as e: logger.error(f"❌ Erreur diagnostic DB: {e}", exc_info=True) raise HTTPException(500, f"Erreur diagnostic: {str(e)}") @app.post("/debug/database/test-user-persistence", tags=["Debug"]) async def tester_persistance_utilisateur(session: AsyncSession = Depends(get_session)): """ 🧪 Test de création/lecture/modification d'un utilisateur de test Crée un utilisateur de test, le modifie, et vérifie la persistance """ import uuid from database import User from security.auth import hash_password try: test_email = f"test_{uuid.uuid4().hex[:8]}@example.com" # === ÉTAPE 1: CRÉATION === test_user = User( id=str(uuid.uuid4()), email=test_email, hashed_password=hash_password("TestPassword123!"), nom="Test", prenom="User", role="user", is_verified=True, is_active=True, created_at=datetime.now(), ) session.add(test_user) await session.flush() user_id = test_user.id await session.commit() logger.info(f"✅ ÉTAPE 1: User créé - {user_id}") # === ÉTAPE 2: LECTURE === result = await session.execute(select(User).where(User.id == user_id)) loaded_user = result.scalar_one_or_none() if not loaded_user: return { "success": False, "error": "❌ User introuvable après création !", "step": "LECTURE", } logger.info(f"✅ ÉTAPE 2: User chargé - {loaded_user.email}") # === ÉTAPE 3: MODIFICATION (simulate reset password) === loaded_user.hashed_password = hash_password("NewPassword456!") loaded_user.reset_token = None loaded_user.reset_token_expires = None session.add(loaded_user) await session.flush() await session.commit() await session.refresh(loaded_user) logger.info(f"✅ ÉTAPE 3: User modifié") # === ÉTAPE 4: RE-LECTURE === result2 = await session.execute(select(User).where(User.id == user_id)) reloaded_user = result2.scalar_one_or_none() if not reloaded_user: return { "success": False, "error": "❌ User DISPARU après modification !", "step": "RE-LECTURE", "user_id": user_id, } logger.info(f"✅ ÉTAPE 4: User re-chargé - {reloaded_user.email}") # === ÉTAPE 5: SUPPRESSION DU TEST === await session.delete(reloaded_user) await session.commit() logger.info(f"✅ ÉTAPE 5: User test supprimé") return { "success": True, "message": "✅ Tous les tests de persistance sont OK", "test_user_id": user_id, "test_email": test_email, "steps_completed": [ "1. Création", "2. Lecture", "3. Modification (reset password simulé)", "4. Re-lecture (vérification persistance)", "5. Suppression (cleanup)", ], } except Exception as e: logger.error(f"❌ Erreur test persistance: {e}", exc_info=True) # Rollback en cas d'erreur await session.rollback() return { "success": False, "error": str(e), "traceback": str(e.__class__.__name__), } # ===================================================== # LANCEMENT # ===================================================== if __name__ == "__main__": uvicorn.run( "api:app", host=settings.api_host, port=settings.api_port, reload=settings.api_reload, )