from fastapi import APIRouter, Depends, HTTPException, Query, Request from fastapi.responses import FileResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import false, select, func, or_, and_, true from sqlalchemy.orm import selectinload from typing import List, Optional from datetime import datetime, timedelta from pydantic import BaseModel, EmailStr import logging from data.data import templates_signature_email from email_queue import email_queue from database import UniversignSignerStatus, UniversignTransactionStatus, get_session from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, LocalDocumentStatus, SageDocumentType, ) import os from pathlib import Path import json from services.universign_document import UniversignDocumentService from services.universign_sync import UniversignSyncService from config.config import settings from utils.generic_functions import normaliser_type_doc from utils.universign_status_mapping import get_status_message, map_universign_to_local from database.models.email import EmailLog from database.enum.status import StatutEmail logger = logging.getLogger(__name__) router = APIRouter(prefix="/universign", tags=["Universign"]) sync_service = UniversignSyncService( api_url=settings.universign_api_url, api_key=settings.universign_api_key ) class CreateSignatureRequest(BaseModel): """Demande de création d'une signature""" sage_document_id: str sage_document_type: SageDocumentType signer_email: EmailStr signer_name: str document_name: Optional[str] = None class TransactionResponse(BaseModel): """Réponse détaillée d'une transaction""" id: str transaction_id: str sage_document_id: str sage_document_type: str universign_status: str local_status: str local_status_label: str signer_url: Optional[str] document_url: Optional[str] created_at: datetime sent_at: Optional[datetime] signed_at: Optional[datetime] last_synced_at: Optional[datetime] needs_sync: bool signers: List[dict] signed_document_available: bool = False signed_document_downloaded_at: Optional[datetime] = None signed_document_size_kb: Optional[float] = None class SyncStatsResponse(BaseModel): """Statistiques de synchronisation""" total_transactions: int pending_sync: int signed: int in_progress: int refused: int expired: int last_sync_at: Optional[datetime] @router.post("/signatures/create", response_model=TransactionResponse) async def create_signature( request: CreateSignatureRequest, session: AsyncSession = Depends(get_session) ): try: # === VÉRIFICATION DOUBLON RENFORCÉE === logger.info( f"🔍 Vérification doublon pour: {request.sage_document_id} " f"(type: {request.sage_document_type.name})" ) existing_query = select(UniversignTransaction).where( UniversignTransaction.sage_document_id == request.sage_document_id, UniversignTransaction.sage_document_type == request.sage_document_type, ) existing_result = await session.execute(existing_query) all_existing = existing_result.scalars().all() if all_existing: logger.warning( f"{len(all_existing)} transaction(s) existante(s) trouvée(s)" ) # Filtrer les transactions non-finales active_txs = [ tx for tx in all_existing if tx.local_status not in [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, LocalDocumentStatus.ERROR, ] ] if active_txs: active_tx = active_txs[0] logger.error( f"Transaction active existante: {active_tx.transaction_id} " f"(statut: {active_tx.local_status.value})" ) raise HTTPException( 400, f"Une demande de signature est déjà en cours pour {request.sage_document_id} " f"(transaction: {active_tx.transaction_id}, statut: {active_tx.local_status.value}). " f"Utilisez GET /universign/documents/{request.sage_document_id}/signatures pour voir toutes les transactions.", ) logger.info( "Toutes les transactions existantes sont finales, création autorisée" ) # Génération PDF logger.info(f"📄 Génération PDF: {request.sage_document_id}") pdf_bytes = email_queue._generate_pdf( request.sage_document_id, normaliser_type_doc(request.sage_document_type) ) if not pdf_bytes: raise HTTPException(400, "Échec génération PDF") logger.info(f"PDF généré: {len(pdf_bytes)} octets") # === CRÉATION TRANSACTION UNIVERSIGN === import requests import uuid auth = (settings.universign_api_key, "") logger.info("🔄 Création transaction Universign...") resp = requests.post( f"{settings.universign_api_url}/transactions", auth=auth, json={ "name": request.document_name or f"{request.sage_document_type.name} {request.sage_document_id}", "language": "fr", }, timeout=30, ) if resp.status_code != 200: logger.error(f"Erreur Universign (création): {resp.text}") raise HTTPException(500, f"Erreur Universign: {resp.status_code}") universign_tx_id = resp.json().get("id") logger.info(f"Transaction Universign créée: {universign_tx_id}") # Upload PDF logger.info("📤 Upload PDF...") files = { "file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf") } resp = requests.post( f"{settings.universign_api_url}/files", auth=auth, files=files, timeout=60 ) if resp.status_code not in [200, 201]: logger.error(f"Erreur upload: {resp.text}") raise HTTPException(500, "Erreur upload PDF") file_id = resp.json().get("id") logger.info(f"PDF uploadé: {file_id}") # Attachement document logger.info("🔗 Attachement document...") resp = requests.post( f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents", auth=auth, data={"document": file_id}, timeout=30, ) if resp.status_code not in [200, 201]: raise HTTPException(500, "Erreur attachement document") document_id = resp.json().get("id") # Création champ signature logger.info("✍️ Création champ signature...") resp = requests.post( f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields", auth=auth, data={"type": "signature"}, timeout=30, ) if resp.status_code not in [200, 201]: raise HTTPException(500, "Erreur création champ signature") field_id = resp.json().get("id") # Liaison signataire logger.info(f"👤 Liaison signataire: {request.signer_email}") resp = requests.post( f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures", auth=auth, data={"signer": request.signer_email, "field": field_id}, timeout=30, ) if resp.status_code not in [200, 201]: raise HTTPException(500, "Erreur liaison signataire") # Démarrage transaction logger.info("🚀 Démarrage transaction...") resp = requests.post( f"{settings.universign_api_url}/transactions/{universign_tx_id}/start", auth=auth, timeout=30, ) if resp.status_code not in [200, 201]: raise HTTPException(500, "Erreur démarrage transaction") final_data = resp.json() # Extraction URL de signature signer_url = "" if final_data.get("actions"): for action in final_data["actions"]: if action.get("url"): signer_url = action["url"] break if not signer_url: raise HTTPException(500, "URL de signature non retournée") logger.info("URL de signature obtenue") # === ENREGISTREMENT LOCAL === local_id = str(uuid.uuid4()) transaction = UniversignTransaction( id=local_id, transaction_id=universign_tx_id, # Utiliser l'ID Universign, ne jamais le changer sage_document_id=request.sage_document_id, sage_document_type=request.sage_document_type, universign_status=UniversignTransactionStatus.STARTED, local_status=LocalDocumentStatus.IN_PROGRESS, signer_url=signer_url, requester_email=request.signer_email, requester_name=request.signer_name, document_name=request.document_name, created_at=datetime.now(), sent_at=datetime.now(), is_test=True, needs_sync=True, ) session.add(transaction) signer = UniversignSigner( id=f"{local_id}_signer_0", transaction_id=local_id, email=request.signer_email, name=request.signer_name, status=UniversignSignerStatus.WAITING, order_index=0, ) session.add(signer) await session.commit() logger.info( f"💾 Transaction sauvegardée: {local_id} (Universign: {universign_tx_id})" ) # === ENVOI EMAIL AVEC TEMPLATE === template = templates_signature_email["demande_signature"] type_labels = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 60: "Facture", 50: "Avoir", } doc_info = email_queue.sage_client.lire_document( request.sage_document_id, request.sage_document_type.value ) montant_ttc = f"{doc_info.get('total_ttc', 0):.2f}" if doc_info else "0.00" date_doc = ( doc_info.get("date", datetime.now().strftime("%d/%m/%Y")) if doc_info else datetime.now().strftime("%d/%m/%Y") ) variables = { "NOM_SIGNATAIRE": request.signer_name, "TYPE_DOC": type_labels.get(request.sage_document_type.value, "Document"), "NUMERO": request.sage_document_id, "DATE": date_doc, "MONTANT_TTC": montant_ttc, "SIGNER_URL": signer_url, "CONTACT_EMAIL": settings.smtp_from, } sujet = template["sujet"] corps = template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) email_log = EmailLog( id=str(uuid.uuid4()), destinataire=request.signer_email, sujet=sujet, corps_html=corps, document_ids=request.sage_document_id, type_document=request.sage_document_type.value, statut=StatutEmail.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.commit() email_queue.enqueue(email_log.id) # === MISE À JOUR STATUT SAGE (Confirmé = 1) === try: from sage_client import sage_client sage_client.changer_statut_document( document_type_code=request.sage_document_type.value, numero=request.sage_document_id, nouveau_statut=1, ) logger.info( f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)" ) except Exception as e: logger.warning(f"Impossible de mettre à jour le statut Sage: {e}") # === RÉPONSE === return TransactionResponse( id=transaction.id, transaction_id=transaction.transaction_id, sage_document_id=transaction.sage_document_id, sage_document_type=transaction.sage_document_type.name, universign_status=transaction.universign_status.value, local_status=transaction.local_status.value, local_status_label=get_status_message(transaction.local_status.value), signer_url=transaction.signer_url, document_url=None, created_at=transaction.created_at, sent_at=transaction.sent_at, signed_at=None, last_synced_at=None, needs_sync=True, signers=[ { "email": signer.email, "name": signer.name, "status": signer.status.value, } ], ) except HTTPException: raise except Exception as e: logger.error(f"Erreur création signature: {e}", exc_info=True) raise HTTPException(500, str(e)) @router.get("/transactions", response_model=List[TransactionResponse]) async def list_transactions( status: Optional[LocalDocumentStatus] = None, sage_document_id: Optional[str] = None, limit: int = Query(100, le=1000), session: AsyncSession = Depends(get_session), ): """Liste toutes les transactions""" query = select(UniversignTransaction).options( selectinload(UniversignTransaction.signers) ) if status: query = query.where(UniversignTransaction.local_status == status) if sage_document_id: query = query.where(UniversignTransaction.sage_document_id == sage_document_id) query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit) result = await session.execute(query) transactions = result.scalars().all() return [ TransactionResponse( id=tx.id, transaction_id=tx.transaction_id, sage_document_id=tx.sage_document_id, sage_document_type=tx.sage_document_type.name, universign_status=tx.universign_status.value, local_status=tx.local_status.value, local_status_label=get_status_message(tx.local_status.value), signer_url=tx.signer_url, document_url=tx.document_url, created_at=tx.created_at, sent_at=tx.sent_at, signed_at=tx.signed_at, last_synced_at=tx.last_synced_at, needs_sync=tx.needs_sync, signers=[ { "email": s.email, "name": s.name, "status": s.status.value, "signed_at": s.signed_at.isoformat() if s.signed_at else None, } for s in tx.signers ], # ✅ NOUVEAUX CHAMPS signed_document_available=bool( tx.signed_document_path and Path(tx.signed_document_path).exists() ), signed_document_downloaded_at=tx.signed_document_downloaded_at, signed_document_size_kb=( tx.signed_document_size_bytes / 1024 if tx.signed_document_size_bytes else None ), ) for tx in transactions ] @router.get("/transactions/{transaction_id}", response_model=TransactionResponse) async def get_transaction( transaction_id: str, session: AsyncSession = Depends(get_session) ): """Récupère une transaction par son ID""" query = ( select(UniversignTransaction) .where(UniversignTransaction.transaction_id == transaction_id) .options(selectinload(UniversignTransaction.signers)) ) result = await session.execute(query) tx = result.scalar_one_or_none() if not tx: raise HTTPException(404, "Transaction introuvable") return TransactionResponse( id=tx.id, transaction_id=tx.transaction_id, sage_document_id=tx.sage_document_id, sage_document_type=tx.sage_document_type.name, universign_status=tx.universign_status.value, local_status=tx.local_status.value, local_status_label=get_status_message(tx.local_status.value), signer_url=tx.signer_url, document_url=tx.document_url, created_at=tx.created_at, sent_at=tx.sent_at, signed_at=tx.signed_at, last_synced_at=tx.last_synced_at, needs_sync=tx.needs_sync, signers=[ { "email": s.email, "name": s.name, "status": s.status.value, "signed_at": s.signed_at.isoformat() if s.signed_at else None, } for s in tx.signers ], # ✅ NOUVEAUX CHAMPS signed_document_available=bool( tx.signed_document_path and Path(tx.signed_document_path).exists() ), signed_document_downloaded_at=tx.signed_document_downloaded_at, signed_document_size_kb=( tx.signed_document_size_bytes / 1024 if tx.signed_document_size_bytes else None ), ) @router.post("/transactions/{transaction_id}/sync") async def sync_single_transaction( transaction_id: str, force: bool = Query(False), session: AsyncSession = Depends(get_session), ): """Force la synchronisation d'une transaction""" query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: raise HTTPException(404, "Transaction introuvable") success, error = await sync_service.sync_transaction( session, transaction, force=force ) if not success: raise HTTPException(500, error or "Échec synchronisation") return { "success": True, "transaction_id": transaction_id, "new_status": transaction.local_status.value, "synced_at": transaction.last_synced_at.isoformat(), } @router.post("/sync/all") async def sync_all_transactions( max_transactions: int = Query(50, le=500), session: AsyncSession = Depends(get_session), ): """Synchronise toutes les transactions en attente""" stats = await sync_service.sync_all_pending(session, max_transactions) return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()} @router.post("/webhook") @router.post("/webhook/") async def webhook_universign( request: Request, session: AsyncSession = Depends(get_session) ): """ CORRECTION : Extraction correcte du transaction_id selon la structure réelle d'Universign """ try: payload = await request.json() # 📋 LOG COMPLET du payload pour débogage logger.info( f"📥 Webhook Universign reçu - Type: {payload.get('type', 'unknown')}" ) logger.debug(f"Payload complet: {json.dumps(payload, indent=2)}") # EXTRACTION CORRECTE DU TRANSACTION_ID transaction_id = None # 🔍 Structure 1 : Événements avec payload imbriqué (la plus courante) # Exemple : transaction.lifecycle.created, transaction.lifecycle.started, etc. if payload.get("type", "").startswith("transaction.") and "payload" in payload: # Le transaction_id est dans payload.object.id nested_object = payload.get("payload", {}).get("object", {}) if nested_object.get("object") == "transaction": transaction_id = nested_object.get("id") logger.info( f"Transaction ID extrait de payload.object.id: {transaction_id}" ) # 🔍 Structure 2 : Action événements (action.opened, action.completed) elif payload.get("type", "").startswith("action."): # Le transaction_id est directement dans payload.object.transaction_id transaction_id = ( payload.get("payload", {}).get("object", {}).get("transaction_id") ) logger.info( f"Transaction ID extrait de payload.object.transaction_id: {transaction_id}" ) # 🔍 Structure 3 : Transaction directe (fallback) elif payload.get("object") == "transaction": transaction_id = payload.get("id") logger.info(f"Transaction ID extrait direct: {transaction_id}") # 🔍 Structure 4 : Ancien format (pour rétro-compatibilité) elif "transaction" in payload: transaction_id = payload.get("transaction", {}).get("id") logger.info(f"Transaction ID extrait de transaction.id: {transaction_id}") # Échec d'extraction if not transaction_id: logger.error( f"Transaction ID introuvable dans webhook\n" f"Type d'événement: {payload.get('type', 'unknown')}\n" f"Clés racine: {list(payload.keys())}\n" f"Payload simplifié: {json.dumps({k: v if k != 'payload' else '...' for k, v in payload.items()})}" ) return { "status": "error", "message": "Transaction ID manquant dans webhook", "event_type": payload.get("type"), "event_id": payload.get("id"), }, 400 logger.info(f"🎯 Transaction ID identifié: {transaction_id}") # Vérifier si la transaction existe localement query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) tx = result.scalar_one_or_none() if not tx: logger.warning( f"Transaction {transaction_id} inconnue en local\n" f"Type d'événement: {payload.get('type')}\n" f"Elle sera synchronisée au prochain polling" ) return { "status": "accepted", "message": f"Transaction {transaction_id} non trouvée localement, sera synchronisée au prochain polling", "transaction_id": transaction_id, "event_type": payload.get("type"), } # Traiter le webhook success, error = await sync_service.process_webhook( session, payload, transaction_id ) if not success: logger.error(f"Erreur traitement webhook: {error}") return { "status": "error", "message": error, "transaction_id": transaction_id, }, 500 # Succès logger.info( f"Webhook traité avec succès\n" f"Transaction: {transaction_id}\n" f"Nouveau statut: {tx.local_status.value if tx else 'unknown'}\n" f"Type d'événement: {payload.get('type')}" ) return { "status": "processed", "transaction_id": transaction_id, "local_status": tx.local_status.value if tx else None, "event_type": payload.get("type"), "event_id": payload.get("id"), } except Exception as e: logger.error(f"💥 Erreur critique webhook: {e}", exc_info=True) return {"status": "error", "message": str(e)}, 500 @router.get("/stats", response_model=SyncStatsResponse) async def get_sync_stats(session: AsyncSession = Depends(get_session)): """Statistiques globales de synchronisation""" # Total total_query = select(func.count(UniversignTransaction.id)) total = (await session.execute(total_query)).scalar() # En attente de sync pending_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.needs_sync ) pending = (await session.execute(pending_query)).scalar() # Par statut signed_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.local_status == LocalDocumentStatus.SIGNED ) signed = (await session.execute(signed_query)).scalar() in_progress_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.local_status == LocalDocumentStatus.IN_PROGRESS ) in_progress = (await session.execute(in_progress_query)).scalar() refused_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.local_status == LocalDocumentStatus.REJECTED ) refused = (await session.execute(refused_query)).scalar() expired_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.local_status == LocalDocumentStatus.EXPIRED ) expired = (await session.execute(expired_query)).scalar() # Dernière sync last_sync_query = select(func.max(UniversignTransaction.last_synced_at)) last_sync = (await session.execute(last_sync_query)).scalar() return SyncStatsResponse( total_transactions=total, pending_sync=pending, signed=signed, in_progress=in_progress, refused=refused, expired=expired, last_sync_at=last_sync, ) @router.get("/transactions/{transaction_id}/logs") async def get_transaction_logs( transaction_id: str, limit: int = Query(50, le=500), session: AsyncSession = Depends(get_session), ): # Trouver la transaction tx_query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) tx_result = await session.execute(tx_query) tx = tx_result.scalar_one_or_none() if not tx: raise HTTPException(404, "Transaction introuvable") # Logs logs_query = ( select(UniversignSyncLog) .where(UniversignSyncLog.transaction_id == tx.id) .order_by(UniversignSyncLog.sync_timestamp.desc()) .limit(limit) ) logs_result = await session.execute(logs_query) logs = logs_result.scalars().all() return { "transaction_id": transaction_id, "total_syncs": len(logs), "logs": [ { "sync_type": log.sync_type, "timestamp": log.sync_timestamp.isoformat(), "success": log.success, "previous_status": log.previous_status, "new_status": log.new_status, "error_message": log.error_message, "response_time_ms": log.response_time_ms, } for log in logs ], } # Ajouter ces routes dans universign.py @router.get("/documents/{sage_document_id}/signatures") async def get_signatures_for_document( sage_document_id: str, session: AsyncSession = Depends(get_session), ): """Liste toutes les transactions de signature pour un document Sage""" query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where(UniversignTransaction.sage_document_id == sage_document_id) .order_by(UniversignTransaction.created_at.desc()) ) result = await session.execute(query) transactions = result.scalars().all() return [ { "id": tx.id, "transaction_id": tx.transaction_id, "local_status": tx.local_status.value, "universign_status": tx.universign_status.value if tx.universign_status else None, "created_at": tx.created_at.isoformat(), "signed_at": tx.signed_at.isoformat() if tx.signed_at else None, "signer_url": tx.signer_url, "signers_count": len(tx.signers), } for tx in transactions ] @router.delete("/documents/{sage_document_id}/duplicates") async def cleanup_duplicate_signatures( sage_document_id: str, keep_latest: bool = Query( True, description="Garder la plus récente (True) ou la plus ancienne (False)" ), session: AsyncSession = Depends(get_session), ): """ Supprime les doublons de signatures pour un document. Garde une seule transaction (la plus récente ou ancienne selon le paramètre). """ query = ( select(UniversignTransaction) .where(UniversignTransaction.sage_document_id == sage_document_id) .order_by( UniversignTransaction.created_at.desc() if keep_latest else UniversignTransaction.created_at.asc() ) ) result = await session.execute(query) transactions = result.scalars().all() if len(transactions) <= 1: return { "success": True, "message": "Aucun doublon trouvé", "kept": transactions[0].transaction_id if transactions else None, "deleted_count": 0, } # Garder la première (selon l'ordre), supprimer les autres to_keep = transactions[0] to_delete = transactions[1:] deleted_ids = [] for tx in to_delete: deleted_ids.append(tx.transaction_id) await session.delete(tx) await session.commit() logger.info( f"Nettoyage doublons {sage_document_id}: gardé {to_keep.transaction_id}, supprimé {deleted_ids}" ) return { "success": True, "document_id": sage_document_id, "kept": { "id": to_keep.id, "transaction_id": to_keep.transaction_id, "status": to_keep.local_status.value, "created_at": to_keep.created_at.isoformat(), }, "deleted_count": len(deleted_ids), "deleted_transaction_ids": deleted_ids, } @router.delete("/transactions/{transaction_id}") async def delete_transaction( transaction_id: str, session: AsyncSession = Depends(get_session), ): """Supprime une transaction spécifique par son ID Universign""" query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) tx = result.scalar_one_or_none() if not tx: raise HTTPException(404, f"Transaction {transaction_id} introuvable") await session.delete(tx) await session.commit() logger.info(f"Transaction {transaction_id} supprimée") return { "success": True, "deleted_transaction_id": transaction_id, "document_id": tx.sage_document_id, } @router.post("/cleanup/all-duplicates") async def cleanup_all_duplicates( session: AsyncSession = Depends(get_session), ): """ Nettoie tous les doublons dans la base. Pour chaque document avec plusieurs transactions, garde la plus récente non-erreur ou la plus récente. """ from sqlalchemy import func # Trouver les documents avec plusieurs transactions subquery = ( select( UniversignTransaction.sage_document_id, func.count(UniversignTransaction.id).label("count"), ) .group_by(UniversignTransaction.sage_document_id) .having(func.count(UniversignTransaction.id) > 1) ).subquery() duplicates_query = select(subquery.c.sage_document_id) duplicates_result = await session.execute(duplicates_query) duplicate_docs = [row[0] for row in duplicates_result.fetchall()] total_deleted = 0 cleanup_details = [] for doc_id in duplicate_docs: # Récupérer toutes les transactions pour ce document tx_query = ( select(UniversignTransaction) .where(UniversignTransaction.sage_document_id == doc_id) .order_by(UniversignTransaction.created_at.desc()) ) tx_result = await session.execute(tx_query) transactions = tx_result.scalars().all() # Priorité: SIGNE > EN_COURS > EN_ATTENTE > autres priority = {"SIGNE": 0, "EN_COURS": 1, "EN_ATTENTE": 2} def sort_key(tx): status_priority = priority.get(tx.local_status.value, 99) return (status_priority, -tx.created_at.timestamp()) sorted_txs = sorted(transactions, key=sort_key) to_keep = sorted_txs[0] to_delete = sorted_txs[1:] for tx in to_delete: await session.delete(tx) total_deleted += 1 cleanup_details.append( { "document_id": doc_id, "kept": to_keep.transaction_id, "kept_status": to_keep.local_status.value, "deleted_count": len(to_delete), } ) await session.commit() logger.info( f"Nettoyage global: {total_deleted} doublons supprimés sur {len(duplicate_docs)} documents" ) return { "success": True, "documents_processed": len(duplicate_docs), "total_deleted": total_deleted, "details": cleanup_details, } @router.get("/admin/diagnostic", tags=["Admin"]) async def diagnostic_complet(session: AsyncSession = Depends(get_session)): """ Diagnostic complet de l'état des transactions Universign """ try: # Statistiques générales total_query = select(func.count(UniversignTransaction.id)) total = (await session.execute(total_query)).scalar() # Par statut local statuts_query = select( UniversignTransaction.local_status, func.count(UniversignTransaction.id) ).group_by(UniversignTransaction.local_status) statuts_result = await session.execute(statuts_query) statuts = {status.value: count for status, count in statuts_result.all()} # Transactions sans sync récente date_limite = datetime.now() - timedelta(hours=1) sans_sync_query = select(func.count(UniversignTransaction.id)).where( and_( UniversignTransaction.needs_sync, or_( UniversignTransaction.last_synced_at < date_limite, UniversignTransaction.last_synced_at.is_(None), ), ) ) sans_sync = (await session.execute(sans_sync_query)).scalar() # Doublons potentiels doublons_query = ( select( UniversignTransaction.sage_document_id, func.count(UniversignTransaction.id).label("count"), ) .group_by(UniversignTransaction.sage_document_id) .having(func.count(UniversignTransaction.id) > 1) ) doublons_result = await session.execute(doublons_query) doublons = doublons_result.fetchall() # Transactions avec erreurs de sync erreurs_query = select(func.count(UniversignTransaction.id)).where( UniversignTransaction.sync_error.isnot(None) ) erreurs = (await session.execute(erreurs_query)).scalar() # Transactions sans webhook reçu sans_webhook_query = select(func.count(UniversignTransaction.id)).where( and_( not UniversignTransaction.webhook_received, UniversignTransaction.local_status != LocalDocumentStatus.PENDING, ) ) sans_webhook = (await session.execute(sans_webhook_query)).scalar() diagnostic = { "timestamp": datetime.now().isoformat(), "total_transactions": total, "repartition_statuts": statuts, "problemes_detectes": { "sans_sync_recente": sans_sync, "doublons_possibles": len(doublons), "erreurs_sync": erreurs, "sans_webhook": sans_webhook, }, "documents_avec_doublons": [ {"document_id": doc_id, "nombre_transactions": count} for doc_id, count in doublons ], "recommandations": [], } # Recommandations if sans_sync > 0: diagnostic["recommandations"].append( f"🔄 {sans_sync} transaction(s) à synchroniser. " f"Utilisez POST /universign/sync/all" ) if len(doublons) > 0: diagnostic["recommandations"].append( f"{len(doublons)} document(s) avec doublons. " f"Utilisez POST /universign/cleanup/all-duplicates" ) if erreurs > 0: diagnostic["recommandations"].append( f"{erreurs} transaction(s) en erreur. " f"Vérifiez les logs avec GET /universign/transactions?status=ERREUR" ) return diagnostic except Exception as e: logger.error(f"Erreur diagnostic: {e}") raise HTTPException(500, str(e)) @router.post("/admin/force-sync-all", tags=["Admin"]) async def forcer_sync_toutes_transactions( max_transactions: int = Query(200, le=500), session: AsyncSession = Depends(get_session), ): """ Force la synchronisation de TOUTES les transactions (même finales) À utiliser pour réparer les incohérences """ try: query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .order_by(UniversignTransaction.created_at.desc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() stats = { "total_verifie": len(transactions), "success": 0, "failed": 0, "status_changes": 0, "details": [], } for transaction in transactions: try: previous_status = transaction.local_status.value logger.info( f"🔄 Force sync: {transaction.transaction_id} (statut: {previous_status})" ) success, error = await sync_service.sync_transaction( session, transaction, force=True ) new_status = transaction.local_status.value if success: stats["success"] += 1 if new_status != previous_status: stats["status_changes"] += 1 stats["details"].append( { "transaction_id": transaction.transaction_id, "document_id": transaction.sage_document_id, "changement": f"{previous_status} → {new_status}", } ) else: stats["failed"] += 1 stats["details"].append( { "transaction_id": transaction.transaction_id, "document_id": transaction.sage_document_id, "erreur": error, } ) except Exception as e: logger.error(f"Erreur sync {transaction.transaction_id}: {e}") stats["failed"] += 1 return { "success": True, "stats": stats, "timestamp": datetime.now().isoformat(), } except Exception as e: logger.error(f"Erreur force sync: {e}") raise HTTPException(500, str(e)) @router.post("/admin/repair-transaction/{transaction_id}", tags=["Admin"]) async def reparer_transaction( transaction_id: str, session: AsyncSession = Depends(get_session) ): """ Répare une transaction spécifique en la re-synchronisant depuis Universign """ try: query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: raise HTTPException(404, f"Transaction {transaction_id} introuvable") old_status = transaction.local_status.value old_universign_status = ( transaction.universign_status.value if transaction.universign_status else None ) # Force sync success, error = await sync_service.sync_transaction( session, transaction, force=True ) if not success: return { "success": False, "transaction_id": transaction_id, "erreur": error, "ancien_statut": old_status, } return { "success": True, "transaction_id": transaction_id, "reparation": { "ancien_statut_local": old_status, "nouveau_statut_local": transaction.local_status.value, "ancien_statut_universign": old_universign_status, "nouveau_statut_universign": transaction.universign_status.value, "statut_change": old_status != transaction.local_status.value, }, "derniere_sync": transaction.last_synced_at.isoformat(), } except HTTPException: raise except Exception as e: logger.error(f"Erreur réparation: {e}") raise HTTPException(500, str(e)) @router.get("/admin/transactions-inconsistantes", tags=["Admin"]) async def trouver_transactions_inconsistantes( session: AsyncSession = Depends(get_session), ): """ Trouve les transactions dont le statut local ne correspond pas au statut Universign """ try: # Toutes les transactions non-finales query = select(UniversignTransaction).where( UniversignTransaction.local_status.in_( [LocalDocumentStatus.PENDING, LocalDocumentStatus.IN_PROGRESS] ) ) result = await session.execute(query) transactions = result.scalars().all() inconsistantes = [] for tx in transactions: try: # Récupérer le statut depuis Universign universign_data = sync_service.fetch_transaction_status( tx.transaction_id ) if not universign_data: inconsistantes.append( { "transaction_id": tx.transaction_id, "document_id": tx.sage_document_id, "probleme": "Impossible de récupérer depuis Universign", "statut_local": tx.local_status.value, "statut_universign": None, } ) continue universign_status = universign_data["transaction"].get("state") expected_local_status = map_universign_to_local(universign_status) if expected_local_status != tx.local_status.value: inconsistantes.append( { "transaction_id": tx.transaction_id, "document_id": tx.sage_document_id, "probleme": "Statut incohérent", "statut_local": tx.local_status.value, "statut_universign": universign_status, "statut_attendu": expected_local_status, "derniere_sync": tx.last_synced_at.isoformat() if tx.last_synced_at else None, } ) except Exception as e: logger.error(f"Erreur vérification {tx.transaction_id}: {e}") inconsistantes.append( { "transaction_id": tx.transaction_id, "document_id": tx.sage_document_id, "probleme": f"Erreur: {str(e)}", "statut_local": tx.local_status.value, } ) return { "total_verifie": len(transactions), "inconsistantes": len(inconsistantes), "details": inconsistantes, "recommandation": ( "Utilisez POST /universign/admin/force-sync-all pour corriger" if inconsistantes else "Aucune incohérence détectée" ), } except Exception as e: logger.error(f"Erreur recherche incohérences: {e}") raise HTTPException(500, str(e)) @router.post("/admin/nettoyer-transactions-erreur", tags=["Admin"]) async def nettoyer_transactions_erreur( age_jours: int = Query( 7, description="Supprimer les transactions en erreur de plus de X jours" ), session: AsyncSession = Depends(get_session), ): """ Nettoie les transactions en erreur anciennes """ try: date_limite = datetime.now() - timedelta(days=age_jours) query = select(UniversignTransaction).where( and_( UniversignTransaction.local_status == LocalDocumentStatus.ERROR, UniversignTransaction.created_at < date_limite, ) ) result = await session.execute(query) transactions = result.scalars().all() supprimees = [] for tx in transactions: supprimees.append( { "transaction_id": tx.transaction_id, "document_id": tx.sage_document_id, "date_creation": tx.created_at.isoformat(), "erreur": tx.sync_error, } ) await session.delete(tx) await session.commit() return { "success": True, "transactions_supprimees": len(supprimees), "age_limite_jours": age_jours, "details": supprimees, } except Exception as e: logger.error(f"Erreur nettoyage: {e}") raise HTTPException(500, str(e)) @router.get("/debug/webhook-payload/{transaction_id}", tags=["Debug"]) async def voir_dernier_webhook( transaction_id: str, session: AsyncSession = Depends(get_session) ): """ Affiche le dernier payload webhook reçu pour une transaction """ try: query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) tx = result.scalar_one_or_none() if not tx: raise HTTPException(404, "Transaction introuvable") # Récupérer le dernier log de type webhook logs_query = ( select(UniversignSyncLog) .where( and_( UniversignSyncLog.transaction_id == tx.id, UniversignSyncLog.sync_type.like("webhook:%"), ) ) .order_by(UniversignSyncLog.sync_timestamp.desc()) .limit(1) ) logs_result = await session.execute(logs_query) last_webhook_log = logs_result.scalar_one_or_none() if not last_webhook_log: return { "transaction_id": transaction_id, "webhook_recu": tx.webhook_received, "dernier_payload": None, "message": "Aucun webhook reçu pour cette transaction", } return { "transaction_id": transaction_id, "webhook_recu": tx.webhook_received, "dernier_webhook": { "timestamp": last_webhook_log.sync_timestamp.isoformat(), "type": last_webhook_log.sync_type, "success": last_webhook_log.success, "payload": json.loads(last_webhook_log.changes_detected) if last_webhook_log.changes_detected else None, }, } except HTTPException: raise except Exception as e: logger.error(f"Erreur debug webhook: {e}") raise HTTPException(500, str(e)) @router.get( "/transactions/{transaction_id}/document/download", tags=["Documents Signés"] ) async def telecharger_document_signe( transaction_id: str, session: AsyncSession = Depends(get_session) ): """ Télécharge le document signé localement stocké """ try: query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: raise HTTPException(404, f"Transaction {transaction_id} introuvable") if not transaction.signed_document_path: raise HTTPException( 404, "Document signé non disponible localement. " "Utilisez POST /admin/download-missing-documents pour le récupérer.", ) file_path = Path(transaction.signed_document_path) if not file_path.exists(): # Document perdu, on peut tenter de le retélécharger logger.warning(f"Fichier perdu : {file_path}") raise HTTPException( 404, "Fichier introuvable sur le serveur. " "Utilisez POST /admin/download-missing-documents pour le récupérer.", ) # Génération du nom de fichier pour le téléchargement download_name = ( f"{transaction.sage_document_id}_" f"{transaction.sage_document_type.name}_" f"signe.pdf" ) return FileResponse( path=str(file_path), media_type="application/pdf", filename=download_name ) except HTTPException: raise except Exception as e: logger.error(f"Erreur téléchargement document : {e}", exc_info=True) raise HTTPException(500, str(e)) @router.get("/transactions/{transaction_id}/document/info", tags=["Documents Signés"]) async def info_document_signe( transaction_id: str, session: AsyncSession = Depends(get_session) ): """ Informations sur le document signé """ try: query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: raise HTTPException(404, f"Transaction {transaction_id} introuvable") file_exists = False file_size_mb = None if transaction.signed_document_path: file_path = Path(transaction.signed_document_path) file_exists = file_path.exists() if file_exists: file_size_mb = os.path.getsize(file_path) / (1024 * 1024) return { "transaction_id": transaction_id, "document_available_locally": file_exists, "document_url_universign": transaction.document_url, "downloaded_at": ( transaction.signed_document_downloaded_at.isoformat() if transaction.signed_document_downloaded_at else None ), "file_size_mb": round(file_size_mb, 2) if file_size_mb else None, "download_attempts": transaction.download_attempts, "last_download_error": transaction.download_error, "local_path": transaction.signed_document_path if file_exists else None, } except HTTPException: raise except Exception as e: logger.error(f"Erreur info document : {e}") raise HTTPException(500, str(e)) @router.post("/admin/download-missing-documents", tags=["Admin"]) async def telecharger_documents_manquants( force_redownload: bool = Query( False, description="Forcer le retéléchargement même si déjà présent" ), session: AsyncSession = Depends(get_session), ): """ Télécharge tous les documents signés manquants pour les transactions SIGNE """ try: # Transactions signées sans document local query = select(UniversignTransaction).where( UniversignTransaction.local_status == LocalDocumentStatus.SIGNED, or_( UniversignTransaction.signed_document_path.is_(None), force_redownload, ), ) result = await session.execute(query) transactions = result.scalars().all() logger.info(f"📥 {len(transactions)} document(s) à télécharger") document_service = UniversignDocumentService( api_key=settings.universign_api_key, timeout=60 ) results = {"total": len(transactions), "success": 0, "failed": 0, "details": []} for transaction in transactions: try: ( success, error, ) = await document_service.download_and_store_signed_document( session=session, transaction=transaction, force=force_redownload ) if success: results["success"] += 1 results["details"].append( { "transaction_id": transaction.transaction_id, "sage_document_id": transaction.sage_document_id, "status": "success", } ) else: results["failed"] += 1 results["details"].append( { "transaction_id": transaction.transaction_id, "sage_document_id": transaction.sage_document_id, "status": "failed", "error": error, } ) except Exception as e: logger.error(f"Erreur téléchargement {transaction.transaction_id}: {e}") results["failed"] += 1 results["details"].append( {"transaction_id": transaction.transaction_id, "error": str(e)} ) await session.commit() logger.info( f"Téléchargement terminé : {results['success']}/{results['total']} réussis" ) return results except Exception as e: logger.error(f"Erreur téléchargement batch : {e}", exc_info=True) raise HTTPException(500, str(e)) @router.post("/admin/cleanup-old-documents", tags=["Admin"]) async def nettoyer_anciens_documents( days_to_keep: int = Query( 90, ge=7, le=365, description="Nombre de jours à conserver" ), ): """ Supprime les documents signés de plus de X jours (par défaut 90) """ try: document_service = UniversignDocumentService( api_key=settings.universign_api_key ) deleted, size_freed_mb = await document_service.cleanup_old_documents( days_to_keep=days_to_keep ) return { "success": True, "files_deleted": deleted, "space_freed_mb": size_freed_mb, "days_kept": days_to_keep, } except Exception as e: logger.error(f"Erreur nettoyage : {e}") raise HTTPException(500, str(e))