fix(universign): improve webhook payload handling and transaction sync

This commit is contained in:
Fanilo-Nantenaina 2026-01-06 19:56:39 +03:00
parent 1ce85517be
commit fbaa43e3fd
2 changed files with 332 additions and 206 deletions

View file

@ -532,41 +532,70 @@ async def sync_all_transactions(
async def webhook_universign( async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session) request: Request, session: AsyncSession = Depends(get_session)
): ):
"""
CORRECTION : Extraction correcte du transaction_id selon la structure réelle d'Universign
"""
try: try:
payload = await request.json() payload = await request.json()
# 🔍 LOG COMPLET du payload pour déboguer # 📋 LOG COMPLET du payload pour débogage
logger.info( logger.info(
f"📥 Webhook Universign reçu - Payload complet: {json.dumps(payload, indent=2)}" f"📥 Webhook Universign reçu - Type: {payload.get('type', 'unknown')}"
) )
logger.debug(f"Payload complet: {json.dumps(payload, indent=2)}")
# Extraction du transaction_id selon la structure Universign # ✅ EXTRACTION CORRECTE DU TRANSACTION_ID
transaction_id = None transaction_id = None
# Universign envoie généralement : # 🔍 Structure 1 : Événements avec payload imbriqué (la plus courante)
# - "object": "transaction" # Exemple : transaction.lifecycle.created, transaction.lifecycle.started, etc.
# - "id": "tr_xxx" (le vrai ID de transaction) if payload.get("type", "").startswith("transaction.") and "payload" in payload:
# - "event": "evt_xxx" (l'ID de l'événement) # Le transaction_id est dans payload.object.id
nested_object = payload.get("payload", {}).get("object", {})
if nested_object.get("object") == "transaction":
transaction_id = nested_object.get("id")
logger.info(
f"✅ Transaction ID extrait de payload.object.id: {transaction_id}"
)
if payload.get("object") == "transaction": # 🔍 Structure 2 : Action événements (action.opened, action.completed)
transaction_id = payload.get("id") # C'est ici le vrai ID elif payload.get("type", "").startswith("action."):
# Le transaction_id est directement dans payload.object.transaction_id
transaction_id = (
payload.get("payload", {}).get("object", {}).get("transaction_id")
)
logger.info(
f"✅ Transaction ID extrait de payload.object.transaction_id: {transaction_id}"
)
# 🔍 Structure 3 : Transaction directe (fallback)
elif payload.get("object") == "transaction":
transaction_id = payload.get("id")
logger.info(f"✅ Transaction ID extrait direct: {transaction_id}")
# 🔍 Structure 4 : Ancien format (pour rétro-compatibilité)
elif "transaction" in payload: elif "transaction" in payload:
# Parfois dans un objet "transaction"
transaction_id = payload.get("transaction", {}).get("id") transaction_id = payload.get("transaction", {}).get("id")
elif "data" in payload: logger.info(
# Ou dans "data" f"✅ Transaction ID extrait de transaction.id: {transaction_id}"
transaction_id = payload.get("data", {}).get("id") )
# ❌ Échec d'extraction
if not transaction_id: if not transaction_id:
logger.error( logger.error(
f"❌ Transaction ID introuvable dans webhook. Payload: {payload}" f"❌ Transaction ID introuvable dans webhook\n"
f"Type d'événement: {payload.get('type', 'unknown')}\n"
f"Clés racine: {list(payload.keys())}\n"
f"Payload simplifié: {json.dumps({k: v if k != 'payload' else '...' for k, v in payload.items()})}"
) )
return { return {
"status": "error", "status": "error",
"message": "Transaction ID manquant dans webhook", "message": "Transaction ID manquant dans webhook",
"event_type": payload.get("type"),
"event_id": payload.get("id"),
}, 400 }, 400
logger.info(f"🔍 Transaction ID extrait: {transaction_id}") logger.info(f"🎯 Transaction ID identifié: {transaction_id}")
# Vérifier si la transaction existe localement # Vérifier si la transaction existe localement
query = select(UniversignTransaction).where( query = select(UniversignTransaction).where(
@ -577,30 +606,48 @@ async def webhook_universign(
if not tx: if not tx:
logger.warning( logger.warning(
f"⚠️ Transaction {transaction_id} inconnue en local - création en attente" f"⚠️ Transaction {transaction_id} inconnue en local\n"
f"Type d'événement: {payload.get('type')}\n"
f"Elle sera synchronisée au prochain polling"
) )
# Ne pas échouer, juste logger
return { return {
"status": "accepted", "status": "accepted",
"message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling", "message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling",
"transaction_id": transaction_id,
"event_type": payload.get("type"),
} }
# Traiter le webhook
success, error = await sync_service.process_webhook( success, error = await sync_service.process_webhook(
session, payload, transaction_id session, payload, transaction_id
) )
if not success: if not success:
logger.error(f"❌ Erreur traitement webhook: {error}") logger.error(f"❌ Erreur traitement webhook: {error}")
return {"status": "error", "message": error}, 500 return {
"status": "error",
"message": error,
"transaction_id": transaction_id,
}, 500
# ✅ Succès
logger.info(
f"✅ Webhook traité avec succès\n"
f"Transaction: {transaction_id}\n"
f"Nouveau statut: {tx.local_status.value if tx else 'unknown'}\n"
f"Type d'événement: {payload.get('type')}"
)
return { return {
"status": "processed", "status": "processed",
"transaction_id": transaction_id, "transaction_id": transaction_id,
"local_status": tx.local_status.value if tx else None, "local_status": tx.local_status.value if tx else None,
"event_type": payload.get("type"),
"event_id": payload.get("id"),
} }
except Exception as e: except Exception as e:
logger.error(f"💥 Erreur webhook: {e}", exc_info=True) logger.error(f"💥 Erreur critique webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500 return {"status": "error", "message": str(e)}, 500

View file

@ -90,155 +90,6 @@ class UniversignSyncService:
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
return None return None
async def sync_transaction(
self,
session: AsyncSession,
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
"""
Synchronise une transaction avec Universign
CORRECTION : Met à jour correctement le statut local selon le statut distant
"""
# Si statut final et pas de force, skip
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"⏭️ Skip {transaction.transaction_id}: statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# Récupération du statut distant
logger.info(f"🔄 Synchronisation: {transaction.transaction_id}")
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
logger.error(f"{error}: {transaction.transaction_id}")
await self._log_sync_attempt(session, transaction, "polling", False, error)
transaction.sync_attempts += 1
await session.commit()
return False, error
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
logger.info(f"📊 Statut Universign brut: {universign_status_raw}")
# Convertir le statut Universign en statut local
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
logger.info(
f"🔄 Mapping: {universign_status_raw} (Universign) → "
f"{new_local_status} (Local) | Actuel: {previous_local_status}"
)
# Vérifier si la transition est autorisée
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"⚠️ Transition refusée: {previous_local_status}{new_local_status}"
)
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
logger.info(f"✅ Résolution conflit: statut résolu = {new_local_status}")
status_changed = previous_local_status != new_local_status
if status_changed:
logger.info(
f"📝 CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}"
)
else:
logger.debug(f"⏸️ Pas de changement de statut")
# Mise à jour du statut Universign brut
try:
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
except ValueError:
logger.warning(f"⚠️ Statut Universign inconnu: {universign_status_raw}")
transaction.universign_status = (
UniversignTransactionStatus.COMPLETED
if new_local_status == "SIGNE"
else UniversignTransactionStatus.FAILED
)
# ✅ CORRECTION PRINCIPALE : Mise à jour du statut local
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# Mise à jour des dates selon le nouveau statut
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
logger.info("📅 Date d'envoi mise à jour")
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
logger.info("✅ Date de signature mise à jour")
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
logger.info("❌ Date de refus mise à jour")
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
logger.info("⏰ Date d'expiration mise à jour")
# Mise à jour des URLs
if universign_data.get("documents") and len(universign_data["documents"]) > 0:
first_doc = universign_data["documents"][0]
if first_doc.get("url"):
transaction.document_url = first_doc["url"]
logger.info("🔗 URL du document mise à jour")
# Synchroniser les signataires
await self._sync_signers(session, transaction, universign_data)
# Mise à jour des métadonnées de sync
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None
# Log de la tentative
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type="polling",
success=True,
error_message=None,
previous_status=previous_local_status,
new_status=new_local_status,
changes=json.dumps(
{
"status_changed": status_changed,
"universign_raw": universign_status_raw,
"response_time_ms": result.get("response_time_ms"),
}
),
)
await session.commit()
# Exécuter les actions post-changement de statut
if status_changed:
logger.info(f"🎬 Exécution actions pour statut: {new_local_status}")
await self._execute_status_actions(session, transaction, new_local_status)
logger.info(
f"✅ Sync terminée: {transaction.transaction_id} | "
f"{previous_local_status}{new_local_status}"
)
return True, None
async def sync_all_pending( async def sync_all_pending(
self, session: AsyncSession, max_transactions: int = 50 self, session: AsyncSession, max_transactions: int = 50
) -> Dict[str, int]: ) -> Dict[str, int]:
@ -304,26 +155,37 @@ class UniversignSyncService:
return stats return stats
# CORRECTION 1 : process_webhook dans universign_sync.py
async def process_webhook( async def process_webhook(
self, session: AsyncSession, payload: Dict, transaction_id: str = None self, session: AsyncSession, payload: Dict, transaction_id: str = None
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
""" """
Traite un webhook Universign Traite un webhook Universign - CORRECTION : meilleure gestion des payloads
Args:
session: Session SQLAlchemy
payload: Payload du webhook
transaction_id: ID de transaction (optionnel si déjà dans payload)
""" """
try: try:
# Si transaction_id n'est pas fourni, essayer de l'extraire # Si transaction_id n'est pas fourni, essayer de l'extraire
if not transaction_id: if not transaction_id:
transaction_id = payload.get("id") or payload.get("transaction_id") # Même logique que dans universign.py
if (
payload.get("type", "").startswith("transaction.")
and "payload" in payload
):
nested_object = payload.get("payload", {}).get("object", {})
if nested_object.get("object") == "transaction":
transaction_id = nested_object.get("id")
elif payload.get("type", "").startswith("action."):
transaction_id = (
payload.get("payload", {})
.get("object", {})
.get("transaction_id")
)
elif payload.get("object") == "transaction":
transaction_id = payload.get("id")
if not transaction_id: if not transaction_id:
return False, "Transaction ID manquant" return False, "Transaction ID manquant"
event_type = payload.get("event") or payload.get("type", "webhook") event_type = payload.get("type", "webhook")
logger.info( logger.info(
f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}" f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}"
@ -369,7 +231,9 @@ class UniversignSyncService:
error_message=error, error_message=error,
previous_status=old_status, previous_status=old_status,
new_status=transaction.local_status.value, new_status=transaction.local_status.value,
changes=json.dumps(payload), changes=json.dumps(
payload, default=str
), # ✅ Ajout default=str pour éviter les erreurs JSON
) )
await session.commit() await session.commit()
@ -380,53 +244,268 @@ class UniversignSyncService:
logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True) logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True)
return False, str(e) return False, str(e)
# CORRECTION 2 : _sync_signers - Ne pas écraser les signers existants
async def _sync_signers( async def _sync_signers(
self, self,
session: AsyncSession, session: AsyncSession,
transaction: UniversignTransaction, transaction: UniversignTransaction,
universign_data: Dict, universign_data: Dict,
): ):
"""
CORRECTION : Synchronise les signataires sans perdre les données locales
"""
# Récupérer les participants depuis différents endroits possibles
signers_data = universign_data.get("participants", [])
if not signers_data:
signers_data = universign_data.get("signers", []) signers_data = universign_data.get("signers", [])
# Ne pas toucher aux signers existants si Universign n'en retourne pas # ⚠️ IMPORTANT : Ne pas toucher aux signers si Universign n'en retourne pas
if not signers_data: if not signers_data:
logger.debug(
"Aucun signataire dans les données Universign, conservation des données locales"
)
return return
# Mettre à jour les signers existants ou en créer de nouveaux # Créer un mapping email -> signer existant
existing_signers = {s.email: s for s in transaction.signers} existing_signers = {s.email: s for s in transaction.signers}
for idx, signer_data in enumerate(signers_data): for idx, signer_data in enumerate(signers_data):
email = signer_data.get("email", "") email = signer_data.get("email", "")
if not email:
logger.warning(f"Signataire sans email à l'index {idx}, ignoré")
continue
if email in existing_signers: if email in existing_signers:
# Mise à jour du signer existant # Mise à jour du signer existant (ne pas écraser si None)
signer = existing_signers[email] signer = existing_signers[email]
signer.status = UniversignSignerStatus(
signer_data.get("status", "waiting") # Mise à jour du statut
) new_status = signer_data.get("status") or signer_data.get("state")
signer.viewed_at = ( if new_status:
self._parse_date(signer_data.get("viewed_at")) or signer.viewed_at try:
) signer.status = UniversignSignerStatus(new_status)
signer.signed_at = ( except ValueError:
self._parse_date(signer_data.get("signed_at")) or signer.signed_at logger.warning(
) f"Statut inconnu pour signer {email}: {new_status}"
signer.refused_at = (
self._parse_date(signer_data.get("refused_at")) or signer.refused_at
) )
# Mise à jour des dates (ne pas écraser si déjà renseignées)
viewed_at = self._parse_date(signer_data.get("viewed_at"))
if viewed_at and not signer.viewed_at:
signer.viewed_at = viewed_at
signed_at = self._parse_date(signer_data.get("signed_at"))
if signed_at and not signer.signed_at:
signer.signed_at = signed_at
refused_at = self._parse_date(signer_data.get("refused_at"))
if refused_at and not signer.refused_at:
signer.refused_at = refused_at
# Mise à jour du nom si manquant
if signer_data.get("name") and not signer.name:
signer.name = signer_data.get("name")
else: else:
# Nouveau signer # ✅ Nouveau signer
try:
status = signer_data.get("status") or signer_data.get(
"state", "waiting"
)
signer = UniversignSigner( signer = UniversignSigner(
id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}", id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}",
transaction_id=transaction.id, transaction_id=transaction.id,
email=email, email=email,
name=signer_data.get("name"), name=signer_data.get("name"),
status=UniversignSignerStatus(signer_data.get("status", "waiting")), status=UniversignSignerStatus(status),
order_index=idx, order_index=idx,
viewed_at=self._parse_date(signer_data.get("viewed_at")), viewed_at=self._parse_date(signer_data.get("viewed_at")),
signed_at=self._parse_date(signer_data.get("signed_at")), signed_at=self._parse_date(signer_data.get("signed_at")),
refused_at=self._parse_date(signer_data.get("refused_at")), refused_at=self._parse_date(signer_data.get("refused_at")),
) )
session.add(signer) session.add(signer)
logger.info(f" Nouveau signataire ajouté: {email}")
except Exception as e:
logger.error(f"Erreur création signer {email}: {e}")
# CORRECTION 3 : Amélioration du logging dans sync_transaction
async def sync_transaction(
self,
session: AsyncSession,
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
"""
CORRECTION : Meilleur logging et gestion d'erreurs
"""
# Si statut final et pas de force, skip
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"⏭️ Skip {transaction.transaction_id}: statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# Récupération du statut distant
logger.info(f"🔄 Synchronisation: {transaction.transaction_id}")
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
logger.error(f"{error}: {transaction.transaction_id}")
# ✅ CORRECTION : Incrémenter les tentatives MÊME en cas d'échec
transaction.sync_attempts += 1
transaction.sync_error = error
await self._log_sync_attempt(session, transaction, "polling", False, error)
await session.commit()
return False, error
try:
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
logger.info(f"📊 Statut Universign brut: {universign_status_raw}")
# Convertir le statut
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
logger.info(
f"🔄 Mapping: {universign_status_raw} (Universign) → "
f"{new_local_status} (Local) | Actuel: {previous_local_status}"
)
# Vérifier la transition
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"⚠️ Transition refusée: {previous_local_status}{new_local_status}"
)
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
logger.info(
f"✅ Résolution conflit: statut résolu = {new_local_status}"
)
status_changed = previous_local_status != new_local_status
if status_changed:
logger.info(
f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}"
)
# Mise à jour du statut Universign brut
try:
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
except ValueError:
logger.warning(f"⚠️ Statut Universign inconnu: {universign_status_raw}")
# Fallback intelligent
if new_local_status == "SIGNE":
transaction.universign_status = (
UniversignTransactionStatus.COMPLETED
)
elif new_local_status == "REFUSE":
transaction.universign_status = UniversignTransactionStatus.REFUSED
elif new_local_status == "EXPIRE":
transaction.universign_status = UniversignTransactionStatus.EXPIRED
else:
transaction.universign_status = UniversignTransactionStatus.STARTED
# ✅ Mise à jour du statut local
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# Mise à jour des dates
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
logger.info("📅 Date d'envoi mise à jour")
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
logger.info("✅ Date de signature mise à jour")
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
logger.info("❌ Date de refus mise à jour")
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
logger.info("⏰ Date d'expiration mise à jour")
# Mise à jour des URLs
if (
universign_data.get("documents")
and len(universign_data["documents"]) > 0
):
first_doc = universign_data["documents"][0]
if first_doc.get("url"):
transaction.document_url = first_doc["url"]
# Synchroniser les signataires
await self._sync_signers(session, transaction, universign_data)
# Mise à jour des métadonnées de sync
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None # ✅ Effacer l'erreur précédente
# Log de la tentative
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type="polling",
success=True,
error_message=None,
previous_status=previous_local_status,
new_status=new_local_status,
changes=json.dumps(
{
"status_changed": status_changed,
"universign_raw": universign_status_raw,
"response_time_ms": result.get("response_time_ms"),
},
default=str, # ✅ Éviter les erreurs de sérialisation
),
)
await session.commit()
# Exécuter les actions post-changement
if status_changed:
logger.info(f"🎬 Exécution actions pour statut: {new_local_status}")
await self._execute_status_actions(
session, transaction, new_local_status
)
logger.info(
f"✅ Sync terminée: {transaction.transaction_id} | "
f"{previous_local_status}{new_local_status}"
)
return True, None
except Exception as e:
error_msg = f"Erreur lors de la synchronisation: {str(e)}"
logger.error(f"{error_msg}", exc_info=True)
transaction.sync_error = error_msg[:1000] # Tronquer si trop long
transaction.sync_attempts += 1
await self._log_sync_attempt(
session, transaction, "polling", False, error_msg
)
await session.commit()
return False, error_msg
async def _log_sync_attempt( async def _log_sync_attempt(
self, self,