Sage100-vps/routes/universign.py

1720 lines
58 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import false, select, func, or_, and_, true
from sqlalchemy.orm import selectinload
from typing import List, Optional
from datetime import datetime, timedelta
from pydantic import BaseModel, EmailStr
import logging
from data.data import templates_signature_email
from email_queue import email_queue
from database import UniversignSignerStatus, UniversignTransactionStatus, get_session
from database import (
UniversignTransaction,
UniversignSigner,
UniversignSyncLog,
LocalDocumentStatus,
SageDocumentType,
)
import os
from pathlib import Path
import json
from services.universign_document import UniversignDocumentService
from services.universign_sync import UniversignSyncService
from config.config import settings
from utils.generic_functions import normaliser_type_doc
from utils.universign_status_mapping import get_status_message, map_universign_to_local
from database.models.email import EmailLog
from database.enum.status import StatutEmail
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/universign", tags=["Universign"])
sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key
)
class CreateSignatureRequest(BaseModel):
"""Demande de création d'une signature"""
sage_document_id: str
sage_document_type: SageDocumentType
signer_email: EmailStr
signer_name: str
document_name: Optional[str] = None
class TransactionResponse(BaseModel):
"""Réponse détaillée d'une transaction"""
id: str
transaction_id: str
sage_document_id: str
sage_document_type: str
universign_status: str
local_status: str
local_status_label: str
signer_url: Optional[str]
document_url: Optional[str]
created_at: datetime
sent_at: Optional[datetime]
signed_at: Optional[datetime]
last_synced_at: Optional[datetime]
needs_sync: bool
signers: List[dict]
signed_document_available: bool = False
signed_document_downloaded_at: Optional[datetime] = None
signed_document_size_kb: Optional[float] = None
class SyncStatsResponse(BaseModel):
"""Statistiques de synchronisation"""
total_transactions: int
pending_sync: int
signed: int
in_progress: int
refused: int
expired: int
last_sync_at: Optional[datetime]
@router.post("/signatures/create", response_model=TransactionResponse)
async def create_signature(
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
):
try:
# === VÉRIFICATION DOUBLON RENFORCÉE ===
logger.info(
f"🔍 Vérification doublon pour: {request.sage_document_id} "
f"(type: {request.sage_document_type.name})"
)
existing_query = select(UniversignTransaction).where(
UniversignTransaction.sage_document_id == request.sage_document_id,
UniversignTransaction.sage_document_type == request.sage_document_type,
)
existing_result = await session.execute(existing_query)
all_existing = existing_result.scalars().all()
if all_existing:
logger.warning(
f"{len(all_existing)} transaction(s) existante(s) trouvée(s)"
)
# Filtrer les transactions non-finales
active_txs = [
tx
for tx in all_existing
if tx.local_status
not in [
LocalDocumentStatus.SIGNED,
LocalDocumentStatus.REJECTED,
LocalDocumentStatus.EXPIRED,
LocalDocumentStatus.ERROR,
]
]
if active_txs:
active_tx = active_txs[0]
logger.error(
f"Transaction active existante: {active_tx.transaction_id} "
f"(statut: {active_tx.local_status.value})"
)
raise HTTPException(
400,
f"Une demande de signature est déjà en cours pour {request.sage_document_id} "
f"(transaction: {active_tx.transaction_id}, statut: {active_tx.local_status.value}). "
f"Utilisez GET /universign/documents/{request.sage_document_id}/signatures pour voir toutes les transactions.",
)
logger.info(
"Toutes les transactions existantes sont finales, création autorisée"
)
# Génération PDF
logger.info(f"📄 Génération PDF: {request.sage_document_id}")
pdf_bytes = email_queue._generate_pdf(
request.sage_document_id, normaliser_type_doc(request.sage_document_type)
)
if not pdf_bytes:
raise HTTPException(400, "Échec génération PDF")
logger.info(f"PDF généré: {len(pdf_bytes)} octets")
# === CRÉATION TRANSACTION UNIVERSIGN ===
import requests
import uuid
auth = (settings.universign_api_key, "")
logger.info("🔄 Création transaction Universign...")
resp = requests.post(
f"{settings.universign_api_url}/transactions",
auth=auth,
json={
"name": request.document_name
or f"{request.sage_document_type.name} {request.sage_document_id}",
"language": "fr",
},
timeout=30,
)
if resp.status_code != 200:
logger.error(f"Erreur Universign (création): {resp.text}")
raise HTTPException(500, f"Erreur Universign: {resp.status_code}")
universign_tx_id = resp.json().get("id")
logger.info(f"Transaction Universign créée: {universign_tx_id}")
# Upload PDF
logger.info("📤 Upload PDF...")
files = {
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
}
resp = requests.post(
f"{settings.universign_api_url}/files", auth=auth, files=files, timeout=60
)
if resp.status_code not in [200, 201]:
logger.error(f"Erreur upload: {resp.text}")
raise HTTPException(500, "Erreur upload PDF")
file_id = resp.json().get("id")
logger.info(f"PDF uploadé: {file_id}")
# Attachement document
logger.info("🔗 Attachement document...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
auth=auth,
data={"document": file_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur attachement document")
document_id = resp.json().get("id")
# Création champ signature
logger.info("✍️ Création champ signature...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
auth=auth,
data={"type": "signature"},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur création champ signature")
field_id = resp.json().get("id")
# Liaison signataire
logger.info(f"👤 Liaison signataire: {request.signer_email}")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
auth=auth,
data={"signer": request.signer_email, "field": field_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur liaison signataire")
# Démarrage transaction
logger.info("🚀 Démarrage transaction...")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
auth=auth,
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur démarrage transaction")
final_data = resp.json()
# Extraction URL de signature
signer_url = ""
if final_data.get("actions"):
for action in final_data["actions"]:
if action.get("url"):
signer_url = action["url"]
break
if not signer_url:
raise HTTPException(500, "URL de signature non retournée")
logger.info("URL de signature obtenue")
# === ENREGISTREMENT LOCAL ===
local_id = str(uuid.uuid4())
transaction = UniversignTransaction(
id=local_id,
transaction_id=universign_tx_id, # Utiliser l'ID Universign, ne jamais le changer
sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type,
universign_status=UniversignTransactionStatus.STARTED,
local_status=LocalDocumentStatus.IN_PROGRESS,
signer_url=signer_url,
requester_email=request.signer_email,
requester_name=request.signer_name,
document_name=request.document_name,
created_at=datetime.now(),
sent_at=datetime.now(),
is_test=True,
needs_sync=True,
)
session.add(transaction)
signer = UniversignSigner(
id=f"{local_id}_signer_0",
transaction_id=local_id,
email=request.signer_email,
name=request.signer_name,
status=UniversignSignerStatus.WAITING,
order_index=0,
)
session.add(signer)
await session.commit()
logger.info(
f"💾 Transaction sauvegardée: {local_id} (Universign: {universign_tx_id})"
)
# === ENVOI EMAIL AVEC TEMPLATE ===
template = templates_signature_email["demande_signature"]
type_labels = {
0: "Devis",
10: "Commande",
30: "Bon de Livraison",
60: "Facture",
50: "Avoir",
}
doc_info = email_queue.sage_client.lire_document(
request.sage_document_id, request.sage_document_type.value
)
montant_ttc = f"{doc_info.get('total_ttc', 0):.2f}" if doc_info else "0.00"
date_doc = (
doc_info.get("date", datetime.now().strftime("%d/%m/%Y"))
if doc_info
else datetime.now().strftime("%d/%m/%Y")
)
variables = {
"NOM_SIGNATAIRE": request.signer_name,
"TYPE_DOC": type_labels.get(request.sage_document_type.value, "Document"),
"NUMERO": request.sage_document_id,
"DATE": date_doc,
"MONTANT_TTC": montant_ttc,
"SIGNER_URL": signer_url,
"CONTACT_EMAIL": settings.smtp_from,
}
sujet = template["sujet"]
corps = template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur))
corps = corps.replace(f"{{{{{var}}}}}", str(valeur))
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=request.signer_email,
sujet=sujet,
corps_html=corps,
document_ids=request.sage_document_id,
type_document=request.sage_document_type.value,
statut=StatutEmail.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.commit()
email_queue.enqueue(email_log.id)
# === MISE À JOUR STATUT SAGE (Confirmé = 1) ===
try:
from sage_client import sage_client
sage_client.changer_statut_document(
document_type_code=request.sage_document_type.value,
numero=request.sage_document_id,
nouveau_statut=1,
)
logger.info(
f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)"
)
except Exception as e:
logger.warning(f"Impossible de mettre à jour le statut Sage: {e}")
# === RÉPONSE ===
return TransactionResponse(
id=transaction.id,
transaction_id=transaction.transaction_id,
sage_document_id=transaction.sage_document_id,
sage_document_type=transaction.sage_document_type.name,
universign_status=transaction.universign_status.value,
local_status=transaction.local_status.value,
local_status_label=get_status_message(transaction.local_status.value),
signer_url=transaction.signer_url,
document_url=None,
created_at=transaction.created_at,
sent_at=transaction.sent_at,
signed_at=None,
last_synced_at=None,
needs_sync=True,
signers=[
{
"email": signer.email,
"name": signer.name,
"status": signer.status.value,
}
],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur création signature: {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
async def list_transactions(
status: Optional[LocalDocumentStatus] = None,
sage_document_id: Optional[str] = None,
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions"""
query = select(UniversignTransaction).options(
selectinload(UniversignTransaction.signers)
)
if status:
query = query.where(UniversignTransaction.local_status == status)
if sage_document_id:
query = query.where(UniversignTransaction.sage_document_id == sage_document_id)
query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit)
result = await session.execute(query)
transactions = result.scalars().all()
return [
TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
# ✅ NOUVEAUX CHAMPS
signed_document_available=bool(
tx.signed_document_path and Path(tx.signed_document_path).exists()
),
signed_document_downloaded_at=tx.signed_document_downloaded_at,
signed_document_size_kb=(
tx.signed_document_size_bytes / 1024
if tx.signed_document_size_bytes
else None
),
)
for tx in transactions
]
@router.get("/transactions/{transaction_id}", response_model=TransactionResponse)
async def get_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""Récupère une transaction par son ID"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.transaction_id == transaction_id)
.options(selectinload(UniversignTransaction.signers))
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
return TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
# ✅ NOUVEAUX CHAMPS
signed_document_available=bool(
tx.signed_document_path and Path(tx.signed_document_path).exists()
),
signed_document_downloaded_at=tx.signed_document_downloaded_at,
signed_document_size_kb=(
tx.signed_document_size_bytes / 1024
if tx.signed_document_size_bytes
else None
),
)
@router.post("/transactions/{transaction_id}/sync")
async def sync_single_transaction(
transaction_id: str,
force: bool = Query(False),
session: AsyncSession = Depends(get_session),
):
"""Force la synchronisation d'une transaction"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, "Transaction introuvable")
success, error = await sync_service.sync_transaction(
session, transaction, force=force
)
if not success:
raise HTTPException(500, error or "Échec synchronisation")
return {
"success": True,
"transaction_id": transaction_id,
"new_status": transaction.local_status.value,
"synced_at": transaction.last_synced_at.isoformat(),
}
@router.post("/sync/all")
async def sync_all_transactions(
max_transactions: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
"""Synchronise toutes les transactions en attente"""
stats = await sync_service.sync_all_pending(session, max_transactions)
return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()}
@router.post("/webhook")
@router.post("/webhook/")
async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session)
):
"""
CORRECTION : Extraction correcte du transaction_id selon la structure réelle d'Universign
"""
try:
payload = await request.json()
# 📋 LOG COMPLET du payload pour débogage
logger.info(
f"📥 Webhook Universign reçu - Type: {payload.get('type', 'unknown')}"
)
logger.debug(f"Payload complet: {json.dumps(payload, indent=2)}")
# EXTRACTION CORRECTE DU TRANSACTION_ID
transaction_id = None
# 🔍 Structure 1 : Événements avec payload imbriqué (la plus courante)
# Exemple : transaction.lifecycle.created, transaction.lifecycle.started, etc.
if payload.get("type", "").startswith("transaction.") and "payload" in payload:
# Le transaction_id est dans payload.object.id
nested_object = payload.get("payload", {}).get("object", {})
if nested_object.get("object") == "transaction":
transaction_id = nested_object.get("id")
logger.info(
f"Transaction ID extrait de payload.object.id: {transaction_id}"
)
# 🔍 Structure 2 : Action événements (action.opened, action.completed)
elif payload.get("type", "").startswith("action."):
# Le transaction_id est directement dans payload.object.transaction_id
transaction_id = (
payload.get("payload", {}).get("object", {}).get("transaction_id")
)
logger.info(
f"Transaction ID extrait de payload.object.transaction_id: {transaction_id}"
)
# 🔍 Structure 3 : Transaction directe (fallback)
elif payload.get("object") == "transaction":
transaction_id = payload.get("id")
logger.info(f"Transaction ID extrait direct: {transaction_id}")
# 🔍 Structure 4 : Ancien format (pour rétro-compatibilité)
elif "transaction" in payload:
transaction_id = payload.get("transaction", {}).get("id")
logger.info(f"Transaction ID extrait de transaction.id: {transaction_id}")
# Échec d'extraction
if not transaction_id:
logger.error(
f"Transaction ID introuvable dans webhook\n"
f"Type d'événement: {payload.get('type', 'unknown')}\n"
f"Clés racine: {list(payload.keys())}\n"
f"Payload simplifié: {json.dumps({k: v if k != 'payload' else '...' for k, v in payload.items()})}"
)
return {
"status": "error",
"message": "Transaction ID manquant dans webhook",
"event_type": payload.get("type"),
"event_id": payload.get("id"),
}, 400
logger.info(f"🎯 Transaction ID identifié: {transaction_id}")
# Vérifier si la transaction existe localement
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
logger.warning(
f"Transaction {transaction_id} inconnue en local\n"
f"Type d'événement: {payload.get('type')}\n"
f"Elle sera synchronisée au prochain polling"
)
return {
"status": "accepted",
"message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling",
"transaction_id": transaction_id,
"event_type": payload.get("type"),
}
# Traiter le webhook
success, error = await sync_service.process_webhook(
session, payload, transaction_id
)
if not success:
logger.error(f"Erreur traitement webhook: {error}")
return {
"status": "error",
"message": error,
"transaction_id": transaction_id,
}, 500
# Succès
logger.info(
f"Webhook traité avec succès\n"
f"Transaction: {transaction_id}\n"
f"Nouveau statut: {tx.local_status.value if tx else 'unknown'}\n"
f"Type d'événement: {payload.get('type')}"
)
return {
"status": "processed",
"transaction_id": transaction_id,
"local_status": tx.local_status.value if tx else None,
"event_type": payload.get("type"),
"event_id": payload.get("id"),
}
except Exception as e:
logger.error(f"💥 Erreur critique webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500
@router.get("/stats", response_model=SyncStatsResponse)
async def get_sync_stats(session: AsyncSession = Depends(get_session)):
"""Statistiques globales de synchronisation"""
# Total
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# En attente de sync
pending_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.needs_sync
)
pending = (await session.execute(pending_query)).scalar()
# Par statut
signed_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.SIGNED
)
signed = (await session.execute(signed_query)).scalar()
in_progress_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.IN_PROGRESS
)
in_progress = (await session.execute(in_progress_query)).scalar()
refused_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.REJECTED
)
refused = (await session.execute(refused_query)).scalar()
expired_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EXPIRED
)
expired = (await session.execute(expired_query)).scalar()
# Dernière sync
last_sync_query = select(func.max(UniversignTransaction.last_synced_at))
last_sync = (await session.execute(last_sync_query)).scalar()
return SyncStatsResponse(
total_transactions=total,
pending_sync=pending,
signed=signed,
in_progress=in_progress,
refused=refused,
expired=expired,
last_sync_at=last_sync,
)
@router.get("/transactions/{transaction_id}/logs")
async def get_transaction_logs(
transaction_id: str,
limit: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
# Trouver la transaction
tx_query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
tx_result = await session.execute(tx_query)
tx = tx_result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Logs
logs_query = (
select(UniversignSyncLog)
.where(UniversignSyncLog.transaction_id == tx.id)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(limit)
)
logs_result = await session.execute(logs_query)
logs = logs_result.scalars().all()
return {
"transaction_id": transaction_id,
"total_syncs": len(logs),
"logs": [
{
"sync_type": log.sync_type,
"timestamp": log.sync_timestamp.isoformat(),
"success": log.success,
"previous_status": log.previous_status,
"new_status": log.new_status,
"error_message": log.error_message,
"response_time_ms": log.response_time_ms,
}
for log in logs
],
}
# Ajouter ces routes dans universign.py
@router.get("/documents/{sage_document_id}/signatures")
async def get_signatures_for_document(
sage_document_id: str,
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions de signature pour un document Sage"""
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(UniversignTransaction.created_at.desc())
)
result = await session.execute(query)
transactions = result.scalars().all()
return [
{
"id": tx.id,
"transaction_id": tx.transaction_id,
"local_status": tx.local_status.value,
"universign_status": tx.universign_status.value
if tx.universign_status
else None,
"created_at": tx.created_at.isoformat(),
"signed_at": tx.signed_at.isoformat() if tx.signed_at else None,
"signer_url": tx.signer_url,
"signers_count": len(tx.signers),
}
for tx in transactions
]
@router.delete("/documents/{sage_document_id}/duplicates")
async def cleanup_duplicate_signatures(
sage_document_id: str,
keep_latest: bool = Query(
True, description="Garder la plus récente (True) ou la plus ancienne (False)"
),
session: AsyncSession = Depends(get_session),
):
"""
Supprime les doublons de signatures pour un document.
Garde une seule transaction (la plus récente ou ancienne selon le paramètre).
"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(
UniversignTransaction.created_at.desc()
if keep_latest
else UniversignTransaction.created_at.asc()
)
)
result = await session.execute(query)
transactions = result.scalars().all()
if len(transactions) <= 1:
return {
"success": True,
"message": "Aucun doublon trouvé",
"kept": transactions[0].transaction_id if transactions else None,
"deleted_count": 0,
}
# Garder la première (selon l'ordre), supprimer les autres
to_keep = transactions[0]
to_delete = transactions[1:]
deleted_ids = []
for tx in to_delete:
deleted_ids.append(tx.transaction_id)
await session.delete(tx)
await session.commit()
logger.info(
f"Nettoyage doublons {sage_document_id}: gardé {to_keep.transaction_id}, supprimé {deleted_ids}"
)
return {
"success": True,
"document_id": sage_document_id,
"kept": {
"id": to_keep.id,
"transaction_id": to_keep.transaction_id,
"status": to_keep.local_status.value,
"created_at": to_keep.created_at.isoformat(),
},
"deleted_count": len(deleted_ids),
"deleted_transaction_ids": deleted_ids,
}
@router.delete("/transactions/{transaction_id}")
async def delete_transaction(
transaction_id: str,
session: AsyncSession = Depends(get_session),
):
"""Supprime une transaction spécifique par son ID Universign"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
await session.delete(tx)
await session.commit()
logger.info(f"Transaction {transaction_id} supprimée")
return {
"success": True,
"deleted_transaction_id": transaction_id,
"document_id": tx.sage_document_id,
}
@router.post("/cleanup/all-duplicates")
async def cleanup_all_duplicates(
session: AsyncSession = Depends(get_session),
):
"""
Nettoie tous les doublons dans la base.
Pour chaque document avec plusieurs transactions, garde la plus récente non-erreur ou la plus récente.
"""
from sqlalchemy import func
# Trouver les documents avec plusieurs transactions
subquery = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
).subquery()
duplicates_query = select(subquery.c.sage_document_id)
duplicates_result = await session.execute(duplicates_query)
duplicate_docs = [row[0] for row in duplicates_result.fetchall()]
total_deleted = 0
cleanup_details = []
for doc_id in duplicate_docs:
# Récupérer toutes les transactions pour ce document
tx_query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == doc_id)
.order_by(UniversignTransaction.created_at.desc())
)
tx_result = await session.execute(tx_query)
transactions = tx_result.scalars().all()
# Priorité: SIGNE > EN_COURS > EN_ATTENTE > autres
priority = {"SIGNE": 0, "EN_COURS": 1, "EN_ATTENTE": 2}
def sort_key(tx):
status_priority = priority.get(tx.local_status.value, 99)
return (status_priority, -tx.created_at.timestamp())
sorted_txs = sorted(transactions, key=sort_key)
to_keep = sorted_txs[0]
to_delete = sorted_txs[1:]
for tx in to_delete:
await session.delete(tx)
total_deleted += 1
cleanup_details.append(
{
"document_id": doc_id,
"kept": to_keep.transaction_id,
"kept_status": to_keep.local_status.value,
"deleted_count": len(to_delete),
}
)
await session.commit()
logger.info(
f"Nettoyage global: {total_deleted} doublons supprimés sur {len(duplicate_docs)} documents"
)
return {
"success": True,
"documents_processed": len(duplicate_docs),
"total_deleted": total_deleted,
"details": cleanup_details,
}
@router.get("/admin/diagnostic", tags=["Admin"])
async def diagnostic_complet(session: AsyncSession = Depends(get_session)):
"""
Diagnostic complet de l'état des transactions Universign
"""
try:
# Statistiques générales
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# Par statut local
statuts_query = select(
UniversignTransaction.local_status, func.count(UniversignTransaction.id)
).group_by(UniversignTransaction.local_status)
statuts_result = await session.execute(statuts_query)
statuts = {status.value: count for status, count in statuts_result.all()}
# Transactions sans sync récente
date_limite = datetime.now() - timedelta(hours=1)
sans_sync_query = select(func.count(UniversignTransaction.id)).where(
and_(
UniversignTransaction.needs_sync,
or_(
UniversignTransaction.last_synced_at < date_limite,
UniversignTransaction.last_synced_at.is_(None),
),
)
)
sans_sync = (await session.execute(sans_sync_query)).scalar()
# Doublons potentiels
doublons_query = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
)
doublons_result = await session.execute(doublons_query)
doublons = doublons_result.fetchall()
# Transactions avec erreurs de sync
erreurs_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.sync_error.isnot(None)
)
erreurs = (await session.execute(erreurs_query)).scalar()
# Transactions sans webhook reçu
sans_webhook_query = select(func.count(UniversignTransaction.id)).where(
and_(
not UniversignTransaction.webhook_received,
UniversignTransaction.local_status != LocalDocumentStatus.PENDING,
)
)
sans_webhook = (await session.execute(sans_webhook_query)).scalar()
diagnostic = {
"timestamp": datetime.now().isoformat(),
"total_transactions": total,
"repartition_statuts": statuts,
"problemes_detectes": {
"sans_sync_recente": sans_sync,
"doublons_possibles": len(doublons),
"erreurs_sync": erreurs,
"sans_webhook": sans_webhook,
},
"documents_avec_doublons": [
{"document_id": doc_id, "nombre_transactions": count}
for doc_id, count in doublons
],
"recommandations": [],
}
# Recommandations
if sans_sync > 0:
diagnostic["recommandations"].append(
f"🔄 {sans_sync} transaction(s) à synchroniser. "
f"Utilisez POST /universign/sync/all"
)
if len(doublons) > 0:
diagnostic["recommandations"].append(
f"{len(doublons)} document(s) avec doublons. "
f"Utilisez POST /universign/cleanup/all-duplicates"
)
if erreurs > 0:
diagnostic["recommandations"].append(
f"{erreurs} transaction(s) en erreur. "
f"Vérifiez les logs avec GET /universign/transactions?status=ERREUR"
)
return diagnostic
except Exception as e:
logger.error(f"Erreur diagnostic: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/force-sync-all", tags=["Admin"])
async def forcer_sync_toutes_transactions(
max_transactions: int = Query(200, le=500),
session: AsyncSession = Depends(get_session),
):
"""
Force la synchronisation de TOUTES les transactions (même finales)
À utiliser pour réparer les incohérences
"""
try:
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.order_by(UniversignTransaction.created_at.desc())
.limit(max_transactions)
)
result = await session.execute(query)
transactions = result.scalars().all()
stats = {
"total_verifie": len(transactions),
"success": 0,
"failed": 0,
"status_changes": 0,
"details": [],
}
for transaction in transactions:
try:
previous_status = transaction.local_status.value
logger.info(
f"🔄 Force sync: {transaction.transaction_id} (statut: {previous_status})"
)
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
new_status = transaction.local_status.value
if success:
stats["success"] += 1
if new_status != previous_status:
stats["status_changes"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"changement": f"{previous_status}{new_status}",
}
)
else:
stats["failed"] += 1
stats["details"].append(
{
"transaction_id": transaction.transaction_id,
"document_id": transaction.sage_document_id,
"erreur": error,
}
)
except Exception as e:
logger.error(f"Erreur sync {transaction.transaction_id}: {e}")
stats["failed"] += 1
return {
"success": True,
"stats": stats,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Erreur force sync: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/repair-transaction/{transaction_id}", tags=["Admin"])
async def reparer_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Répare une transaction spécifique en la re-synchronisant depuis Universign
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
old_status = transaction.local_status.value
old_universign_status = (
transaction.universign_status.value
if transaction.universign_status
else None
)
# Force sync
success, error = await sync_service.sync_transaction(
session, transaction, force=True
)
if not success:
return {
"success": False,
"transaction_id": transaction_id,
"erreur": error,
"ancien_statut": old_status,
}
return {
"success": True,
"transaction_id": transaction_id,
"reparation": {
"ancien_statut_local": old_status,
"nouveau_statut_local": transaction.local_status.value,
"ancien_statut_universign": old_universign_status,
"nouveau_statut_universign": transaction.universign_status.value,
"statut_change": old_status != transaction.local_status.value,
},
"derniere_sync": transaction.last_synced_at.isoformat(),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur réparation: {e}")
raise HTTPException(500, str(e))
@router.get("/admin/transactions-inconsistantes", tags=["Admin"])
async def trouver_transactions_inconsistantes(
session: AsyncSession = Depends(get_session),
):
"""
Trouve les transactions dont le statut local ne correspond pas au statut Universign
"""
try:
# Toutes les transactions non-finales
query = select(UniversignTransaction).where(
UniversignTransaction.local_status.in_(
[LocalDocumentStatus.PENDING, LocalDocumentStatus.IN_PROGRESS]
)
)
result = await session.execute(query)
transactions = result.scalars().all()
inconsistantes = []
for tx in transactions:
try:
# Récupérer le statut depuis Universign
universign_data = sync_service.fetch_transaction_status(
tx.transaction_id
)
if not universign_data:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Impossible de récupérer depuis Universign",
"statut_local": tx.local_status.value,
"statut_universign": None,
}
)
continue
universign_status = universign_data["transaction"].get("state")
expected_local_status = map_universign_to_local(universign_status)
if expected_local_status != tx.local_status.value:
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": "Statut incohérent",
"statut_local": tx.local_status.value,
"statut_universign": universign_status,
"statut_attendu": expected_local_status,
"derniere_sync": tx.last_synced_at.isoformat()
if tx.last_synced_at
else None,
}
)
except Exception as e:
logger.error(f"Erreur vérification {tx.transaction_id}: {e}")
inconsistantes.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"probleme": f"Erreur: {str(e)}",
"statut_local": tx.local_status.value,
}
)
return {
"total_verifie": len(transactions),
"inconsistantes": len(inconsistantes),
"details": inconsistantes,
"recommandation": (
"Utilisez POST /universign/admin/force-sync-all pour corriger"
if inconsistantes
else "Aucune incohérence détectée"
),
}
except Exception as e:
logger.error(f"Erreur recherche incohérences: {e}")
raise HTTPException(500, str(e))
@router.post("/admin/nettoyer-transactions-erreur", tags=["Admin"])
async def nettoyer_transactions_erreur(
age_jours: int = Query(
7, description="Supprimer les transactions en erreur de plus de X jours"
),
session: AsyncSession = Depends(get_session),
):
"""
Nettoie les transactions en erreur anciennes
"""
try:
date_limite = datetime.now() - timedelta(days=age_jours)
query = select(UniversignTransaction).where(
and_(
UniversignTransaction.local_status == LocalDocumentStatus.ERROR,
UniversignTransaction.created_at < date_limite,
)
)
result = await session.execute(query)
transactions = result.scalars().all()
supprimees = []
for tx in transactions:
supprimees.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"date_creation": tx.created_at.isoformat(),
"erreur": tx.sync_error,
}
)
await session.delete(tx)
await session.commit()
return {
"success": True,
"transactions_supprimees": len(supprimees),
"age_limite_jours": age_jours,
"details": supprimees,
}
except Exception as e:
logger.error(f"Erreur nettoyage: {e}")
raise HTTPException(500, str(e))
@router.get("/debug/webhook-payload/{transaction_id}", tags=["Debug"])
async def voir_dernier_webhook(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Affiche le dernier payload webhook reçu pour une transaction
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Récupérer le dernier log de type webhook
logs_query = (
select(UniversignSyncLog)
.where(
and_(
UniversignSyncLog.transaction_id == tx.id,
UniversignSyncLog.sync_type.like("webhook:%"),
)
)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(1)
)
logs_result = await session.execute(logs_query)
last_webhook_log = logs_result.scalar_one_or_none()
if not last_webhook_log:
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_payload": None,
"message": "Aucun webhook reçu pour cette transaction",
}
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_webhook": {
"timestamp": last_webhook_log.sync_timestamp.isoformat(),
"type": last_webhook_log.sync_type,
"success": last_webhook_log.success,
"payload": json.loads(last_webhook_log.changes_detected)
if last_webhook_log.changes_detected
else None,
},
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur debug webhook: {e}")
raise HTTPException(500, str(e))
@router.get(
"/transactions/{transaction_id}/document/download", tags=["Documents Signés"]
)
async def telecharger_document_signe(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Télécharge le document signé localement stocké
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
if not transaction.signed_document_path:
raise HTTPException(
404,
"Document signé non disponible localement. "
"Utilisez POST /admin/download-missing-documents pour le récupérer.",
)
file_path = Path(transaction.signed_document_path)
if not file_path.exists():
# Document perdu, on peut tenter de le retélécharger
logger.warning(f"Fichier perdu : {file_path}")
raise HTTPException(
404,
"Fichier introuvable sur le serveur. "
"Utilisez POST /admin/download-missing-documents pour le récupérer.",
)
# Génération du nom de fichier pour le téléchargement
download_name = (
f"{transaction.sage_document_id}_"
f"{transaction.sage_document_type.name}_"
f"signe.pdf"
)
return FileResponse(
path=str(file_path), media_type="application/pdf", filename=download_name
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur téléchargement document : {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.get("/transactions/{transaction_id}/document/info", tags=["Documents Signés"])
async def info_document_signe(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Informations sur le document signé
"""
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
file_exists = False
file_size_mb = None
if transaction.signed_document_path:
file_path = Path(transaction.signed_document_path)
file_exists = file_path.exists()
if file_exists:
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
return {
"transaction_id": transaction_id,
"document_available_locally": file_exists,
"document_url_universign": transaction.document_url,
"downloaded_at": (
transaction.signed_document_downloaded_at.isoformat()
if transaction.signed_document_downloaded_at
else None
),
"file_size_mb": round(file_size_mb, 2) if file_size_mb else None,
"download_attempts": transaction.download_attempts,
"last_download_error": transaction.download_error,
"local_path": transaction.signed_document_path if file_exists else None,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur info document : {e}")
raise HTTPException(500, str(e))
@router.post("/admin/download-missing-documents", tags=["Admin"])
async def telecharger_documents_manquants(
force_redownload: bool = Query(
False, description="Forcer le retéléchargement même si déjà présent"
),
session: AsyncSession = Depends(get_session),
):
"""
Télécharge tous les documents signés manquants pour les transactions SIGNE
"""
try:
# Transactions signées sans document local
query = select(UniversignTransaction).where(
UniversignTransaction.local_status == LocalDocumentStatus.SIGNED,
or_(
UniversignTransaction.signed_document_path.is_(None),
force_redownload,
),
)
result = await session.execute(query)
transactions = result.scalars().all()
logger.info(f"📥 {len(transactions)} document(s) à télécharger")
document_service = UniversignDocumentService(
api_url=settings.universign_api_url,
api_key=settings.universign_api_key, timeout=60
)
results = {"total": len(transactions), "success": 0, "failed": 0, "details": []}
for transaction in transactions:
try:
(
success,
error,
) = await document_service.download_and_store_signed_document(
session=session, transaction=transaction, force=force_redownload
)
if success:
results["success"] += 1
results["details"].append(
{
"transaction_id": transaction.transaction_id,
"sage_document_id": transaction.sage_document_id,
"status": "success",
}
)
else:
results["failed"] += 1
results["details"].append(
{
"transaction_id": transaction.transaction_id,
"sage_document_id": transaction.sage_document_id,
"status": "failed",
"error": error,
}
)
except Exception as e:
logger.error(f"Erreur téléchargement {transaction.transaction_id}: {e}")
results["failed"] += 1
results["details"].append(
{"transaction_id": transaction.transaction_id, "error": str(e)}
)
await session.commit()
logger.info(
f"Téléchargement terminé : {results['success']}/{results['total']} réussis"
)
return results
except Exception as e:
logger.error(f"Erreur téléchargement batch : {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.post("/admin/cleanup-old-documents", tags=["Admin"])
async def nettoyer_anciens_documents(
days_to_keep: int = Query(
90, ge=7, le=365, description="Nombre de jours à conserver"
),
):
"""
Supprime les documents signés de plus de X jours (par défaut 90)
"""
try:
document_service = UniversignDocumentService(
api_url=settings.universign_api_url,
api_key=settings.universign_api_key
)
deleted, size_freed_mb = await document_service.cleanup_old_documents(
days_to_keep=days_to_keep
)
return {
"success": True,
"files_deleted": deleted,
"space_freed_mb": size_freed_mb,
"days_kept": days_to_keep,
}
except Exception as e:
logger.error(f"Erreur nettoyage : {e}")
raise HTTPException(500, str(e))
@router.get("/transactions/{transaction_id}/diagnose", tags=["Debug"])
async def diagnose_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""
Diagnostic complet d'une transaction Universign
Utile pour débugger les problèmes de récupération de documents
"""
from services.universign_document import UniversignDocumentService
try:
# Récupérer la transaction locale
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
local_info = None
if transaction:
local_info = {
"id": transaction.id,
"sage_document_id": transaction.sage_document_id,
"local_status": transaction.local_status.value,
"document_url": transaction.document_url,
"signed_document_path": transaction.signed_document_path,
"download_attempts": transaction.download_attempts,
"download_error": transaction.download_error,
}
# Diagnostic API Universign
document_service = UniversignDocumentService(
api_url=settings.universign_api_url,
api_key=settings.universign_api_key,
timeout=30,
)
api_diagnosis = document_service.diagnose_transaction(transaction_id)
return {
"transaction_id": transaction_id,
"local_data": local_info,
"api_diagnosis": api_diagnosis,
"recommendations": _generate_recommendations(local_info, api_diagnosis),
}
except Exception as e:
logger.error(f"Erreur diagnostic: {e}", exc_info=True)
raise HTTPException(500, str(e))
def _generate_recommendations(local_info, api_diagnosis):
"""Génère des recommandations basées sur le diagnostic"""
recommendations = []
if not local_info:
recommendations.append(
"Transaction introuvable localement. Vérifiez le transaction_id."
)
return recommendations
if not api_diagnosis.get("success"):
recommendations.append(
f"Erreur API Universign: {api_diagnosis.get('error')}. "
f"Vérifiez la connectivité et les credentials."
)
return recommendations
state = api_diagnosis.get("checks", {}).get("transaction_data", {}).get("state")
if state not in ["completed", "closed"]:
recommendations.append(
f"La transaction n'est pas encore signée (state={state}). "
f"Attendez que le signataire complète la signature."
)
docs = api_diagnosis.get("checks", {}).get("documents", [])
if not docs:
recommendations.append("Aucun document trouvé dans la transaction Universign.")
else:
for doc in docs:
dl_check = doc.get("download_check", {})
if not dl_check.get("accessible"):
recommendations.append(
f"Document {doc.get('id')} non accessible: "
f"status_code={dl_check.get('status_code')}. "
f"Vérifiez que la signature est complète."
)
if local_info.get("download_error"):
recommendations.append(
f"Dernière erreur de téléchargement: {local_info['download_error']}"
)
if not recommendations:
recommendations.append(
"Tout semble correct. Essayez POST /admin/download-missing-documents "
"avec force_redownload=true"
)
return recommendations