From a68f5af72eaa8d9f1bd7322cbacd2a9eeced3dc7 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Mon, 5 Jan 2026 23:37:17 +0300 Subject: [PATCH] feat(universign): implement comprehensive e-signature integration --- api.py | 382 ++----------------- database/__init__.py | 12 + database/models/universign.py | 322 ++++++++++++++++ routes/universign.py | 538 +++++++++++++++++++++++++++ services/universign_sync.py | 575 +++++++++++++++++++++++++++++ utils/generic_functions.py | 271 +++++++++++++- utils/universign_status_mapping.py | 274 ++++++++++++++ 7 files changed, 2020 insertions(+), 354 deletions(-) create mode 100644 database/models/universign.py create mode 100644 routes/universign.py create mode 100644 services/universign_sync.py create mode 100644 utils/universign_status_mapping.py diff --git a/api.py b/api.py index 73fded5..c37cbde 100644 --- a/api.py +++ b/api.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field, EmailStr from typing import List, Optional from datetime import datetime import uvicorn +import asyncio from contextlib import asynccontextmanager import uuid import csv @@ -74,17 +75,24 @@ from schemas import ( from schemas.tiers.commercial import ( CollaborateurCreate, CollaborateurDetails, - CollaborateurListe, CollaborateurUpdate, ) from utils.normalization import normaliser_type_tiers from routes.sage_gateway import router as sage_gateway_router +from routes.universign import router as universign_router + +from services.universign_sync import UniversignSyncService, UniversignSyncScheduler + from core.sage_context import ( get_sage_client_for_user, get_gateway_context_for_user, GatewayContext, ) -from utils.generic_functions import _preparer_lignes_document +from utils.generic_functions import ( + _preparer_lignes_document, + universign_envoyer, + universign_statut, +) if os.path.exists("/app"): LOGS_DIR = FilePath("/app/logs") @@ -118,8 +126,23 @@ async def lifespan(app: FastAPI): email_queue.start(num_workers=settings.max_email_workers) logger.info("Email queue démarrée") + sync_service = UniversignSyncService( + api_url=settings.universign_api_url, api_key=settings.universign_api_key + ) + + scheduler = UniversignSyncScheduler( + sync_service=sync_service, + interval_minutes=5, # Synchronisation toutes les 5 minutes + ) + + sync_task = asyncio.create_task(scheduler.start(async_session_factory)) + + logger.info("✓ Synchronisation Universign démarrée (5min)") + yield + scheduler.stop() + sync_task.cancel() email_queue.stop() logger.info("Services arrêtés") @@ -143,181 +166,7 @@ app.add_middleware( app.include_router(auth_router) app.include_router(sage_gateway_router) - - -async def universign_envoyer( - doc_id: str, - pdf_bytes: bytes, - email: str, - nom: str, - doc_data: dict, - session: AsyncSession, -) -> dict: - import requests - - try: - api_key = settings.universign_api_key - api_url = settings.universign_api_url - auth = (api_key, "") - - logger.info(f"Démarrage processus Universign pour {email}") - - if not pdf_bytes or len(pdf_bytes) == 0: - raise Exception("Le PDF généré est vide") - - response = requests.post( - f"{api_url}/transactions", - auth=auth, - json={ - "name": f"{doc_data.get('type_label', 'Document')} {doc_id}", - "language": "fr", - }, - timeout=30, - ) - if response.status_code != 200: - raise Exception(f"Erreur création transaction: {response.status_code}") - transaction_id = response.json().get("id") - - files = { - "file": ( - f"{doc_data.get('type_label', 'Document')}_{doc_id}.pdf", - pdf_bytes, - "application/pdf", - ) - } - response = requests.post(f"{api_url}/files", auth=auth, files=files, timeout=60) - if response.status_code not in [200, 201]: - raise Exception(f"Erreur upload fichier: {response.status_code}") - file_id = response.json().get("id") - - response = requests.post( - f"{api_url}/transactions/{transaction_id}/documents", - auth=auth, - data={"document": file_id}, - timeout=30, - ) - if response.status_code not in [200, 201]: - raise Exception(f"Erreur ajout document: {response.status_code}") - document_id = response.json().get("id") - - response = requests.post( - f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields", - auth=auth, - data={"type": "signature"}, - timeout=30, - ) - if response.status_code not in [200, 201]: - raise Exception(f"Erreur création champ: {response.status_code}") - field_id = response.json().get("id") - - response = requests.post( - f"{api_url}/transactions/{transaction_id}/signatures", - auth=auth, - data={"signer": email, "field": field_id}, - timeout=30, - ) - if response.status_code not in [200, 201]: - raise Exception(f"Erreur liaison signataire: {response.status_code}") - - response = requests.post( - f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30 - ) - if response.status_code not in [200, 201]: - raise Exception(f"Erreur démarrage: {response.status_code}") - final_data = response.json() - - signer_url = "" - if final_data.get("actions"): - for action in final_data["actions"]: - if action.get("url"): - signer_url = action["url"] - break - if not signer_url and final_data.get("signers"): - for signer in final_data["signers"]: - if signer.get("email") == email: - signer_url = signer.get("url", "") - break - if not signer_url: - raise ValueError("URL de signature non retournée par Universign") - - template = templates_signature_email["demande_signature"] - type_labels = { - 0: "Devis", - 10: "Commande", - 30: "Bon de Livraison", - 60: "Facture", - 50: "Avoir", - } - variables = { - "NOM_SIGNATAIRE": nom, - "TYPE_DOC": type_labels.get(doc_data.get("type_doc", 0), "Document"), - "NUMERO": doc_id, - "DATE": doc_data.get("date", datetime.now().strftime("%d/%m/%Y")), - "MONTANT_TTC": f"{doc_data.get('montant_ttc', 0):.2f}", - "SIGNER_URL": signer_url, - "CONTACT_EMAIL": settings.smtp_from, - } - sujet = template["sujet"] - corps = template["corps_html"] - for var, valeur in variables.items(): - sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) - corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) - - email_log = EmailLog( - id=str(uuid.uuid4()), - destinataire=email, - sujet=sujet, - corps_html=corps, - document_ids=doc_id, - type_document=doc_data.get("type_doc"), - statut=StatutEmailDB.EN_ATTENTE, - date_creation=datetime.now(), - nb_tentatives=0, - ) - session.add(email_log) - await session.flush() - email_queue.enqueue(email_log.id) - - return { - "transaction_id": transaction_id, - "signer_url": signer_url, - "statut": "ENVOYE", - "email_log_id": email_log.id, - "email_sent": True, - } - - except Exception as e: - logger.error(f"Erreur Universign: {e}", exc_info=True) - return {"error": str(e), "statut": "ERREUR", "email_sent": False} - - -async def universign_statut(transaction_id: str) -> dict: - import requests - - try: - response = requests.get( - f"{settings.universign_api_url}/transactions/{transaction_id}", - auth=(settings.universign_api_key, ""), - timeout=10, - ) - if response.status_code == 200: - data = response.json() - statut_map = { - "draft": "EN_ATTENTE", - "started": "EN_ATTENTE", - "completed": "SIGNE", - "refused": "REFUSE", - "expired": "EXPIRE", - "canceled": "REFUSE", - } - return { - "statut": statut_map.get(data.get("state"), "EN_ATTENTE"), - "date_signature": data.get("completed_at"), - } - return {"statut": "ERREUR"} - except Exception as e: - logger.error(f"Erreur statut Universign: {e}") - return {"statut": "ERREUR", "error": str(e)} +app.include_router(universign_router) @app.get("/clients", response_model=List[ClientDetails], tags=["Clients"]) @@ -1086,181 +935,6 @@ def normaliser_type_doc(type_doc: int) -> int: return type_doc if type_doc == 0 else type_doc // 10 -@app.post("/signature/universign/send", tags=["Signatures"]) -async def envoyer_signature_optimise( - demande: Signature, session: AsyncSession = Depends(get_session) -): - try: - doc = sage_client.lire_document( - demande.doc_id, normaliser_type_doc(demande.type_doc) - ) - if not doc: - raise HTTPException(404, f"Document {demande.doc_id} introuvable") - - pdf_bytes = email_queue._generate_pdf( - demande.doc_id, normaliser_type_doc(demande.type_doc) - ) - - doc_data = { - "type_doc": demande.type_doc, - "type_label": { - 0: "Devis", - 10: "Commande", - 30: "Bon de Livraison", - 60: "Facture", - 50: "Avoir", - }.get(demande.type_doc, "Document"), - "montant_ttc": doc.get("total_ttc", 0), - "date": doc.get("date", datetime.now().strftime("%d/%m/%Y")), - } - - resultat = await universign_envoyer( - doc_id=demande.doc_id, - pdf_bytes=pdf_bytes, - email=demande.email_signataire, - nom=demande.nom_signataire, - doc_data=doc_data, - session=session, - ) - - if "error" in resultat: - raise HTTPException(500, resultat["error"]) - - signature_log = SignatureLog( - id=str(uuid.uuid4()), - document_id=demande.doc_id, - type_document=demande.type_doc, - transaction_id=resultat["transaction_id"], - signer_url=resultat["signer_url"], - email_signataire=demande.email_signataire, - nom_signataire=demande.nom_signataire, - statut=StatutSignatureDB.ENVOYE, - date_envoi=datetime.now(), - ) - - session.add(signature_log) - await session.commit() - - sage_client.mettre_a_jour_champ_libre( - demande.doc_id, demande.type_doc, "UniversignID", resultat["transaction_id"] - ) - - logger.info( - f"Signature envoyée: {demande.doc_id} (Email: {resultat['email_sent']})" - ) - - return { - "success": True, - "transaction_id": resultat["transaction_id"], - "signer_url": resultat["signer_url"], - "email_sent": resultat["email_sent"], - "email_log_id": resultat.get("email_log_id"), - "message": f"Demande de signature envoyée à {demande.email_signataire}", - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Erreur signature: {e}") - raise HTTPException(500, str(e)) - - -@app.post("/webhooks/universign", tags=["Signatures"]) -async def webhook_universign( - request: Request, session: AsyncSession = Depends(get_session) -): - try: - payload = await request.json() - - event_type = payload.get("event") - transaction_id = payload.get("transaction_id") - - if not transaction_id: - logger.warning("Webhook sans transaction_id") - return {"status": "ignored"} - - query = select(SignatureLog).where( - SignatureLog.transaction_id == transaction_id - ) - result = await session.execute(query) - signature_log = result.scalar_one_or_none() - - if not signature_log: - logger.warning(f"Transaction {transaction_id} introuvable en DB") - return {"status": "not_found"} - - if event_type == "transaction.completed": - signature_log.statut = StatutSignatureDB.SIGNE - signature_log.date_signature = datetime.now() - - logger.info(f"Signature confirmée: {signature_log.document_id}") - - template = templates_signature_email["signature_confirmee"] - - type_labels = { - 0: "Devis", - 10: "Commande", - 30: "Bon de Livraison", - 60: "Facture", - 50: "Avoir", - } - - variables = { - "NOM_SIGNATAIRE": signature_log.nom_signataire, - "TYPE_DOC": type_labels.get(signature_log.type_document, "Document"), - "NUMERO": signature_log.document_id, - "DATE_SIGNATURE": datetime.now().strftime("%d/%m/%Y à %H:%M"), - "TRANSACTION_ID": transaction_id, - "CONTACT_EMAIL": settings.smtp_from, - } - - sujet = template["sujet"] - corps = template["corps_html"] - - for var, valeur in variables.items(): - sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) - corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) - - email_log = EmailLog( - id=str(uuid.uuid4()), - destinataire=signature_log.email_signataire, - sujet=sujet, - corps_html=corps, - document_ids=signature_log.document_id, - type_document=signature_log.type_document, - statut=StatutEmailDB.EN_ATTENTE, - date_creation=datetime.now(), - nb_tentatives=0, - ) - - session.add(email_log) - email_queue.enqueue(email_log.id) - - logger.info( - f" Email de confirmation envoyé: {signature_log.email_signataire}" - ) - - elif event_type == "transaction.refused": - signature_log.statut = StatutSignatureDB.REFUSE - logger.warning(f"Signature refusée: {signature_log.document_id}") - - elif event_type == "transaction.expired": - signature_log.statut = StatutSignatureDB.EXPIRE - logger.warning(f"⏰ Transaction expirée: {signature_log.document_id}") - - await session.commit() - - return { - "status": "processed", - "event": event_type, - "transaction_id": transaction_id, - } - - except Exception as e: - logger.error(f"Erreur webhook Universign: {e}") - return {"status": "error", "message": str(e)} - - @app.get("/admin/signatures/relances-auto", tags=["Admin"]) async def relancer_signatures_automatique(session: AsyncSession = Depends(get_session)): try: @@ -3149,7 +2823,9 @@ async def get_current_sage_config( # Routes Collaborateurs @app.get( - "/collaborateurs", response_model=List[CollaborateurDetails], tags=["Collaborateurs"] + "/collaborateurs", + response_model=List[CollaborateurDetails], + tags=["Collaborateurs"], ) async def lister_collaborateurs( filtre: Optional[str] = Query(None, description="Filtre sur nom/prénom"), diff --git a/database/__init__.py b/database/__init__.py index 5a459cd..1a4a9ff 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -20,6 +20,13 @@ from database.enum.status import ( StatutSignature, ) from database.models.workflow import WorkflowLog +from database.models.universign import ( + UniversignTransaction, + UniversignSigner, + UniversignSyncLog, + LocalDocumentStatus, + SageDocumentType, +) __all__ = [ "engine", @@ -39,4 +46,9 @@ __all__ = [ "RefreshToken", "LoginAttempt", "SageGatewayConfig", + "UniversignTransaction", + "UniversignSigner", + "UniversignSyncLog", + "LocalDocumentStatus", + "SageDocumentType", ] diff --git a/database/models/universign.py b/database/models/universign.py new file mode 100644 index 0000000..f34a4ae --- /dev/null +++ b/database/models/universign.py @@ -0,0 +1,322 @@ +from sqlalchemy import ( + Column, + String, + DateTime, + Boolean, + Integer, + Text, + Enum as SQLEnum, + ForeignKey, + Index, +) +from sqlalchemy.orm import relationship +from datetime import datetime +from enum import Enum +from database.models.generic_model import Base + + +# ======================================== +# ENUMS DE STATUTS +# ======================================== + + +class UniversignTransactionStatus(str, Enum): + """Statuts de transaction Universign (exhaustifs)""" + + DRAFT = "draft" # Transaction créée, non démarrée + READY = "ready" # Prête à démarrer + STARTED = "started" # Démarrée, en cours + COMPLETED = "completed" # Tous signés ✓ + REFUSED = "refused" # Refusé par un signataire + EXPIRED = "expired" # Délai expiré + CANCELED = "canceled" # Annulée manuellement + FAILED = "failed" # Erreur technique + + +class UniversignSignerStatus(str, Enum): + """Statuts d'un signataire individuel""" + + WAITING = "waiting" # En attente + VIEWED = "viewed" # Document ouvert + SIGNED = "signed" # Signé ✓ + REFUSED = "refused" # Refusé + EXPIRED = "expired" # Expiré + + +class LocalDocumentStatus(str, Enum): + """Statuts métier locaux (simplifié pour l'UI)""" + + PENDING = "EN_ATTENTE" # Transaction non démarrée ou en attente + IN_PROGRESS = "EN_COURS" # Envoyé, en cours de signature + SIGNED = "SIGNE" # Signé avec succès + REJECTED = "REFUSE" # Refusé ou annulé + EXPIRED = "EXPIRE" # Expiré + ERROR = "ERREUR" # Erreur technique + + +class SageDocumentType(int, Enum): + """Types de documents Sage (synchronisé avec Sage)""" + + DEVIS = 0 + BON_COMMANDE = 10 + PREPARATION = 20 + BON_LIVRAISON = 30 + BON_RETOUR = 40 + BON_AVOIR = 50 + FACTURE = 60 + + +# ======================================== +# TABLE PRINCIPALE : TRANSACTIONS UNIVERSIGN +# ======================================== + + +class UniversignTransaction(Base): + """ + Table centrale : synchronisation bidirectionnelle Universign ↔ Local + """ + + __tablename__ = "universign_transactions" + + # === IDENTIFIANTS === + id = Column(String(36), primary_key=True) # UUID local + transaction_id = Column( + String(255), + unique=True, + nullable=False, + index=True, + comment="ID Universign (ex: tr_abc123)", + ) + + # === LIEN AVEC LE DOCUMENT SAGE === + sage_document_id = Column( + String(50), + nullable=False, + index=True, + comment="Numéro du document Sage (ex: DE00123)", + ) + sage_document_type = Column( + SQLEnum(SageDocumentType), nullable=False, comment="Type de document Sage" + ) + + # === STATUTS UNIVERSIGN (SOURCE DE VÉRITÉ) === + universign_status = Column( + SQLEnum(UniversignTransactionStatus), + nullable=False, + default=UniversignTransactionStatus.DRAFT, + index=True, + comment="Statut brut Universign", + ) + universign_status_updated_at = Column( + DateTime, nullable=True, comment="Dernière MAJ du statut Universign" + ) + + # === STATUT LOCAL (DÉDUIT) === + local_status = Column( + SQLEnum(LocalDocumentStatus), + nullable=False, + default=LocalDocumentStatus.PENDING, + index=True, + comment="Statut métier simplifié pour l'UI", + ) + + # === URLS ET MÉTADONNÉES UNIVERSIGN === + signer_url = Column(Text, nullable=True, comment="URL de signature") + document_url = Column(Text, nullable=True, comment="URL du document signé") + certificate_url = Column(Text, nullable=True, comment="URL du certificat") + + # === SIGNATAIRES === + signers_data = Column( + Text, nullable=True, comment="JSON des signataires (snapshot)" + ) + + # === INFORMATIONS MÉTIER === + requester_email = Column(String(255), nullable=True) + requester_name = Column(String(255), nullable=True) + document_name = Column(String(500), nullable=True) + + # === DATES CLÉS === + created_at = Column( + DateTime, + default=datetime.now, + nullable=False, + comment="Date de création locale", + ) + sent_at = Column( + DateTime, nullable=True, comment="Date d'envoi Universign (started)" + ) + signed_at = Column(DateTime, nullable=True, comment="Date de signature complète") + refused_at = Column(DateTime, nullable=True) + expired_at = Column(DateTime, nullable=True) + canceled_at = Column(DateTime, nullable=True) + + # === SYNCHRONISATION === + last_synced_at = Column( + DateTime, nullable=True, comment="Dernière sync réussie avec Universign" + ) + sync_attempts = Column(Integer, default=0, comment="Nombre de tentatives de sync") + sync_error = Column(Text, nullable=True) + + # === FLAGS === + is_test = Column( + Boolean, default=False, comment="Transaction en environnement .alpha" + ) + needs_sync = Column( + Boolean, default=True, index=True, comment="À synchroniser avec Universign" + ) + webhook_received = Column(Boolean, default=False, comment="Webhook Universign reçu") + + # === RELATION === + signers = relationship( + "UniversignSigner", back_populates="transaction", cascade="all, delete-orphan" + ) + sync_logs = relationship( + "UniversignSyncLog", back_populates="transaction", cascade="all, delete-orphan" + ) + + # === INDEXES COMPOSITES === + __table_args__ = ( + Index("idx_sage_doc", "sage_document_id", "sage_document_type"), + Index("idx_sync_status", "needs_sync", "universign_status"), + Index("idx_dates", "created_at", "signed_at"), + ) + + def __repr__(self): + return ( + f"" + ) + + +# ======================================== +# TABLE SECONDAIRE : SIGNATAIRES +# ======================================== + + +class UniversignSigner(Base): + """ + Détail de chaque signataire d'une transaction + """ + + __tablename__ = "universign_signers" + + id = Column(String(36), primary_key=True) + transaction_id = Column( + String(36), + ForeignKey("universign_transactions.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # === DONNÉES SIGNATAIRE === + email = Column(String(255), nullable=False, index=True) + name = Column(String(255), nullable=True) + phone = Column(String(50), nullable=True) + + # === STATUT === + status = Column( + SQLEnum(UniversignSignerStatus), + default=UniversignSignerStatus.WAITING, + nullable=False, + ) + + # === ACTIONS === + viewed_at = Column(DateTime, nullable=True) + signed_at = Column(DateTime, nullable=True) + refused_at = Column(DateTime, nullable=True) + refusal_reason = Column(Text, nullable=True) + + # === MÉTADONNÉES === + ip_address = Column(String(45), nullable=True) + user_agent = Column(Text, nullable=True) + signature_method = Column(String(50), nullable=True) + + # === ORDRE === + order_index = Column(Integer, default=0) + + # === RELATION === + transaction = relationship("UniversignTransaction", back_populates="signers") + + def __repr__(self): + return f"" + + +# ======================================== +# TABLE DE LOGS : SYNCHRONISATION +# ======================================== + + +class UniversignSyncLog(Base): + """ + Journal de toutes les synchronisations (audit trail) + """ + + __tablename__ = "universign_sync_logs" + + id = Column(Integer, primary_key=True, autoincrement=True) + transaction_id = Column( + String(36), + ForeignKey("universign_transactions.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + + # === SYNC INFO === + sync_type = Column(String(50), nullable=False, comment="webhook, polling, manual") + sync_timestamp = Column(DateTime, default=datetime.now, nullable=False, index=True) + + # === CHANGEMENTS DÉTECTÉS === + previous_status = Column(String(50), nullable=True) + new_status = Column(String(50), nullable=True) + changes_detected = Column(Text, nullable=True, comment="JSON des changements") + + # === RÉSULTAT === + success = Column(Boolean, default=True) + error_message = Column(Text, nullable=True) + http_status_code = Column(Integer, nullable=True) + response_time_ms = Column(Integer, nullable=True) + + # === RELATION === + transaction = relationship("UniversignSyncLog", back_populates="sync_logs") + + def __repr__(self): + return f"" + + +# ======================================== +# TABLE DE CONFIGURATION +# ======================================== + + +class UniversignConfig(Base): + """ + Configuration Universign par environnement/utilisateur + """ + + __tablename__ = "universign_configs" + + id = Column(String(36), primary_key=True) + user_id = Column(String(36), nullable=True, index=True) + + environment = Column( + String(50), nullable=False, default="alpha", comment="alpha, prod" + ) + + api_url = Column(String(500), nullable=False) + api_key = Column(String(500), nullable=False, comment="⚠️ À chiffrer") + + # === OPTIONS === + webhook_url = Column(String(500), nullable=True) + webhook_secret = Column(String(255), nullable=True) + + auto_sync_enabled = Column(Boolean, default=True) + sync_interval_minutes = Column(Integer, default=5) + + signature_expiry_days = Column(Integer, default=30) + + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.now) + + def __repr__(self): + return f"" diff --git a/routes/universign.py b/routes/universign.py new file mode 100644 index 0000000..faa6588 --- /dev/null +++ b/routes/universign.py @@ -0,0 +1,538 @@ +from fastapi import APIRouter, Depends, HTTPException, Query, Request +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, func +from sqlalchemy.orm import selectinload +from typing import List, Optional +from datetime import datetime +from pydantic import BaseModel, EmailStr +import logging + +from database import get_session +from database import ( + UniversignTransaction, + UniversignSigner, + UniversignSyncLog, + LocalDocumentStatus, + SageDocumentType, +) +from services.universign_sync import UniversignSyncService +from config.config import settings +from utils.universign_status_mapping import get_status_message + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/universign", tags=["Universign"]) + +sync_service = UniversignSyncService( + api_url=settings.universign_api_url, api_key=settings.universign_api_key +) + + +class CreateSignatureRequest(BaseModel): + """Demande de création d'une signature""" + + sage_document_id: str + sage_document_type: SageDocumentType + signer_email: EmailStr + signer_name: str + document_name: Optional[str] = None + + +class TransactionResponse(BaseModel): + """Réponse détaillée d'une transaction""" + + id: str + transaction_id: str + sage_document_id: str + sage_document_type: str + universign_status: str + local_status: str + local_status_label: str + signer_url: Optional[str] + document_url: Optional[str] + created_at: datetime + sent_at: Optional[datetime] + signed_at: Optional[datetime] + last_synced_at: Optional[datetime] + needs_sync: bool + signers: List[dict] + + +class SyncStatsResponse(BaseModel): + """Statistiques de synchronisation""" + + total_transactions: int + pending_sync: int + signed: int + in_progress: int + refused: int + expired: int + last_sync_at: Optional[datetime] + + +@router.post("/signatures/create", response_model=TransactionResponse) +async def create_signature( + request: CreateSignatureRequest, session: AsyncSession = Depends(get_session) +): + try: + # === 1. GÉNÉRATION PDF === + from sage_client import sage_client + + pdf_bytes = sage_client.generer_pdf_document( + request.sage_document_id, request.sage_document_type.value + ) + + if not pdf_bytes: + raise HTTPException(400, "Échec génération PDF") + + # === 2. CRÉATION TRANSACTION UNIVERSIGN === + import requests + import uuid + + auth = (settings.universign_api_key, "") + + # Créer la transaction + resp = requests.post( + f"{settings.universign_api_url}/transactions", + auth=auth, + json={ + "name": request.document_name + or f"{request.sage_document_type.name} {request.sage_document_id}", + "language": "fr", + }, + timeout=30, + ) + + if resp.status_code != 200: + raise HTTPException(500, f"Erreur Universign: {resp.status_code}") + + universign_tx_id = resp.json().get("id") + + # Upload le fichier + files = { + "file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf") + } + resp = requests.post( + f"{settings.universign_api_url}/files", auth=auth, files=files, timeout=60 + ) + + if resp.status_code not in [200, 201]: + raise HTTPException(500, "Erreur upload PDF") + + file_id = resp.json().get("id") + + # Attacher le document + resp = requests.post( + f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents", + auth=auth, + data={"document": file_id}, + timeout=30, + ) + + if resp.status_code not in [200, 201]: + raise HTTPException(500, "Erreur attachement document") + + document_id = resp.json().get("id") + + # Créer le champ de signature + resp = requests.post( + f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields", + auth=auth, + data={"type": "signature"}, + timeout=30, + ) + + if resp.status_code not in [200, 201]: + raise HTTPException(500, "Erreur création champ signature") + + field_id = resp.json().get("id") + + # Lier le signataire + resp = requests.post( + f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures", + auth=auth, + data={"signer": request.signer_email, "field": field_id}, + timeout=30, + ) + + if resp.status_code not in [200, 201]: + raise HTTPException(500, "Erreur liaison signataire") + + # Démarrer la transaction + resp = requests.post( + f"{settings.universign_api_url}/transactions/{universign_tx_id}/start", + auth=auth, + timeout=30, + ) + + if resp.status_code not in [200, 201]: + raise HTTPException(500, "Erreur démarrage transaction") + + final_data = resp.json() + + # Extraire l'URL de signature + signer_url = "" + if final_data.get("actions"): + for action in final_data["actions"]: + if action.get("url"): + signer_url = action["url"] + break + + if not signer_url: + raise HTTPException(500, "URL de signature non retournée") + + # === 3. ENREGISTREMENT LOCAL === + + local_id = str(uuid.uuid4()) + + transaction = UniversignTransaction( + id=local_id, + transaction_id=universign_tx_id, + sage_document_id=request.sage_document_id, + sage_document_type=request.sage_document_type, + universign_status="started", + local_status=LocalDocumentStatus.EN_COURS, + signer_url=signer_url, + requester_email=request.signer_email, + requester_name=request.signer_name, + document_name=request.document_name, + created_at=datetime.now(), + sent_at=datetime.now(), + is_test=True, # Environnement .alpha + needs_sync=True, + ) + + session.add(transaction) + + # Signataire + signer = UniversignSigner( + id=f"{local_id}_signer_0", + transaction_id=local_id, + email=request.signer_email, + name=request.signer_name, + status="waiting", + order_index=0, + ) + + session.add(signer) + + await session.commit() + + # === 4. ENVOI EMAIL (via email_queue) === + from email_queue import email_queue + from database.models.email import EmailLog + from database.enum.status import StatutEmail + + email_log = EmailLog( + id=str(uuid.uuid4()), + destinataire=request.signer_email, + sujet=f"Signature requise - {request.sage_document_type.name} {request.sage_document_id}", + corps_html=f""" +

Bonjour {request.signer_name},

+

Merci de signer le document suivant :

+

Cliquez ici pour signer

+ """, + document_ids=request.sage_document_id, + type_document=request.sage_document_type.value, + statut=StatutEmail.EN_ATTENTE, + date_creation=datetime.now(), + nb_tentatives=0, + ) + + session.add(email_log) + await session.commit() + + email_queue.enqueue(email_log.id) + + # === RÉPONSE === + + return TransactionResponse( + id=transaction.id, + transaction_id=transaction.transaction_id, + sage_document_id=transaction.sage_document_id, + sage_document_type=transaction.sage_document_type.name, + universign_status=transaction.universign_status.value, + local_status=transaction.local_status.value, + local_status_label=get_status_message(transaction.local_status.value), + signer_url=transaction.signer_url, + document_url=None, + created_at=transaction.created_at, + sent_at=transaction.sent_at, + signed_at=None, + last_synced_at=None, + needs_sync=True, + signers=[ + { + "email": signer.email, + "name": signer.name, + "status": signer.status.value, + } + ], + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Erreur création signature: {e}", exc_info=True) + raise HTTPException(500, str(e)) + + +@router.get("/transactions", response_model=List[TransactionResponse]) +async def list_transactions( + status: Optional[LocalDocumentStatus] = None, + sage_document_id: Optional[str] = None, + limit: int = Query(100, le=1000), + session: AsyncSession = Depends(get_session), +): + """Liste toutes les transactions""" + query = select(UniversignTransaction).options( + selectinload(UniversignTransaction.signers) + ) + + if status: + query = query.where(UniversignTransaction.local_status == status) + + if sage_document_id: + query = query.where(UniversignTransaction.sage_document_id == sage_document_id) + + query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit) + + result = await session.execute(query) + transactions = result.scalars().all() + + return [ + TransactionResponse( + id=tx.id, + transaction_id=tx.transaction_id, + sage_document_id=tx.sage_document_id, + sage_document_type=tx.sage_document_type.name, + universign_status=tx.universign_status.value, + local_status=tx.local_status.value, + local_status_label=get_status_message(tx.local_status.value), + signer_url=tx.signer_url, + document_url=tx.document_url, + created_at=tx.created_at, + sent_at=tx.sent_at, + signed_at=tx.signed_at, + last_synced_at=tx.last_synced_at, + needs_sync=tx.needs_sync, + signers=[ + { + "email": s.email, + "name": s.name, + "status": s.status.value, + "signed_at": s.signed_at.isoformat() if s.signed_at else None, + } + for s in tx.signers + ], + ) + for tx in transactions + ] + + +@router.get("/transactions/{transaction_id}", response_model=TransactionResponse) +async def get_transaction( + transaction_id: str, session: AsyncSession = Depends(get_session) +): + """Récupère une transaction par son ID""" + query = ( + select(UniversignTransaction) + .where(UniversignTransaction.transaction_id == transaction_id) + .options(selectinload(UniversignTransaction.signers)) + ) + + result = await session.execute(query) + tx = result.scalar_one_or_none() + + if not tx: + raise HTTPException(404, "Transaction introuvable") + + return TransactionResponse( + id=tx.id, + transaction_id=tx.transaction_id, + sage_document_id=tx.sage_document_id, + sage_document_type=tx.sage_document_type.name, + universign_status=tx.universign_status.value, + local_status=tx.local_status.value, + local_status_label=get_status_message(tx.local_status.value), + signer_url=tx.signer_url, + document_url=tx.document_url, + created_at=tx.created_at, + sent_at=tx.sent_at, + signed_at=tx.signed_at, + last_synced_at=tx.last_synced_at, + needs_sync=tx.needs_sync, + signers=[ + { + "email": s.email, + "name": s.name, + "status": s.status.value, + "signed_at": s.signed_at.isoformat() if s.signed_at else None, + } + for s in tx.signers + ], + ) + + +@router.post("/transactions/{transaction_id}/sync") +async def sync_single_transaction( + transaction_id: str, + force: bool = Query(False), + session: AsyncSession = Depends(get_session), +): + """Force la synchronisation d'une transaction""" + query = select(UniversignTransaction).where( + UniversignTransaction.transaction_id == transaction_id + ) + result = await session.execute(query) + transaction = result.scalar_one_or_none() + + if not transaction: + raise HTTPException(404, "Transaction introuvable") + + success, error = await sync_service.sync_transaction( + session, transaction, force=force + ) + + if not success: + raise HTTPException(500, error or "Échec synchronisation") + + return { + "success": True, + "transaction_id": transaction_id, + "new_status": transaction.local_status.value, + "synced_at": transaction.last_synced_at.isoformat(), + } + + +@router.post("/sync/all") +async def sync_all_transactions( + max_transactions: int = Query(50, le=500), + session: AsyncSession = Depends(get_session), +): + """Synchronise toutes les transactions en attente""" + stats = await sync_service.sync_all_pending(session, max_transactions) + + return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()} + + +@router.post("/webhook") +async def webhook_universign( + request: Request, session: AsyncSession = Depends(get_session) +): + try: + payload = await request.json() + + logger.info( + f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}" + ) + + success, error = await sync_service.process_webhook(session, payload) + + if not success: + logger.error(f"Erreur traitement webhook: {error}") + return {"status": "error", "message": error}, 500 + + return { + "status": "processed", + "event": payload.get("event"), + "transaction_id": payload.get("transaction_id"), + } + + except Exception as e: + logger.error(f"Erreur webhook: {e}", exc_info=True) + return {"status": "error", "message": str(e)}, 500 + + +@router.get("/stats", response_model=SyncStatsResponse) +async def get_sync_stats(session: AsyncSession = Depends(get_session)): + """Statistiques globales de synchronisation""" + + # Total + total_query = select(func.count(UniversignTransaction.id)) + total = (await session.execute(total_query)).scalar() + + # En attente de sync + pending_query = select(func.count(UniversignTransaction.id)).where( + UniversignTransaction.needs_sync + ) + pending = (await session.execute(pending_query)).scalar() + + # Par statut + signed_query = select(func.count(UniversignTransaction.id)).where( + UniversignTransaction.local_status == LocalDocumentStatus.SIGNE + ) + signed = (await session.execute(signed_query)).scalar() + + in_progress_query = select(func.count(UniversignTransaction.id)).where( + UniversignTransaction.local_status == LocalDocumentStatus.EN_COURS + ) + in_progress = (await session.execute(in_progress_query)).scalar() + + refused_query = select(func.count(UniversignTransaction.id)).where( + UniversignTransaction.local_status == LocalDocumentStatus.REFUSE + ) + refused = (await session.execute(refused_query)).scalar() + + expired_query = select(func.count(UniversignTransaction.id)).where( + UniversignTransaction.local_status == LocalDocumentStatus.EXPIRE + ) + expired = (await session.execute(expired_query)).scalar() + + # Dernière sync + last_sync_query = select(func.max(UniversignTransaction.last_synced_at)) + last_sync = (await session.execute(last_sync_query)).scalar() + + return SyncStatsResponse( + total_transactions=total, + pending_sync=pending, + signed=signed, + in_progress=in_progress, + refused=refused, + expired=expired, + last_sync_at=last_sync, + ) + + +@router.get("/transactions/{transaction_id}/logs") +async def get_transaction_logs( + transaction_id: str, + limit: int = Query(50, le=500), + session: AsyncSession = Depends(get_session), +): + # Trouver la transaction + tx_query = select(UniversignTransaction).where( + UniversignTransaction.transaction_id == transaction_id + ) + tx_result = await session.execute(tx_query) + tx = tx_result.scalar_one_or_none() + + if not tx: + raise HTTPException(404, "Transaction introuvable") + + # Logs + logs_query = ( + select(UniversignSyncLog) + .where(UniversignSyncLog.transaction_id == tx.id) + .order_by(UniversignSyncLog.sync_timestamp.desc()) + .limit(limit) + ) + + logs_result = await session.execute(logs_query) + logs = logs_result.scalars().all() + + return { + "transaction_id": transaction_id, + "total_syncs": len(logs), + "logs": [ + { + "sync_type": log.sync_type, + "timestamp": log.sync_timestamp.isoformat(), + "success": log.success, + "previous_status": log.previous_status, + "new_status": log.new_status, + "error_message": log.error_message, + "response_time_ms": log.response_time_ms, + } + for log in logs + ], + } diff --git a/services/universign_sync.py b/services/universign_sync.py new file mode 100644 index 0000000..c9af7c9 --- /dev/null +++ b/services/universign_sync.py @@ -0,0 +1,575 @@ +""" +Service de synchronisation Universign +Architecture : polling + webhooks avec retry et gestion d'erreurs +""" + +import requests +import json +import logging +from typing import Dict, List, Optional, Tuple +from datetime import datetime, timedelta +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_, or_ +from sqlalchemy.orm import selectinload + +from database.models.universign_models import ( + UniversignTransaction, + UniversignSigner, + UniversignSyncLog, + UniversignTransactionStatus, + LocalDocumentStatus, + UniversignSignerStatus, +) +from status_mapping import ( + map_universign_to_local, + get_sage_status_code, + is_transition_allowed, + get_status_actions, + is_final_status, + resolve_status_conflict, +) + +logger = logging.getLogger(__name__) + + +class UniversignSyncService: + """ + Service centralisé de synchronisation Universign + """ + + def __init__(self, api_url: str, api_key: str, timeout: int = 30): + self.api_url = api_url.rstrip("/") + self.api_key = api_key + self.timeout = timeout + self.auth = (api_key, "") + + # ======================================== + # 1. RÉCUPÉRATION DEPUIS UNIVERSIGN + # ======================================== + + def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: + """ + Récupère le statut actuel d'une transaction depuis Universign + + Args: + transaction_id: ID Universign (ex: tr_abc123) + + Returns: + Dictionnaire avec les données Universign ou None + """ + start_time = datetime.now() + + try: + response = requests.get( + f"{self.api_url}/transactions/{transaction_id}", + auth=self.auth, + timeout=self.timeout, + headers={"Accept": "application/json"}, + ) + + response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) + + if response.status_code == 200: + data = response.json() + logger.info( + f"✓ Fetch OK: {transaction_id} " + f"status={data.get('state')} " + f"({response_time_ms}ms)" + ) + return { + "transaction": data, + "http_status": 200, + "response_time_ms": response_time_ms, + "fetched_at": datetime.now(), + } + + elif response.status_code == 404: + logger.warning( + f"Transaction {transaction_id} introuvable sur Universign" + ) + return None + + else: + logger.error( + f"Erreur HTTP {response.status_code} " + f"pour {transaction_id}: {response.text}" + ) + return None + + except requests.exceptions.Timeout: + logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") + return None + + except Exception as e: + logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) + return None + + # ======================================== + # 2. SYNCHRONISATION UNITAIRE + # ======================================== + + async def sync_transaction( + self, + session: AsyncSession, + transaction: UniversignTransaction, + force: bool = False, + ) -> Tuple[bool, Optional[str]]: + """ + Synchronise UNE transaction avec Universign + + Args: + session: Session BDD + transaction: Transaction à synchroniser + force: Forcer même si statut final + + Returns: + (success: bool, error_message: Optional[str]) + """ + # === VÉRIFICATIONS PRÉALABLES === + + # Si statut final et pas de force, skip + if is_final_status(transaction.local_status.value) and not force: + logger.debug( + f"Skip {transaction.transaction_id}: " + f"statut final {transaction.local_status.value}" + ) + transaction.needs_sync = False + await session.commit() + return True, None + + # === FETCH UNIVERSIGN === + + result = self.fetch_transaction_status(transaction.transaction_id) + + if not result: + error = "Échec récupération données Universign" + await self._log_sync_attempt(session, transaction, "polling", False, error) + return False, error + + # === EXTRACTION DONNÉES === + + universign_data = result["transaction"] + universign_status_raw = universign_data.get("state", "draft") + + # === MAPPING STATUT === + + new_local_status = map_universign_to_local(universign_status_raw) + previous_local_status = transaction.local_status.value + + # === VALIDATION TRANSITION === + + if not is_transition_allowed(previous_local_status, new_local_status): + logger.warning( + f"Transition refusée: {previous_local_status} → {new_local_status}" + ) + # En cas de conflit, résoudre par priorité + new_local_status = resolve_status_conflict( + previous_local_status, new_local_status + ) + + # === DÉTECTION CHANGEMENT === + + status_changed = previous_local_status != new_local_status + + if not status_changed and not force: + logger.debug(f"Pas de changement pour {transaction.transaction_id}") + transaction.last_synced_at = datetime.now() + transaction.needs_sync = False + await session.commit() + return True, None + + # === MISE À JOUR TRANSACTION === + + transaction.universign_status = UniversignTransactionStatus( + universign_status_raw + ) + transaction.local_status = LocalDocumentStatus(new_local_status) + transaction.universign_status_updated_at = datetime.now() + + # === DATES SPÉCIFIQUES === + + if new_local_status == "EN_COURS" and not transaction.sent_at: + transaction.sent_at = datetime.now() + + if new_local_status == "SIGNE" and not transaction.signed_at: + transaction.signed_at = datetime.now() + + if new_local_status == "REFUSE" and not transaction.refused_at: + transaction.refused_at = datetime.now() + + if new_local_status == "EXPIRE" and not transaction.expired_at: + transaction.expired_at = datetime.now() + + # === URLS === + + if "signers" in universign_data and len(universign_data["signers"]) > 0: + first_signer = universign_data["signers"][0] + if "url" in first_signer: + transaction.signer_url = first_signer["url"] + + if "documents" in universign_data and len(universign_data["documents"]) > 0: + first_doc = universign_data["documents"][0] + if "url" in first_doc: + transaction.document_url = first_doc["url"] + + # === SIGNATAIRES === + + await self._sync_signers(session, transaction, universign_data) + + # === FLAGS === + + transaction.last_synced_at = datetime.now() + transaction.sync_attempts += 1 + transaction.needs_sync = not is_final_status(new_local_status) + transaction.sync_error = None + + # === LOG === + + await self._log_sync_attempt( + session=session, + transaction=transaction, + sync_type="polling", + success=True, + error_message=None, + previous_status=previous_local_status, + new_status=new_local_status, + changes=json.dumps( + { + "status_changed": status_changed, + "universign_raw": universign_status_raw, + "response_time_ms": result.get("response_time_ms"), + } + ), + ) + + await session.commit() + + # === ACTIONS MÉTIER === + + if status_changed: + await self._execute_status_actions(session, transaction, new_local_status) + + logger.info( + f"✓ Sync OK: {transaction.transaction_id} " + f"{previous_local_status} → {new_local_status}" + ) + + return True, None + + # ======================================== + # 3. SYNCHRONISATION DE MASSE (POLLING) + # ======================================== + + async def sync_all_pending( + self, session: AsyncSession, max_transactions: int = 50 + ) -> Dict[str, int]: + """ + Synchronise toutes les transactions en attente + + Args: + session: Session BDD + max_transactions: Nombre max de transactions à traiter + + Returns: + Statistiques de synchronisation + """ + # === SÉLECTION TRANSACTIONS À SYNCHRONISER === + + query = ( + select(UniversignTransaction) + .where( + and_( + UniversignTransaction.needs_sync == True, + or_( + # Transactions non finales + ~UniversignTransaction.local_status.in_( + [ + LocalDocumentStatus.SIGNE, + LocalDocumentStatus.REFUSE, + LocalDocumentStatus.EXPIRE, + ] + ), + # OU dernière sync > 1h (vérification finale) + UniversignTransaction.last_synced_at + < (datetime.now() - timedelta(hours=1)), + # OU jamais synchronisé + UniversignTransaction.last_synced_at.is_(None), + ), + ) + ) + .order_by(UniversignTransaction.created_at.asc()) + .limit(max_transactions) + ) + + result = await session.execute(query) + transactions = result.scalars().all() + + # === STATISTIQUES === + + stats = { + "total_found": len(transactions), + "success": 0, + "failed": 0, + "skipped": 0, + "status_changes": 0, + } + + # === TRAITEMENT === + + for transaction in transactions: + try: + previous_status = transaction.local_status.value + + success, error = await self.sync_transaction( + session, transaction, force=False + ) + + if success: + stats["success"] += 1 + + if transaction.local_status.value != previous_status: + stats["status_changes"] += 1 + else: + stats["failed"] += 1 + + except Exception as e: + logger.error( + f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True + ) + stats["failed"] += 1 + + logger.info( + f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " + f"{stats['status_changes']} changements détectés" + ) + + return stats + + # ======================================== + # 4. WEBHOOK HANDLER + # ======================================== + + async def process_webhook( + self, session: AsyncSession, payload: Dict + ) -> Tuple[bool, Optional[str]]: + """ + Traite un webhook Universign + + Args: + session: Session BDD + payload: Corps du webhook + + Returns: + (success: bool, error_message: Optional[str]) + """ + try: + # === EXTRACTION DONNÉES === + + event_type = payload.get("event") + transaction_id = payload.get("transaction_id") or payload.get("id") + + if not transaction_id: + return False, "Pas de transaction_id dans le webhook" + + # === RECHERCHE TRANSACTION === + + query = select(UniversignTransaction).where( + UniversignTransaction.transaction_id == transaction_id + ) + result = await session.execute(query) + transaction = result.scalar_one_or_none() + + if not transaction: + logger.warning( + f"Webhook reçu pour transaction inconnue: {transaction_id}" + ) + return False, "Transaction inconnue" + + # === MARQUAGE WEBHOOK === + + transaction.webhook_received = True + + # === SYNCHRONISATION === + + success, error = await self.sync_transaction( + session, transaction, force=True + ) + + # === LOG WEBHOOK === + + await self._log_sync_attempt( + session=session, + transaction=transaction, + sync_type=f"webhook:{event_type}", + success=success, + error_message=error, + changes=json.dumps(payload), + ) + + await session.commit() + + logger.info( + f"✓ Webhook traité: {transaction_id} " + f"event={event_type} success={success}" + ) + + return success, error + + except Exception as e: + logger.error(f"Erreur traitement webhook: {e}", exc_info=True) + return False, str(e) + + # ======================================== + # 5. HELPERS PRIVÉS + # ======================================== + + async def _sync_signers( + self, + session: AsyncSession, + transaction: UniversignTransaction, + universign_data: Dict, + ): + """Synchronise les signataires""" + signers_data = universign_data.get("signers", []) + + # Supprimer les anciens signataires + for signer in transaction.signers: + await session.delete(signer) + + # Créer les nouveaux + for idx, signer_data in enumerate(signers_data): + signer = UniversignSigner( + id=f"{transaction.id}_signer_{idx}", + transaction_id=transaction.id, + email=signer_data.get("email", ""), + name=signer_data.get("name"), + status=UniversignSignerStatus(signer_data.get("status", "waiting")), + order_index=idx, + viewed_at=self._parse_date(signer_data.get("viewed_at")), + signed_at=self._parse_date(signer_data.get("signed_at")), + refused_at=self._parse_date(signer_data.get("refused_at")), + ) + session.add(signer) + + async def _log_sync_attempt( + self, + session: AsyncSession, + transaction: UniversignTransaction, + sync_type: str, + success: bool, + error_message: Optional[str] = None, + previous_status: Optional[str] = None, + new_status: Optional[str] = None, + changes: Optional[str] = None, + ): + """Enregistre une tentative de sync dans les logs""" + log = UniversignSyncLog( + transaction_id=transaction.id, + sync_type=sync_type, + sync_timestamp=datetime.now(), + previous_status=previous_status, + new_status=new_status, + changes_detected=changes, + success=success, + error_message=error_message, + ) + session.add(log) + + async def _execute_status_actions( + self, session: AsyncSession, transaction: UniversignTransaction, new_status: str + ): + """Exécute les actions métier associées au statut""" + actions = get_status_actions(new_status) + + if not actions: + return + + # Mise à jour Sage + if actions.get("update_sage_status"): + await self._update_sage_status(transaction, new_status) + + # Déclencher workflow + if actions.get("trigger_workflow"): + await self._trigger_workflow(transaction) + + # Notifications + if actions.get("send_notification"): + await self._send_notification(transaction, new_status) + + # Archive + if actions.get("archive_document"): + await self._archive_signed_document(transaction) + + async def _update_sage_status(self, transaction, status): + """Met à jour le statut dans Sage""" + # TODO: Appeler sage_client.mettre_a_jour_champ_libre() + logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") + + async def _trigger_workflow(self, transaction): + """Déclenche un workflow (ex: devis→commande)""" + logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") + + async def _send_notification(self, transaction, status): + """Envoie une notification email""" + logger.info(f"TODO: Notif pour {transaction.sage_document_id}") + + async def _archive_signed_document(self, transaction): + """Archive le document signé""" + logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") + + @staticmethod + def _parse_date(date_str: Optional[str]) -> Optional[datetime]: + """Parse une date ISO 8601""" + if not date_str: + return None + try: + return datetime.fromisoformat(date_str.replace("Z", "+00:00")) + except: + return None + + +# ======================================== +# 6. TÂCHE PLANIFIÉE (BACKGROUND) +# ======================================== + + +class UniversignSyncScheduler: + """ + Planificateur de synchronisation automatique + """ + + def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): + self.sync_service = sync_service + self.interval_minutes = interval_minutes + self.is_running = False + + async def start(self, session_factory): + """Démarre le polling automatique""" + import asyncio + + self.is_running = True + + logger.info( + f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" + ) + + while self.is_running: + try: + async with session_factory() as session: + stats = await self.sync_service.sync_all_pending(session) + + logger.info( + f"Polling: {stats['success']} transactions synchronisées, " + f"{stats['status_changes']} changements" + ) + + except Exception as e: + logger.error(f"Erreur polling: {e}", exc_info=True) + + # Attendre avant le prochain cycle + await asyncio.sleep(self.interval_minutes * 60) + + def stop(self): + """Arrête le polling""" + self.is_running = False + logger.info("Arrêt polling Universign") diff --git a/utils/generic_functions.py b/utils/generic_functions.py index 221d5b7..94eb4f2 100644 --- a/utils/generic_functions.py +++ b/utils/generic_functions.py @@ -1,6 +1,7 @@ -from typing import Dict, List +from typing import Dict, List, Optional from config.config import settings import logging +from enum import Enum from datetime import datetime import uuid @@ -278,4 +279,272 @@ def _preparer_lignes_document(lignes: List) -> List[Dict]: ] +# ======================================== +# MAPPING UNIVERSIGN → LOCAL +# ======================================== + +UNIVERSIGN_TO_LOCAL: Dict[str, str] = { + # États initiaux + "draft": "EN_ATTENTE", # Transaction créée + "ready": "EN_ATTENTE", # Prête mais pas envoyée + # En cours + "started": "EN_COURS", # Envoyée, en attente de signature + # États finaux (succès) + "completed": "SIGNE", # ✓ Signé avec succès + # États finaux (échec) + "refused": "REFUSE", # ✗ Refusé par un signataire + "expired": "EXPIRE", # ⏰ Délai expiré + "canceled": "REFUSE", # ✗ Annulé manuellement + "failed": "ERREUR", # ⚠️ Erreur technique +} + + +# ======================================== +# MAPPING LOCAL → SAGE (CHAMP LIBRE) +# ======================================== + +LOCAL_TO_SAGE_STATUS: Dict[str, int] = { + """ + À stocker dans un champ libre Sage (ex: CB_STATUT_SIGNATURE) + """ + "EN_ATTENTE": 0, # Non envoyé + "EN_COURS": 1, # Envoyé, en attente + "SIGNE": 2, # ✓ Signé (peut déclencher workflow) + "REFUSE": 3, # ✗ Refusé + "EXPIRE": 4, # ⏰ Expiré + "ERREUR": 5, # ⚠️ Erreur +} + + +# ======================================== +# ACTIONS MÉTIER PAR STATUT +# ======================================== + +STATUS_ACTIONS: Dict[str, Dict[str, any]] = { + """ + Actions automatiques à déclencher selon le statut + """ + "SIGNE": { + "update_sage_status": True, # Mettre à jour Sage + "trigger_workflow": True, # Déclencher transformation (devis→commande) + "send_notification": True, # Email de confirmation + "archive_document": True, # Archiver le PDF signé + "update_sage_field": "CB_DateSignature", # Champ libre Sage + }, + "REFUSE": { + "update_sage_status": True, + "trigger_workflow": False, + "send_notification": True, + "archive_document": False, + "alert_sales": True, # Alerter commercial + }, + "EXPIRE": { + "update_sage_status": True, + "trigger_workflow": False, + "send_notification": True, + "archive_document": False, + "schedule_reminder": True, # Programmer relance + }, + "ERREUR": { + "update_sage_status": False, + "trigger_workflow": False, + "send_notification": False, + "log_error": True, + "retry_sync": True, + }, +} + + +# ======================================== +# RÈGLES DE TRANSITION +# ======================================== + +ALLOWED_TRANSITIONS: Dict[str, list] = { + """ + Transitions de statuts autorisées (validation) + """ + "EN_ATTENTE": ["EN_COURS", "ERREUR"], + "EN_COURS": ["SIGNE", "REFUSE", "EXPIRE", "ERREUR"], + "SIGNE": [], # État final, pas de retour + "REFUSE": [], # État final + "EXPIRE": [], # État final + "ERREUR": ["EN_ATTENTE", "EN_COURS"], # Retry possible +} + + +# ======================================== +# FONCTION DE MAPPING +# ======================================== + + +def map_universign_to_local(universign_status: str) -> str: + """ + Convertit un statut Universign en statut local + + Args: + universign_status: Statut brut Universign + + Returns: + Statut local normalisé + """ + return UNIVERSIGN_TO_LOCAL.get( + universign_status.lower(), + "ERREUR", # Fallback si statut inconnu + ) + + +def get_sage_status_code(local_status: str) -> int: + """ + Obtient le code numérique pour Sage + + Args: + local_status: Statut local (EN_ATTENTE, SIGNE, etc.) + + Returns: + Code numérique pour champ libre Sage + """ + return LOCAL_TO_SAGE_STATUS.get(local_status, 5) + + +def is_transition_allowed(from_status: str, to_status: str) -> bool: + """ + Vérifie si une transition de statut est valide + + Args: + from_status: Statut actuel + to_status: Nouveau statut + + Returns: + True si la transition est autorisée + """ + if from_status == to_status: + return True # Même statut = OK (idempotence) + + allowed = ALLOWED_TRANSITIONS.get(from_status, []) + return to_status in allowed + + +def get_status_actions(local_status: str) -> Dict[str, any]: + """ + Obtient les actions à exécuter pour un statut + + Args: + local_status: Statut local + + Returns: + Dictionnaire des actions à exécuter + """ + return STATUS_ACTIONS.get(local_status, {}) + + +def is_final_status(local_status: str) -> bool: + """ + Détermine si le statut est final (pas de synchronisation nécessaire) + + Args: + local_status: Statut local + + Returns: + True si statut final + """ + return local_status in ["SIGNE", "REFUSE", "EXPIRE"] + + +# ======================================== +# PRIORITÉS DE STATUTS (POUR CONFLITS) +# ======================================== + +STATUS_PRIORITY: Dict[str, int] = { + """ + En cas de conflit de synchronisation, prendre le statut + avec la priorité la plus élevée + """ + "ERREUR": 0, # Priorité la plus basse + "EN_ATTENTE": 1, + "EN_COURS": 2, + "EXPIRE": 3, + "REFUSE": 4, + "SIGNE": 5, # Priorité la plus élevée (état final souhaité) +} + + +def resolve_status_conflict(status_a: str, status_b: str) -> str: + """ + Résout un conflit entre deux statuts (prend le plus prioritaire) + + Args: + status_a: Premier statut + status_b: Second statut + + Returns: + Statut prioritaire + """ + priority_a = STATUS_PRIORITY.get(status_a, 0) + priority_b = STATUS_PRIORITY.get(status_b, 0) + + return status_a if priority_a >= priority_b else status_b + + +# ======================================== +# MESSAGES UTILISATEUR +# ======================================== + +STATUS_MESSAGES: Dict[str, Dict[str, str]] = { + "EN_ATTENTE": { + "fr": "Document en attente d'envoi", + "en": "Document pending", + "icon": "⏳", + "color": "gray", + }, + "EN_COURS": { + "fr": "En attente de signature", + "en": "Awaiting signature", + "icon": "✍️", + "color": "blue", + }, + "SIGNE": { + "fr": "Signé avec succès", + "en": "Successfully signed", + "icon": "✅", + "color": "green", + }, + "REFUSE": { + "fr": "Signature refusée", + "en": "Signature refused", + "icon": "❌", + "color": "red", + }, + "EXPIRE": { + "fr": "Délai de signature expiré", + "en": "Signature expired", + "icon": "⏰", + "color": "orange", + }, + "ERREUR": { + "fr": "Erreur technique", + "en": "Technical error", + "icon": "⚠️", + "color": "red", + }, +} + + +def get_status_message(local_status: str, lang: str = "fr") -> str: + """ + Obtient le message utilisateur pour un statut + + Args: + local_status: Statut local + lang: Langue (fr, en) + + Returns: + Message formaté + """ + status_info = STATUS_MESSAGES.get(local_status, {}) + icon = status_info.get("icon", "") + message = status_info.get(lang, local_status) + + return f"{icon} {message}" + + __all__ = ["_preparer_lignes_document"] diff --git a/utils/universign_status_mapping.py b/utils/universign_status_mapping.py new file mode 100644 index 0000000..6bc0c9e --- /dev/null +++ b/utils/universign_status_mapping.py @@ -0,0 +1,274 @@ +""" +Mapping exhaustif : Universign ↔ Local ↔ Sage +""" + +from typing import Dict, Optional +from enum import Enum + + +# ======================================== +# MAPPING UNIVERSIGN → LOCAL +# ======================================== + +UNIVERSIGN_TO_LOCAL: Dict[str, str] = { + # États initiaux + "draft": "EN_ATTENTE", # Transaction créée + "ready": "EN_ATTENTE", # Prête mais pas envoyée + # En cours + "started": "EN_COURS", # Envoyée, en attente de signature + # États finaux (succès) + "completed": "SIGNE", # ✓ Signé avec succès + # États finaux (échec) + "refused": "REFUSE", # ✗ Refusé par un signataire + "expired": "EXPIRE", # ⏰ Délai expiré + "canceled": "REFUSE", # ✗ Annulé manuellement + "failed": "ERREUR", # ⚠️ Erreur technique +} + + +# ======================================== +# MAPPING LOCAL → SAGE (CHAMP LIBRE) +# ======================================== + +LOCAL_TO_SAGE_STATUS: Dict[str, int] = { + """ + À stocker dans un champ libre Sage (ex: CB_STATUT_SIGNATURE) + """ + "EN_ATTENTE": 0, # Non envoyé + "EN_COURS": 1, # Envoyé, en attente + "SIGNE": 2, # ✓ Signé (peut déclencher workflow) + "REFUSE": 3, # ✗ Refusé + "EXPIRE": 4, # ⏰ Expiré + "ERREUR": 5, # ⚠️ Erreur +} + + +# ======================================== +# ACTIONS MÉTIER PAR STATUT +# ======================================== + +STATUS_ACTIONS: Dict[str, Dict[str, any]] = { + """ + Actions automatiques à déclencher selon le statut + """ + "SIGNE": { + "update_sage_status": True, # Mettre à jour Sage + "trigger_workflow": True, # Déclencher transformation (devis→commande) + "send_notification": True, # Email de confirmation + "archive_document": True, # Archiver le PDF signé + "update_sage_field": "CB_DateSignature", # Champ libre Sage + }, + "REFUSE": { + "update_sage_status": True, + "trigger_workflow": False, + "send_notification": True, + "archive_document": False, + "alert_sales": True, # Alerter commercial + }, + "EXPIRE": { + "update_sage_status": True, + "trigger_workflow": False, + "send_notification": True, + "archive_document": False, + "schedule_reminder": True, # Programmer relance + }, + "ERREUR": { + "update_sage_status": False, + "trigger_workflow": False, + "send_notification": False, + "log_error": True, + "retry_sync": True, + }, +} + + +# ======================================== +# RÈGLES DE TRANSITION +# ======================================== + +ALLOWED_TRANSITIONS: Dict[str, list] = { + """ + Transitions de statuts autorisées (validation) + """ + "EN_ATTENTE": ["EN_COURS", "ERREUR"], + "EN_COURS": ["SIGNE", "REFUSE", "EXPIRE", "ERREUR"], + "SIGNE": [], # État final, pas de retour + "REFUSE": [], # État final + "EXPIRE": [], # État final + "ERREUR": ["EN_ATTENTE", "EN_COURS"], # Retry possible +} + + +# ======================================== +# FONCTION DE MAPPING +# ======================================== + + +def map_universign_to_local(universign_status: str) -> str: + """ + Convertit un statut Universign en statut local + + Args: + universign_status: Statut brut Universign + + Returns: + Statut local normalisé + """ + return UNIVERSIGN_TO_LOCAL.get( + universign_status.lower(), + "ERREUR", # Fallback si statut inconnu + ) + + +def get_sage_status_code(local_status: str) -> int: + """ + Obtient le code numérique pour Sage + + Args: + local_status: Statut local (EN_ATTENTE, SIGNE, etc.) + + Returns: + Code numérique pour champ libre Sage + """ + return LOCAL_TO_SAGE_STATUS.get(local_status, 5) + + +def is_transition_allowed(from_status: str, to_status: str) -> bool: + """ + Vérifie si une transition de statut est valide + + Args: + from_status: Statut actuel + to_status: Nouveau statut + + Returns: + True si la transition est autorisée + """ + if from_status == to_status: + return True # Même statut = OK (idempotence) + + allowed = ALLOWED_TRANSITIONS.get(from_status, []) + return to_status in allowed + + +def get_status_actions(local_status: str) -> Dict[str, any]: + """ + Obtient les actions à exécuter pour un statut + + Args: + local_status: Statut local + + Returns: + Dictionnaire des actions à exécuter + """ + return STATUS_ACTIONS.get(local_status, {}) + + +def is_final_status(local_status: str) -> bool: + """ + Détermine si le statut est final (pas de synchronisation nécessaire) + + Args: + local_status: Statut local + + Returns: + True si statut final + """ + return local_status in ["SIGNE", "REFUSE", "EXPIRE"] + + +# ======================================== +# PRIORITÉS DE STATUTS (POUR CONFLITS) +# ======================================== + +STATUS_PRIORITY: Dict[str, int] = { + """ + En cas de conflit de synchronisation, prendre le statut + avec la priorité la plus élevée + """ + "ERREUR": 0, # Priorité la plus basse + "EN_ATTENTE": 1, + "EN_COURS": 2, + "EXPIRE": 3, + "REFUSE": 4, + "SIGNE": 5, # Priorité la plus élevée (état final souhaité) +} + + +def resolve_status_conflict(status_a: str, status_b: str) -> str: + """ + Résout un conflit entre deux statuts (prend le plus prioritaire) + + Args: + status_a: Premier statut + status_b: Second statut + + Returns: + Statut prioritaire + """ + priority_a = STATUS_PRIORITY.get(status_a, 0) + priority_b = STATUS_PRIORITY.get(status_b, 0) + + return status_a if priority_a >= priority_b else status_b + + +# ======================================== +# MESSAGES UTILISATEUR +# ======================================== + +STATUS_MESSAGES: Dict[str, Dict[str, str]] = { + "EN_ATTENTE": { + "fr": "Document en attente d'envoi", + "en": "Document pending", + "icon": "⏳", + "color": "gray", + }, + "EN_COURS": { + "fr": "En attente de signature", + "en": "Awaiting signature", + "icon": "✍️", + "color": "blue", + }, + "SIGNE": { + "fr": "Signé avec succès", + "en": "Successfully signed", + "icon": "✅", + "color": "green", + }, + "REFUSE": { + "fr": "Signature refusée", + "en": "Signature refused", + "icon": "❌", + "color": "red", + }, + "EXPIRE": { + "fr": "Délai de signature expiré", + "en": "Signature expired", + "icon": "⏰", + "color": "orange", + }, + "ERREUR": { + "fr": "Erreur technique", + "en": "Technical error", + "icon": "⚠️", + "color": "red", + }, +} + + +def get_status_message(local_status: str, lang: str = "fr") -> str: + """ + Obtient le message utilisateur pour un statut + + Args: + local_status: Statut local + lang: Langue (fr, en) + + Returns: + Message formaté + """ + status_info = STATUS_MESSAGES.get(local_status, {}) + icon = status_info.get("icon", "") + message = status_info.get(lang, local_status) + + return f"{icon} {message}"