Merge pull request 'debug/universign_3' (#3) from debug/universign_3 into develop

Reviewed-on: fanilo/backend_vps#3
This commit is contained in:
fanilo 2026-01-07 03:13:26 +00:00
commit 0be28f6744
8 changed files with 1268 additions and 240 deletions

9
api.py
View file

@ -129,14 +129,19 @@ async def lifespan(app: FastAPI):
api_url=settings.universign_api_url, api_key=settings.universign_api_key api_url=settings.universign_api_url, api_key=settings.universign_api_key
) )
# Configuration du service avec les dépendances
sync_service.configure(
sage_client=sage_client, email_queue=email_queue, settings=settings
)
scheduler = UniversignSyncScheduler( scheduler = UniversignSyncScheduler(
sync_service=sync_service, sync_service=sync_service,
interval_minutes=5, # Synchronisation toutes les 5 minutes interval_minutes=5,
) )
sync_task = asyncio.create_task(scheduler.start(async_session_factory)) sync_task = asyncio.create_task(scheduler.start(async_session_factory))
logger.info("Synchronisation Universign démarrée (5min)") logger.info("Synchronisation Universign démarrée (5min)")
yield yield

View file

@ -1,12 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de création du premier utilisateur administrateur
Usage:
python create_admin.py
"""
import asyncio import asyncio
import sys import sys
from pathlib import Path from pathlib import Path

View file

@ -29,10 +29,14 @@ class UniversignTransactionStatus(str, Enum):
class UniversignSignerStatus(str, Enum): class UniversignSignerStatus(str, Enum):
WAITING = "waiting" WAITING = "waiting"
OPEN = "open"
VIEWED = "viewed" VIEWED = "viewed"
SIGNED = "signed" SIGNED = "signed"
COMPLETED = "completed"
REFUSED = "refused" REFUSED = "refused"
EXPIRED = "expired" EXPIRED = "expired"
STALLED = "stalled"
UNKNOWN = "unknown"
class LocalDocumentStatus(str, Enum): class LocalDocumentStatus(str, Enum):

View file

@ -285,12 +285,12 @@ class EmailQueue:
y -= 0.8 * cm y -= 0.8 * cm
pdf.setFont("Helvetica-Bold", 11) pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(12 * cm, y, "Total HT:") pdf.drawString(12 * cm, y, "Total HT NET:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ht') or 0:.2f}") pdf.drawString(15 * cm, y, f"{doc.get('total_ht_net') or 0:.2f}")
y -= 0.6 * cm y -= 0.6 * cm
pdf.drawString(12 * cm, y, "TVA (20%):") pdf.drawString(12 * cm, y, "TVA (20%):")
tva = (doc.get("total_ttc") or 0) - (doc.get("total_ht") or 0) tva = (doc.get("total_ttc") or 0) - (doc.get("total_ht_net") or 0)
pdf.drawString(15 * cm, y, f"{tva:.2f}") pdf.drawString(15 * cm, y, f"{tva:.2f}")
y -= 0.6 * cm y -= 0.6 * cm

View file

@ -1,9 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func from sqlalchemy import select, func, or_, and_
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from typing import List, Optional from typing import List, Optional
from datetime import datetime from datetime import datetime, timedelta
from pydantic import BaseModel, EmailStr from pydantic import BaseModel, EmailStr
import logging import logging
from data.data import templates_signature_email from data.data import templates_signature_email
@ -16,10 +16,11 @@ from database import (
LocalDocumentStatus, LocalDocumentStatus,
SageDocumentType, SageDocumentType,
) )
import json
from services.universign_sync import UniversignSyncService from services.universign_sync import UniversignSyncService
from config.config import settings from config.config import settings
from utils.generic_functions import normaliser_type_doc from utils.generic_functions import normaliser_type_doc
from utils.universign_status_mapping import get_status_message from utils.universign_status_mapping import get_status_message, map_universign_to_local
from database.models.email import EmailLog from database.models.email import EmailLog
from database.enum.status import StatutEmail from database.enum.status import StatutEmail
@ -80,6 +81,56 @@ async def create_signature(
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session) request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
): ):
try: try:
# === VÉRIFICATION DOUBLON RENFORCÉE ===
logger.info(
f"🔍 Vérification doublon pour: {request.sage_document_id} "
f"(type: {request.sage_document_type.name})"
)
existing_query = select(UniversignTransaction).where(
UniversignTransaction.sage_document_id == request.sage_document_id,
UniversignTransaction.sage_document_type == request.sage_document_type,
)
existing_result = await session.execute(existing_query)
all_existing = existing_result.scalars().all()
if all_existing:
logger.warning(
f"⚠️ {len(all_existing)} transaction(s) existante(s) trouvée(s)"
)
# Filtrer les transactions non-finales
active_txs = [
tx
for tx in all_existing
if tx.local_status
not in [
LocalDocumentStatus.SIGNED,
LocalDocumentStatus.REJECTED,
LocalDocumentStatus.EXPIRED,
LocalDocumentStatus.ERROR,
]
]
if active_txs:
active_tx = active_txs[0]
logger.error(
f"❌ Transaction active existante: {active_tx.transaction_id} "
f"(statut: {active_tx.local_status.value})"
)
raise HTTPException(
400,
f"Une demande de signature est déjà en cours pour {request.sage_document_id} "
f"(transaction: {active_tx.transaction_id}, statut: {active_tx.local_status.value}). "
f"Utilisez GET /universign/documents/{request.sage_document_id}/signatures pour voir toutes les transactions.",
)
logger.info(
"✅ Toutes les transactions existantes sont finales, création autorisée"
)
# Génération PDF
logger.info(f"📄 Génération PDF: {request.sage_document_id}")
pdf_bytes = email_queue._generate_pdf( pdf_bytes = email_queue._generate_pdf(
request.sage_document_id, normaliser_type_doc(request.sage_document_type) request.sage_document_id, normaliser_type_doc(request.sage_document_type)
) )
@ -87,12 +138,16 @@ async def create_signature(
if not pdf_bytes: if not pdf_bytes:
raise HTTPException(400, "Échec génération PDF") raise HTTPException(400, "Échec génération PDF")
logger.info(f"✅ PDF généré: {len(pdf_bytes)} octets")
# === CRÉATION TRANSACTION UNIVERSIGN === # === CRÉATION TRANSACTION UNIVERSIGN ===
import requests import requests
import uuid import uuid
auth = (settings.universign_api_key, "") auth = (settings.universign_api_key, "")
logger.info("🔄 Création transaction Universign...")
resp = requests.post( resp = requests.post(
f"{settings.universign_api_url}/transactions", f"{settings.universign_api_url}/transactions",
auth=auth, auth=auth,
@ -105,10 +160,14 @@ async def create_signature(
) )
if resp.status_code != 200: if resp.status_code != 200:
logger.error(f"❌ Erreur Universign (création): {resp.text}")
raise HTTPException(500, f"Erreur Universign: {resp.status_code}") raise HTTPException(500, f"Erreur Universign: {resp.status_code}")
universign_tx_id = resp.json().get("id") universign_tx_id = resp.json().get("id")
logger.info(f"✅ Transaction Universign créée: {universign_tx_id}")
# Upload PDF
logger.info("📤 Upload PDF...")
files = { files = {
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf") "file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
} }
@ -117,10 +176,14 @@ async def create_signature(
) )
if resp.status_code not in [200, 201]: if resp.status_code not in [200, 201]:
logger.error(f"❌ Erreur upload: {resp.text}")
raise HTTPException(500, "Erreur upload PDF") raise HTTPException(500, "Erreur upload PDF")
file_id = resp.json().get("id") file_id = resp.json().get("id")
logger.info(f"✅ PDF uploadé: {file_id}")
# Attachement document
logger.info("🔗 Attachement document...")
resp = requests.post( resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents", f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
auth=auth, auth=auth,
@ -133,6 +196,8 @@ async def create_signature(
document_id = resp.json().get("id") document_id = resp.json().get("id")
# Création champ signature
logger.info("✍️ Création champ signature...")
resp = requests.post( resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields", f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
auth=auth, auth=auth,
@ -145,6 +210,8 @@ async def create_signature(
field_id = resp.json().get("id") field_id = resp.json().get("id")
# Liaison signataire
logger.info(f"👤 Liaison signataire: {request.signer_email}")
resp = requests.post( resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures", f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
auth=auth, auth=auth,
@ -155,6 +222,8 @@ async def create_signature(
if resp.status_code not in [200, 201]: if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur liaison signataire") raise HTTPException(500, "Erreur liaison signataire")
# Démarrage transaction
logger.info("🚀 Démarrage transaction...")
resp = requests.post( resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start", f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
auth=auth, auth=auth,
@ -166,6 +235,7 @@ async def create_signature(
final_data = resp.json() final_data = resp.json()
# Extraction URL de signature
signer_url = "" signer_url = ""
if final_data.get("actions"): if final_data.get("actions"):
for action in final_data["actions"]: for action in final_data["actions"]:
@ -176,12 +246,14 @@ async def create_signature(
if not signer_url: if not signer_url:
raise HTTPException(500, "URL de signature non retournée") raise HTTPException(500, "URL de signature non retournée")
logger.info("✅ URL de signature obtenue")
# === ENREGISTREMENT LOCAL === # === ENREGISTREMENT LOCAL ===
local_id = str(uuid.uuid4()) local_id = str(uuid.uuid4())
transaction = UniversignTransaction( transaction = UniversignTransaction(
id=local_id, id=local_id,
transaction_id=universign_tx_id, transaction_id=universign_tx_id, # ⚠️ Utiliser l'ID Universign, ne jamais le changer
sage_document_id=request.sage_document_id, sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type, sage_document_type=request.sage_document_type,
universign_status=UniversignTransactionStatus.STARTED, universign_status=UniversignTransactionStatus.STARTED,
@ -210,6 +282,10 @@ async def create_signature(
session.add(signer) session.add(signer)
await session.commit() await session.commit()
logger.info(
f"💾 Transaction sauvegardée: {local_id} (Universign: {universign_tx_id})"
)
# === ENVOI EMAIL AVEC TEMPLATE === # === ENVOI EMAIL AVEC TEMPLATE ===
template = templates_signature_email["demande_signature"] template = templates_signature_email["demande_signature"]
@ -265,6 +341,21 @@ async def create_signature(
email_queue.enqueue(email_log.id) email_queue.enqueue(email_log.id)
# === MISE À JOUR STATUT SAGE (Confirmé = 1) ===
try:
from sage_client import sage_client
sage_client.changer_statut_document(
document_type_code=request.sage_document_type.value,
numero=request.sage_document_id,
nouveau_statut=1,
)
logger.info(
f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)"
)
except Exception as e:
logger.warning(f"Impossible de mettre à jour le statut Sage: {e}")
# === RÉPONSE === # === RÉPONSE ===
return TransactionResponse( return TransactionResponse(
id=transaction.id, id=transaction.id,
@ -441,27 +532,122 @@ async def sync_all_transactions(
async def webhook_universign( async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session) request: Request, session: AsyncSession = Depends(get_session)
): ):
"""
CORRECTION : Extraction correcte du transaction_id selon la structure réelle d'Universign
"""
try: try:
payload = await request.json() payload = await request.json()
# 📋 LOG COMPLET du payload pour débogage
logger.info( logger.info(
f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}" f"📥 Webhook Universign reçu - Type: {payload.get('type', 'unknown')}"
)
logger.debug(f"Payload complet: {json.dumps(payload, indent=2)}")
# ✅ EXTRACTION CORRECTE DU TRANSACTION_ID
transaction_id = None
# 🔍 Structure 1 : Événements avec payload imbriqué (la plus courante)
# Exemple : transaction.lifecycle.created, transaction.lifecycle.started, etc.
if payload.get("type", "").startswith("transaction.") and "payload" in payload:
# Le transaction_id est dans payload.object.id
nested_object = payload.get("payload", {}).get("object", {})
if nested_object.get("object") == "transaction":
transaction_id = nested_object.get("id")
logger.info(
f"✅ Transaction ID extrait de payload.object.id: {transaction_id}"
) )
success, error = await sync_service.process_webhook(session, payload) # 🔍 Structure 2 : Action événements (action.opened, action.completed)
elif payload.get("type", "").startswith("action."):
# Le transaction_id est directement dans payload.object.transaction_id
transaction_id = (
payload.get("payload", {}).get("object", {}).get("transaction_id")
)
logger.info(
f"✅ Transaction ID extrait de payload.object.transaction_id: {transaction_id}"
)
# 🔍 Structure 3 : Transaction directe (fallback)
elif payload.get("object") == "transaction":
transaction_id = payload.get("id")
logger.info(f"✅ Transaction ID extrait direct: {transaction_id}")
# 🔍 Structure 4 : Ancien format (pour rétro-compatibilité)
elif "transaction" in payload:
transaction_id = payload.get("transaction", {}).get("id")
logger.info(
f"✅ Transaction ID extrait de transaction.id: {transaction_id}"
)
# ❌ Échec d'extraction
if not transaction_id:
logger.error(
f"❌ Transaction ID introuvable dans webhook\n"
f"Type d'événement: {payload.get('type', 'unknown')}\n"
f"Clés racine: {list(payload.keys())}\n"
f"Payload simplifié: {json.dumps({k: v if k != 'payload' else '...' for k, v in payload.items()})}"
)
return {
"status": "error",
"message": "Transaction ID manquant dans webhook",
"event_type": payload.get("type"),
"event_id": payload.get("id"),
}, 400
logger.info(f"🎯 Transaction ID identifié: {transaction_id}")
# Vérifier si la transaction existe localement
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
logger.warning(
f"⚠️ Transaction {transaction_id} inconnue en local\n"
f"Type d'événement: {payload.get('type')}\n"
f"Elle sera synchronisée au prochain polling"
)
return {
"status": "accepted",
"message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling",
"transaction_id": transaction_id,
"event_type": payload.get("type"),
}
# Traiter le webhook
success, error = await sync_service.process_webhook(
session, payload, transaction_id
)
if not success: if not success:
logger.error(f"Erreur traitement webhook: {error}") logger.error(f"❌ Erreur traitement webhook: {error}")
return {"status": "error", "message": error}, 500 return {
"status": "error",
"message": error,
"transaction_id": transaction_id,
}, 500
# ✅ Succès
logger.info(
f"✅ Webhook traité avec succès\n"
f"Transaction: {transaction_id}\n"
f"Nouveau statut: {tx.local_status.value if tx else 'unknown'}\n"
f"Type d'événement: {payload.get('type')}"
)
return { return {
"status": "processed", "status": "processed",
"event": payload.get("event"), "transaction_id": transaction_id,
"transaction_id": payload.get("transaction_id"), "local_status": tx.local_status.value if tx else None,
"event_type": payload.get("type"),
"event_id": payload.get("id"),
} }
except Exception as e: except Exception as e:
logger.error(f"Erreur webhook: {e}", exc_info=True) logger.error(f"💥 Erreur critique webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500 return {"status": "error", "message": str(e)}, 500
@ -558,3 +744,632 @@ async def get_transaction_logs(
for log in logs for log in logs
], ],
} }
# Ajouter ces routes dans universign.py
@router.get("/documents/{sage_document_id}/signatures")
async def get_signatures_for_document(
sage_document_id: str,
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions de signature pour un document Sage"""
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(UniversignTransaction.created_at.desc())
)
result = await session.execute(query)
transactions = result.scalars().all()
return [
{
"id": tx.id,
"transaction_id": tx.transaction_id,
"local_status": tx.local_status.value,
"universign_status": tx.universign_status.value
if tx.universign_status
else None,
"created_at": tx.created_at.isoformat(),
"signed_at": tx.signed_at.isoformat() if tx.signed_at else None,
"signer_url": tx.signer_url,
"signers_count": len(tx.signers),
}
for tx in transactions
]
@router.delete("/documents/{sage_document_id}/duplicates")
async def cleanup_duplicate_signatures(
sage_document_id: str,
keep_latest: bool = Query(
True, description="Garder la plus récente (True) ou la plus ancienne (False)"
),
session: AsyncSession = Depends(get_session),
):
"""
Supprime les doublons de signatures pour un document.
Garde une seule transaction (la plus récente ou ancienne selon le paramètre).
"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(
UniversignTransaction.created_at.desc()
if keep_latest
else UniversignTransaction.created_at.asc()
)
)
result = await session.execute(query)
transactions = result.scalars().all()
if len(transactions) <= 1:
return {
"success": True,
"message": "Aucun doublon trouvé",
"kept": transactions[0].transaction_id if transactions else None,
"deleted_count": 0,
}
# Garder la première (selon l'ordre), supprimer les autres
to_keep = transactions[0]
to_delete = transactions[1:]
deleted_ids = []
for tx in to_delete:
deleted_ids.append(tx.transaction_id)
await session.delete(tx)
await session.commit()
logger.info(
f"Nettoyage doublons {sage_document_id}: gardé {to_keep.transaction_id}, supprimé {deleted_ids}"
)
return {
"success": True,
"document_id": sage_document_id,
"kept": {
"id": to_keep.id,
"transaction_id": to_keep.transaction_id,
"status": to_keep.local_status.value,
"created_at": to_keep.created_at.isoformat(),
},
"deleted_count": len(deleted_ids),
"deleted_transaction_ids": deleted_ids,
}
@router.delete("/transactions/{transaction_id}")
async def delete_transaction(
transaction_id: str,
session: AsyncSession = Depends(get_session),
):
"""Supprime une transaction spécifique par son ID Universign"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
await session.delete(tx)
await session.commit()
logger.info(f"Transaction {transaction_id} supprimée")
return {
"success": True,
"deleted_transaction_id": transaction_id,
"document_id": tx.sage_document_id,
}
@router.post("/cleanup/all-duplicates")
async def cleanup_all_duplicates(
session: AsyncSession = Depends(get_session),
):
"""
Nettoie tous les doublons dans la base.
Pour chaque document avec plusieurs transactions, garde la plus récente non-erreur ou la plus récente.
"""
from sqlalchemy import func
# Trouver les documents avec plusieurs transactions
subquery = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
).subquery()
duplicates_query = select(subquery.c.sage_document_id)
duplicates_result = await session.execute(duplicates_query)
duplicate_docs = [row[0] for row in duplicates_result.fetchall()]
total_deleted = 0
cleanup_details = []
for doc_id in duplicate_docs:
# Récupérer toutes les transactions pour ce document
tx_query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == doc_id)
.order_by(UniversignTransaction.created_at.desc())
)
tx_result = await session.execute(tx_query)
transactions = tx_result.scalars().all()
# Priorité: SIGNE > EN_COURS > EN_ATTENTE > autres
priority = {"SIGNE": 0, "EN_COURS": 1, "EN_ATTENTE": 2}
def sort_key(tx):
status_priority = priority.get(tx.local_status.value, 99)
return (status_priority, -tx.created_at.timestamp())
sorted_txs = sorted(transactions, key=sort_key)
to_keep = sorted_txs[0]
to_delete = sorted_txs[1:]
for tx in to_delete:
await session.delete(tx)
total_deleted += 1
cleanup_details.append(
{
"document_id": doc_id,
"kept": to_keep.transaction_id,
"kept_status": to_keep.local_status.value,
"deleted_count": len(to_delete),
}
)
await session.commit()
logger.info(
f"Nettoyage global: {total_deleted} doublons supprimés sur {len(duplicate_docs)} documents"
)
return {
"success": True,
"documents_processed": len(duplicate_docs),
"total_deleted": total_deleted,
"details": cleanup_details,
}
@router.get("/admin/diagnostic", tags=["Admin"])
async def diagnostic_complet(session: AsyncSession = Depends(get_session)):
"""
Diagnostic complet de l'état des transactions Universign
"""
try:
# Statistiques générales
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# Par statut local
statuts_query = select(
UniversignTransaction.local_status, func.count(UniversignTransaction.id)
).group_by(UniversignTransaction.local_status)
statuts_result = await session.execute(statuts_query)
statuts = {status.value: count for status, count in statuts_result.all()}
# Transactions sans sync récente
date_limite = datetime.now() - timedelta(hours=1)
sans_sync_query = select(func.count(UniversignTransaction.id)).where(
and_(
UniversignTransaction.needs_sync == True,
or_(
UniversignTransaction.last_synced_at < date_limite,
UniversignTransaction.last_synced_at.is_(None),
),
)
)
sans_sync = (await session.execute(sans_sync_query)).scalar()
# Doublons potentiels
doublons_query = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
)
doublons_result = await session.execute(doublons_query)
doublons = doublons_result.fetchall()
# Transactions avec erreurs de sync
erreurs_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.sync_error.isnot(None)
)
erreurs = (await session.execute(erreurs_query)).scalar()
# Transactions sans webhook reçu
sans_webhook_query = select(func.count(UniversignTransaction.id)).where(
and_(
UniversignTransaction.webhook_received == False,
UniversignTransaction.local_status != LocalDocumentStatus.PENDING,
)
)
sans_webhook = (await session.execute(sans_webhook_query)).scalar()
diagnostic = {
"timestamp": datetime.now().isoformat(),
"total_transactions": total,
"repartition_statuts": statuts,
"problemes_detectes": {
"sans_sync_recente": sans_sync,
"doublons_possibles": len(doublons),
"erreurs_sync": erreurs,
"sans_webhook": sans_webhook,
},
"documents_avec_doublons": [
{"document_id": doc_id, "nombre_transactions": count}
for doc_id, count in doublons
],
"recommandations": [],
}
# Recommandations
if sans_sync > 0:
diagnostic["recommandations"].append(
f"🔄 {sans_sync} transaction(s) à synchroniser. "
f"Utilisez POST /universign/sync/all"
)
if len(doublons) > 0:
diagnostic["recommandations"].append(
f"⚠️ {len(doublons)} document(s) avec doublons. "
f"Utilisez POST /universign/cleanup/all-duplicates"
)
if erreurs > 0:
diagnostic["recommandations"].append(
f"{erreurs} transaction(s) en erreur. "
f"Vérifiez les logs avec GET /universign/transactions?status=ERREUR"
)
return diagnostic
except Exception as e:
logger.error(f"Erreur diagnostic: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/force-sync-all", tags=["Admin"])
async def forcer_sync_toutes_transactions(
max_transactions: int = Query(200, le=500),
session: AsyncSession = Depends(get_session),
):
"""
Force la synchronisation de TOUTES les transactions (même finales)
À utiliser pour réparer les incohérences
"""
try:
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.order_by(UniversignTransaction.created_at.desc())
.limit(max_transactions)
)
result = await session.execute(query)
transactions = result.scalars().all()
stats = {
"total_verifie": len(transactions),
"success": 0,
"failed": 0,
"status_changes": 0,
"details": [],
}
for transaction in transactions:
try:
previous_status = transaction.local_status.value
logger.info(
f"🔄 Force sync: {transaction.transaction_id} (statut: {previous_status})"
)
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
new_status = transaction.local_status.value
if success:
stats["success"] += 1
if new_status != previous_status:
stats["status_changes"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"changement": f"{previous_status}{new_status}",
}
)
else:
stats["failed"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"erreur": error,
}
)
except Exception as e:
logger.error(f"Erreur sync {transaction.transaction_id}: {e}")
stats["failed"] += 1
return {
"success": True,
"stats": stats,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Erreur force sync: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/repair-transaction/{transaction_id}", tags=["Admin"])
async def reparer_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Répare une transaction spécifique en la re-synchronisant depuis Universign
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
old_status = transaction.local_status.value
old_universign_status = (
transaction.universign_status.value
if transaction.universign_status
else None
)
# Force sync
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
if not success:
return {
"success": False,
"transaction_id": transaction_id,
"erreur": error,
"ancien_statut": old_status,
}
return {
"success": True,
"transaction_id": transaction_id,
"reparation": {
"ancien_statut_local": old_status,
"nouveau_statut_local": transaction.local_status.value,
"ancien_statut_universign": old_universign_status,
"nouveau_statut_universign": transaction.universign_status.value,
"statut_change": old_status != transaction.local_status.value,
},
"derniere_sync": transaction.last_synced_at.isoformat(),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur réparation: {e}")
raise HTTPException(500, str(e))
@router.get("/admin/transactions-inconsistantes", tags=["Admin"])
async def trouver_transactions_inconsistantes(
session: AsyncSession = Depends(get_session),
):
"""
Trouve les transactions dont le statut local ne correspond pas au statut Universign
"""
try:
# Toutes les transactions non-finales
query = select(UniversignTransaction).where(
UniversignTransaction.local_status.in_(
[LocalDocumentStatus.PENDING, LocalDocumentStatus.IN_PROGRESS]
)
)
result = await session.execute(query)
transactions = result.scalars().all()
inconsistantes = []
for tx in transactions:
try:
# Récupérer le statut depuis Universign
universign_data = sync_service.fetch_transaction_status(
tx.transaction_id
)
if not universign_data:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Impossible de récupérer depuis Universign",
"statut_local": tx.local_status.value,
"statut_universign": None,
}
)
continue
universign_status = universign_data["transaction"].get("state")
expected_local_status = map_universign_to_local(universign_status)
if expected_local_status != tx.local_status.value:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Statut incohérent",
"statut_local": tx.local_status.value,
"statut_universign": universign_status,
"statut_attendu": expected_local_status,
"derniere_sync": tx.last_synced_at.isoformat()
if tx.last_synced_at
else None,
}
)
except Exception as e:
logger.error(f"Erreur vérification {tx.transaction_id}: {e}")
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": f"Erreur: {str(e)}",
"statut_local": tx.local_status.value,
}
)
return {
"total_verifie": len(transactions),
"inconsistantes": len(inconsistantes),
"details": inconsistantes,
"recommandation": (
"Utilisez POST /universign/admin/force-sync-all pour corriger"
if inconsistantes
else "Aucune incohérence détectée"
),
}
except Exception as e:
logger.error(f"Erreur recherche incohérences: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/nettoyer-transactions-erreur", tags=["Admin"])
async def nettoyer_transactions_erreur(
age_jours: int = Query(
7, description="Supprimer les transactions en erreur de plus de X jours"
),
session: AsyncSession = Depends(get_session),
):
"""
Nettoie les transactions en erreur anciennes
"""
try:
date_limite = datetime.now() - timedelta(days=age_jours)
query = select(UniversignTransaction).where(
and_(
UniversignTransaction.local_status == LocalDocumentStatus.ERROR,
UniversignTransaction.created_at < date_limite,
)
)
result = await session.execute(query)
transactions = result.scalars().all()
supprimees = []
for tx in transactions:
supprimees.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"date_creation": tx.created_at.isoformat(),
"erreur": tx.sync_error,
}
)
await session.delete(tx)
await session.commit()
return {
"success": True,
"transactions_supprimees": len(supprimees),
"age_limite_jours": age_jours,
"details": supprimees,
}
except Exception as e:
logger.error(f"Erreur nettoyage: {e}")
raise HTTPException(500, str(e))
@router.get("/debug/webhook-payload/{transaction_id}", tags=["Debug"])
async def voir_dernier_webhook(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Affiche le dernier payload webhook reçu pour une transaction
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Récupérer le dernier log de type webhook
logs_query = (
select(UniversignSyncLog)
.where(
and_(
UniversignSyncLog.transaction_id == tx.id,
UniversignSyncLog.sync_type.like("webhook:%"),
)
)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(1)
)
logs_result = await session.execute(logs_query)
last_webhook_log = logs_result.scalar_one_or_none()
if not last_webhook_log:
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_payload": None,
"message": "Aucun webhook reçu pour cette transaction",
}
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_webhook": {
"timestamp": last_webhook_log.sync_timestamp.isoformat(),
"type": last_webhook_log.sync_type,
"success": last_webhook_log.success,
"payload": json.loads(last_webhook_log.changes_detected)
if last_webhook_log.changes_detected
else None,
},
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur debug webhook: {e}")
raise HTTPException(500, str(e))

View file

@ -1,10 +1,12 @@
import requests import requests
import json import json
import logging import logging
import uuid
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_ from sqlalchemy import select, and_, or_
from sqlalchemy.orm import selectinload
from database import ( from database import (
UniversignTransaction, UniversignTransaction,
@ -13,7 +15,10 @@ from database import (
UniversignTransactionStatus, UniversignTransactionStatus,
LocalDocumentStatus, LocalDocumentStatus,
UniversignSignerStatus, UniversignSignerStatus,
EmailLog,
StatutEmail,
) )
from data.data import templates_signature_email
from utils.universign_status_mapping import ( from utils.universign_status_mapping import (
map_universign_to_local, map_universign_to_local,
is_transition_allowed, is_transition_allowed,
@ -31,6 +36,14 @@ class UniversignSyncService:
self.api_key = api_key self.api_key = api_key
self.timeout = timeout self.timeout = timeout
self.auth = (api_key, "") self.auth = (api_key, "")
self.sage_client = None
self.email_queue = None
self.settings = None
def configure(self, sage_client, email_queue, settings):
self.sage_client = sage_client
self.email_queue = email_queue
self.settings = settings
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
start_time = datetime.now() start_time = datetime.now()
@ -48,9 +61,7 @@ class UniversignSyncService:
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
logger.info( logger.info(
f"✓ Fetch OK: {transaction_id} " f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)"
f"status={data.get('state')} "
f"({response_time_ms}ms)"
) )
return { return {
"transaction": data, "transaction": data,
@ -67,8 +78,7 @@ class UniversignSyncService:
else: else:
logger.error( logger.error(
f"Erreur HTTP {response.status_code} " f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}"
f"pour {transaction_id}: {response.text}"
) )
return None return None
@ -80,151 +90,12 @@ class UniversignSyncService:
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
return None return None
async def sync_transaction(
self,
session: AsyncSession,
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"Skip {transaction.transaction_id}: "
f"statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# === FETCH UNIVERSIGN ===
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
await self._log_sync_attempt(session, transaction, "polling", False, error)
return False, error
# === EXTRACTION DONNÉES ===
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
# === MAPPING STATUT ===
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
# === VALIDATION TRANSITION ===
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"Transition refusée: {previous_local_status}{new_local_status}"
)
# En cas de conflit, résoudre par priorité
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
# === DÉTECTION CHANGEMENT ===
status_changed = previous_local_status != new_local_status
if not status_changed and not force:
logger.debug(f"Pas de changement pour {transaction.transaction_id}")
transaction.last_synced_at = datetime.now()
transaction.needs_sync = False
await session.commit()
return True, None
# === MISE À JOUR TRANSACTION ===
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# === DATES SPÉCIFIQUES ===
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
# === URLS ===
if "signers" in universign_data and len(universign_data["signers"]) > 0:
first_signer = universign_data["signers"][0]
if "url" in first_signer:
transaction.signer_url = first_signer["url"]
if "documents" in universign_data and len(universign_data["documents"]) > 0:
first_doc = universign_data["documents"][0]
if "url" in first_doc:
transaction.document_url = first_doc["url"]
# === SIGNATAIRES ===
await self._sync_signers(session, transaction, universign_data)
# === FLAGS ===
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None
# === LOG ===
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type="polling",
success=True,
error_message=None,
previous_status=previous_local_status,
new_status=new_local_status,
changes=json.dumps(
{
"status_changed": status_changed,
"universign_raw": universign_status_raw,
"response_time_ms": result.get("response_time_ms"),
}
),
)
await session.commit()
# === ACTIONS MÉTIER ===
if status_changed:
await self._execute_status_actions(session, transaction, new_local_status)
logger.info(
f"✓ Sync OK: {transaction.transaction_id} "
f"{previous_local_status}{new_local_status}"
)
return True, None
async def sync_all_pending( async def sync_all_pending(
self, session: AsyncSession, max_transactions: int = 50 self, session: AsyncSession, max_transactions: int = 50
) -> Dict[str, int]: ) -> Dict[str, int]:
"""
Synchronise toutes les transactions en attente
"""
from sqlalchemy.orm import selectinload # Si pas déjà importé en haut
query = ( query = (
select(UniversignTransaction) select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE .options(selectinload(UniversignTransaction.signers))
.where( .where(
and_( and_(
UniversignTransaction.needs_sync, UniversignTransaction.needs_sync,
@ -267,7 +138,6 @@ class UniversignSyncService:
if success: if success:
stats["success"] += 1 stats["success"] += 1
if transaction.local_status.value != previous_status: if transaction.local_status.value != previous_status:
stats["status_changes"] += 1 stats["status_changes"] += 1
else: else:
@ -280,89 +150,351 @@ class UniversignSyncService:
stats["failed"] += 1 stats["failed"] += 1
logger.info( logger.info(
f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés"
f"{stats['status_changes']} changements détectés"
) )
return stats return stats
# CORRECTION 1 : process_webhook dans universign_sync.py
async def process_webhook( async def process_webhook(
self, session: AsyncSession, payload: Dict self, session: AsyncSession, payload: Dict, transaction_id: str = None
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
"""
Traite un webhook Universign - CORRECTION : meilleure gestion des payloads
"""
try: try:
event_type = payload.get("event") # Si transaction_id n'est pas fourni, essayer de l'extraire
transaction_id = payload.get("transaction_id") or payload.get("id") if not transaction_id:
# Même logique que dans universign.py
if (
payload.get("type", "").startswith("transaction.")
and "payload" in payload
):
nested_object = payload.get("payload", {}).get("object", {})
if nested_object.get("object") == "transaction":
transaction_id = nested_object.get("id")
elif payload.get("type", "").startswith("action."):
transaction_id = (
payload.get("payload", {})
.get("object", {})
.get("transaction_id")
)
elif payload.get("object") == "transaction":
transaction_id = payload.get("id")
if not transaction_id: if not transaction_id:
return False, "Pas de transaction_id dans le webhook" return False, "Transaction ID manquant"
query = select(UniversignTransaction).where( event_type = payload.get("type", "webhook")
UniversignTransaction.transaction_id == transaction_id
logger.info(
f"📨 Traitement webhook: transaction={transaction_id}, event={event_type}"
)
# Récupérer la transaction locale
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.where(UniversignTransaction.transaction_id == transaction_id)
) )
result = await session.execute(query) result = await session.execute(query)
transaction = result.scalar_one_or_none() transaction = result.scalar_one_or_none()
if not transaction: if not transaction:
logger.warning( logger.warning(f"⚠️ Transaction {transaction_id} inconnue localement")
f"Webhook reçu pour transaction inconnue: {transaction_id}"
)
return False, "Transaction inconnue" return False, "Transaction inconnue"
# Marquer comme webhook reçu
transaction.webhook_received = True transaction.webhook_received = True
# Stocker l'ancien statut pour comparaison
old_status = transaction.local_status.value
# Force la synchronisation complète
success, error = await self.sync_transaction( success, error = await self.sync_transaction(
session, transaction, force=True session, transaction, force=True
) )
# Log du changement de statut
if success and transaction.local_status.value != old_status:
logger.info(
f"✅ Webhook traité: {transaction_id} | "
f"{old_status}{transaction.local_status.value}"
)
# Enregistrer le log du webhook
await self._log_sync_attempt( await self._log_sync_attempt(
session=session, session=session,
transaction=transaction, transaction=transaction,
sync_type=f"webhook:{event_type}", sync_type=f"webhook:{event_type}",
success=success, success=success,
error_message=error, error_message=error,
changes=json.dumps(payload), previous_status=old_status,
new_status=transaction.local_status.value,
changes=json.dumps(
payload, default=str
), # ✅ Ajout default=str pour éviter les erreurs JSON
) )
await session.commit() await session.commit()
logger.info(
f"✓ Webhook traité: {transaction_id} "
f"event={event_type} success={success}"
)
return success, error return success, error
except Exception as e: except Exception as e:
logger.error(f"Erreur traitement webhook: {e}", exc_info=True) logger.error(f"💥 Erreur traitement webhook: {e}", exc_info=True)
return False, str(e) return False, str(e)
# CORRECTION 2 : _sync_signers - Ne pas écraser les signers existants
async def _sync_signers( async def _sync_signers(
self, self,
session: AsyncSession, session: AsyncSession,
transaction: UniversignTransaction, transaction: UniversignTransaction,
universign_data: Dict, universign_data: Dict,
): ):
"""Synchronise les signataires""" signers_data = universign_data.get("participants", [])
if not signers_data:
signers_data = universign_data.get("signers", []) signers_data = universign_data.get("signers", [])
# Supprimer les anciens signataires if not signers_data:
for signer in transaction.signers: logger.debug("Aucun signataire dans les données Universign")
await session.delete(signer) return
existing_signers = {s.email: s for s in transaction.signers}
# Créer les nouveaux
for idx, signer_data in enumerate(signers_data): for idx, signer_data in enumerate(signers_data):
email = signer_data.get("email", "")
if not email:
logger.warning(f"Signataire sans email à l'index {idx}, ignoré")
continue
# ✅ PROTECTION : gérer les statuts inconnus
raw_status = signer_data.get("status") or signer_data.get(
"state", "waiting"
)
try:
status = UniversignSignerStatus(raw_status)
except ValueError:
logger.warning(
f"Statut inconnu pour signer {email}: {raw_status}, utilisation de 'unknown'"
)
status = UniversignSignerStatus.UNKNOWN
if email in existing_signers:
signer = existing_signers[email]
signer.status = status
viewed_at = self._parse_date(signer_data.get("viewed_at"))
if viewed_at and not signer.viewed_at:
signer.viewed_at = viewed_at
signed_at = self._parse_date(signer_data.get("signed_at"))
if signed_at and not signer.signed_at:
signer.signed_at = signed_at
refused_at = self._parse_date(signer_data.get("refused_at"))
if refused_at and not signer.refused_at:
signer.refused_at = refused_at
if signer_data.get("name") and not signer.name:
signer.name = signer_data.get("name")
else:
# ✅ Nouveau signer avec gestion d'erreur intégrée
try:
signer = UniversignSigner( signer = UniversignSigner(
id=f"{transaction.id}_signer_{idx}", id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}",
transaction_id=transaction.id, transaction_id=transaction.id,
email=signer_data.get("email", ""), email=email,
name=signer_data.get("name"), name=signer_data.get("name"),
status=UniversignSignerStatus(signer_data.get("status", "waiting")), status=status,
order_index=idx, order_index=idx,
viewed_at=self._parse_date(signer_data.get("viewed_at")), viewed_at=self._parse_date(signer_data.get("viewed_at")),
signed_at=self._parse_date(signer_data.get("signed_at")), signed_at=self._parse_date(signer_data.get("signed_at")),
refused_at=self._parse_date(signer_data.get("refused_at")), refused_at=self._parse_date(signer_data.get("refused_at")),
) )
session.add(signer) session.add(signer)
logger.info(
f" Nouveau signataire ajouté: {email} (statut: {status.value})"
)
except Exception as e:
logger.error(f"Erreur création signer {email}: {e}")
# CORRECTION 3 : Amélioration du logging dans sync_transaction
async def sync_transaction(
self,
session: AsyncSession,
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
"""
CORRECTION : Meilleur logging et gestion d'erreurs
"""
# Si statut final et pas de force, skip
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"⏭️ Skip {transaction.transaction_id}: statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# Récupération du statut distant
logger.info(f"🔄 Synchronisation: {transaction.transaction_id}")
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
logger.error(f"{error}: {transaction.transaction_id}")
# ✅ CORRECTION : Incrémenter les tentatives MÊME en cas d'échec
transaction.sync_attempts += 1
transaction.sync_error = error
await self._log_sync_attempt(session, transaction, "polling", False, error)
await session.commit()
return False, error
try:
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
logger.info(f"📊 Statut Universign brut: {universign_status_raw}")
# Convertir le statut
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
logger.info(
f"🔄 Mapping: {universign_status_raw} (Universign) → "
f"{new_local_status} (Local) | Actuel: {previous_local_status}"
)
# Vérifier la transition
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"⚠️ Transition refusée: {previous_local_status}{new_local_status}"
)
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
logger.info(
f"✅ Résolution conflit: statut résolu = {new_local_status}"
)
status_changed = previous_local_status != new_local_status
if status_changed:
logger.info(
f"🔔 CHANGEMENT DÉTECTÉ: {previous_local_status}{new_local_status}"
)
# Mise à jour du statut Universign brut
try:
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
except ValueError:
logger.warning(f"⚠️ Statut Universign inconnu: {universign_status_raw}")
# Fallback intelligent
if new_local_status == "SIGNE":
transaction.universign_status = (
UniversignTransactionStatus.COMPLETED
)
elif new_local_status == "REFUSE":
transaction.universign_status = UniversignTransactionStatus.REFUSED
elif new_local_status == "EXPIRE":
transaction.universign_status = UniversignTransactionStatus.EXPIRED
else:
transaction.universign_status = UniversignTransactionStatus.STARTED
# ✅ Mise à jour du statut local
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# Mise à jour des dates
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
logger.info("📅 Date d'envoi mise à jour")
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
logger.info("✅ Date de signature mise à jour")
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
logger.info("❌ Date de refus mise à jour")
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
logger.info("⏰ Date d'expiration mise à jour")
# Mise à jour des URLs
if (
universign_data.get("documents")
and len(universign_data["documents"]) > 0
):
first_doc = universign_data["documents"][0]
if first_doc.get("url"):
transaction.document_url = first_doc["url"]
# Synchroniser les signataires
await self._sync_signers(session, transaction, universign_data)
# Mise à jour des métadonnées de sync
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None # ✅ Effacer l'erreur précédente
# Log de la tentative
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type="polling",
success=True,
error_message=None,
previous_status=previous_local_status,
new_status=new_local_status,
changes=json.dumps(
{
"status_changed": status_changed,
"universign_raw": universign_status_raw,
"response_time_ms": result.get("response_time_ms"),
},
default=str, # ✅ Éviter les erreurs de sérialisation
),
)
await session.commit()
# Exécuter les actions post-changement
if status_changed:
logger.info(f"🎬 Exécution actions pour statut: {new_local_status}")
await self._execute_status_actions(
session, transaction, new_local_status
)
logger.info(
f"✅ Sync terminée: {transaction.transaction_id} | "
f"{previous_local_status}{new_local_status}"
)
return True, None
except Exception as e:
error_msg = f"Erreur lors de la synchronisation: {str(e)}"
logger.error(f"{error_msg}", exc_info=True)
transaction.sync_error = error_msg[:1000] # Tronquer si trop long
transaction.sync_attempts += 1
await self._log_sync_attempt(
session, transaction, "polling", False, error_msg
)
await session.commit()
return False, error_msg
async def _log_sync_attempt( async def _log_sync_attempt(
self, self,
@ -375,7 +507,6 @@ class UniversignSyncService:
new_status: Optional[str] = None, new_status: Optional[str] = None,
changes: Optional[str] = None, changes: Optional[str] = None,
): ):
"""Enregistre une tentative de sync dans les logs"""
log = UniversignSyncLog( log = UniversignSyncLog(
transaction_id=transaction.id, transaction_id=transaction.id,
sync_type=sync_type, sync_type=sync_type,
@ -391,48 +522,119 @@ class UniversignSyncService:
async def _execute_status_actions( async def _execute_status_actions(
self, session: AsyncSession, transaction: UniversignTransaction, new_status: str self, session: AsyncSession, transaction: UniversignTransaction, new_status: str
): ):
"""Exécute les actions métier associées au statut"""
actions = get_status_actions(new_status) actions = get_status_actions(new_status)
if not actions: if not actions:
return return
# Mise à jour Sage if actions.get("update_sage_status") and self.sage_client:
if actions.get("update_sage_status"):
await self._update_sage_status(transaction, new_status) await self._update_sage_status(transaction, new_status)
elif actions.get("update_sage_status"):
logger.debug(
f"sage_client non configuré, skip MAJ Sage pour {transaction.sage_document_id}"
)
# Déclencher workflow if actions.get("send_notification") and self.email_queue and self.settings:
if actions.get("trigger_workflow"): await self._send_notification(session, transaction, new_status)
await self._trigger_workflow(transaction) elif actions.get("send_notification"):
logger.debug(
f"email_queue/settings non configuré, skip notification pour {transaction.transaction_id}"
)
# Notifications async def _update_sage_status(
if actions.get("send_notification"): self, transaction: UniversignTransaction, status: str
await self._send_notification(transaction, new_status) ):
if not self.sage_client:
logger.warning("sage_client non configuré pour mise à jour Sage")
return
# Archive try:
if actions.get("archive_document"): type_doc = transaction.sage_document_type.value
await self._archive_signed_document(transaction) doc_id = transaction.sage_document_id
async def _update_sage_status(self, transaction, status): if status == "SIGNE":
"""Met à jour le statut dans Sage""" self.sage_client.changer_statut_document(
# TODO: Appeler sage_client.mettre_a_jour_champ_libre() document_type_code=type_doc, numero=doc_id, nouveau_statut=2
logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") )
logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)")
async def _trigger_workflow(self, transaction): elif status == "EN_COURS":
"""Déclenche un workflow (ex: devis→commande)""" self.sage_client.changer_statut_document(
logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") document_type_code=type_doc, numero=doc_id, nouveau_statut=1
)
logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)")
async def _send_notification(self, transaction, status): except Exception as e:
"""Envoie une notification email""" logger.error(
logger.info(f"TODO: Notif pour {transaction.sage_document_id}") f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}"
)
async def _archive_signed_document(self, transaction): async def _send_notification(
"""Archive le document signé""" self, session: AsyncSession, transaction: UniversignTransaction, status: str
logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") ):
if not self.email_queue or not self.settings:
logger.warning("email_queue ou settings non configuré")
return
try:
if status == "SIGNE":
template = templates_signature_email["signature_confirmee"]
type_labels = {
0: "Devis",
10: "Commande",
30: "Bon de Livraison",
60: "Facture",
50: "Avoir",
}
variables = {
"NOM_SIGNATAIRE": transaction.requester_name or "Client",
"TYPE_DOC": type_labels.get(
transaction.sage_document_type.value, "Document"
),
"NUMERO": transaction.sage_document_id,
"DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M")
if transaction.signed_at
else datetime.now().strftime("%d/%m/%Y à %H:%M"),
"TRANSACTION_ID": transaction.transaction_id,
"CONTACT_EMAIL": self.settings.smtp_from,
}
sujet = template["sujet"]
corps = template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur))
corps = corps.replace(f"{{{{{var}}}}}", str(valeur))
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=transaction.requester_email,
sujet=sujet,
corps_html=corps,
document_ids=transaction.sage_document_id,
type_document=transaction.sage_document_type.value,
statut=StatutEmail.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
self.email_queue.enqueue(email_log.id)
logger.info(
f"Email confirmation signature envoyé à {transaction.requester_email}"
)
except Exception as e:
logger.error(
f"Erreur envoi notification pour {transaction.transaction_id}: {e}"
)
@staticmethod @staticmethod
def _parse_date(date_str: Optional[str]) -> Optional[datetime]: def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse une date ISO 8601"""
if not date_str: if not date_str:
return None return None
try: try:
@ -448,7 +650,6 @@ class UniversignSyncScheduler:
self.is_running = False self.is_running = False
async def start(self, session_factory): async def start(self, session_factory):
"""Démarre le polling automatique"""
import asyncio import asyncio
self.is_running = True self.is_running = True
@ -470,10 +671,8 @@ class UniversignSyncScheduler:
except Exception as e: except Exception as e:
logger.error(f"Erreur polling: {e}", exc_info=True) logger.error(f"Erreur polling: {e}", exc_info=True)
# Attendre avant le prochain cycle
await asyncio.sleep(self.interval_minutes * 60) await asyncio.sleep(self.interval_minutes * 60)
def stop(self): def stop(self):
"""Arrête le polling"""
self.is_running = False self.is_running = False
logger.info("Arrêt polling Universign") logger.info("Arrêt polling Universign")

View file

@ -297,6 +297,7 @@ UNIVERSIGN_TO_LOCAL: Dict[str, str] = {
"started": "EN_COURS", "started": "EN_COURS",
# États finaux (succès) # États finaux (succès)
"completed": "SIGNE", "completed": "SIGNE",
"closed": "SIGNE",
# États finaux (échec) # États finaux (échec)
"refused": "REFUSE", "refused": "REFUSE",
"expired": "EXPIRE", "expired": "EXPIRE",

View file

@ -1,4 +1,8 @@
from typing import Dict, Any from typing import Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
UNIVERSIGN_TO_LOCAL: Dict[str, str] = { UNIVERSIGN_TO_LOCAL: Dict[str, str] = {
"draft": "EN_ATTENTE", "draft": "EN_ATTENTE",
@ -111,8 +115,17 @@ STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
def map_universign_to_local(universign_status: str) -> str: def map_universign_to_local(universign_status: str) -> str:
"""Convertit un statut Universign en statut local.""" """Convertit un statut Universign en statut local avec fallback robuste."""
return UNIVERSIGN_TO_LOCAL.get(universign_status.lower(), "ERREUR") normalized = universign_status.lower().strip()
mapped = UNIVERSIGN_TO_LOCAL.get(normalized)
if not mapped:
logger.warning(
f"Statut Universign inconnu: '{universign_status}', mapping vers ERREUR"
)
return "ERREUR"
return mapped
def get_sage_status_code(local_status: str) -> int: def get_sage_status_code(local_status: str) -> int: