feat(universign): improve transaction sync and webhook handling

This commit is contained in:
Fanilo-Nantenaina 2026-01-06 19:43:42 +03:00
parent a3f02cbd91
commit 1ce85517be
2 changed files with 637 additions and 44 deletions

View file

@ -1,9 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy import select, func, or_, and_
from sqlalchemy.orm import selectinload
from typing import List, Optional
from datetime import datetime
from datetime import datetime, timedelta
from pydantic import BaseModel, EmailStr
import logging
from data.data import templates_signature_email
@ -16,10 +16,11 @@ from database import (
LocalDocumentStatus,
SageDocumentType,
)
import json
from services.universign_sync import UniversignSyncService
from config.config import settings
from utils.generic_functions import normaliser_type_doc
from utils.universign_status_mapping import get_status_message
from utils.universign_status_mapping import get_status_message, map_universign_to_local
from database.models.email import EmailLog
from database.enum.status import StatutEmail
@ -80,29 +81,56 @@ async def create_signature(
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
):
try:
# === VÉRIFICATION DOUBLON ===
# === VÉRIFICATION DOUBLON RENFORCÉE ===
logger.info(
f"🔍 Vérification doublon pour: {request.sage_document_id} "
f"(type: {request.sage_document_type.name})"
)
existing_query = select(UniversignTransaction).where(
UniversignTransaction.sage_document_id == request.sage_document_id,
UniversignTransaction.sage_document_type == request.sage_document_type,
~UniversignTransaction.local_status.in_(
[
)
existing_result = await session.execute(existing_query)
all_existing = existing_result.scalars().all()
if all_existing:
logger.warning(
f"⚠️ {len(all_existing)} transaction(s) existante(s) trouvée(s)"
)
# Filtrer les transactions non-finales
active_txs = [
tx
for tx in all_existing
if tx.local_status
not in [
LocalDocumentStatus.SIGNED,
LocalDocumentStatus.REJECTED,
LocalDocumentStatus.EXPIRED,
LocalDocumentStatus.ERROR,
]
),
)
existing_result = await session.execute(existing_query)
existing_tx = existing_result.scalar_one_or_none()
]
if existing_tx:
raise HTTPException(
400,
f"Une demande de signature est déjà en cours pour {request.sage_document_id} "
f"(transaction: {existing_tx.transaction_id}, statut: {existing_tx.local_status.value})",
if active_txs:
active_tx = active_txs[0]
logger.error(
f"❌ Transaction active existante: {active_tx.transaction_id} "
f"(statut: {active_tx.local_status.value})"
)
raise HTTPException(
400,
f"Une demande de signature est déjà en cours pour {request.sage_document_id} "
f"(transaction: {active_tx.transaction_id}, statut: {active_tx.local_status.value}). "
f"Utilisez GET /universign/documents/{request.sage_document_id}/signatures pour voir toutes les transactions.",
)
logger.info(
"✅ Toutes les transactions existantes sont finales, création autorisée"
)
# Génération PDF
logger.info(f"📄 Génération PDF: {request.sage_document_id}")
pdf_bytes = email_queue._generate_pdf(
request.sage_document_id, normaliser_type_doc(request.sage_document_type)
)
@ -110,12 +138,16 @@ async def create_signature(
if not pdf_bytes:
raise HTTPException(400, "Échec génération PDF")
logger.info(f"✅ PDF généré: {len(pdf_bytes)} octets")
# === CRÉATION TRANSACTION UNIVERSIGN ===
import requests
import uuid
auth = (settings.universign_api_key, "")
logger.info("🔄 Création transaction Universign...")
resp = requests.post(
f"{settings.universign_api_url}/transactions",
auth=auth,
@ -128,10 +160,14 @@ async def create_signature(
)
if resp.status_code != 200:
logger.error(f"❌ Erreur Universign (création): {resp.text}")
raise HTTPException(500, f"Erreur Universign: {resp.status_code}")
universign_tx_id = resp.json().get("id")
logger.info(f"✅ Transaction Universign créée: {universign_tx_id}")
# Upload PDF
logger.info("📤 Upload PDF...")
files = {
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
}
@ -140,10 +176,14 @@ async def create_signature(
)
if resp.status_code not in [200, 201]:
logger.error(f"❌ Erreur upload: {resp.text}")
raise HTTPException(500, "Erreur upload PDF")
file_id = resp.json().get("id")
logger.info(f"✅ PDF uploadé: {file_id}")
# Attachement document
logger.info("🔗 Attachement document...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
auth=auth,
@ -156,6 +196,8 @@ async def create_signature(
document_id = resp.json().get("id")
# Création champ signature
logger.info("✍️ Création champ signature...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
auth=auth,
@ -168,6 +210,8 @@ async def create_signature(
field_id = resp.json().get("id")
# Liaison signataire
logger.info(f"👤 Liaison signataire: {request.signer_email}")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
auth=auth,
@ -178,6 +222,8 @@ async def create_signature(
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur liaison signataire")
# Démarrage transaction
logger.info("🚀 Démarrage transaction...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
auth=auth,
@ -189,6 +235,7 @@ async def create_signature(
final_data = resp.json()
# Extraction URL de signature
signer_url = ""
if final_data.get("actions"):
for action in final_data["actions"]:
@ -199,12 +246,14 @@ async def create_signature(
if not signer_url:
raise HTTPException(500, "URL de signature non retournée")
logger.info("✅ URL de signature obtenue")
# === ENREGISTREMENT LOCAL ===
local_id = str(uuid.uuid4())
transaction = UniversignTransaction(
id=local_id,
transaction_id=universign_tx_id,
transaction_id=universign_tx_id, # ⚠️ Utiliser l'ID Universign, ne jamais le changer
sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type,
universign_status=UniversignTransactionStatus.STARTED,
@ -233,6 +282,10 @@ async def create_signature(
session.add(signer)
await session.commit()
logger.info(
f"💾 Transaction sauvegardée: {local_id} (Universign: {universign_tx_id})"
)
# === ENVOI EMAIL AVEC TEMPLATE ===
template = templates_signature_email["demande_signature"]
@ -482,24 +535,72 @@ async def webhook_universign(
try:
payload = await request.json()
# 🔍 LOG COMPLET du payload pour déboguer
logger.info(
f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}"
f"📥 Webhook Universign reçu - Payload complet: {json.dumps(payload, indent=2)}"
)
success, error = await sync_service.process_webhook(session, payload)
# Extraction du transaction_id selon la structure Universign
transaction_id = None
# Universign envoie généralement :
# - "object": "transaction"
# - "id": "tr_xxx" (le vrai ID de transaction)
# - "event": "evt_xxx" (l'ID de l'événement)
if payload.get("object") == "transaction":
transaction_id = payload.get("id") # C'est ici le vrai ID
elif "transaction" in payload:
# Parfois dans un objet "transaction"
transaction_id = payload.get("transaction", {}).get("id")
elif "data" in payload:
# Ou dans "data"
transaction_id = payload.get("data", {}).get("id")
if not transaction_id:
logger.error(
f"❌ Transaction ID introuvable dans webhook. Payload: {payload}"
)
return {
"status": "error",
"message": "Transaction ID manquant dans webhook",
}, 400
logger.info(f"🔍 Transaction ID extrait: {transaction_id}")
# Vérifier si la transaction existe localement
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
logger.warning(
f"⚠️ Transaction {transaction_id} inconnue en local - création en attente"
)
# Ne pas échouer, juste logger
return {
"status": "accepted",
"message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling",
}
success, error = await sync_service.process_webhook(
session, payload, transaction_id
)
if not success:
logger.error(f"Erreur traitement webhook: {error}")
logger.error(f"Erreur traitement webhook: {error}")
return {"status": "error", "message": error}, 500
return {
"status": "processed",
"event": payload.get("event"),
"transaction_id": payload.get("transaction_id"),
"transaction_id": transaction_id,
"local_status": tx.local_status.value if tx else None,
}
except Exception as e:
logger.error(f"Erreur webhook: {e}", exc_info=True)
logger.error(f"💥 Erreur webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500
@ -796,3 +897,432 @@ async def cleanup_all_duplicates(
"total_deleted": total_deleted,
"details": cleanup_details,
}
@router.get("/admin/diagnostic", tags=["Admin"])
async def diagnostic_complet(session: AsyncSession = Depends(get_session)):
"""
Diagnostic complet de l'état des transactions Universign
"""
try:
# Statistiques générales
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# Par statut local
statuts_query = select(
UniversignTransaction.local_status, func.count(UniversignTransaction.id)
).group_by(UniversignTransaction.local_status)
statuts_result = await session.execute(statuts_query)
statuts = {status.value: count for status, count in statuts_result.all()}
# Transactions sans sync récente
date_limite = datetime.now() - timedelta(hours=1)
sans_sync_query = select(func.count(UniversignTransaction.id)).where(
and_(
UniversignTransaction.needs_sync == True,
or_(
UniversignTransaction.last_synced_at < date_limite,
UniversignTransaction.last_synced_at.is_(None),
),
)
)
sans_sync = (await session.execute(sans_sync_query)).scalar()
# Doublons potentiels
doublons_query = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
)
doublons_result = await session.execute(doublons_query)
doublons = doublons_result.fetchall()
# Transactions avec erreurs de sync
erreurs_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.sync_error.isnot(None)
)
erreurs = (await session.execute(erreurs_query)).scalar()
# Transactions sans webhook reçu
sans_webhook_query = select(func.count(UniversignTransaction.id)).where(
and_(
UniversignTransaction.webhook_received == False,
UniversignTransaction.local_status != LocalDocumentStatus.PENDING,
)
)
sans_webhook = (await session.execute(sans_webhook_query)).scalar()
diagnostic = {
"timestamp": datetime.now().isoformat(),
"total_transactions": total,
"repartition_statuts": statuts,
"problemes_detectes": {
"sans_sync_recente": sans_sync,
"doublons_possibles": len(doublons),
"erreurs_sync": erreurs,
"sans_webhook": sans_webhook,
},
"documents_avec_doublons": [
{"document_id": doc_id, "nombre_transactions": count}
for doc_id, count in doublons
],
"recommandations": [],
}
# Recommandations
if sans_sync > 0:
diagnostic["recommandations"].append(
f"🔄 {sans_sync} transaction(s) à synchroniser. "
f"Utilisez POST /universign/sync/all"
)
if len(doublons) > 0:
diagnostic["recommandations"].append(
f"⚠️ {len(doublons)} document(s) avec doublons. "
f"Utilisez POST /universign/cleanup/all-duplicates"
)
if erreurs > 0:
diagnostic["recommandations"].append(
f"{erreurs} transaction(s) en erreur. "
f"Vérifiez les logs avec GET /universign/transactions?status=ERREUR"
)
return diagnostic
except Exception as e:
logger.error(f"Erreur diagnostic: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/force-sync-all", tags=["Admin"])
async def forcer_sync_toutes_transactions(
max_transactions: int = Query(200, le=500),
session: AsyncSession = Depends(get_session),
):
"""
Force la synchronisation de TOUTES les transactions (même finales)
À utiliser pour réparer les incohérences
"""
try:
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.order_by(UniversignTransaction.created_at.desc())
.limit(max_transactions)
)
result = await session.execute(query)
transactions = result.scalars().all()
stats = {
"total_verifie": len(transactions),
"success": 0,
"failed": 0,
"status_changes": 0,
"details": [],
}
for transaction in transactions:
try:
previous_status = transaction.local_status.value
logger.info(
f"🔄 Force sync: {transaction.transaction_id} (statut: {previous_status})"
)
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
new_status = transaction.local_status.value
if success:
stats["success"] += 1
if new_status != previous_status:
stats["status_changes"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"changement": f"{previous_status}{new_status}",
}
)
else:
stats["failed"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"erreur": error,
}
)
except Exception as e:
logger.error(f"Erreur sync {transaction.transaction_id}: {e}")
stats["failed"] += 1
return {
"success": True,
"stats": stats,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Erreur force sync: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/repair-transaction/{transaction_id}", tags=["Admin"])
async def reparer_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Répare une transaction spécifique en la re-synchronisant depuis Universign
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
old_status = transaction.local_status.value
old_universign_status = (
transaction.universign_status.value
if transaction.universign_status
else None
)
# Force sync
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
if not success:
return {
"success": False,
"transaction_id": transaction_id,
"erreur": error,
"ancien_statut": old_status,
}
return {
"success": True,
"transaction_id": transaction_id,
"reparation": {
"ancien_statut_local": old_status,
"nouveau_statut_local": transaction.local_status.value,
"ancien_statut_universign": old_universign_status,
"nouveau_statut_universign": transaction.universign_status.value,
"statut_change": old_status != transaction.local_status.value,
},
"derniere_sync": transaction.last_synced_at.isoformat(),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur réparation: {e}")
raise HTTPException(500, str(e))
@router.get("/admin/transactions-inconsistantes", tags=["Admin"])
async def trouver_transactions_inconsistantes(
session: AsyncSession = Depends(get_session),
):
"""
Trouve les transactions dont le statut local ne correspond pas au statut Universign
"""
try:
# Toutes les transactions non-finales
query = select(UniversignTransaction).where(
UniversignTransaction.local_status.in_(
[LocalDocumentStatus.PENDING, LocalDocumentStatus.IN_PROGRESS]
)
)
result = await session.execute(query)
transactions = result.scalars().all()
inconsistantes = []
for tx in transactions:
try:
# Récupérer le statut depuis Universign
universign_data = sync_service.fetch_transaction_status(
tx.transaction_id
)
if not universign_data:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Impossible de récupérer depuis Universign",
"statut_local": tx.local_status.value,
"statut_universign": None,
}
)
continue
universign_status = universign_data["transaction"].get("state")
expected_local_status = map_universign_to_local(universign_status)
if expected_local_status != tx.local_status.value:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Statut incohérent",
"statut_local": tx.local_status.value,
"statut_universign": universign_status,
"statut_attendu": expected_local_status,
"derniere_sync": tx.last_synced_at.isoformat()
if tx.last_synced_at
else None,
}
)
except Exception as e:
logger.error(f"Erreur vérification {tx.transaction_id}: {e}")
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": f"Erreur: {str(e)}",
"statut_local": tx.local_status.value,
}
)
return {
"total_verifie": len(transactions),
"inconsistantes": len(inconsistantes),
"details": inconsistantes,
"recommandation": (
"Utilisez POST /universign/admin/force-sync-all pour corriger"
if inconsistantes
else "Aucune incohérence détectée"
),
}
except Exception as e:
logger.error(f"Erreur recherche incohérences: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/nettoyer-transactions-erreur", tags=["Admin"])
async def nettoyer_transactions_erreur(
age_jours: int = Query(
7, description="Supprimer les transactions en erreur de plus de X jours"
),
session: AsyncSession = Depends(get_session),
):
"""
Nettoie les transactions en erreur anciennes
"""
try:
date_limite = datetime.now() - timedelta(days=age_jours)
query = select(UniversignTransaction).where(
and_(
UniversignTransaction.local_status == LocalDocumentStatus.ERROR,
UniversignTransaction.created_at < date_limite,
)
)
result = await session.execute(query)
transactions = result.scalars().all()
supprimees = []
for tx in transactions:
supprimees.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"date_creation": tx.created_at.isoformat(),
"erreur": tx.sync_error,
}
)
await session.delete(tx)
await session.commit()
return {
"success": True,
"transactions_supprimees": len(supprimees),
"age_limite_jours": age_jours,
"details": supprimees,
}
except Exception as e:
logger.error(f"Erreur nettoyage: {e}")
raise HTTPException(500, str(e))
@router.get("/debug/webhook-payload/{transaction_id}", tags=["Debug"])
async def voir_dernier_webhook(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Affiche le dernier payload webhook reçu pour une transaction
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Récupérer le dernier log de type webhook
logs_query = (
select(UniversignSyncLog)
.where(
and_(
UniversignSyncLog.transaction_id == tx.id,
UniversignSyncLog.sync_type.like("webhook:%"),
)
)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(1)
)
logs_result = await session.execute(logs_query)
last_webhook_log = logs_result.scalar_one_or_none()
if not last_webhook_log:
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_payload": None,
"message": "Aucun webhook reçu pour cette transaction",
}
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_webhook": {
"timestamp": last_webhook_log.sync_timestamp.isoformat(),
"type": last_webhook_log.sync_type,
"success": last_webhook_log.success,
"payload": json.loads(last_webhook_log.changes_detected)
if last_webhook_log.changes_detected
else None,
},
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur debug webhook: {e}")
raise HTTPException(500, str(e))

View file

@ -1,4 +1,3 @@
import requests
import json
import logging
@ -97,82 +96,118 @@ class UniversignSyncService:
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
"""
Synchronise une transaction avec Universign
CORRECTION : Met à jour correctement le statut local selon le statut distant
"""
# Si statut final et pas de force, skip
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"Skip {transaction.transaction_id}: statut final {transaction.local_status.value}"
f"⏭️ Skip {transaction.transaction_id}: statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# Récupération du statut distant
logger.info(f"🔄 Synchronisation: {transaction.transaction_id}")
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
logger.error(f"{error}: {transaction.transaction_id}")
await self._log_sync_attempt(session, transaction, "polling", False, error)
transaction.sync_attempts += 1
await session.commit()
return False, error
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
logger.info(f"📊 Statut Universign brut: {universign_status_raw}")
# Convertir le statut Universign en statut local
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
logger.info(
f"🔄 Mapping: {universign_status_raw} (Universign) → "
f"{new_local_status} (Local) | Actuel: {previous_local_status}"
)
# Vérifier si la transition est autorisée
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"Transition refusée: {previous_local_status}{new_local_status}"
f"⚠️ Transition refusée: {previous_local_status}{new_local_status}"
)
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
logger.info(f"✅ Résolution conflit: statut résolu = {new_local_status}")
status_changed = previous_local_status != new_local_status
if not status_changed and not force:
logger.debug(f"Pas de changement pour {transaction.transaction_id}")
transaction.last_synced_at = datetime.now()
transaction.needs_sync = False
await session.commit()
return True, None
if status_changed:
logger.info(
f"📝 CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}"
)
else:
logger.debug(f"⏸️ Pas de changement de statut")
# Mise à jour du statut Universign brut
try:
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
except ValueError:
logger.warning(f"⚠️ Statut Universign inconnu: {universign_status_raw}")
transaction.universign_status = (
UniversignTransactionStatus.COMPLETED
if new_local_status == "SIGNE"
else UniversignTransactionStatus.FAILED
)
# ✅ CORRECTION PRINCIPALE : Mise à jour du statut local
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# Mise à jour des dates selon le nouveau statut
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
logger.info("📅 Date d'envoi mise à jour")
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
logger.info("✅ Date de signature mise à jour")
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
logger.info("❌ Date de refus mise à jour")
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
logger.info("⏰ Date d'expiration mise à jour")
# Mise à jour des URLs
if universign_data.get("documents") and len(universign_data["documents"]) > 0:
first_doc = universign_data["documents"][0]
if first_doc.get("url"):
transaction.document_url = first_doc["url"]
logger.info("🔗 URL du document mise à jour")
# Synchroniser les signataires
await self._sync_signers(session, transaction, universign_data)
# Mise à jour des métadonnées de sync
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None
# Log de la tentative
await self._log_sync_attempt(
session=session,
transaction=transaction,
@ -192,11 +227,14 @@ class UniversignSyncService:
await session.commit()
# Exécuter les actions post-changement de statut
if status_changed:
logger.info(f"🎬 Exécution actions pour statut: {new_local_status}")
await self._execute_status_actions(session, transaction, new_local_status)
logger.info(
f"Sync OK: {transaction.transaction_id} {previous_local_status}{new_local_status}"
f"✅ Sync terminée: {transaction.transaction_id} | "
f"{previous_local_status}{new_local_status}"
)
return True, None
@ -267,15 +305,31 @@ class UniversignSyncService:
return stats
async def process_webhook(
self, session: AsyncSession, payload: Dict
self, session: AsyncSession, payload: Dict, transaction_id: str = None
) -> Tuple[bool, Optional[str]]:
"""
Traite un webhook Universign
Args:
session: Session SQLAlchemy
payload: Payload du webhook
transaction_id: ID de transaction (optionnel si déjà dans payload)
"""
try:
event_type = payload.get("event")
transaction_id = payload.get("transaction_id") or payload.get("id")
# Si transaction_id n'est pas fourni, essayer de l'extraire
if not transaction_id:
transaction_id = payload.get("id") or payload.get("transaction_id")
if not transaction_id:
return False, "Pas de transaction_id dans le webhook"
return False, "Transaction ID manquant"
event_type = payload.get("event") or payload.get("type", "webhook")
logger.info(
f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}"
)
# Récupérer la transaction locale
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
@ -285,36 +339,45 @@ class UniversignSyncService:
transaction = result.scalar_one_or_none()
if not transaction:
logger.warning(
f"Webhook reçu pour transaction inconnue: {transaction_id}"
)
logger.warning(f"⚠️ Transaction {transaction_id} inconnue localement")
return False, "Transaction inconnue"
# Marquer comme webhook reçu
transaction.webhook_received = True
# Stocker l'ancien statut pour comparaison
old_status = transaction.local_status.value
# Force la synchronisation complète
success, error = await self.sync_transaction(
session, transaction, force=True
)
# Log du changement de statut
if success and transaction.local_status.value != old_status:
logger.info(
f"✅ Webhook traité: {transaction_id} | "
f"{old_status}{transaction.local_status.value}"
)
# Enregistrer le log du webhook
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type=f"webhook:{event_type}",
success=success,
error_message=error,
previous_status=old_status,
new_status=transaction.local_status.value,
changes=json.dumps(payload),
)
await session.commit()
logger.info(
f"Webhook traité: {transaction_id} event={event_type} success={success}"
)
return success, error
except Exception as e:
logger.error(f"Erreur traitement webhook: {e}", exc_info=True)
logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True)
return False, str(e)
async def _sync_signers(