Sage100-vps/routes/universign.py

535 lines
17 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, EmailStr
import logging
from api import normaliser_type_doc
from email_queue import email_queue
from database import get_session
from database import (
UniversignTransaction,
UniversignSigner,
UniversignSyncLog,
LocalDocumentStatus,
SageDocumentType,
)
from services.universign_sync import UniversignSyncService
from config.config import settings
from utils.universign_status_mapping import get_status_message
from database.models.email import EmailLog
from database.enum.status import StatutEmail
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/universign", tags=["Universign"])
sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key
)
class CreateSignatureRequest(BaseModel):
"""Demande de création d'une signature"""
sage_document_id: str
sage_document_type: SageDocumentType
signer_email: EmailStr
signer_name: str
document_name: Optional[str] = None
class TransactionResponse(BaseModel):
"""Réponse détaillée d'une transaction"""
id: str
transaction_id: str
sage_document_id: str
sage_document_type: str
universign_status: str
local_status: str
local_status_label: str
signer_url: Optional[str]
document_url: Optional[str]
created_at: datetime
sent_at: Optional[datetime]
signed_at: Optional[datetime]
last_synced_at: Optional[datetime]
needs_sync: bool
signers: List[dict]
class SyncStatsResponse(BaseModel):
"""Statistiques de synchronisation"""
total_transactions: int
pending_sync: int
signed: int
in_progress: int
refused: int
expired: int
last_sync_at: Optional[datetime]
@router.post("/signatures/create", response_model=TransactionResponse)
async def create_signature(
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
):
try:
pdf_bytes = email_queue._generate_pdf(
request.doc_id, normaliser_type_doc(request.type_doc)
)
if not pdf_bytes:
raise HTTPException(400, "Échec génération PDF")
# === 2. CRÉATION TRANSACTION UNIVERSIGN ===
import requests
import uuid
auth = (settings.universign_api_key, "")
# Créer la transaction
resp = requests.post(
f"{settings.universign_api_url}/transactions",
auth=auth,
json={
"name": request.document_name
or f"{request.sage_document_type.name} {request.sage_document_id}",
"language": "fr",
},
timeout=30,
)
if resp.status_code != 200:
raise HTTPException(500, f"Erreur Universign: {resp.status_code}")
universign_tx_id = resp.json().get("id")
# Upload le fichier
files = {
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
}
resp = requests.post(
f"{settings.universign_api_url}/files", auth=auth, files=files, timeout=60
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur upload PDF")
file_id = resp.json().get("id")
# Attacher le document
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
auth=auth,
data={"document": file_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur attachement document")
document_id = resp.json().get("id")
# Créer le champ de signature
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
auth=auth,
data={"type": "signature"},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur création champ signature")
field_id = resp.json().get("id")
# Lier le signataire
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
auth=auth,
data={"signer": request.signer_email, "field": field_id},
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur liaison signataire")
# Démarrer la transaction
resp = requests.post(
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
auth=auth,
timeout=30,
)
if resp.status_code not in [200, 201]:
raise HTTPException(500, "Erreur démarrage transaction")
final_data = resp.json()
# Extraire l'URL de signature
signer_url = ""
if final_data.get("actions"):
for action in final_data["actions"]:
if action.get("url"):
signer_url = action["url"]
break
if not signer_url:
raise HTTPException(500, "URL de signature non retournée")
# === 3. ENREGISTREMENT LOCAL ===
local_id = str(uuid.uuid4())
transaction = UniversignTransaction(
id=local_id,
transaction_id=universign_tx_id,
sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type,
universign_status="started",
local_status=LocalDocumentStatus.EN_COURS,
signer_url=signer_url,
requester_email=request.signer_email,
requester_name=request.signer_name,
document_name=request.document_name,
created_at=datetime.now(),
sent_at=datetime.now(),
is_test=True, # Environnement .alpha
needs_sync=True,
)
session.add(transaction)
# Signataire
signer = UniversignSigner(
id=f"{local_id}_signer_0",
transaction_id=local_id,
email=request.signer_email,
name=request.signer_name,
status="waiting",
order_index=0,
)
session.add(signer)
await session.commit()
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=request.signer_email,
sujet=f"Signature requise - {request.sage_document_type.name} {request.sage_document_id}",
corps_html=f"""
<p>Bonjour {request.signer_name},</p>
<p>Merci de signer le document suivant :</p>
<p><a href="{signer_url}">Cliquez ici pour signer</a></p>
""",
document_ids=request.sage_document_id,
type_document=request.sage_document_type.value,
statut=StatutEmail.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.commit()
email_queue.enqueue(email_log.id)
# === RÉPONSE ===
return TransactionResponse(
id=transaction.id,
transaction_id=transaction.transaction_id,
sage_document_id=transaction.sage_document_id,
sage_document_type=transaction.sage_document_type.name,
universign_status=transaction.universign_status.value,
local_status=transaction.local_status.value,
local_status_label=get_status_message(transaction.local_status.value),
signer_url=transaction.signer_url,
document_url=None,
created_at=transaction.created_at,
sent_at=transaction.sent_at,
signed_at=None,
last_synced_at=None,
needs_sync=True,
signers=[
{
"email": signer.email,
"name": signer.name,
"status": signer.status.value,
}
],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur création signature: {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.get("/transactions", response_model=List[TransactionResponse])
async def list_transactions(
status: Optional[LocalDocumentStatus] = None,
sage_document_id: Optional[str] = None,
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""Liste toutes les transactions"""
query = select(UniversignTransaction).options(
selectinload(UniversignTransaction.signers)
)
if status:
query = query.where(UniversignTransaction.local_status == status)
if sage_document_id:
query = query.where(UniversignTransaction.sage_document_id == sage_document_id)
query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit)
result = await session.execute(query)
transactions = result.scalars().all()
return [
TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
)
for tx in transactions
]
@router.get("/transactions/{transaction_id}", response_model=TransactionResponse)
async def get_transaction(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""Récupère une transaction par son ID"""
query = (
select(UniversignTransaction)
.where(UniversignTransaction.transaction_id == transaction_id)
.options(selectinload(UniversignTransaction.signers))
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
return TransactionResponse(
id=tx.id,
transaction_id=tx.transaction_id,
sage_document_id=tx.sage_document_id,
sage_document_type=tx.sage_document_type.name,
universign_status=tx.universign_status.value,
local_status=tx.local_status.value,
local_status_label=get_status_message(tx.local_status.value),
signer_url=tx.signer_url,
document_url=tx.document_url,
created_at=tx.created_at,
sent_at=tx.sent_at,
signed_at=tx.signed_at,
last_synced_at=tx.last_synced_at,
needs_sync=tx.needs_sync,
signers=[
{
"email": s.email,
"name": s.name,
"status": s.status.value,
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
}
for s in tx.signers
],
)
@router.post("/transactions/{transaction_id}/sync")
async def sync_single_transaction(
transaction_id: str,
force: bool = Query(False),
session: AsyncSession = Depends(get_session),
):
"""Force la synchronisation d'une transaction"""
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, "Transaction introuvable")
success, error = await sync_service.sync_transaction(
session, transaction, force=force
)
if not success:
raise HTTPException(500, error or "Échec synchronisation")
return {
"success": True,
"transaction_id": transaction_id,
"new_status": transaction.local_status.value,
"synced_at": transaction.last_synced_at.isoformat(),
}
@router.post("/sync/all")
async def sync_all_transactions(
max_transactions: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
"""Synchronise toutes les transactions en attente"""
stats = await sync_service.sync_all_pending(session, max_transactions)
return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()}
@router.post("/webhook")
async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session)
):
try:
payload = await request.json()
logger.info(
f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}"
)
success, error = await sync_service.process_webhook(session, payload)
if not success:
logger.error(f"Erreur traitement webhook: {error}")
return {"status": "error", "message": error}, 500
return {
"status": "processed",
"event": payload.get("event"),
"transaction_id": payload.get("transaction_id"),
}
except Exception as e:
logger.error(f"Erreur webhook: {e}", exc_info=True)
return {"status": "error", "message": str(e)}, 500
@router.get("/stats", response_model=SyncStatsResponse)
async def get_sync_stats(session: AsyncSession = Depends(get_session)):
"""Statistiques globales de synchronisation"""
# Total
total_query = select(func.count(UniversignTransaction.id))
total = (await session.execute(total_query)).scalar()
# En attente de sync
pending_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.needs_sync
)
pending = (await session.execute(pending_query)).scalar()
# Par statut
signed_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.SIGNE
)
signed = (await session.execute(signed_query)).scalar()
in_progress_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EN_COURS
)
in_progress = (await session.execute(in_progress_query)).scalar()
refused_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.REFUSE
)
refused = (await session.execute(refused_query)).scalar()
expired_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EXPIRE
)
expired = (await session.execute(expired_query)).scalar()
# Dernière sync
last_sync_query = select(func.max(UniversignTransaction.last_synced_at))
last_sync = (await session.execute(last_sync_query)).scalar()
return SyncStatsResponse(
total_transactions=total,
pending_sync=pending,
signed=signed,
in_progress=in_progress,
refused=refused,
expired=expired,
last_sync_at=last_sync,
)
@router.get("/transactions/{transaction_id}/logs")
async def get_transaction_logs(
transaction_id: str,
limit: int = Query(50, le=500),
session: AsyncSession = Depends(get_session),
):
# Trouver la transaction
tx_query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
tx_result = await session.execute(tx_query)
tx = tx_result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
# Logs
logs_query = (
select(UniversignSyncLog)
.where(UniversignSyncLog.transaction_id == tx.id)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(limit)
)
logs_result = await session.execute(logs_query)
logs = logs_result.scalars().all()
return {
"transaction_id": transaction_id,
"total_syncs": len(logs),
"logs": [
{
"sync_type": log.sync_type,
"timestamp": log.sync_timestamp.isoformat(),
"success": log.success,
"previous_status": log.previous_status,
"new_status": log.new_status,
"error_message": log.error_message,
"response_time_ms": log.response_time_ms,
}
for log in logs
],
}