From 7d1a68f4e521f43ce9cec8a1240238f722db40a2 Mon Sep 17 00:00:00 2001 From: Fanilo-Nantenaina Date: Tue, 6 Jan 2026 17:07:09 +0300 Subject: [PATCH] feat(universign): add transaction validation and status synchronization --- api.py | 9 +- routes/universign.py | 33 ++++++ services/universign_sync.py | 225 ++++++++++++++++++++++-------------- 3 files changed, 176 insertions(+), 91 deletions(-) diff --git a/api.py b/api.py index db1a6dd..a78fcbc 100644 --- a/api.py +++ b/api.py @@ -129,14 +129,19 @@ async def lifespan(app: FastAPI): api_url=settings.universign_api_url, api_key=settings.universign_api_key ) + # Configuration du service avec les dépendances + sync_service.configure( + sage_client=sage_client, email_queue=email_queue, settings=settings + ) + scheduler = UniversignSyncScheduler( sync_service=sync_service, - interval_minutes=5, # Synchronisation toutes les 5 minutes + interval_minutes=5, ) sync_task = asyncio.create_task(scheduler.start(async_session_factory)) - logger.info("✓ Synchronisation Universign démarrée (5min)") + logger.info("Synchronisation Universign démarrée (5min)") yield diff --git a/routes/universign.py b/routes/universign.py index 582047d..37a3c8c 100644 --- a/routes/universign.py +++ b/routes/universign.py @@ -80,6 +80,24 @@ async def create_signature( request: CreateSignatureRequest, session: AsyncSession = Depends(get_session) ): try: + # === VÉRIFICATION DOUBLON === + existing_query = select(UniversignTransaction).where( + UniversignTransaction.sage_document_id == request.sage_document_id, + UniversignTransaction.sage_document_type == request.sage_document_type, + UniversignTransaction.local_status.in_( + [LocalDocumentStatus.PENDING, LocalDocumentStatus.IN_PROGRESS] + ), + ) + existing_result = await session.execute(existing_query) + existing_tx = existing_result.scalar_one_or_none() + + if existing_tx: + raise HTTPException( + 400, + f"Une demande de signature est déjà en cours pour {request.sage_document_id} " + f"(transaction: {existing_tx.transaction_id})", + ) + pdf_bytes = email_queue._generate_pdf( request.sage_document_id, normaliser_type_doc(request.sage_document_type) ) @@ -265,6 +283,21 @@ async def create_signature( email_queue.enqueue(email_log.id) + # === MISE À JOUR STATUT SAGE (Confirmé = 1) === + try: + from sage_client import sage_client + + sage_client.changer_statut_document( + document_type_code=request.sage_document_type.value, + numero=request.sage_document_id, + nouveau_statut=1, + ) + logger.info( + f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)" + ) + except Exception as e: + logger.warning(f"Impossible de mettre à jour le statut Sage: {e}") + # === RÉPONSE === return TransactionResponse( id=transaction.id, diff --git a/services/universign_sync.py b/services/universign_sync.py index f4f288c..f1d4c77 100644 --- a/services/universign_sync.py +++ b/services/universign_sync.py @@ -1,10 +1,17 @@ +""" +Service de synchronisation Universign +Architecture : polling + webhooks avec retry et gestion d'erreurs +""" + import requests import json import logging +import uuid from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ +from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, @@ -13,7 +20,10 @@ from database import ( UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, + EmailLog, + StatutEmail, ) +from data.data import templates_signature_email from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, @@ -31,6 +41,14 @@ class UniversignSyncService: self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") + self.sage_client = None + self.email_queue = None + self.settings = None + + def configure(self, sage_client, email_queue, settings): + self.sage_client = sage_client + self.email_queue = email_queue + self.settings = settings def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() @@ -48,9 +66,7 @@ class UniversignSyncService: if response.status_code == 200: data = response.json() logger.info( - f"✓ Fetch OK: {transaction_id} " - f"status={data.get('state')} " - f"({response_time_ms}ms)" + f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)" ) return { "transaction": data, @@ -67,8 +83,7 @@ class UniversignSyncService: else: logger.error( - f"Erreur HTTP {response.status_code} " - f"pour {transaction_id}: {response.text}" + f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}" ) return None @@ -88,15 +103,12 @@ class UniversignSyncService: ) -> Tuple[bool, Optional[str]]: if is_final_status(transaction.local_status.value) and not force: logger.debug( - f"Skip {transaction.transaction_id}: " - f"statut final {transaction.local_status.value}" + f"Skip {transaction.transaction_id}: statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None - # === FETCH UNIVERSIGN === - result = self.fetch_transaction_status(transaction.transaction_id) if not result: @@ -104,29 +116,20 @@ class UniversignSyncService: await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error - # === EXTRACTION DONNÉES === - universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") - # === MAPPING STATUT === - new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value - # === VALIDATION TRANSITION === - if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) - # En cas de conflit, résoudre par priorité new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) - # === DÉTECTION CHANGEMENT === - status_changed = previous_local_status != new_local_status if not status_changed and not force: @@ -136,16 +139,20 @@ class UniversignSyncService: await session.commit() return True, None - # === MISE À JOUR TRANSACTION === + try: + transaction.universign_status = UniversignTransactionStatus( + universign_status_raw + ) + except ValueError: + transaction.universign_status = ( + UniversignTransactionStatus.COMPLETED + if new_local_status == "SIGNE" + else UniversignTransactionStatus.FAILED + ) - transaction.universign_status = UniversignTransactionStatus( - universign_status_raw - ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() - # === DATES SPÉCIFIQUES === - if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() @@ -158,31 +165,18 @@ class UniversignSyncService: if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() - # === URLS === - - if "signers" in universign_data and len(universign_data["signers"]) > 0: - first_signer = universign_data["signers"][0] - if "url" in first_signer: - transaction.signer_url = first_signer["url"] - - if "documents" in universign_data and len(universign_data["documents"]) > 0: + if universign_data.get("documents") and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] - if "url" in first_doc: + if first_doc.get("url"): transaction.document_url = first_doc["url"] - # === SIGNATAIRES === - await self._sync_signers(session, transaction, universign_data) - # === FLAGS === - transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None - # === LOG === - await self._log_sync_attempt( session=session, transaction=transaction, @@ -202,14 +196,11 @@ class UniversignSyncService: await session.commit() - # === ACTIONS MÉTIER === - if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( - f"✓ Sync OK: {transaction.transaction_id} " - f"{previous_local_status} → {new_local_status}" + f"Sync OK: {transaction.transaction_id} {previous_local_status} → {new_local_status}" ) return True, None @@ -217,14 +208,9 @@ class UniversignSyncService: async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: - """ - Synchronise toutes les transactions en attente - """ - from sqlalchemy.orm import selectinload # Si pas déjà importé en haut - query = ( select(UniversignTransaction) - .options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE + .options(selectinload(UniversignTransaction.signers)) .where( and_( UniversignTransaction.needs_sync, @@ -267,7 +253,6 @@ class UniversignSyncService: if success: stats["success"] += 1 - if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: @@ -280,8 +265,7 @@ class UniversignSyncService: stats["failed"] += 1 logger.info( - f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " - f"{stats['status_changes']} changements détectés" + f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés" ) return stats @@ -296,8 +280,10 @@ class UniversignSyncService: if not transaction_id: return False, "Pas de transaction_id dans le webhook" - query = select(UniversignTransaction).where( - UniversignTransaction.transaction_id == transaction_id + query = ( + select(UniversignTransaction) + .options(selectinload(UniversignTransaction.signers)) + .where(UniversignTransaction.transaction_id == transaction_id) ) result = await session.execute(query) transaction = result.scalar_one_or_none() @@ -326,8 +312,7 @@ class UniversignSyncService: await session.commit() logger.info( - f"✓ Webhook traité: {transaction_id} " - f"event={event_type} success={success}" + f"Webhook traité: {transaction_id} event={event_type} success={success}" ) return success, error @@ -342,14 +327,16 @@ class UniversignSyncService: transaction: UniversignTransaction, universign_data: Dict, ): - """Synchronise les signataires""" signers_data = universign_data.get("signers", []) - # Supprimer les anciens signataires - for signer in transaction.signers: + if not signers_data: + return + + existing_ids = {s.id for s in transaction.signers} + + for signer in list(transaction.signers): await session.delete(signer) - # Créer les nouveaux for idx, signer_data in enumerate(signers_data): signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}", @@ -375,7 +362,6 @@ class UniversignSyncService: new_status: Optional[str] = None, changes: Optional[str] = None, ): - """Enregistre une tentative de sync dans les logs""" log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, @@ -391,48 +377,112 @@ class UniversignSyncService: async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): - """Exécute les actions métier associées au statut""" actions = get_status_actions(new_status) if not actions: return - # Mise à jour Sage if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) - # Déclencher workflow - if actions.get("trigger_workflow"): - await self._trigger_workflow(transaction) - - # Notifications if actions.get("send_notification"): - await self._send_notification(transaction, new_status) + await self._send_notification(session, transaction, new_status) - # Archive - if actions.get("archive_document"): - await self._archive_signed_document(transaction) + async def _update_sage_status( + self, transaction: UniversignTransaction, status: str + ): + if not self.sage_client: + logger.warning("sage_client non configuré pour mise à jour Sage") + return - async def _update_sage_status(self, transaction, status): - """Met à jour le statut dans Sage""" - # TODO: Appeler sage_client.mettre_a_jour_champ_libre() - logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") + try: + type_doc = transaction.sage_document_type.value + doc_id = transaction.sage_document_id - async def _trigger_workflow(self, transaction): - """Déclenche un workflow (ex: devis→commande)""" - logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") + if status == "SIGNE": + self.sage_client.changer_statut_document( + document_type_code=type_doc, numero=doc_id, nouveau_statut=2 + ) + logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)") - async def _send_notification(self, transaction, status): - """Envoie une notification email""" - logger.info(f"TODO: Notif pour {transaction.sage_document_id}") + elif status == "EN_COURS": + self.sage_client.changer_statut_document( + document_type_code=type_doc, numero=doc_id, nouveau_statut=1 + ) + logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)") - async def _archive_signed_document(self, transaction): - """Archive le document signé""" - logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") + except Exception as e: + logger.error( + f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}" + ) + + async def _send_notification( + self, session: AsyncSession, transaction: UniversignTransaction, status: str + ): + if not self.email_queue or not self.settings: + logger.warning("email_queue ou settings non configuré") + return + + try: + if status == "SIGNE": + template = templates_signature_email["signature_confirmee"] + + type_labels = { + 0: "Devis", + 10: "Commande", + 30: "Bon de Livraison", + 60: "Facture", + 50: "Avoir", + } + + variables = { + "NOM_SIGNATAIRE": transaction.requester_name or "Client", + "TYPE_DOC": type_labels.get( + transaction.sage_document_type.value, "Document" + ), + "NUMERO": transaction.sage_document_id, + "DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M") + if transaction.signed_at + else datetime.now().strftime("%d/%m/%Y à %H:%M"), + "TRANSACTION_ID": transaction.transaction_id, + "CONTACT_EMAIL": self.settings.smtp_from, + } + + sujet = template["sujet"] + corps = template["corps_html"] + + for var, valeur in variables.items(): + sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) + corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) + + email_log = EmailLog( + id=str(uuid.uuid4()), + destinataire=transaction.requester_email, + sujet=sujet, + corps_html=corps, + document_ids=transaction.sage_document_id, + type_document=transaction.sage_document_type.value, + statut=StatutEmail.EN_ATTENTE, + date_creation=datetime.now(), + nb_tentatives=0, + ) + + session.add(email_log) + await session.flush() + + self.email_queue.enqueue(email_log.id) + + logger.info( + f"Email confirmation signature envoyé à {transaction.requester_email}" + ) + + except Exception as e: + logger.error( + f"Erreur envoi notification pour {transaction.transaction_id}: {e}" + ) @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: - """Parse une date ISO 8601""" if not date_str: return None try: @@ -448,7 +498,6 @@ class UniversignSyncScheduler: self.is_running = False async def start(self, session_factory): - """Démarre le polling automatique""" import asyncio self.is_running = True @@ -470,10 +519,8 @@ class UniversignSyncScheduler: except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) - # Attendre avant le prochain cycle await asyncio.sleep(self.interval_minutes * 60) def stop(self): - """Arrête le polling""" self.is_running = False logger.info("Arrêt polling Universign")