from fastapi import FastAPI, HTTPException, Query, Depends, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field, EmailStr from typing import List, Optional, Dict from datetime import date, datetime from enum import Enum import uvicorn from contextlib import asynccontextmanager import uuid import csv import io import logging from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select # Configuration logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("sage_api.log"), logging.StreamHandler()], ) logger = logging.getLogger(__name__) # Imports locaux from config import settings from database import ( init_db, async_session_factory, get_session, EmailLog, StatutEmail as StatutEmailEnum, WorkflowLog, SignatureLog, StatutSignature as StatutSignatureEnum, ) from email_queue import email_queue from sage_client import sage_client # ===================================================== # ENUMS # ===================================================== class TypeDocument(int, Enum): DEVIS = 0 BON_LIVRAISON = 1 BON_RETOUR = 2 COMMANDE = 3 PREPARATION = 4 FACTURE = 5 class StatutSignature(str, Enum): EN_ATTENTE = "EN_ATTENTE" ENVOYE = "ENVOYE" SIGNE = "SIGNE" REFUSE = "REFUSE" EXPIRE = "EXPIRE" class StatutEmail(str, Enum): EN_ATTENTE = "EN_ATTENTE" EN_COURS = "EN_COURS" ENVOYE = "ENVOYE" OUVERT = "OUVERT" ERREUR = "ERREUR" BOUNCE = "BOUNCE" # ===================================================== # MODÈLES PYDANTIC # ===================================================== class ClientResponse(BaseModel): numero: str intitule: str adresse: Optional[str] = None code_postal: Optional[str] = None ville: Optional[str] = None email: Optional[str] = None telephone: Optional[str] = None class ArticleResponse(BaseModel): reference: str designation: str prix_vente: float stock_reel: float class LigneDevis(BaseModel): article_code: str quantite: float prix_unitaire_ht: Optional[float] = None remise_pourcentage: Optional[float] = 0.0 class DevisRequest(BaseModel): client_id: str date_devis: Optional[date] = None lignes: List[LigneDevis] class DevisResponse(BaseModel): id: str client_id: str date_devis: str montant_total_ht: float montant_total_ttc: float nb_lignes: int class SignatureRequest(BaseModel): doc_id: str type_doc: TypeDocument email_signataire: EmailStr nom_signataire: str class EmailEnvoiRequest(BaseModel): destinataire: EmailStr cc: Optional[List[EmailStr]] = [] cci: Optional[List[EmailStr]] = [] sujet: str corps_html: str document_ids: Optional[List[str]] = None type_document: Optional[TypeDocument] = None class RelanceDevisRequest(BaseModel): doc_id: str message_personnalise: Optional[str] = None class BaremeRemiseResponse(BaseModel): client_id: str remise_max_autorisee: float remise_demandee: float autorisee: bool message: str # ===================================================== # SERVICES EXTERNES (Universign) # ===================================================== async def universign_envoyer( doc_id: str, pdf_bytes: bytes, email: str, nom: str ) -> Dict: """Envoi signature via API Universign""" import requests try: api_key = settings.universign_api_key api_url = settings.universign_api_url auth = (api_key, "") # Étape 1: Créer transaction response = requests.post( f"{api_url}/transactions", auth=auth, json={"name": f"Devis {doc_id}", "language": "fr"}, timeout=30, ) response.raise_for_status() transaction_id = response.json().get("id") # Étape 2: Upload PDF files = {"file": (f"Devis_{doc_id}.pdf", pdf_bytes, "application/pdf")} response = requests.post(f"{api_url}/files", auth=auth, files=files, timeout=30) response.raise_for_status() file_id = response.json().get("id") # Étape 3: Ajouter document response = requests.post( f"{api_url}/transactions/{transaction_id}/documents", auth=auth, data={"document": file_id}, timeout=30, ) response.raise_for_status() document_id = response.json().get("id") # Étape 4: Créer champ signature response = requests.post( f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields", auth=auth, data={"type": "signature"}, timeout=30, ) response.raise_for_status() field_id = response.json().get("id") # Étape 5: Assigner signataire response = requests.post( f"{api_url}/transactions/{transaction_id}/signatures", auth=auth, data={"signer": email, "field": field_id}, timeout=30, ) response.raise_for_status() # Étape 6: Démarrer transaction response = requests.post( f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30 ) response.raise_for_status() final_data = response.json() signer_url = ( final_data.get("actions", [{}])[0].get("url", "") if final_data.get("actions") else "" ) logger.info(f"✅ Signature Universign envoyée: {transaction_id}") return { "transaction_id": transaction_id, "signer_url": signer_url, "statut": "ENVOYE", } except Exception as e: logger.error(f"❌ Erreur Universign: {e}") return {"error": str(e), "statut": "ERREUR"} async def universign_statut(transaction_id: str) -> Dict: """Récupération statut signature""" import requests try: response = requests.get( f"{settings.universign_api_url}/transactions/{transaction_id}", auth=(settings.universign_api_key, ""), timeout=10, ) if response.status_code == 200: data = response.json() statut_map = { "draft": "EN_ATTENTE", "started": "EN_ATTENTE", "completed": "SIGNE", "refused": "REFUSE", "expired": "EXPIRE", "canceled": "REFUSE", } return { "statut": statut_map.get(data.get("state"), "EN_ATTENTE"), "date_signature": data.get("completed_at"), } else: return {"statut": "ERREUR"} except Exception as e: logger.error(f"Erreur statut Universign: {e}") return {"statut": "ERREUR", "error": str(e)} # ===================================================== # CYCLE DE VIE # ===================================================== @asynccontextmanager async def lifespan(app: FastAPI): # Init base de données await init_db() logger.info("✅ Base de données initialisée") # Injecter session_factory dans email_queue email_queue.session_factory = async_session_factory # ⚠️ PAS de sage_connector ici (c'est sur Windows !) # email_queue utilisera sage_client pour générer les PDFs via HTTP # Démarrer queue email_queue.start(num_workers=settings.max_email_workers) logger.info(f"✅ Email queue démarrée") yield # Cleanup email_queue.stop() logger.info("👋 Services arrêtés") # ===================================================== # APPLICATION # ===================================================== app = FastAPI( title="API Sage 100c Dataven", version="2.0.0", description="API de gestion commerciale - VPS Linux", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["*"], allow_credentials=True, ) # ===================================================== # ENDPOINTS - US-A1 (CRÉATION RAPIDE DEVIS) # ===================================================== @app.get("/clients", response_model=List[ClientResponse], tags=["US-A1"]) async def rechercher_clients(query: Optional[str] = Query(None)): """🔍 Recherche clients via gateway Windows""" try: clients = sage_client.lister_clients(filtre=query or "") return [ClientResponse(**c) for c in clients] except Exception as e: logger.error(f"Erreur recherche clients: {e}") raise HTTPException(500, str(e)) @app.get("/articles", response_model=List[ArticleResponse], tags=["US-A1"]) async def rechercher_articles(query: Optional[str] = Query(None)): """🔍 Recherche articles via gateway Windows""" try: articles = sage_client.lister_articles(filtre=query or "") return [ArticleResponse(**a) for a in articles] except Exception as e: logger.error(f"Erreur recherche articles: {e}") raise HTTPException(500, str(e)) @app.post("/devis", response_model=DevisResponse, status_code=201, tags=["US-A1"]) async def creer_devis(devis: DevisRequest): """📝 Création de devis via gateway Windows""" try: # Préparer les données pour la gateway devis_data = { "client_id": devis.client_id, "date_devis": devis.date_devis.isoformat() if devis.date_devis else None, "lignes": [ { "article_code": l.article_code, "quantite": l.quantite, "prix_unitaire_ht": l.prix_unitaire_ht, "remise_pourcentage": l.remise_pourcentage, } for l in devis.lignes ], } # Appel HTTP vers Windows resultat = sage_client.creer_devis(devis_data) logger.info(f"✅ Devis créé: {resultat.get('numero_devis')}") return DevisResponse( id=resultat["numero_devis"], client_id=devis.client_id, date_devis=resultat["date_devis"], montant_total_ht=resultat["total_ht"], montant_total_ttc=resultat["total_ttc"], nb_lignes=resultat["nb_lignes"], ) except Exception as e: logger.error(f"Erreur création devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis", tags=["US-A1"]) async def lister_devis( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """ 📋 Liste tous les devis via gateway Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) devis_list = sage_client.lister_devis(limit=limit, statut=statut) return devis_list except Exception as e: logger.error(f"Erreur liste devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis/{id}", tags=["US-A1"]) async def lire_devis(id: str): """📄 Lecture d'un devis via gateway Windows""" try: devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") return devis except HTTPException: raise except Exception as e: logger.error(f"Erreur lecture devis: {e}") raise HTTPException(500, str(e)) @app.get("/devis/{id}/pdf", tags=["US-A1"]) async def telecharger_devis_pdf(id: str): """📄 Téléchargement PDF (généré via email_queue)""" try: # Générer PDF en appelant la méthode de email_queue # qui elle-même appellera sage_client pour récupérer les données pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS) return StreamingResponse( iter([pdf_bytes]), media_type="application/pdf", headers={"Content-Disposition": f"attachment; filename=Devis_{id}.pdf"}, ) except Exception as e: logger.error(f"Erreur génération PDF: {e}") raise HTTPException(500, str(e)) @app.post("/devis/{id}/envoyer", tags=["US-A1"]) async def envoyer_devis_email( id: str, request: EmailEnvoiRequest, session: AsyncSession = Depends(get_session) ): """📧 Envoi devis par email""" try: # Vérifier que le devis existe devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Créer logs email pour chaque destinataire tous_destinataires = [request.destinataire] + request.cc + request.cci email_logs = [] for dest in tous_destinataires: email_log = EmailLog( id=str(uuid.uuid4()), destinataire=dest, sujet=request.sujet, corps_html=request.corps_html, document_ids=id, type_document=TypeDocument.DEVIS, statut=StatutEmailEnum.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() email_queue.enqueue(email_log.id) email_logs.append(email_log.id) await session.commit() logger.info( f"✅ Email devis {id} mis en file: {len(tous_destinataires)} destinataire(s)" ) return { "success": True, "email_log_ids": email_logs, "devis_id": id, "message": f"{len(tous_destinataires)} email(s) en file d'attente", } except HTTPException: raise except Exception as e: logger.error(f"Erreur envoi email: {e}") raise HTTPException(500, str(e)) @app.put("/devis/{id}/statut", tags=["US-A1"]) async def changer_statut_devis(id: str, nouveau_statut: int = Query(..., ge=0, le=5)): """ 📄 Changement de statut d'un devis via gateway Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) resultat = sage_client.changer_statut_devis(id, nouveau_statut) logger.info(f"✅ Statut devis {id} changé: {nouveau_statut}") return { "success": True, "devis_id": id, "statut_ancien": resultat.get("statut_ancien"), "statut_nouveau": resultat.get("statut_nouveau"), "message": "Statut mis à jour avec succès", } except Exception as e: logger.error(f"Erreur changement statut: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A2 (WORKFLOW SANS RESSAISIE) # ===================================================== @app.get("/commandes", tags=["US-A2"]) async def lister_commandes( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """ 📋 Liste toutes les commandes via gateway Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) commandes = sage_client.lister_commandes(limit=limit, statut=statut) return commandes except Exception as e: logger.error(f"Erreur liste commandes: {e}") raise HTTPException(500, str(e)) @app.post("/workflow/devis/{id}/to-commande", tags=["US-A2"]) async def devis_vers_commande(id: str, session: AsyncSession = Depends(get_session)): """🔧 Transformation Devis → Commande via gateway Windows""" try: # Appel HTTP vers Windows resultat = sage_client.transformer_document( numero_source=id, type_source=TypeDocument.DEVIS, type_cible=TypeDocument.COMMANDE, ) # Logger en DB workflow_log = WorkflowLog( id=str(uuid.uuid4()), document_source=id, type_source=TypeDocument.DEVIS, document_cible=resultat.get("document_cible", ""), type_cible=TypeDocument.COMMANDE, nb_lignes=resultat.get("nb_lignes", 0), date_transformation=datetime.now(), succes=True, ) session.add(workflow_log) await session.commit() logger.info( f"✅ Transformation: Devis {id} → Commande {resultat['document_cible']}" ) return { "success": True, "document_source": id, "document_cible": resultat["document_cible"], "nb_lignes": resultat["nb_lignes"], } except Exception as e: logger.error(f"Erreur transformation: {e}") raise HTTPException(500, str(e)) @app.post("/workflow/commande/{id}/to-facture", tags=["US-A2"]) async def commande_vers_facture(id: str, session: AsyncSession = Depends(get_session)): """🔧 Transformation Commande → Facture via gateway Windows""" try: resultat = sage_client.transformer_document( numero_source=id, type_source=TypeDocument.COMMANDE, type_cible=TypeDocument.FACTURE, ) workflow_log = WorkflowLog( id=str(uuid.uuid4()), document_source=id, type_source=TypeDocument.COMMANDE, document_cible=resultat.get("document_cible", ""), type_cible=TypeDocument.FACTURE, nb_lignes=resultat.get("nb_lignes", 0), date_transformation=datetime.now(), succes=True, ) session.add(workflow_log) await session.commit() return resultat except Exception as e: logger.error(f"Erreur transformation: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A3 (SIGNATURE ÉLECTRONIQUE) # ===================================================== @app.post("/signature/universign/send", tags=["US-A3"]) async def envoyer_signature( demande: SignatureRequest, session: AsyncSession = Depends(get_session) ): """✍️ Envoi document pour signature Universign""" try: # Générer PDF pdf_bytes = email_queue._generate_pdf(demande.doc_id, demande.type_doc) # Envoi Universign resultat = await universign_envoyer( demande.doc_id, pdf_bytes, demande.email_signataire, demande.nom_signataire ) if "error" in resultat: raise HTTPException(500, resultat["error"]) # Logger en DB signature_log = SignatureLog( id=str(uuid.uuid4()), document_id=demande.doc_id, type_document=demande.type_doc, transaction_id=resultat["transaction_id"], signer_url=resultat["signer_url"], email_signataire=demande.email_signataire, nom_signataire=demande.nom_signataire, statut=StatutSignatureEnum.ENVOYE, date_envoi=datetime.now(), ) session.add(signature_log) await session.commit() # MAJ champ libre Sage via gateway Windows sage_client.mettre_a_jour_champ_libre( demande.doc_id, demande.type_doc, "UniversignID", resultat["transaction_id"] ) logger.info(f"✅ Signature envoyée: {demande.doc_id}") return { "success": True, "transaction_id": resultat["transaction_id"], "signer_url": resultat["signer_url"], } except HTTPException: raise except Exception as e: logger.error(f"Erreur signature: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A5 # ===================================================== @app.post("/devis/valider-remise", response_model=BaremeRemiseResponse, tags=["US-A5"]) async def valider_remise( client_id: str = Query(..., min_length=1), remise_pourcentage: float = Query(0.0, ge=0, le=100), ): """ 💰 US-A5: Validation remise via barème client Sage """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) remise_max = sage_client.lire_remise_max_client(client_id) autorisee = remise_pourcentage <= remise_max if not autorisee: message = f"⚠️ Remise trop élevée (max autorisé: {remise_max}%)" logger.warning( f"Remise refusée pour {client_id}: {remise_pourcentage}% > {remise_max}%" ) else: message = "✅ Remise autorisée" return BaremeRemiseResponse( client_id=client_id, remise_max_autorisee=remise_max, remise_demandee=remise_pourcentage, autorisee=autorisee, message=message, ) except Exception as e: logger.error(f"Erreur validation remise: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A6 (RELANCE DEVIS) # ===================================================== @app.post("/devis/{id}/relancer-signature", tags=["US-A6"]) async def relancer_devis_signature( id: str, relance: RelanceDevisRequest, session: AsyncSession = Depends(get_session) ): """📧 Relance devis via Universign""" try: # Lire devis via gateway devis = sage_client.lire_devis(id) if not devis: raise HTTPException(404, f"Devis {id} introuvable") # Récupérer contact via gateway contact = sage_client.lire_contact_client(devis["client_code"]) if not contact or not contact.get("email"): raise HTTPException(400, "Aucun email trouvé pour ce client") # Générer PDF pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS) # Envoi Universign resultat = await universign_envoyer( id, pdf_bytes, contact["email"], contact["nom"] or contact["client_intitule"], ) if "error" in resultat: raise HTTPException(500, resultat["error"]) # Logger en DB signature_log = SignatureLog( id=str(uuid.uuid4()), document_id=id, type_document=TypeDocument.DEVIS, transaction_id=resultat["transaction_id"], signer_url=resultat["signer_url"], email_signataire=contact["email"], nom_signataire=contact["nom"] or contact["client_intitule"], statut=StatutSignatureEnum.ENVOYE, date_envoi=datetime.now(), est_relance=True, nb_relances=1, ) session.add(signature_log) await session.commit() return { "success": True, "devis_id": id, "transaction_id": resultat["transaction_id"], "message": "Relance signature envoyée", } except HTTPException: raise except Exception as e: logger.error(f"Erreur relance: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - US-A7 # ===================================================== @app.get("/factures", tags=["US-A7"]) async def lister_factures( limit: int = Query(100, le=1000), statut: Optional[int] = Query(None) ): """ 📋 Liste toutes les factures via gateway Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) factures = sage_client.lister_factures(limit=limit, statut=statut) return factures except Exception as e: logger.error(f"Erreur liste factures: {e}") raise HTTPException(500, str(e)) # ===================================================== # ENDPOINTS - HEALTH # ===================================================== @app.get("/health", tags=["System"]) async def health_check(): """🏥 Health check""" gateway_health = sage_client.health() return { "status": "healthy", "sage_gateway": gateway_health, "email_queue": { "running": email_queue.running, "workers": len(email_queue.workers), "queue_size": email_queue.queue.qsize(), }, "timestamp": datetime.now().isoformat(), } @app.get("/", tags=["System"]) async def root(): """🏠 Page d'accueil""" return { "api": "Sage 100c Dataven - VPS Linux", "version": "2.0.0", "documentation": "/docs", "health": "/health", } # ===================================================== # ENDPOINTS - ADMIN # ===================================================== @app.get("/admin/cache/info", tags=["Admin"]) async def info_cache(): """ 📊 Informations sur l'état du cache Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) cache_info = sage_client.get_cache_info() return cache_info except Exception as e: logger.error(f"Erreur info cache: {e}") raise HTTPException(500, str(e)) # 🔧 CORRECTION ADMIN: Cache refresh (doit appeler Windows) @app.post("/admin/cache/refresh", tags=["Admin"]) async def forcer_actualisation(): """ 🔄 Force l'actualisation du cache Windows """ try: # ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows) resultat = sage_client.refresh_cache() cache_info = sage_client.get_cache_info() return { "success": True, "message": "Cache actualisé sur Windows Server", "info": cache_info, } except Exception as e: logger.error(f"Erreur refresh cache: {e}") raise HTTPException(500, str(e)) # ✅ RESTE INCHANGÉ: admin/queue/status (local au VPS) @app.get("/admin/queue/status", tags=["Admin"]) async def statut_queue(): """ 📊 Statut de la queue d'emails (local VPS) """ return { "queue_size": email_queue.queue.qsize(), "workers": len(email_queue.workers), "running": email_queue.running, } # ===================================================== # LANCEMENT # ===================================================== if __name__ == "__main__": uvicorn.run( "api:app", host=settings.api_host, port=settings.api_port, reload=settings.api_reload, )