Sage100-vps/routes/universign.py
Fanilo-Nantenaina ba4a9d4773 feat(universign): improve transaction and signer management
- Update signer handling
2026-01-06 18:31:53 +03:00

795 lines
25 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, EmailStr
import logging
from data.data import templates_signature_email
from email_queue import email_queue
from database import UniversignSignerStatus, UniversignTransactionStatus, get_session
from database import (
UniversignTransaction,
UniversignSigner,
UniversignSyncLog,
LocalDocumentStatus,
SageDocumentType,
)
from services.universign_sync import UniversignSyncService
from config.config import settings
from utils.generic_functions import normaliser_type_doc
from utils.universign_status_mapping import get_status_message
from database.models.email import EmailLog
from database.enum.status import StatutEmail
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/universign", tags=["Universign"])
sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key
)
class CreateSignatureRequest(BaseModel):
"""Demande de création d'une signature"""
sage_document_id: str
sage_document_type: SageDocumentType
signer_email: EmailStr
signer_name: str
document_name: Optional[str] = None
class TransactionResponse(BaseModel):
"""Réponse détaillée d'une transaction"""
id: str
transaction_id: str
sage_document_id: str
sage_document_type: str
universign_status: str
local_status: str
local_status_label: str
signer_url: Optional[str]
document_url: Optional[str]
created_at: datetime
sent_at: Optional[datetime]
signed_at: Optional[datetime]
last_synced_at: Optional[datetime]
needs_sync: bool
signers: List[dict]
class SyncStatsResponse(BaseModel):
"""Statistiques de synchronisation"""
total_transactions: int
pending_sync: int
signed: int
in_progress: int
refused: int
expired: int
last_sync_at: Optional[datetime]
@router.post("/signatures/create", response_model=TransactionResponse)
async def create_signature(
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
):
try:
# === VÉRIFICATION DOUBLON ===
existing_query = select(UniversignTransaction).where(
UniversignTransaction.sage_document_id == request.sage_document_id,
UniversignTransaction.sage_document_type == request.sage_document_type,
~UniversignTransaction.local_status.in_(
[
LocalDocumentStatus.SIGNED,
LocalDocumentStatus.REJECTED,
LocalDocumentStatus.EXPIRED,
LocalDocumentStatus.ERROR,
]
),
)
existing_result = await session.execute(existing_query)
existing_tx = existing_result.scalar_one_or_none()
if existing_tx:
raise HTTPException(
400,
f"Une demande de signature est déjà en cours pour {request.sage_document_id} "
f"(transaction: {existing_tx.transaction_id}, statut: {existing_tx.local_status.value})",
)
pdf_bytes = email_queue._generate_pdf(
request.sage_document_id, normaliser_type_doc(request.sage_document_type)
)
if not pdf_bytes:
raise HTTPException(400, "Échec génération PDF")
# === CRÉATION TRANSACTION UNIVERSIGN ===
import requests
import uuid
auth = (settings.universign_api_key, "")
resp = requests.post(
f"{settings.universign_api_url}/transactions",
auth=auth,
json={
"name": request.document_name
or f"{request.sage_document_type.name} {request.sage_document_id}",
"language": "fr",
},
timeout=30,
)
if resp.status_code != 200:
raise HTTPException(500, f"Erreur Universign: {resp.status_code}")
universign_tx_id = resp.json().get("id")
files = {
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
}
resp = requests.post(
f"{settings.universign_api_url}/files", auth=auth, files=files, timeout=60
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur upload PDF")
file_id = resp.json().get("id")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
auth=auth,
data={"document": file_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur attachement document")
document_id = resp.json().get("id")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
auth=auth,
data={"type": "signature"},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur création champ signature")
field_id = resp.json().get("id")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
auth=auth,
data={"signer": request.signer_email, "field": field_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur liaison signataire")
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
auth=auth,
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur démarrage transaction")
final_data = resp.json()
signer_url = ""
if final_data.get("actions"):
for action in final_data["actions"]:
if action.get("url"):
signer_url = action["url"]
break
if not signer_url:
raise HTTPException(500, "URL de signature non retournée")
# === ENREGISTREMENT LOCAL ===
local_id = str(uuid.uuid4())
transaction = UniversignTransaction(
id=local_id,
transaction_id=universign_tx_id,
sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type,
universign_status=UniversignTransactionStatus.STARTED,
local_status=LocalDocumentStatus.IN_PROGRESS,
signer_url=signer_url,
requester_email=request.signer_email,
requester_name=request.signer_name,
document_name=request.document_name,
created_at=datetime.now(),
sent_at=datetime.now(),
is_test=True,
needs_sync=True,
)
session.add(transaction)
signer = UniversignSigner(
id=f"{local_id}_signer_0",
transaction_id=local_id,
email=request.signer_email,
name=request.signer_name,
status=UniversignSignerStatus.WAITING,
order_index=0,
)
session.add(signer)
await session.commit()
# === ENVOI EMAIL AVEC TEMPLATE ===
template = templates_signature_email["demande_signature"]
type_labels = {
0: "Devis",
10: "Commande",
30: "Bon de Livraison",
60: "Facture",
50: "Avoir",
}
doc_info = email_queue.sage_client.lire_document(
request.sage_document_id, request.sage_document_type.value
)
montant_ttc = f"{doc_info.get('total_ttc', 0):.2f}" if doc_info else "0.00"
date_doc = (
doc_info.get("date", datetime.now().strftime("%d/%m/%Y"))
if doc_info
else datetime.now().strftime("%d/%m/%Y")
)
variables = {
"NOM_SIGNATAIRE": request.signer_name,
"TYPE_DOC": type_labels.get(request.sage_document_type.value, "Document"),
"NUMERO": request.sage_document_id,
"DATE": date_doc,
"MONTANT_TTC": montant_ttc,
"SIGNER_URL": signer_url,
"CONTACT_EMAIL": settings.smtp_from,
}
sujet = template["sujet"]
corps = template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur))
corps = corps.replace(f"{{{{{var}}}}}", str(valeur))
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=request.signer_email,
sujet=sujet,
corps_html=corps,
document_ids=request.sage_document_id,
type_document=request.sage_document_type.value,
statut=StatutEmail.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.commit()
email_queue.enqueue(email_log.id)
# === MISE À JOUR STATUT SAGE (Confirmé = 1) ===
try:
from sage_client import sage_client
sage_client.changer_statut_document(
document_type_code=request.sage_document_type.value,
numero=request.sage_document_id,
nouveau_statut=1,
)
logger.info(
f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)"
)
except Exception as e:
logger.warning(f"Impossible de mettre à jour le statut Sage: {e}")
# === RÉPONSE ===
return TransactionResponse(
id=transaction.id,
transaction_id=transaction.transaction_id,
sage_document_id=transaction.sage_document_id,
sage_document_type=transaction.sage_document_type.name,
universign_status=transaction.universign_status.value,
local_status=transaction.local_status.value,
local_status_label=get_status_message(transaction.local_status.value),
signer_url=transaction.signer_url,
document_url=None,
created_at=transaction.created_at,
sent_at=transaction.sent_at,
signed_at=None,
last_synced_at=None,
needs_sync=True,
signers=[
{
"email": signer.email,
"name": signer.name,
"status": signer.status.value,
}
],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur création signature: {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
async def list_transactions(
status: Optional[LocalDocumentStatus] = None,
sage_document_id: Optional[str] = None,
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions"""
query = select(UniversignTransaction).options(
selectinload(UniversignTransaction.signers)
)
if status:
query = query.where(UniversignTransaction.local_status == status)
if sage_document_id:
query = query.where(UniversignTransaction.sage_document_id == sage_document_id)
query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit)
result = await session.execute(query)
transactions = result.scalars().all()
return [
TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
)
for tx in transactions
]
@router.get("/transactions/{transaction_id}", response_model=TransactionResponse)
async def get_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""Récupère une transaction par son ID"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.transaction_id == transaction_id)
.options(selectinload(UniversignTransaction.signers))
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
return TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
)
@router.post("/transactions/{transaction_id}/sync")
async def sync_single_transaction(
transaction_id: str,
force: bool = Query(False),
session: AsyncSession = Depends(get_session),
):
"""Force la synchronisation d'une transaction"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, "Transaction introuvable")
success, error = await sync_service.sync_transaction(
session, transaction, force=force
)
if not success:
raise HTTPException(500, error or "Échec synchronisation")
return {
"success": True,
"transaction_id": transaction_id,
"new_status": transaction.local_status.value,
"synced_at": transaction.last_synced_at.isoformat(),
}
@router.post("/sync/all")
async def sync_all_transactions(
max_transactions: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
"""Synchronise toutes les transactions en attente"""
stats = await sync_service.sync_all_pending(session, max_transactions)
return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()}
@router.post("/webhook")
@router.post("/webhook/")
async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session)
):
try:
payload = await request.json()
logger.info(
f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}"
)
success, error = await sync_service.process_webhook(session, payload)
if not success:
logger.error(f"Erreur traitement webhook: {error}")
return {"status": "error", "message": error}, 500
return {
"status": "processed",
"event": payload.get("event"),
"transaction_id": payload.get("transaction_id"),
}
except Exception as e:
logger.error(f"Erreur webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500
@router.get("/stats", response_model=SyncStatsResponse)
async def get_sync_stats(session: AsyncSession = Depends(get_session)):
"""Statistiques globales de synchronisation"""
# Total
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# En attente de sync
pending_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.needs_sync
)
pending = (await session.execute(pending_query)).scalar()
# Par statut
signed_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.SIGNED
)
signed = (await session.execute(signed_query)).scalar()
in_progress_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.IN_PROGRESS
)
in_progress = (await session.execute(in_progress_query)).scalar()
refused_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.REJECTED
)
refused = (await session.execute(refused_query)).scalar()
expired_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EXPIRED
)
expired = (await session.execute(expired_query)).scalar()
# Dernière sync
last_sync_query = select(func.max(UniversignTransaction.last_synced_at))
last_sync = (await session.execute(last_sync_query)).scalar()
return SyncStatsResponse(
total_transactions=total,
pending_sync=pending,
signed=signed,
in_progress=in_progress,
refused=refused,
expired=expired,
last_sync_at=last_sync,
)
@router.get("/transactions/{transaction_id}/logs")
async def get_transaction_logs(
transaction_id: str,
limit: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
# Trouver la transaction
tx_query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
tx_result = await session.execute(tx_query)
tx = tx_result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Logs
logs_query = (
select(UniversignSyncLog)
.where(UniversignSyncLog.transaction_id == tx.id)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(limit)
)
logs_result = await session.execute(logs_query)
logs = logs_result.scalars().all()
return {
"transaction_id": transaction_id,
"total_syncs": len(logs),
"logs": [
{
"sync_type": log.sync_type,
"timestamp": log.sync_timestamp.isoformat(),
"success": log.success,
"previous_status": log.previous_status,
"new_status": log.new_status,
"error_message": log.error_message,
"response_time_ms": log.response_time_ms,
}
for log in logs
],
}
@router.get("/documents/{sage_document_id}/signatures")
async def get_signatures_for_document(
sage_document_id: str,
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions de signature pour un document Sage"""
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers))
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(UniversignTransaction.created_at.desc())
)
result = await session.execute(query)
transactions = result.scalars().all()
return [
{
"id": tx.id,
"transaction_id": tx.transaction_id,
"local_status": tx.local_status.value,
"universign_status": tx.universign_status.value
if tx.universign_status
else None,
"created_at": tx.created_at.isoformat(),
"signed_at": tx.signed_at.isoformat() if tx.signed_at else None,
"signer_url": tx.signer_url,
"signers_count": len(tx.signers),
}
for tx in transactions
]
@router.delete("/documents/{sage_document_id}/duplicates")
async def cleanup_duplicate_signatures(
sage_document_id: str,
keep_latest: bool = Query(
True, description="Garder la plus récente (True) ou la plus ancienne (False)"
),
session: AsyncSession = Depends(get_session),
):
"""
Supprime les doublons de signatures pour un document.
Garde une seule transaction (la plus récente ou ancienne selon le paramètre).
"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == sage_document_id)
.order_by(
UniversignTransaction.created_at.desc()
if keep_latest
else UniversignTransaction.created_at.asc()
)
)
result = await session.execute(query)
transactions = result.scalars().all()
if len(transactions) <= 1:
return {
"success": True,
"message": "Aucun doublon trouvé",
"kept": transactions[0].transaction_id if transactions else None,
"deleted_count": 0,
}
# Garder la première (selon l'ordre), supprimer les autres
to_keep = transactions[0]
to_delete = transactions[1:]
deleted_ids = []
for tx in to_delete:
deleted_ids.append(tx.transaction_id)
await session.delete(tx)
await session.commit()
logger.info(
f"Nettoyage doublons {sage_document_id}: gardé {to_keep.transaction_id}, supprimé {deleted_ids}"
)
return {
"success": True,
"document_id": sage_document_id,
"kept": {
"id": to_keep.id,
"transaction_id": to_keep.transaction_id,
"status": to_keep.local_status.value,
"created_at": to_keep.created_at.isoformat(),
},
"deleted_count": len(deleted_ids),
"deleted_transaction_ids": deleted_ids,
}
@router.delete("/transactions/{transaction_id}")
async def delete_transaction(
transaction_id: str,
session: AsyncSession = Depends(get_session),
):
"""Supprime une transaction spécifique par son ID Universign"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
await session.delete(tx)
await session.commit()
logger.info(f"Transaction {transaction_id} supprimée")
return {
"success": True,
"deleted_transaction_id": transaction_id,
"document_id": tx.sage_document_id,
}
@router.post("/cleanup/all-duplicates")
async def cleanup_all_duplicates(
session: AsyncSession = Depends(get_session),
):
"""
Nettoie tous les doublons dans la base.
Pour chaque document avec plusieurs transactions, garde la plus récente non-erreur ou la plus récente.
"""
from sqlalchemy import func
# Trouver les documents avec plusieurs transactions
subquery = (
select(
UniversignTransaction.sage_document_id,
func.count(UniversignTransaction.id).label("count"),
)
.group_by(UniversignTransaction.sage_document_id)
.having(func.count(UniversignTransaction.id) > 1)
).subquery()
duplicates_query = select(subquery.c.sage_document_id)
duplicates_result = await session.execute(duplicates_query)
duplicate_docs = [row[0] for row in duplicates_result.fetchall()]
total_deleted = 0
cleanup_details = []
for doc_id in duplicate_docs:
# Récupérer toutes les transactions pour ce document
tx_query = (
select(UniversignTransaction)
.where(UniversignTransaction.sage_document_id == doc_id)
.order_by(UniversignTransaction.created_at.desc())
)
tx_result = await session.execute(tx_query)
transactions = tx_result.scalars().all()
# Priorité: SIGNE > EN_COURS > EN_ATTENTE > autres
priority = {"SIGNE": 0, "EN_COURS": 1, "EN_ATTENTE": 2}
def sort_key(tx):
status_priority = priority.get(tx.local_status.value, 99)
return (status_priority, -tx.created_at.timestamp())
sorted_txs = sorted(transactions, key=sort_key)
to_keep = sorted_txs[0]
to_delete = sorted_txs[1:]
for tx in to_delete:
await session.delete(tx)
total_deleted += 1
cleanup_details.append(
{
"document_id": doc_id,
"kept": to_keep.transaction_id,
"kept_status": to_keep.local_status.value,
"deleted_count": len(to_delete),
}
)
await session.commit()
logger.info(
f"Nettoyage global: {total_deleted} doublons supprimés sur {len(duplicate_docs)} documents"
)
return {
"success": True,
"documents_processed": len(duplicate_docs),
"total_deleted": total_deleted,
"details": cleanup_details,
}