import requests import json import logging import uuid from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, EmailLog, StatutEmail, ) from data.data import templates_signature_email from services.universign_document import UniversignDocumentService from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, get_status_actions, is_final_status, resolve_status_conflict, ) logger = logging.getLogger(__name__) class UniversignSyncService: def __init__(self, api_url: str, api_key: str, timeout: int = 30): self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") self.sage_client = None self.email_queue = None self.settings = None self.document_service = UniversignDocumentService( api_url=api_url, api_key=api_key, timeout=60 ) def configure(self, sage_client, email_queue, settings): self.sage_client = sage_client self.email_queue = email_queue self.settings = settings def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() try: response = requests.get( f"{self.api_url}/transactions/{transaction_id}", auth=self.auth, timeout=self.timeout, headers={"Accept": "application/json"}, ) response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) if response.status_code == 200: data = response.json() logger.info( f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)" ) return { "transaction": data, "http_status": 200, "response_time_ms": response_time_ms, "fetched_at": datetime.now(), } elif response.status_code == 404: logger.warning( f"Transaction {transaction_id} introuvable sur Universign" ) return None else: logger.error( f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") return None except Exception as e: logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) return None async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where( and_( UniversignTransaction.needs_sync, or_( ~UniversignTransaction.local_status.in_( [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, ] ), UniversignTransaction.last_synced_at < (datetime.now() - timedelta(hours=1)), UniversignTransaction.last_synced_at.is_(None), ), ) ) .order_by(UniversignTransaction.created_at.asc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() stats = { "total_found": len(transactions), "success": 0, "failed": 0, "skipped": 0, "status_changes": 0, } for transaction in transactions: try: previous_status = transaction.local_status.value success, error = await self.sync_transaction( session, transaction, force=False ) if success: stats["success"] += 1 if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: stats["failed"] += 1 except Exception as e: logger.error( f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True ) stats["failed"] += 1 logger.info( f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés" ) return stats # CORRECTION 1 : process_webhook dans universign_sync.py async def process_webhook( self, session: AsyncSession, payload: Dict, transaction_id: str = None ) -> Tuple[bool, Optional[str]]: """ Traite un webhook Universign - CORRECTION : meilleure gestion des payloads """ try: # Si transaction_id n'est pas fourni, essayer de l'extraire if not transaction_id: # Même logique que dans universign.py if ( payload.get("type", "").startswith("transaction.") and "payload" in payload ): nested_object = payload.get("payload", {}).get("object", {}) if nested_object.get("object") == "transaction": transaction_id = nested_object.get("id") elif payload.get("type", "").startswith("action."): transaction_id = ( payload.get("payload", {}) .get("object", {}) .get("transaction_id") ) elif payload.get("object") == "transaction": transaction_id = payload.get("id") if not transaction_id: return False, "Transaction ID manquant" event_type = payload.get("type", "webhook") logger.info( f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}" ) # Récupérer la transaction locale query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where(UniversignTransaction.transaction_id == transaction_id) ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: logger.warning(f"Transaction {transaction_id} inconnue localement") return False, "Transaction inconnue" # Marquer comme webhook reçu transaction.webhook_received = True # Stocker l'ancien statut pour comparaison old_status = transaction.local_status.value # Force la synchronisation complète success, error = await self.sync_transaction( session, transaction, force=True ) # Log du changement de statut if success and transaction.local_status.value != old_status: logger.info( f"Webhook traité: {transaction_id} | " f"{old_status} → {transaction.local_status.value}" ) # Enregistrer le log du webhook await self._log_sync_attempt( session=session, transaction=transaction, sync_type=f"webhook:{event_type}", success=success, error_message=error, previous_status=old_status, new_status=transaction.local_status.value, changes=json.dumps( payload, default=str ), # Ajout default=str pour éviter les erreurs JSON ) await session.commit() return success, error except Exception as e: logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True) return False, str(e) # CORRECTION 2 : _sync_signers - Ne pas écraser les signers existants async def _sync_signers( self, session: AsyncSession, transaction: UniversignTransaction, universign_data: Dict, ): signers_data = universign_data.get("participants", []) if not signers_data: signers_data = universign_data.get("signers", []) if not signers_data: logger.debug("Aucun signataire dans les données Universign") return existing_signers = {s.email: s for s in transaction.signers} for idx, signer_data in enumerate(signers_data): email = signer_data.get("email", "") if not email: logger.warning(f"Signataire sans email à l'index {idx}, ignoré") continue # PROTECTION : gérer les statuts inconnus raw_status = signer_data.get("status") or signer_data.get( "state", "waiting" ) try: status = UniversignSignerStatus(raw_status) except ValueError: logger.warning( f"Statut inconnu pour signer {email}: {raw_status}, utilisation de 'unknown'" ) status = UniversignSignerStatus.UNKNOWN if email in existing_signers: signer = existing_signers[email] signer.status = status viewed_at = self._parse_date(signer_data.get("viewed_at")) if viewed_at and not signer.viewed_at: signer.viewed_at = viewed_at signed_at = self._parse_date(signer_data.get("signed_at")) if signed_at and not signer.signed_at: signer.signed_at = signed_at refused_at = self._parse_date(signer_data.get("refused_at")) if refused_at and not signer.refused_at: signer.refused_at = refused_at if signer_data.get("name") and not signer.name: signer.name = signer_data.get("name") else: # Nouveau signer avec gestion d'erreur intégrée try: signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}", transaction_id=transaction.id, email=email, name=signer_data.get("name"), status=status, order_index=idx, viewed_at=self._parse_date(signer_data.get("viewed_at")), signed_at=self._parse_date(signer_data.get("signed_at")), refused_at=self._parse_date(signer_data.get("refused_at")), ) session.add(signer) logger.info( f"➕ Nouveau signataire ajouté: {email} (statut: {status.value})" ) except Exception as e: logger.error(f"Erreur création signer {email}: {e}") async def sync_transaction( self, session, transaction, force: bool = False, ): import json # Si statut final et pas de force, skip if is_final_status(transaction.local_status.value) and not force: logger.debug( f"⏭️ Skip {transaction.transaction_id}: statut final " f"{transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None # Récupération du statut distant logger.info(f"🔄 Synchronisation: {transaction.transaction_id}") result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" logger.error(f"❌ {error}: {transaction.transaction_id}") transaction.sync_attempts += 1 transaction.sync_error = error await self._log_sync_attempt(session, transaction, "polling", False, error) await session.commit() return False, error try: universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") logger.info(f"📊 Statut Universign brut: {universign_status_raw}") # Convertir le statut new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value logger.info( f"🔄 Mapping: {universign_status_raw} (Universign) → " f"{new_local_status} (Local) | Actuel: {previous_local_status}" ) # Vérifier la transition if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"⚠️ Transition refusée: {previous_local_status} → {new_local_status}" ) new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) logger.info(f"Résolution conflit: statut résolu = {new_local_status}") status_changed = previous_local_status != new_local_status if status_changed: logger.info( f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status} → {new_local_status}" ) # Mise à jour du statut Universign brut try: transaction.universign_status = UniversignTransactionStatus( universign_status_raw ) except ValueError: logger.warning(f"⚠️ Statut Universign inconnu: {universign_status_raw}") if new_local_status == "SIGNE": transaction.universign_status = UniversignTransactionStatus.COMPLETED elif new_local_status == "REFUSE": transaction.universign_status = UniversignTransactionStatus.REFUSED elif new_local_status == "EXPIRE": transaction.universign_status = UniversignTransactionStatus.EXPIRED else: transaction.universign_status = UniversignTransactionStatus.STARTED # Mise à jour du statut local transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() # Mise à jour des dates if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() logger.info("📅 Date d'envoi mise à jour") if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() logger.info("✅ Date de signature mise à jour") if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() logger.info("❌ Date de refus mise à jour") if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() logger.info("⏰ Date d'expiration mise à jour") # === SECTION CORRIGÉE: Gestion des documents === # Ne plus chercher document_url dans la réponse (elle n'existe pas!) # Le téléchargement se fait via le service document qui utilise le bon endpoint documents = universign_data.get("documents", []) if documents: first_doc = documents[0] logger.info( f"📄 Document Universign trouvé: id={first_doc.get('id')}, " f"status={first_doc.get('status')}" ) # Téléchargement automatique du document signé if new_local_status == "SIGNE" and not transaction.signed_document_path: logger.info("📥 Déclenchement téléchargement document signé...") try: ( download_success, download_error, ) = await self.document_service.download_and_store_signed_document( session=session, transaction=transaction, force=False ) if download_success: logger.info("✅ Document signé téléchargé et stocké") else: logger.warning(f"⚠️ Échec téléchargement: {download_error}") except Exception as e: logger.error( f"❌ Erreur téléchargement document: {e}", exc_info=True ) # === FIN SECTION CORRIGÉE === # Synchroniser les signataires await self._sync_signers(session, transaction, universign_data) # Mise à jour des métadonnées de sync transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None # Log de la tentative await self._log_sync_attempt( session=session, transaction=transaction, sync_type="polling", success=True, error_message=None, previous_status=previous_local_status, new_status=new_local_status, changes=json.dumps( { "status_changed": status_changed, "universign_raw": universign_status_raw, "documents_count": len(documents), "response_time_ms": result.get("response_time_ms"), }, default=str, ), ) await session.commit() # Exécuter les actions post-changement if status_changed: logger.info(f"🎬 Exécution actions pour statut: {new_local_status}") await self._execute_status_actions(session, transaction, new_local_status) logger.info( f"✅ Sync terminée: {transaction.transaction_id} | " f"{previous_local_status} → {new_local_status}" ) return True, None except Exception as e: error_msg = f"Erreur lors de la synchronisation: {str(e)}" logger.error(f"❌ {error_msg}", exc_info=True) transaction.sync_error = error_msg[:1000] transaction.sync_attempts += 1 await self._log_sync_attempt( session, transaction, "polling", False, error_msg ) await session.commit() return False, error_msg # CORRECTION 3 : Amélioration du logging dans sync_transaction async def _sync_transaction_documents_corrected( self, session, transaction, universign_data: dict, new_local_status: str ): # Récupérer et stocker les infos documents documents = universign_data.get("documents", []) if documents: # Stocker le premier document_id pour référence first_doc = documents[0] first_doc_id = first_doc.get("id") if first_doc_id: # Stocker l'ID du document (si le champ existe dans le modèle) if hasattr(transaction, "universign_document_id"): transaction.universign_document_id = first_doc_id logger.info( f"📄 Document Universign: id={first_doc_id}, " f"name={first_doc.get('name')}, status={first_doc.get('status')}" ) else: logger.debug("Aucun document dans la réponse Universign") # Téléchargement automatique si signé if new_local_status == "SIGNE": if not transaction.signed_document_path: logger.info("📥 Déclenchement téléchargement document signé...") try: ( download_success, download_error, ) = await self.document_service.download_and_store_signed_document( session=session, transaction=transaction, force=False ) if download_success: logger.info("✅ Document signé téléchargé avec succès") else: logger.warning(f"⚠️ Échec téléchargement: {download_error}") except Exception as e: logger.error( f"❌ Erreur téléchargement document: {e}", exc_info=True ) else: logger.debug( f"Document déjà téléchargé: {transaction.signed_document_path}" ) async def _log_sync_attempt( self, session: AsyncSession, transaction: UniversignTransaction, sync_type: str, success: bool, error_message: Optional[str] = None, previous_status: Optional[str] = None, new_status: Optional[str] = None, changes: Optional[str] = None, ): log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, sync_timestamp=datetime.now(), previous_status=previous_status, new_status=new_status, changes_detected=changes, success=success, error_message=error_message, ) session.add(log) async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): actions = get_status_actions(new_status) if not actions: return if actions.get("update_sage_status") and self.sage_client: await self._update_sage_status(transaction, new_status) elif actions.get("update_sage_status"): logger.debug( f"sage_client non configuré, skip MAJ Sage pour {transaction.sage_document_id}" ) if actions.get("send_notification") and self.email_queue and self.settings: await self._send_notification(session, transaction, new_status) elif actions.get("send_notification"): logger.debug( f"email_queue/settings non configuré, skip notification pour {transaction.transaction_id}" ) async def _update_sage_status( self, transaction: UniversignTransaction, status: str ): if not self.sage_client: logger.warning("sage_client non configuré pour mise à jour Sage") return try: type_doc = transaction.sage_document_type.value doc_id = transaction.sage_document_id if status == "SIGNE": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=2 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)") elif status == "EN_COURS": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=1 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)") except Exception as e: logger.error( f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}" ) async def _send_notification( self, session: AsyncSession, transaction: UniversignTransaction, status: str ): if not self.email_queue or not self.settings: logger.warning("email_queue ou settings non configuré") return try: if status == "SIGNE": template = templates_signature_email["signature_confirmee"] type_labels = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 60: "Facture", 50: "Avoir", } variables = { "NOM_SIGNATAIRE": transaction.requester_name or "Client", "TYPE_DOC": type_labels.get( transaction.sage_document_type.value, "Document" ), "NUMERO": transaction.sage_document_id, "DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M") if transaction.signed_at else datetime.now().strftime("%d/%m/%Y à %H:%M"), "TRANSACTION_ID": transaction.transaction_id, "CONTACT_EMAIL": self.settings.smtp_from, } sujet = template["sujet"] corps = template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) email_log = EmailLog( id=str(uuid.uuid4()), destinataire=transaction.requester_email, sujet=sujet, corps_html=corps, document_ids=transaction.sage_document_id, type_document=transaction.sage_document_type.value, statut=StatutEmail.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() self.email_queue.enqueue(email_log.id) logger.info( f"Email confirmation signature envoyé à {transaction.requester_email}" ) except Exception as e: logger.error( f"Erreur envoi notification pour {transaction.transaction_id}: {e}" ) @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except Exception: return None class UniversignSyncScheduler: def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): self.sync_service = sync_service self.interval_minutes = interval_minutes self.is_running = False async def start(self, session_factory): import asyncio self.is_running = True logger.info( f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" ) while self.is_running: try: async with session_factory() as session: stats = await self.sync_service.sync_all_pending(session) logger.info( f"Polling: {stats['success']} transactions synchronisées, " f"{stats['status_changes']} changements" ) except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) await asyncio.sleep(self.interval_minutes * 60) def stop(self): self.is_running = False logger.info("Arrêt polling Universign")