""" Service de synchronisation Universign Architecture : polling + webhooks avec retry et gestion d'erreurs """ import requests import json import logging from typing import Dict, List, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, ) from utils.universign_status_mapping import ( map_universign_to_local, get_sage_status_code, is_transition_allowed, get_status_actions, is_final_status, resolve_status_conflict, ) logger = logging.getLogger(__name__) class UniversignSyncService: """ Service centralisé de synchronisation Universign """ def __init__(self, api_url: str, api_key: str, timeout: int = 30): self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") # ======================================== # 1. RÉCUPÉRATION DEPUIS UNIVERSIGN # ======================================== def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: """ Récupère le statut actuel d'une transaction depuis Universign Args: transaction_id: ID Universign (ex: tr_abc123) Returns: Dictionnaire avec les données Universign ou None """ start_time = datetime.now() try: response = requests.get( f"{self.api_url}/transactions/{transaction_id}", auth=self.auth, timeout=self.timeout, headers={"Accept": "application/json"}, ) response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) if response.status_code == 200: data = response.json() logger.info( f"✓ Fetch OK: {transaction_id} " f"status={data.get('state')} " f"({response_time_ms}ms)" ) return { "transaction": data, "http_status": 200, "response_time_ms": response_time_ms, "fetched_at": datetime.now(), } elif response.status_code == 404: logger.warning( f"Transaction {transaction_id} introuvable sur Universign" ) return None else: logger.error( f"Erreur HTTP {response.status_code} " f"pour {transaction_id}: {response.text}" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") return None except Exception as e: logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) return None # ======================================== # 2. SYNCHRONISATION UNITAIRE # ======================================== async def sync_transaction( self, session: AsyncSession, transaction: UniversignTransaction, force: bool = False, ) -> Tuple[bool, Optional[str]]: """ Synchronise UNE transaction avec Universign Args: session: Session BDD transaction: Transaction à synchroniser force: Forcer même si statut final Returns: (success: bool, error_message: Optional[str]) """ # === VÉRIFICATIONS PRÉALABLES === # Si statut final et pas de force, skip if is_final_status(transaction.local_status.value) and not force: logger.debug( f"Skip {transaction.transaction_id}: " f"statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None # === FETCH UNIVERSIGN === result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error # === EXTRACTION DONNÉES === universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") # === MAPPING STATUT === new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value # === VALIDATION TRANSITION === if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) # En cas de conflit, résoudre par priorité new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) # === DÉTECTION CHANGEMENT === status_changed = previous_local_status != new_local_status if not status_changed and not force: logger.debug(f"Pas de changement pour {transaction.transaction_id}") transaction.last_synced_at = datetime.now() transaction.needs_sync = False await session.commit() return True, None # === MISE À JOUR TRANSACTION === transaction.universign_status = UniversignTransactionStatus( universign_status_raw ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() # === DATES SPÉCIFIQUES === if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() # === URLS === if "signers" in universign_data and len(universign_data["signers"]) > 0: first_signer = universign_data["signers"][0] if "url" in first_signer: transaction.signer_url = first_signer["url"] if "documents" in universign_data and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] if "url" in first_doc: transaction.document_url = first_doc["url"] # === SIGNATAIRES === await self._sync_signers(session, transaction, universign_data) # === FLAGS === transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None # === LOG === await self._log_sync_attempt( session=session, transaction=transaction, sync_type="polling", success=True, error_message=None, previous_status=previous_local_status, new_status=new_local_status, changes=json.dumps( { "status_changed": status_changed, "universign_raw": universign_status_raw, "response_time_ms": result.get("response_time_ms"), } ), ) await session.commit() # === ACTIONS MÉTIER === if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( f"✓ Sync OK: {transaction.transaction_id} " f"{previous_local_status} → {new_local_status}" ) return True, None # ======================================== # 3. SYNCHRONISATION DE MASSE (POLLING) # ======================================== async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: """ Synchronise toutes les transactions en attente Args: session: Session BDD max_transactions: Nombre max de transactions à traiter Returns: Statistiques de synchronisation """ # === SÉLECTION TRANSACTIONS À SYNCHRONISER === query = ( select(UniversignTransaction) .where( and_( UniversignTransaction.needs_sync, or_( # Transactions non finales ~UniversignTransaction.local_status.in_( [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, ] ), # OU dernière sync > 1h (vérification finale) UniversignTransaction.last_synced_at < (datetime.now() - timedelta(hours=1)), # OU jamais synchronisé UniversignTransaction.last_synced_at.is_(None), ), ) ) .order_by(UniversignTransaction.created_at.asc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() # === STATISTIQUES === stats = { "total_found": len(transactions), "success": 0, "failed": 0, "skipped": 0, "status_changes": 0, } # === TRAITEMENT === for transaction in transactions: try: previous_status = transaction.local_status.value success, error = await self.sync_transaction( session, transaction, force=False ) if success: stats["success"] += 1 if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: stats["failed"] += 1 except Exception as e: logger.error( f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True ) stats["failed"] += 1 logger.info( f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " f"{stats['status_changes']} changements détectés" ) return stats # ======================================== # 4. WEBHOOK HANDLER # ======================================== async def process_webhook( self, session: AsyncSession, payload: Dict ) -> Tuple[bool, Optional[str]]: """ Traite un webhook Universign Args: session: Session BDD payload: Corps du webhook Returns: (success: bool, error_message: Optional[str]) """ try: # === EXTRACTION DONNÉES === event_type = payload.get("event") transaction_id = payload.get("transaction_id") or payload.get("id") if not transaction_id: return False, "Pas de transaction_id dans le webhook" # === RECHERCHE TRANSACTION === query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: logger.warning( f"Webhook reçu pour transaction inconnue: {transaction_id}" ) return False, "Transaction inconnue" # === MARQUAGE WEBHOOK === transaction.webhook_received = True # === SYNCHRONISATION === success, error = await self.sync_transaction( session, transaction, force=True ) # === LOG WEBHOOK === await self._log_sync_attempt( session=session, transaction=transaction, sync_type=f"webhook:{event_type}", success=success, error_message=error, changes=json.dumps(payload), ) await session.commit() logger.info( f"✓ Webhook traité: {transaction_id} " f"event={event_type} success={success}" ) return success, error except Exception as e: logger.error(f"Erreur traitement webhook: {e}", exc_info=True) return False, str(e) # ======================================== # 5. HELPERS PRIVÉS # ======================================== async def _sync_signers( self, session: AsyncSession, transaction: UniversignTransaction, universign_data: Dict, ): """Synchronise les signataires""" signers_data = universign_data.get("signers", []) # Supprimer les anciens signataires for signer in transaction.signers: await session.delete(signer) # Créer les nouveaux for idx, signer_data in enumerate(signers_data): signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}", transaction_id=transaction.id, email=signer_data.get("email", ""), name=signer_data.get("name"), status=UniversignSignerStatus(signer_data.get("status", "waiting")), order_index=idx, viewed_at=self._parse_date(signer_data.get("viewed_at")), signed_at=self._parse_date(signer_data.get("signed_at")), refused_at=self._parse_date(signer_data.get("refused_at")), ) session.add(signer) async def _log_sync_attempt( self, session: AsyncSession, transaction: UniversignTransaction, sync_type: str, success: bool, error_message: Optional[str] = None, previous_status: Optional[str] = None, new_status: Optional[str] = None, changes: Optional[str] = None, ): """Enregistre une tentative de sync dans les logs""" log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, sync_timestamp=datetime.now(), previous_status=previous_status, new_status=new_status, changes_detected=changes, success=success, error_message=error_message, ) session.add(log) async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): """Exécute les actions métier associées au statut""" actions = get_status_actions(new_status) if not actions: return # Mise à jour Sage if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) # Déclencher workflow if actions.get("trigger_workflow"): await self._trigger_workflow(transaction) # Notifications if actions.get("send_notification"): await self._send_notification(transaction, new_status) # Archive if actions.get("archive_document"): await self._archive_signed_document(transaction) async def _update_sage_status(self, transaction, status): """Met à jour le statut dans Sage""" # TODO: Appeler sage_client.mettre_a_jour_champ_libre() logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") async def _trigger_workflow(self, transaction): """Déclenche un workflow (ex: devis→commande)""" logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") async def _send_notification(self, transaction, status): """Envoie une notification email""" logger.info(f"TODO: Notif pour {transaction.sage_document_id}") async def _archive_signed_document(self, transaction): """Archive le document signé""" logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: """Parse une date ISO 8601""" if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except: return None # ======================================== # 6. TÂCHE PLANIFIÉE (BACKGROUND) # ======================================== class UniversignSyncScheduler: """ Planificateur de synchronisation automatique """ def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): self.sync_service = sync_service self.interval_minutes = interval_minutes self.is_running = False async def start(self, session_factory): """Démarre le polling automatique""" import asyncio self.is_running = True logger.info( f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" ) while self.is_running: try: async with session_factory() as session: stats = await self.sync_service.sync_all_pending(session) logger.info( f"Polling: {stats['success']} transactions synchronisées, " f"{stats['status_changes']} changements" ) except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) # Attendre avant le prochain cycle await asyncio.sleep(self.interval_minutes * 60) def stop(self): """Arrête le polling""" self.is_running = False logger.info("Arrêt polling Universign")