""" Service de synchronisation Universign Architecture : polling + webhooks avec retry et gestion d'erreurs """ import requests import json import logging import uuid from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, EmailLog, StatutEmail, ) from data.data import templates_signature_email from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, get_status_actions, is_final_status, resolve_status_conflict, ) logger = logging.getLogger(__name__) class UniversignSyncService: def __init__(self, api_url: str, api_key: str, timeout: int = 30): self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") self.sage_client = None self.email_queue = None self.settings = None def configure(self, sage_client, email_queue, settings): self.sage_client = sage_client self.email_queue = email_queue self.settings = settings def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() try: response = requests.get( f"{self.api_url}/transactions/{transaction_id}", auth=self.auth, timeout=self.timeout, headers={"Accept": "application/json"}, ) response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) if response.status_code == 200: data = response.json() logger.info( f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)" ) return { "transaction": data, "http_status": 200, "response_time_ms": response_time_ms, "fetched_at": datetime.now(), } elif response.status_code == 404: logger.warning( f"Transaction {transaction_id} introuvable sur Universign" ) return None else: logger.error( f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") return None except Exception as e: logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) return None async def sync_transaction( self, session: AsyncSession, transaction: UniversignTransaction, force: bool = False, ) -> Tuple[bool, Optional[str]]: if is_final_status(transaction.local_status.value) and not force: logger.debug( f"Skip {transaction.transaction_id}: statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) status_changed = previous_local_status != new_local_status if not status_changed and not force: logger.debug(f"Pas de changement pour {transaction.transaction_id}") transaction.last_synced_at = datetime.now() transaction.needs_sync = False await session.commit() return True, None try: transaction.universign_status = UniversignTransactionStatus( universign_status_raw ) except ValueError: transaction.universign_status = ( UniversignTransactionStatus.COMPLETED if new_local_status == "SIGNE" else UniversignTransactionStatus.FAILED ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() if universign_data.get("documents") and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] if first_doc.get("url"): transaction.document_url = first_doc["url"] await self._sync_signers(session, transaction, universign_data) transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None await self._log_sync_attempt( session=session, transaction=transaction, sync_type="polling", success=True, error_message=None, previous_status=previous_local_status, new_status=new_local_status, changes=json.dumps( { "status_changed": status_changed, "universign_raw": universign_status_raw, "response_time_ms": result.get("response_time_ms"), } ), ) await session.commit() if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( f"Sync OK: {transaction.transaction_id} {previous_local_status} → {new_local_status}" ) return True, None async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where( and_( UniversignTransaction.needs_sync, or_( ~UniversignTransaction.local_status.in_( [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, ] ), UniversignTransaction.last_synced_at < (datetime.now() - timedelta(hours=1)), UniversignTransaction.last_synced_at.is_(None), ), ) ) .order_by(UniversignTransaction.created_at.asc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() stats = { "total_found": len(transactions), "success": 0, "failed": 0, "skipped": 0, "status_changes": 0, } for transaction in transactions: try: previous_status = transaction.local_status.value success, error = await self.sync_transaction( session, transaction, force=False ) if success: stats["success"] += 1 if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: stats["failed"] += 1 except Exception as e: logger.error( f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True ) stats["failed"] += 1 logger.info( f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés" ) return stats async def process_webhook( self, session: AsyncSession, payload: Dict ) -> Tuple[bool, Optional[str]]: try: event_type = payload.get("event") transaction_id = payload.get("transaction_id") or payload.get("id") if not transaction_id: return False, "Pas de transaction_id dans le webhook" query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where(UniversignTransaction.transaction_id == transaction_id) ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: logger.warning( f"Webhook reçu pour transaction inconnue: {transaction_id}" ) return False, "Transaction inconnue" transaction.webhook_received = True success, error = await self.sync_transaction( session, transaction, force=True ) await self._log_sync_attempt( session=session, transaction=transaction, sync_type=f"webhook:{event_type}", success=success, error_message=error, changes=json.dumps(payload), ) await session.commit() logger.info( f"Webhook traité: {transaction_id} event={event_type} success={success}" ) return success, error except Exception as e: logger.error(f"Erreur traitement webhook: {e}", exc_info=True) return False, str(e) async def _sync_signers( self, session: AsyncSession, transaction: UniversignTransaction, universign_data: Dict, ): signers_data = universign_data.get("signers", []) if not signers_data: return existing_ids = {s.id for s in transaction.signers} for signer in list(transaction.signers): await session.delete(signer) for idx, signer_data in enumerate(signers_data): signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}", transaction_id=transaction.id, email=signer_data.get("email", ""), name=signer_data.get("name"), status=UniversignSignerStatus(signer_data.get("status", "waiting")), order_index=idx, viewed_at=self._parse_date(signer_data.get("viewed_at")), signed_at=self._parse_date(signer_data.get("signed_at")), refused_at=self._parse_date(signer_data.get("refused_at")), ) session.add(signer) async def _log_sync_attempt( self, session: AsyncSession, transaction: UniversignTransaction, sync_type: str, success: bool, error_message: Optional[str] = None, previous_status: Optional[str] = None, new_status: Optional[str] = None, changes: Optional[str] = None, ): log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, sync_timestamp=datetime.now(), previous_status=previous_status, new_status=new_status, changes_detected=changes, success=success, error_message=error_message, ) session.add(log) async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): actions = get_status_actions(new_status) if not actions: return if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) if actions.get("send_notification"): await self._send_notification(session, transaction, new_status) async def _update_sage_status( self, transaction: UniversignTransaction, status: str ): if not self.sage_client: logger.warning("sage_client non configuré pour mise à jour Sage") return try: type_doc = transaction.sage_document_type.value doc_id = transaction.sage_document_id if status == "SIGNE": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=2 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)") elif status == "EN_COURS": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=1 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)") except Exception as e: logger.error( f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}" ) async def _send_notification( self, session: AsyncSession, transaction: UniversignTransaction, status: str ): if not self.email_queue or not self.settings: logger.warning("email_queue ou settings non configuré") return try: if status == "SIGNE": template = templates_signature_email["signature_confirmee"] type_labels = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 60: "Facture", 50: "Avoir", } variables = { "NOM_SIGNATAIRE": transaction.requester_name or "Client", "TYPE_DOC": type_labels.get( transaction.sage_document_type.value, "Document" ), "NUMERO": transaction.sage_document_id, "DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M") if transaction.signed_at else datetime.now().strftime("%d/%m/%Y à %H:%M"), "TRANSACTION_ID": transaction.transaction_id, "CONTACT_EMAIL": self.settings.smtp_from, } sujet = template["sujet"] corps = template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) email_log = EmailLog( id=str(uuid.uuid4()), destinataire=transaction.requester_email, sujet=sujet, corps_html=corps, document_ids=transaction.sage_document_id, type_document=transaction.sage_document_type.value, statut=StatutEmail.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() self.email_queue.enqueue(email_log.id) logger.info( f"Email confirmation signature envoyé à {transaction.requester_email}" ) except Exception as e: logger.error( f"Erreur envoi notification pour {transaction.transaction_id}: {e}" ) @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except Exception: return None class UniversignSyncScheduler: def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): self.sync_service = sync_service self.interval_minutes = interval_minutes self.is_running = False async def start(self, session_factory): import asyncio self.is_running = True logger.info( f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" ) while self.is_running: try: async with session_factory() as session: stats = await self.sync_service.sync_all_pending(session) logger.info( f"Polling: {stats['success']} transactions synchronisées, " f"{stats['status_changes']} changements" ) except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) await asyncio.sleep(self.interval_minutes * 60) def stop(self): self.is_running = False logger.info("Arrêt polling Universign")