Compare commits
No commits in common. "9bc2a3f7c91e049d9248420d8281f40b94a4adeb" and "92a2b95cbbff363feafc668f67ce19b1c4d8dc40" have entirely different histories.
9bc2a3f7c9
...
92a2b95cbb
5 changed files with 733 additions and 785 deletions
14
api.py
14
api.py
|
|
@ -80,7 +80,7 @@ from utils.normalization import normaliser_type_tiers
|
|||
from routes.sage_gateway import router as sage_gateway_router
|
||||
from routes.universign import router as universign_router
|
||||
|
||||
from services.universign_sync import UniversignSyncService
|
||||
from services.universign_sync import UniversignSyncService, UniversignSyncScheduler
|
||||
|
||||
from core.sage_context import (
|
||||
get_sage_client_for_user,
|
||||
|
|
@ -129,15 +129,19 @@ async def lifespan(app: FastAPI):
|
|||
api_url=settings.universign_api_url, api_key=settings.universign_api_key
|
||||
)
|
||||
|
||||
# Configuration du service avec les dépendances
|
||||
sync_service.configure(
|
||||
sage_client=sage_client, email_queue=email_queue, settings=settings
|
||||
scheduler = UniversignSyncScheduler(
|
||||
sync_service=sync_service,
|
||||
interval_minutes=5, # Synchronisation toutes les 5 minutes
|
||||
)
|
||||
|
||||
logger.info("Synchronisation Universign démarrée (5min)")
|
||||
sync_task = asyncio.create_task(scheduler.start(async_session_factory))
|
||||
|
||||
logger.info("✓ Synchronisation Universign démarrée (5min)")
|
||||
|
||||
yield
|
||||
|
||||
scheduler.stop()
|
||||
sync_task.cancel()
|
||||
email_queue.stop()
|
||||
logger.info("Services arrêtés")
|
||||
|
||||
|
|
|
|||
|
|
@ -145,12 +145,6 @@ class UniversignTransaction(Base):
|
|||
)
|
||||
webhook_received = Column(Boolean, default=False, comment="Webhook Universign reçu")
|
||||
|
||||
signed_document_path = Column(
|
||||
Text, nullable=True, comment="Chemin local du document signé téléchargé"
|
||||
)
|
||||
signed_document_downloaded_at = Column(
|
||||
DateTime, nullable=True, comment="Date de téléchargement du document signé"
|
||||
)
|
||||
# === RELATION ===
|
||||
signers = relationship(
|
||||
"UniversignSigner", back_populates="transaction", cascade="all, delete-orphan"
|
||||
|
|
|
|||
|
|
@ -1,48 +1,36 @@
|
|||
"""
|
||||
Routes API Universign améliorées
|
||||
Intègre la logique métier complète de gestion des signatures
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Path, Request
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.orm import selectinload
|
||||
from typing import Optional
|
||||
from typing import List, Optional
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel, EmailStr
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from data.data import templates_signature_email
|
||||
from email_queue import email_queue
|
||||
from database import UniversignSignerStatus, UniversignTransactionStatus, get_session
|
||||
from database import (
|
||||
UniversignTransaction,
|
||||
UniversignSigner,
|
||||
UniversignSyncLog,
|
||||
LocalDocumentStatus,
|
||||
SageDocumentType,
|
||||
UniversignTransactionStatus,
|
||||
UniversignSignerStatus,
|
||||
get_session,
|
||||
EmailLog,
|
||||
StatutEmail,
|
||||
)
|
||||
from services.universign_sync import UniversignSyncService
|
||||
from services.signed_documents import signed_documents
|
||||
from config.config import settings
|
||||
from email_queue import email_queue
|
||||
from sage_client import sage_client
|
||||
from data.data import templates_signature_email
|
||||
from utils.generic_functions import normaliser_type_doc
|
||||
from utils.universign_status_mapping import get_status_message
|
||||
|
||||
from database.models.email import EmailLog
|
||||
from database.enum.status import StatutEmail
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/universign", tags=["Universign Enhanced"])
|
||||
router = APIRouter(prefix="/universign", tags=["Universign"])
|
||||
|
||||
# Service de synchronisation amélioré
|
||||
universign_sync = UniversignSyncService(
|
||||
sync_service = UniversignSyncService(
|
||||
api_url=settings.universign_api_url, api_key=settings.universign_api_key
|
||||
)
|
||||
universign_sync.configure(
|
||||
sage_client=sage_client, email_queue=email_queue, settings=settings
|
||||
)
|
||||
|
||||
|
||||
class CreateSignatureRequest(BaseModel):
|
||||
|
|
@ -55,52 +43,43 @@ class CreateSignatureRequest(BaseModel):
|
|||
document_name: Optional[str] = None
|
||||
|
||||
|
||||
@router.post("/signatures/create-enhanced")
|
||||
async def create_signature_enhanced(
|
||||
class TransactionResponse(BaseModel):
|
||||
"""Réponse détaillée d'une transaction"""
|
||||
|
||||
id: str
|
||||
transaction_id: str
|
||||
sage_document_id: str
|
||||
sage_document_type: str
|
||||
universign_status: str
|
||||
local_status: str
|
||||
local_status_label: str
|
||||
signer_url: Optional[str]
|
||||
document_url: Optional[str]
|
||||
created_at: datetime
|
||||
sent_at: Optional[datetime]
|
||||
signed_at: Optional[datetime]
|
||||
last_synced_at: Optional[datetime]
|
||||
needs_sync: bool
|
||||
signers: List[dict]
|
||||
|
||||
|
||||
class SyncStatsResponse(BaseModel):
|
||||
"""Statistiques de synchronisation"""
|
||||
|
||||
total_transactions: int
|
||||
pending_sync: int
|
||||
signed: int
|
||||
in_progress: int
|
||||
refused: int
|
||||
expired: int
|
||||
last_sync_at: Optional[datetime]
|
||||
|
||||
|
||||
@router.post("/signatures/create", response_model=TransactionResponse)
|
||||
async def create_signature(
|
||||
request: CreateSignatureRequest, session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
"""
|
||||
Création de signature avec logique métier stricte:
|
||||
- Vérifie le statut Sage actuel
|
||||
- Ne met à jour à 1 QUE si statut = 0
|
||||
- Crée la transaction Universign
|
||||
- Envoie l'email de demande
|
||||
"""
|
||||
try:
|
||||
# === VÉRIFICATION STATUT SAGE ACTUEL ===
|
||||
doc = sage_client.lire_document(
|
||||
request.sage_document_id, request.sage_document_type.value
|
||||
)
|
||||
|
||||
if not doc:
|
||||
raise HTTPException(404, f"Document {request.sage_document_id} introuvable")
|
||||
|
||||
statut_actuel = doc.get("statut", 0)
|
||||
logger.info(f"📊 Statut Sage actuel: {statut_actuel}")
|
||||
|
||||
# === VÉRIFICATION DOUBLON ===
|
||||
existing_query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.sage_document_id == request.sage_document_id,
|
||||
UniversignTransaction.sage_document_type == request.sage_document_type,
|
||||
~UniversignTransaction.local_status.in_(
|
||||
[
|
||||
LocalDocumentStatus.SIGNED,
|
||||
LocalDocumentStatus.REJECTED,
|
||||
LocalDocumentStatus.EXPIRED,
|
||||
LocalDocumentStatus.ERROR,
|
||||
]
|
||||
),
|
||||
)
|
||||
existing_result = await session.execute(existing_query)
|
||||
existing_tx = existing_result.scalar_one_or_none()
|
||||
|
||||
if existing_tx:
|
||||
raise HTTPException(
|
||||
400,
|
||||
f"Une demande de signature est déjà en cours pour {request.sage_document_id}",
|
||||
)
|
||||
|
||||
# === GÉNÉRATION PDF ===
|
||||
pdf_bytes = email_queue._generate_pdf(
|
||||
request.sage_document_id, normaliser_type_doc(request.sage_document_type)
|
||||
)
|
||||
|
|
@ -110,10 +89,10 @@ async def create_signature_enhanced(
|
|||
|
||||
# === CRÉATION TRANSACTION UNIVERSIGN ===
|
||||
import requests
|
||||
import uuid
|
||||
|
||||
auth = (settings.universign_api_key, "")
|
||||
|
||||
# 1. Créer transaction
|
||||
resp = requests.post(
|
||||
f"{settings.universign_api_url}/transactions",
|
||||
auth=auth,
|
||||
|
|
@ -130,7 +109,6 @@ async def create_signature_enhanced(
|
|||
|
||||
universign_tx_id = resp.json().get("id")
|
||||
|
||||
# 2. Upload PDF
|
||||
files = {
|
||||
"file": (f"{request.sage_document_id}.pdf", pdf_bytes, "application/pdf")
|
||||
}
|
||||
|
|
@ -143,7 +121,6 @@ async def create_signature_enhanced(
|
|||
|
||||
file_id = resp.json().get("id")
|
||||
|
||||
# 3. Attacher document
|
||||
resp = requests.post(
|
||||
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents",
|
||||
auth=auth,
|
||||
|
|
@ -156,7 +133,6 @@ async def create_signature_enhanced(
|
|||
|
||||
document_id = resp.json().get("id")
|
||||
|
||||
# 4. Créer champ signature
|
||||
resp = requests.post(
|
||||
f"{settings.universign_api_url}/transactions/{universign_tx_id}/documents/{document_id}/fields",
|
||||
auth=auth,
|
||||
|
|
@ -169,7 +145,6 @@ async def create_signature_enhanced(
|
|||
|
||||
field_id = resp.json().get("id")
|
||||
|
||||
# 5. Lier signataire
|
||||
resp = requests.post(
|
||||
f"{settings.universign_api_url}/transactions/{universign_tx_id}/signatures",
|
||||
auth=auth,
|
||||
|
|
@ -180,7 +155,6 @@ async def create_signature_enhanced(
|
|||
if resp.status_code not in [200, 201]:
|
||||
raise HTTPException(500, "Erreur liaison signataire")
|
||||
|
||||
# 6. Démarrer transaction
|
||||
resp = requests.post(
|
||||
f"{settings.universign_api_url}/transactions/{universign_tx_id}/start",
|
||||
auth=auth,
|
||||
|
|
@ -192,7 +166,6 @@ async def create_signature_enhanced(
|
|||
|
||||
final_data = resp.json()
|
||||
|
||||
# 7. Extraire URL de signature
|
||||
signer_url = ""
|
||||
if final_data.get("actions"):
|
||||
for action in final_data["actions"]:
|
||||
|
|
@ -237,7 +210,7 @@ async def create_signature_enhanced(
|
|||
session.add(signer)
|
||||
await session.commit()
|
||||
|
||||
# === ENVOI EMAIL ===
|
||||
# === ENVOI EMAIL AVEC TEMPLATE ===
|
||||
template = templates_signature_email["demande_signature"]
|
||||
|
||||
type_labels = {
|
||||
|
|
@ -248,7 +221,7 @@ async def create_signature_enhanced(
|
|||
50: "Avoir",
|
||||
}
|
||||
|
||||
doc_info = sage_client.lire_document(
|
||||
doc_info = email_queue.sage_client.lire_document(
|
||||
request.sage_document_id, request.sage_document_type.value
|
||||
)
|
||||
montant_ttc = f"{doc_info.get('total_ttc', 0):.2f}" if doc_info else "0.00"
|
||||
|
|
@ -292,37 +265,30 @@ async def create_signature_enhanced(
|
|||
|
||||
email_queue.enqueue(email_log.id)
|
||||
|
||||
# === MISE À JOUR STATUT SAGE (LOGIQUE STRICTE) ===
|
||||
statut_sage_updated = False
|
||||
|
||||
if statut_actuel == 0:
|
||||
try:
|
||||
sage_client.changer_statut_document(
|
||||
document_type_code=request.sage_document_type.value,
|
||||
numero=request.sage_document_id,
|
||||
nouveau_statut=1, # Confirmé
|
||||
)
|
||||
logger.info(f"✅ Statut Sage mis à jour: 0 → 1")
|
||||
statut_sage_updated = True
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Impossible de mettre à jour le statut Sage: {e}")
|
||||
else:
|
||||
logger.info(f"ℹ️ Statut Sage non modifié (était {statut_actuel}, ≠ 0)")
|
||||
|
||||
# === RÉPONSE ===
|
||||
return {
|
||||
"success": True,
|
||||
"transaction_id": transaction.transaction_id,
|
||||
"sage_document_id": transaction.sage_document_id,
|
||||
"signer_url": transaction.signer_url,
|
||||
"statut_sage_initial": statut_actuel,
|
||||
"statut_sage_updated": statut_sage_updated,
|
||||
"nouveau_statut_sage": 1 if statut_sage_updated else statut_actuel,
|
||||
"message": (
|
||||
f"Signature créée. Statut Sage: {statut_actuel} → "
|
||||
f"{1 if statut_sage_updated else statut_actuel}"
|
||||
),
|
||||
return TransactionResponse(
|
||||
id=transaction.id,
|
||||
transaction_id=transaction.transaction_id,
|
||||
sage_document_id=transaction.sage_document_id,
|
||||
sage_document_type=transaction.sage_document_type.name,
|
||||
universign_status=transaction.universign_status.value,
|
||||
local_status=transaction.local_status.value,
|
||||
local_status_label=get_status_message(transaction.local_status.value),
|
||||
signer_url=transaction.signer_url,
|
||||
document_url=None,
|
||||
created_at=transaction.created_at,
|
||||
sent_at=transaction.sent_at,
|
||||
signed_at=None,
|
||||
last_synced_at=None,
|
||||
needs_sync=True,
|
||||
signers=[
|
||||
{
|
||||
"email": signer.email,
|
||||
"name": signer.name,
|
||||
"status": signer.status.value,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
|
@ -331,108 +297,167 @@ async def create_signature_enhanced(
|
|||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@router.post("/webhook-enhanced")
|
||||
@router.post("/webhook-enhanced/")
|
||||
async def webhook_universign_enhanced(
|
||||
request: Request, session: AsyncSession = Depends(get_session)
|
||||
@router.get("/transactions", response_model=List[TransactionResponse])
|
||||
async def list_transactions(
|
||||
status: Optional[LocalDocumentStatus] = None,
|
||||
sage_document_id: Optional[str] = None,
|
||||
limit: int = Query(100, le=1000),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""
|
||||
Webhook Universign amélioré:
|
||||
- Détecte l'événement 'closed' (signature complétée)
|
||||
- Télécharge automatiquement le document signé
|
||||
- Met à jour le statut Sage à 2
|
||||
- Envoie la notification avec lien de téléchargement
|
||||
"""
|
||||
try:
|
||||
payload = await request.json()
|
||||
"""Liste toutes les transactions"""
|
||||
query = select(UniversignTransaction).options(
|
||||
selectinload(UniversignTransaction.signers)
|
||||
)
|
||||
|
||||
event_type = payload.get("event")
|
||||
transaction_id = payload.get("transaction_id") or payload.get("id")
|
||||
if status:
|
||||
query = query.where(UniversignTransaction.local_status == status)
|
||||
|
||||
logger.info(f"📨 Webhook reçu: event={event_type}, tx={transaction_id}")
|
||||
if sage_document_id:
|
||||
query = query.where(UniversignTransaction.sage_document_id == sage_document_id)
|
||||
|
||||
if not transaction_id:
|
||||
return {"status": "error", "message": "Pas de transaction_id"}, 400
|
||||
query = query.order_by(UniversignTransaction.created_at.desc()).limit(limit)
|
||||
|
||||
# Récupérer la transaction locale
|
||||
result = await session.execute(query)
|
||||
transactions = result.scalars().all()
|
||||
|
||||
return [
|
||||
TransactionResponse(
|
||||
id=tx.id,
|
||||
transaction_id=tx.transaction_id,
|
||||
sage_document_id=tx.sage_document_id,
|
||||
sage_document_type=tx.sage_document_type.name,
|
||||
universign_status=tx.universign_status.value,
|
||||
local_status=tx.local_status.value,
|
||||
local_status_label=get_status_message(tx.local_status.value),
|
||||
signer_url=tx.signer_url,
|
||||
document_url=tx.document_url,
|
||||
created_at=tx.created_at,
|
||||
sent_at=tx.sent_at,
|
||||
signed_at=tx.signed_at,
|
||||
last_synced_at=tx.last_synced_at,
|
||||
needs_sync=tx.needs_sync,
|
||||
signers=[
|
||||
{
|
||||
"email": s.email,
|
||||
"name": s.name,
|
||||
"status": s.status.value,
|
||||
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
|
||||
}
|
||||
for s in tx.signers
|
||||
],
|
||||
)
|
||||
for tx in transactions
|
||||
]
|
||||
|
||||
|
||||
@router.get("/transactions/{transaction_id}", response_model=TransactionResponse)
|
||||
async def get_transaction(
|
||||
transaction_id: str, session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
"""Récupère une transaction par son ID"""
|
||||
query = (
|
||||
select(UniversignTransaction)
|
||||
.options(selectinload(UniversignTransaction.signers))
|
||||
.where(UniversignTransaction.transaction_id == transaction_id)
|
||||
.options(selectinload(UniversignTransaction.signers))
|
||||
)
|
||||
|
||||
result = await session.execute(query)
|
||||
tx = result.scalar_one_or_none()
|
||||
|
||||
if not tx:
|
||||
raise HTTPException(404, "Transaction introuvable")
|
||||
|
||||
return TransactionResponse(
|
||||
id=tx.id,
|
||||
transaction_id=tx.transaction_id,
|
||||
sage_document_id=tx.sage_document_id,
|
||||
sage_document_type=tx.sage_document_type.name,
|
||||
universign_status=tx.universign_status.value,
|
||||
local_status=tx.local_status.value,
|
||||
local_status_label=get_status_message(tx.local_status.value),
|
||||
signer_url=tx.signer_url,
|
||||
document_url=tx.document_url,
|
||||
created_at=tx.created_at,
|
||||
sent_at=tx.sent_at,
|
||||
signed_at=tx.signed_at,
|
||||
last_synced_at=tx.last_synced_at,
|
||||
needs_sync=tx.needs_sync,
|
||||
signers=[
|
||||
{
|
||||
"email": s.email,
|
||||
"name": s.name,
|
||||
"status": s.status.value,
|
||||
"signed_at": s.signed_at.isoformat() if s.signed_at else None,
|
||||
}
|
||||
for s in tx.signers
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@router.post("/transactions/{transaction_id}/sync")
|
||||
async def sync_single_transaction(
|
||||
transaction_id: str,
|
||||
force: bool = Query(False),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""Force la synchronisation d'une transaction"""
|
||||
query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.transaction_id == transaction_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
transaction = result.scalar_one_or_none()
|
||||
|
||||
if not transaction:
|
||||
logger.warning(f"Transaction {transaction_id} inconnue")
|
||||
return {"status": "error", "message": "Transaction inconnue"}, 404
|
||||
raise HTTPException(404, "Transaction introuvable")
|
||||
|
||||
transaction.webhook_received = True
|
||||
|
||||
# Récupérer l'état complet depuis Universign
|
||||
import requests
|
||||
|
||||
resp = requests.get(
|
||||
f"{settings.universign_api_url}/transactions/{transaction_id}",
|
||||
auth=(settings.universign_api_key, ""),
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"Erreur récupération transaction: {resp.status_code}")
|
||||
return {"status": "error", "message": "Erreur API Universign"}, 500
|
||||
|
||||
universign_data = resp.json()
|
||||
universign_status_raw = universign_data.get("state", "")
|
||||
|
||||
previous_status = transaction.local_status.value
|
||||
|
||||
# Déterminer le nouveau statut
|
||||
from utils.universign_status_mapping import map_universign_to_local
|
||||
|
||||
new_status = map_universign_to_local(universign_status_raw)
|
||||
|
||||
# Mettre à jour la transaction
|
||||
transaction.universign_status = (
|
||||
UniversignTransactionStatus(universign_status_raw)
|
||||
if universign_status_raw in [s.value for s in UniversignTransactionStatus]
|
||||
else transaction.universign_status
|
||||
)
|
||||
transaction.local_status = LocalDocumentStatus(new_status)
|
||||
transaction.universign_status_updated_at = datetime.now()
|
||||
transaction.last_synced_at = datetime.now()
|
||||
|
||||
if new_status == "SIGNE" and not transaction.signed_at:
|
||||
transaction.signed_at = datetime.now()
|
||||
|
||||
await session.commit()
|
||||
|
||||
# Si statut = SIGNE (completed/closed), gérer la complétion
|
||||
if new_status == "SIGNE" and previous_status != "SIGNE":
|
||||
logger.info(f"🎯 Signature complétée détectée via webhook")
|
||||
|
||||
success, error = await universign_sync.handle_signature_completed(
|
||||
session=session,
|
||||
transaction=transaction,
|
||||
universign_data=universign_data,
|
||||
success, error = await sync_service.sync_transaction(
|
||||
session, transaction, force=force
|
||||
)
|
||||
|
||||
if not success:
|
||||
logger.error(f"Erreur handle_signature_completed: {error}")
|
||||
return {
|
||||
"status": "partial_success",
|
||||
"message": "Webhook traité mais erreur téléchargement",
|
||||
"error": error,
|
||||
}, 200
|
||||
|
||||
logger.info(f"✅ Webhook traité: {previous_status} → {new_status}")
|
||||
raise HTTPException(500, error or "Échec synchronisation")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"event": event_type,
|
||||
"success": True,
|
||||
"transaction_id": transaction_id,
|
||||
"previous_status": previous_status,
|
||||
"new_status": new_status,
|
||||
"new_status": transaction.local_status.value,
|
||||
"synced_at": transaction.last_synced_at.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/sync/all")
|
||||
async def sync_all_transactions(
|
||||
max_transactions: int = Query(50, le=500),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""Synchronise toutes les transactions en attente"""
|
||||
stats = await sync_service.sync_all_pending(session, max_transactions)
|
||||
|
||||
return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()}
|
||||
|
||||
|
||||
@router.post("/webhook")
|
||||
@router.post("/webhook/")
|
||||
async def webhook_universign(
|
||||
request: Request, session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
try:
|
||||
payload = await request.json()
|
||||
|
||||
logger.info(
|
||||
f"Webhook reçu: {payload.get('event')} - {payload.get('transaction_id')}"
|
||||
)
|
||||
|
||||
success, error = await sync_service.process_webhook(session, payload)
|
||||
|
||||
if not success:
|
||||
logger.error(f"Erreur traitement webhook: {error}")
|
||||
return {"status": "error", "message": error}, 500
|
||||
|
||||
return {
|
||||
"status": "processed",
|
||||
"event": payload.get("event"),
|
||||
"transaction_id": payload.get("transaction_id"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -440,122 +465,96 @@ async def webhook_universign_enhanced(
|
|||
return {"status": "error", "message": str(e)}, 500
|
||||
|
||||
|
||||
@router.get("/documents/{transaction_local_id}/download")
|
||||
async def download_signed_document(
|
||||
transaction_local_id: str = Path(..., description="ID local de la transaction"),
|
||||
@router.get("/stats", response_model=SyncStatsResponse)
|
||||
async def get_sync_stats(session: AsyncSession = Depends(get_session)):
|
||||
"""Statistiques globales de synchronisation"""
|
||||
|
||||
# Total
|
||||
total_query = select(func.count(UniversignTransaction.id))
|
||||
total = (await session.execute(total_query)).scalar()
|
||||
|
||||
# En attente de sync
|
||||
pending_query = select(func.count(UniversignTransaction.id)).where(
|
||||
UniversignTransaction.needs_sync
|
||||
)
|
||||
pending = (await session.execute(pending_query)).scalar()
|
||||
|
||||
# Par statut
|
||||
signed_query = select(func.count(UniversignTransaction.id)).where(
|
||||
UniversignTransaction.local_status == LocalDocumentStatus.SIGNED
|
||||
)
|
||||
signed = (await session.execute(signed_query)).scalar()
|
||||
|
||||
in_progress_query = select(func.count(UniversignTransaction.id)).where(
|
||||
UniversignTransaction.local_status == LocalDocumentStatus.IN_PROGRESS
|
||||
)
|
||||
in_progress = (await session.execute(in_progress_query)).scalar()
|
||||
|
||||
refused_query = select(func.count(UniversignTransaction.id)).where(
|
||||
UniversignTransaction.local_status == LocalDocumentStatus.REJECTED
|
||||
)
|
||||
refused = (await session.execute(refused_query)).scalar()
|
||||
|
||||
expired_query = select(func.count(UniversignTransaction.id)).where(
|
||||
UniversignTransaction.local_status == LocalDocumentStatus.EXPIRED
|
||||
)
|
||||
expired = (await session.execute(expired_query)).scalar()
|
||||
|
||||
# Dernière sync
|
||||
last_sync_query = select(func.max(UniversignTransaction.last_synced_at))
|
||||
last_sync = (await session.execute(last_sync_query)).scalar()
|
||||
|
||||
return SyncStatsResponse(
|
||||
total_transactions=total,
|
||||
pending_sync=pending,
|
||||
signed=signed,
|
||||
in_progress=in_progress,
|
||||
refused=refused,
|
||||
expired=expired,
|
||||
last_sync_at=last_sync,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/transactions/{transaction_id}/logs")
|
||||
async def get_transaction_logs(
|
||||
transaction_id: str,
|
||||
limit: int = Query(50, le=500),
|
||||
session: AsyncSession = Depends(get_session),
|
||||
):
|
||||
"""
|
||||
Téléchargement sécurisé du document signé
|
||||
|
||||
**Sécurité**:
|
||||
- Vérifier que le document existe
|
||||
- Vérifier l'intégrité du fichier
|
||||
- Retourner 404 si non trouvé
|
||||
"""
|
||||
try:
|
||||
# Récupérer la transaction
|
||||
query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.id == transaction_local_id
|
||||
# Trouver la transaction
|
||||
tx_query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.transaction_id == transaction_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
transaction = result.scalar_one_or_none()
|
||||
tx_result = await session.execute(tx_query)
|
||||
tx = tx_result.scalar_one_or_none()
|
||||
|
||||
if not transaction:
|
||||
if not tx:
|
||||
raise HTTPException(404, "Transaction introuvable")
|
||||
|
||||
if transaction.local_status != LocalDocumentStatus.SIGNED:
|
||||
raise HTTPException(
|
||||
400, f"Document non signé (statut: {transaction.local_status.value})"
|
||||
# Logs
|
||||
logs_query = (
|
||||
select(UniversignSyncLog)
|
||||
.where(UniversignSyncLog.transaction_id == tx.id)
|
||||
.order_by(UniversignSyncLog.sync_timestamp.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
|
||||
# Récupérer le chemin du document
|
||||
file_path = signed_documents.get_document_path(transaction)
|
||||
|
||||
if not file_path:
|
||||
raise HTTPException(404, "Document signé non disponible")
|
||||
|
||||
# Vérifier l'intégrité
|
||||
if not signed_documents.verify_document_integrity(file_path):
|
||||
logger.error(f"Document corrompu: {file_path}")
|
||||
raise HTTPException(500, "Document signé corrompu")
|
||||
|
||||
# Nom du fichier à télécharger
|
||||
filename = f"{transaction.sage_document_id}_signe.pdf"
|
||||
|
||||
logger.info(
|
||||
f"📥 Téléchargement: {filename} par transaction {transaction_local_id}"
|
||||
)
|
||||
|
||||
return FileResponse(
|
||||
path=file_path,
|
||||
media_type="application/pdf",
|
||||
filename=filename,
|
||||
headers={
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"X-Transaction-ID": transaction.transaction_id,
|
||||
"X-Signed-At": transaction.signed_at.isoformat()
|
||||
if transaction.signed_at
|
||||
else "",
|
||||
},
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur téléchargement document: {e}", exc_info=True)
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@router.get("/documents/{transaction_local_id}/info")
|
||||
async def get_signed_document_info(
|
||||
transaction_local_id: str, session: AsyncSession = Depends(get_session)
|
||||
):
|
||||
"""
|
||||
Informations sur le document signé (sans le télécharger)
|
||||
"""
|
||||
try:
|
||||
query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.id == transaction_local_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
transaction = result.scalar_one_or_none()
|
||||
|
||||
if not transaction:
|
||||
raise HTTPException(404, "Transaction introuvable")
|
||||
|
||||
file_path = signed_documents.get_document_path(transaction)
|
||||
|
||||
file_info = None
|
||||
if file_path:
|
||||
file_stat = file_path.stat()
|
||||
file_info = {
|
||||
"exists": True,
|
||||
"size_bytes": file_stat.st_size,
|
||||
"size_mb": round(file_stat.st_size / 1024 / 1024, 2),
|
||||
"created_at": datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
|
||||
"integrity_ok": signed_documents.verify_document_integrity(file_path),
|
||||
}
|
||||
logs_result = await session.execute(logs_query)
|
||||
logs = logs_result.scalars().all()
|
||||
|
||||
return {
|
||||
"transaction_id": transaction.transaction_id,
|
||||
"sage_document_id": transaction.sage_document_id,
|
||||
"sage_document_type": transaction.sage_document_type.name,
|
||||
"local_status": transaction.local_status.value,
|
||||
"signed_at": transaction.signed_at.isoformat()
|
||||
if transaction.signed_at
|
||||
else None,
|
||||
"downloaded_at": (
|
||||
transaction.signed_document_downloaded_at.isoformat()
|
||||
if transaction.signed_document_downloaded_at
|
||||
else None
|
||||
),
|
||||
"file_info": file_info,
|
||||
"download_url": f"/universign/documents/{transaction_local_id}/download",
|
||||
"transaction_id": transaction_id,
|
||||
"total_syncs": len(logs),
|
||||
"logs": [
|
||||
{
|
||||
"sync_type": log.sync_type,
|
||||
"timestamp": log.sync_timestamp.isoformat(),
|
||||
"success": log.success,
|
||||
"previous_status": log.previous_status,
|
||||
"new_status": log.new_status,
|
||||
"error_message": log.error_message,
|
||||
"response_time_ms": log.response_time_ms,
|
||||
}
|
||||
for log in logs
|
||||
],
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur récupération info document: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
|
|
|||
|
|
@ -1,188 +0,0 @@
|
|||
import os
|
||||
import requests
|
||||
import hashlib
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
from datetime import datetime
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from database import UniversignTransaction
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SignedDocuments:
|
||||
"""Service de gestion des documents signés"""
|
||||
|
||||
def __init__(self, storage_path: str = "./data/signed_documents"):
|
||||
self.storage_path = Path(storage_path)
|
||||
self.storage_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Créer des sous-répertoires par type de document
|
||||
for doc_type in ["devis", "commandes", "factures", "livraisons", "avoirs"]:
|
||||
(self.storage_path / doc_type).mkdir(exist_ok=True)
|
||||
|
||||
def _get_storage_subdir(self, sage_doc_type: int) -> str:
|
||||
"""Retourne le sous-répertoire selon le type de document Sage"""
|
||||
mapping = {
|
||||
0: "devis",
|
||||
10: "commandes",
|
||||
30: "livraisons",
|
||||
50: "avoirs",
|
||||
60: "factures",
|
||||
}
|
||||
return mapping.get(sage_doc_type, "autres")
|
||||
|
||||
def _generate_filename(
|
||||
self, transaction_id: str, sage_doc_id: str, sage_doc_type: int
|
||||
) -> str:
|
||||
"""Génère un nom de fichier unique et sécurisé"""
|
||||
# Hash du transaction_id pour éviter les collisions
|
||||
hash_suffix = hashlib.md5(transaction_id.encode()).hexdigest()[:8]
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
return f"{sage_doc_id}_{timestamp}_{hash_suffix}_signed.pdf"
|
||||
|
||||
async def download_and_store(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
document_url: str,
|
||||
api_key: str,
|
||||
) -> Tuple[bool, Optional[str], Optional[str]]:
|
||||
"""
|
||||
Télécharge et stocke le document signé
|
||||
|
||||
Returns:
|
||||
(success, file_path, error_message)
|
||||
"""
|
||||
try:
|
||||
# Télécharger le document depuis Universign
|
||||
logger.info(f"Téléchargement document signé: {transaction.transaction_id}")
|
||||
|
||||
response = requests.get(
|
||||
document_url, auth=(api_key, ""), timeout=60, stream=True
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
error = f"Erreur HTTP {response.status_code} lors du téléchargement"
|
||||
logger.error(error)
|
||||
return False, None, error
|
||||
|
||||
# Vérifier que c'est bien un PDF
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
if "pdf" not in content_type.lower():
|
||||
error = f"Type de contenu invalide: {content_type}"
|
||||
logger.warning(error)
|
||||
|
||||
# Générer le nom de fichier et le chemin
|
||||
subdir = self._get_storage_subdir(transaction.sage_document_type.value)
|
||||
filename = self._generate_filename(
|
||||
transaction.transaction_id,
|
||||
transaction.sage_document_id,
|
||||
transaction.sage_document_type.value,
|
||||
)
|
||||
|
||||
file_path = self.storage_path / subdir / filename
|
||||
|
||||
# Écrire le fichier
|
||||
with open(file_path, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
|
||||
file_size = file_path.stat().st_size
|
||||
logger.info(f"Document stocké: {file_path} ({file_size} octets)")
|
||||
|
||||
# Mettre à jour la transaction
|
||||
transaction.signed_document_path = str(file_path)
|
||||
transaction.signed_document_downloaded_at = datetime.now()
|
||||
await session.commit()
|
||||
|
||||
return True, str(file_path), None
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
error = f"Erreur réseau lors du téléchargement: {str(e)}"
|
||||
logger.error(error, exc_info=True)
|
||||
return False, None, error
|
||||
|
||||
except IOError as e:
|
||||
error = f"Erreur d'écriture du fichier: {str(e)}"
|
||||
logger.error(error, exc_info=True)
|
||||
return False, None, error
|
||||
|
||||
except Exception as e:
|
||||
error = f"Erreur inattendue: {str(e)}"
|
||||
logger.error(error, exc_info=True)
|
||||
return False, None, error
|
||||
|
||||
def get_document_path(self, transaction: UniversignTransaction) -> Optional[Path]:
|
||||
"""Retourne le chemin du document signé s'il existe"""
|
||||
if not transaction.signed_document_path:
|
||||
return None
|
||||
|
||||
path = Path(transaction.signed_document_path)
|
||||
if not path.exists():
|
||||
logger.warning(f"Document signé introuvable: {path}")
|
||||
return None
|
||||
|
||||
return path
|
||||
|
||||
def verify_document_integrity(self, file_path: Path) -> bool:
|
||||
"""Vérifie l'intégrité basique du document (taille, extension)"""
|
||||
try:
|
||||
if not file_path.exists():
|
||||
return False
|
||||
|
||||
# Vérifier que le fichier n'est pas vide
|
||||
if file_path.stat().st_size == 0:
|
||||
logger.error(f"Document vide: {file_path}")
|
||||
return False
|
||||
|
||||
# Vérifier l'extension
|
||||
if file_path.suffix.lower() != ".pdf":
|
||||
logger.error(f"Extension invalide: {file_path}")
|
||||
return False
|
||||
|
||||
# Vérifier les premiers octets (signature PDF)
|
||||
with open(file_path, "rb") as f:
|
||||
header = f.read(5)
|
||||
if header != b"%PDF-":
|
||||
logger.error(f"Signature PDF invalide: {file_path}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur vérification intégrité: {e}")
|
||||
return False
|
||||
|
||||
async def cleanup_old_documents(self, days_to_keep: int = 365):
|
||||
"""Nettoie les documents signés de plus de X jours (archivage)"""
|
||||
cutoff_date = datetime.now().timestamp() - (days_to_keep * 86400)
|
||||
deleted_count = 0
|
||||
|
||||
try:
|
||||
for subdir in self.storage_path.iterdir():
|
||||
if not subdir.is_dir():
|
||||
continue
|
||||
|
||||
for file_path in subdir.glob("*.pdf"):
|
||||
if file_path.stat().st_mtime < cutoff_date:
|
||||
logger.info(f"Suppression ancien document: {file_path}")
|
||||
file_path.unlink()
|
||||
deleted_count += 1
|
||||
|
||||
logger.info(f"Nettoyage terminé: {deleted_count} document(s) supprimé(s)")
|
||||
return deleted_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur nettoyage documents: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
# Instance globale
|
||||
signed_documents = SignedDocuments(
|
||||
storage_path=os.getenv("SIGNED_DOCS_PATH", "./data/signed_documents")
|
||||
)
|
||||
|
|
@ -1,340 +1,479 @@
|
|||
import uuid
|
||||
import requests
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Optional, Tuple
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, and_, or_
|
||||
|
||||
from database import (
|
||||
UniversignTransaction,
|
||||
EmailLog,
|
||||
StatutEmail,
|
||||
UniversignSigner,
|
||||
UniversignSyncLog,
|
||||
UniversignTransactionStatus,
|
||||
LocalDocumentStatus,
|
||||
UniversignSignerStatus,
|
||||
)
|
||||
from utils.universign_status_mapping import (
|
||||
map_universign_to_local,
|
||||
is_transition_allowed,
|
||||
get_status_actions,
|
||||
is_final_status,
|
||||
resolve_status_conflict,
|
||||
)
|
||||
from data.data import templates_signature_email
|
||||
from services.signed_documents import signed_documents
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UniversignSyncService:
|
||||
"""Service de synchronisation avec logique métier complète"""
|
||||
|
||||
def __init__(self, api_url: str, api_key: str):
|
||||
def __init__(self, api_url: str, api_key: str, timeout: int = 30):
|
||||
self.api_url = api_url.rstrip("/")
|
||||
self.api_key = api_key
|
||||
self.sage_client = None
|
||||
self.email_queue = None
|
||||
self.settings = None
|
||||
self.timeout = timeout
|
||||
self.auth = (api_key, "")
|
||||
|
||||
def configure(self, sage_client, email_queue, settings):
|
||||
"""Configure les dépendances injectées"""
|
||||
self.sage_client = sage_client
|
||||
self.email_queue = email_queue
|
||||
self.settings = settings
|
||||
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
|
||||
start_time = datetime.now()
|
||||
|
||||
async def handle_signature_completed(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
universign_data: Dict,
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Gère la complétion d'une signature:
|
||||
1. Télécharge et stocke le document signé
|
||||
2. Met à jour le statut Sage à 2 (accepté)
|
||||
3. Envoie la notification avec lien de téléchargement
|
||||
"""
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.api_url}/transactions/{transaction_id}",
|
||||
auth=self.auth,
|
||||
timeout=self.timeout,
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
|
||||
response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(
|
||||
f"🎯 Traitement signature complétée: {transaction.transaction_id}"
|
||||
f"✓ Fetch OK: {transaction_id} "
|
||||
f"status={data.get('state')} "
|
||||
f"({response_time_ms}ms)"
|
||||
)
|
||||
return {
|
||||
"transaction": data,
|
||||
"http_status": 200,
|
||||
"response_time_ms": response_time_ms,
|
||||
"fetched_at": datetime.now(),
|
||||
}
|
||||
|
||||
# Étape 1: Télécharger le document signé
|
||||
document_url = self._extract_document_url(universign_data)
|
||||
|
||||
if not document_url:
|
||||
error = "URL du document signé non trouvée dans la réponse Universign"
|
||||
logger.error(error)
|
||||
return False, error
|
||||
|
||||
(
|
||||
success,
|
||||
file_path,
|
||||
error,
|
||||
) = await signed_documents.download_and_store(
|
||||
session=session,
|
||||
transaction=transaction,
|
||||
document_url=document_url,
|
||||
api_key=self.api_key,
|
||||
)
|
||||
|
||||
if not success:
|
||||
return False, f"Échec téléchargement document: {error}"
|
||||
|
||||
logger.info(f"✅ Document signé stocké: {file_path}")
|
||||
|
||||
# Étape 2: Mettre à jour le statut Sage UNIQUEMENT si ≠ 2
|
||||
current_sage_status = await self._get_current_sage_status(transaction)
|
||||
|
||||
if current_sage_status != 2:
|
||||
success_sage = await self._update_sage_to_accepted(transaction)
|
||||
|
||||
if success_sage:
|
||||
logger.info(f"✅ Statut Sage mis à jour: {current_sage_status} → 2")
|
||||
else:
|
||||
elif response.status_code == 404:
|
||||
logger.warning(
|
||||
f"⚠️ Échec mise à jour statut Sage pour {transaction.sage_document_id}"
|
||||
f"Transaction {transaction_id} introuvable sur Universign"
|
||||
)
|
||||
return None
|
||||
|
||||
else:
|
||||
logger.info(f"ℹ️ Statut Sage déjà à 2, pas de mise à jour")
|
||||
|
||||
# Étape 3: Envoyer notification avec lien de téléchargement
|
||||
notification_sent = await self._send_signature_confirmation(
|
||||
session=session,
|
||||
transaction=transaction,
|
||||
download_link=self._generate_download_link(transaction),
|
||||
logger.error(
|
||||
f"Erreur HTTP {response.status_code} "
|
||||
f"pour {transaction_id}: {response.text}"
|
||||
)
|
||||
return None
|
||||
|
||||
if not notification_sent:
|
||||
logger.warning("⚠️ Notification non envoyée (mais document stocké)")
|
||||
|
||||
return True, None
|
||||
except requests.exceptions.Timeout:
|
||||
logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
error = f"Erreur handle_signature_completed: {str(e)}"
|
||||
logger.error(error, exc_info=True)
|
||||
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def sync_transaction(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
force: bool = False,
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
if is_final_status(transaction.local_status.value) and not force:
|
||||
logger.debug(
|
||||
f"Skip {transaction.transaction_id}: "
|
||||
f"statut final {transaction.local_status.value}"
|
||||
)
|
||||
transaction.needs_sync = False
|
||||
await session.commit()
|
||||
return True, None
|
||||
|
||||
# === FETCH UNIVERSIGN ===
|
||||
|
||||
result = self.fetch_transaction_status(transaction.transaction_id)
|
||||
|
||||
if not result:
|
||||
error = "Échec récupération données Universign"
|
||||
await self._log_sync_attempt(session, transaction, "polling", False, error)
|
||||
return False, error
|
||||
|
||||
def _extract_document_url(self, universign_data: Dict) -> Optional[str]:
|
||||
"""Extrait l'URL du document signé depuis la réponse Universign"""
|
||||
try:
|
||||
# Structure: data['documents'][0]['url']
|
||||
documents = universign_data.get("documents", [])
|
||||
if documents and len(documents) > 0:
|
||||
return documents[0].get("url")
|
||||
# === EXTRACTION DONNÉES ===
|
||||
|
||||
# Fallback: vérifier dans les actions
|
||||
actions = universign_data.get("actions", [])
|
||||
for action in actions:
|
||||
if action.get("type") == "download" and action.get("url"):
|
||||
return action["url"]
|
||||
universign_data = result["transaction"]
|
||||
universign_status_raw = universign_data.get("state", "draft")
|
||||
|
||||
return None
|
||||
# === MAPPING STATUT ===
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur extraction URL document: {e}")
|
||||
return None
|
||||
new_local_status = map_universign_to_local(universign_status_raw)
|
||||
previous_local_status = transaction.local_status.value
|
||||
|
||||
async def _get_current_sage_status(self, transaction: UniversignTransaction) -> int:
|
||||
"""Récupère le statut actuel du document dans Sage"""
|
||||
try:
|
||||
if not self.sage_client:
|
||||
logger.warning("sage_client non configuré")
|
||||
return 0
|
||||
# === VALIDATION TRANSITION ===
|
||||
|
||||
doc = self.sage_client.lire_document(
|
||||
transaction.sage_document_id, transaction.sage_document_type.value
|
||||
if not is_transition_allowed(previous_local_status, new_local_status):
|
||||
logger.warning(
|
||||
f"Transition refusée: {previous_local_status} → {new_local_status}"
|
||||
)
|
||||
# En cas de conflit, résoudre par priorité
|
||||
new_local_status = resolve_status_conflict(
|
||||
previous_local_status, new_local_status
|
||||
)
|
||||
|
||||
return doc.get("statut", 0) if doc else 0
|
||||
# === DÉTECTION CHANGEMENT ===
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur lecture statut Sage: {e}")
|
||||
return 0
|
||||
status_changed = previous_local_status != new_local_status
|
||||
|
||||
async def _update_sage_to_accepted(
|
||||
self, transaction: UniversignTransaction
|
||||
) -> bool:
|
||||
"""Met à jour le statut Sage à 2 (accepté)"""
|
||||
try:
|
||||
if not self.sage_client:
|
||||
logger.warning("sage_client non configuré")
|
||||
return False
|
||||
if not status_changed and not force:
|
||||
logger.debug(f"Pas de changement pour {transaction.transaction_id}")
|
||||
transaction.last_synced_at = datetime.now()
|
||||
transaction.needs_sync = False
|
||||
await session.commit()
|
||||
return True, None
|
||||
|
||||
self.sage_client.changer_statut_document(
|
||||
document_type_code=transaction.sage_document_type.value,
|
||||
numero=transaction.sage_document_id,
|
||||
nouveau_statut=2, # Accepté
|
||||
# === MISE À JOUR TRANSACTION ===
|
||||
|
||||
transaction.universign_status = UniversignTransactionStatus(
|
||||
universign_status_raw
|
||||
)
|
||||
transaction.local_status = LocalDocumentStatus(new_local_status)
|
||||
transaction.universign_status_updated_at = datetime.now()
|
||||
|
||||
return True
|
||||
# === DATES SPÉCIFIQUES ===
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur mise à jour Sage: {e}")
|
||||
return False
|
||||
if new_local_status == "EN_COURS" and not transaction.sent_at:
|
||||
transaction.sent_at = datetime.now()
|
||||
|
||||
def _generate_download_link(self, transaction: UniversignTransaction) -> str:
|
||||
"""Génère le lien de téléchargement sécurisé"""
|
||||
base_url = (
|
||||
self.settings.api_base_url if self.settings else "http://localhost:8000"
|
||||
)
|
||||
return f"{base_url}/universign/documents/{transaction.id}/download"
|
||||
if new_local_status == "SIGNE" and not transaction.signed_at:
|
||||
transaction.signed_at = datetime.now()
|
||||
|
||||
async def _send_signature_confirmation(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
download_link: str,
|
||||
) -> bool:
|
||||
"""Envoie l'email de confirmation avec lien de téléchargement"""
|
||||
try:
|
||||
if not self.email_queue or not self.settings:
|
||||
logger.warning("email_queue ou settings non configuré")
|
||||
return False
|
||||
if new_local_status == "REFUSE" and not transaction.refused_at:
|
||||
transaction.refused_at = datetime.now()
|
||||
|
||||
template = templates_signature_email["signature_confirmee"]
|
||||
if new_local_status == "EXPIRE" and not transaction.expired_at:
|
||||
transaction.expired_at = datetime.now()
|
||||
|
||||
type_labels = {
|
||||
0: "Devis",
|
||||
10: "Commande",
|
||||
30: "Bon de Livraison",
|
||||
60: "Facture",
|
||||
50: "Avoir",
|
||||
}
|
||||
# === URLS ===
|
||||
|
||||
variables = {
|
||||
"NOM_SIGNATAIRE": transaction.requester_name or "Client",
|
||||
"TYPE_DOC": type_labels.get(
|
||||
transaction.sage_document_type.value, "Document"
|
||||
),
|
||||
"NUMERO": transaction.sage_document_id,
|
||||
"DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M")
|
||||
if transaction.signed_at
|
||||
else datetime.now().strftime("%d/%m/%Y à %H:%M"),
|
||||
"TRANSACTION_ID": transaction.transaction_id,
|
||||
"CONTACT_EMAIL": self.settings.smtp_from,
|
||||
"DOWNLOAD_LINK": download_link, # Nouvelle variable
|
||||
}
|
||||
if "signers" in universign_data and len(universign_data["signers"]) > 0:
|
||||
first_signer = universign_data["signers"][0]
|
||||
if "url" in first_signer:
|
||||
transaction.signer_url = first_signer["url"]
|
||||
|
||||
sujet = template["sujet"]
|
||||
if "documents" in universign_data and len(universign_data["documents"]) > 0:
|
||||
first_doc = universign_data["documents"][0]
|
||||
if "url" in first_doc:
|
||||
transaction.document_url = first_doc["url"]
|
||||
|
||||
# Corps modifié pour inclure le lien de téléchargement
|
||||
corps = template["corps_html"].replace(
|
||||
"</td>\n </tr>\n \n <!-- Footer -->",
|
||||
f"""</td>
|
||||
</tr>
|
||||
# === SIGNATAIRES ===
|
||||
|
||||
<!-- Download Section -->
|
||||
<tr>
|
||||
<td style="padding: 20px 30px; background-color: #f0f9ff; border: 1px solid #90cdf4; border-radius: 4px; margin: 20px 0;">
|
||||
<p style="color: #2c5282; font-size: 14px; line-height: 1.6; margin: 0 0 15px;">
|
||||
📄 <strong>Télécharger le document signé :</strong>
|
||||
</p>
|
||||
<table width="100%" cellpadding="0" cellspacing="0">
|
||||
<tr>
|
||||
<td align="center">
|
||||
<a href="{download_link}" style="display: inline-block; background: #48bb78; color: #ffffff; text-decoration: none; padding: 12px 30px; border-radius: 6px; font-size: 14px; font-weight: 600;">
|
||||
⬇️ Télécharger le PDF signé
|
||||
</a>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
<p style="color: #718096; font-size: 12px; margin: 15px 0 0; text-align: center;">
|
||||
Ce lien est valable pendant 1 an
|
||||
</p>
|
||||
</td>
|
||||
</tr>
|
||||
await self._sync_signers(session, transaction, universign_data)
|
||||
|
||||
<!-- Footer -->""",
|
||||
)
|
||||
# === FLAGS ===
|
||||
|
||||
for var, valeur in variables.items():
|
||||
sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur))
|
||||
corps = corps.replace(f"{{{{{var}}}}}", str(valeur))
|
||||
transaction.last_synced_at = datetime.now()
|
||||
transaction.sync_attempts += 1
|
||||
transaction.needs_sync = not is_final_status(new_local_status)
|
||||
transaction.sync_error = None
|
||||
|
||||
email_log = EmailLog(
|
||||
id=str(uuid.uuid4()),
|
||||
destinataire=transaction.requester_email,
|
||||
sujet=sujet,
|
||||
corps_html=corps,
|
||||
document_ids=transaction.sage_document_id,
|
||||
type_document=transaction.sage_document_type.value,
|
||||
statut=StatutEmail.EN_ATTENTE,
|
||||
date_creation=datetime.now(),
|
||||
nb_tentatives=0,
|
||||
)
|
||||
# === LOG ===
|
||||
|
||||
session.add(email_log)
|
||||
await session.flush()
|
||||
|
||||
self.email_queue.enqueue(email_log.id)
|
||||
|
||||
logger.info(f"📧 Email confirmation envoyé à {transaction.requester_email}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur envoi notification: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
async def handle_status_transition(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
previous_status: str,
|
||||
new_status: str,
|
||||
universign_data: Dict,
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Gère les transitions de statut avec logique métier
|
||||
"""
|
||||
logger.info(
|
||||
f"🔄 Transition: {transaction.transaction_id} "
|
||||
f"{previous_status} → {new_status}"
|
||||
)
|
||||
|
||||
# Si passage à SIGNE (completed)
|
||||
if new_status == "SIGNE" and previous_status != "SIGNE":
|
||||
return await self.handle_signature_completed(
|
||||
await self._log_sync_attempt(
|
||||
session=session,
|
||||
transaction=transaction,
|
||||
universign_data=universign_data,
|
||||
sync_type="polling",
|
||||
success=True,
|
||||
error_message=None,
|
||||
previous_status=previous_local_status,
|
||||
new_status=new_local_status,
|
||||
changes=json.dumps(
|
||||
{
|
||||
"status_changed": status_changed,
|
||||
"universign_raw": universign_status_raw,
|
||||
"response_time_ms": result.get("response_time_ms"),
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
# Si passage à REFUSE
|
||||
elif new_status == "REFUSE" and previous_status != "REFUSE":
|
||||
await self._update_sage_to_refused(transaction)
|
||||
await session.commit()
|
||||
|
||||
# Si passage à EXPIRE
|
||||
elif new_status == "EXPIRE" and previous_status != "EXPIRE":
|
||||
await self._update_sage_to_expired(transaction)
|
||||
# === ACTIONS MÉTIER ===
|
||||
|
||||
if status_changed:
|
||||
await self._execute_status_actions(session, transaction, new_local_status)
|
||||
|
||||
logger.info(
|
||||
f"✓ Sync OK: {transaction.transaction_id} "
|
||||
f"{previous_local_status} → {new_local_status}"
|
||||
)
|
||||
|
||||
return True, None
|
||||
|
||||
async def _update_sage_to_refused(self, transaction: UniversignTransaction):
|
||||
"""Met à jour Sage quand signature refusée"""
|
||||
try:
|
||||
if not self.sage_client:
|
||||
return
|
||||
async def sync_all_pending(
|
||||
self, session: AsyncSession, max_transactions: int = 50
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
Synchronise toutes les transactions en attente
|
||||
"""
|
||||
from sqlalchemy.orm import selectinload # Si pas déjà importé en haut
|
||||
|
||||
# Statut 3 = Perdu/Refusé (selon config Sage)
|
||||
self.sage_client.changer_statut_document(
|
||||
document_type_code=transaction.sage_document_type.value,
|
||||
numero=transaction.sage_document_id,
|
||||
nouveau_statut=3,
|
||||
query = (
|
||||
select(UniversignTransaction)
|
||||
.options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE
|
||||
.where(
|
||||
and_(
|
||||
UniversignTransaction.needs_sync,
|
||||
or_(
|
||||
~UniversignTransaction.local_status.in_(
|
||||
[
|
||||
LocalDocumentStatus.SIGNED,
|
||||
LocalDocumentStatus.REJECTED,
|
||||
LocalDocumentStatus.EXPIRED,
|
||||
]
|
||||
),
|
||||
UniversignTransaction.last_synced_at
|
||||
< (datetime.now() - timedelta(hours=1)),
|
||||
UniversignTransaction.last_synced_at.is_(None),
|
||||
),
|
||||
)
|
||||
)
|
||||
.order_by(UniversignTransaction.created_at.asc())
|
||||
.limit(max_transactions)
|
||||
)
|
||||
|
||||
result = await session.execute(query)
|
||||
transactions = result.scalars().all()
|
||||
|
||||
stats = {
|
||||
"total_found": len(transactions),
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"skipped": 0,
|
||||
"status_changes": 0,
|
||||
}
|
||||
|
||||
for transaction in transactions:
|
||||
try:
|
||||
previous_status = transaction.local_status.value
|
||||
|
||||
success, error = await self.sync_transaction(
|
||||
session, transaction, force=False
|
||||
)
|
||||
|
||||
if success:
|
||||
stats["success"] += 1
|
||||
|
||||
if transaction.local_status.value != previous_status:
|
||||
stats["status_changes"] += 1
|
||||
else:
|
||||
stats["failed"] += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True
|
||||
)
|
||||
stats["failed"] += 1
|
||||
|
||||
logger.info(
|
||||
f"📛 Statut Sage → 3 (Refusé) pour {transaction.sage_document_id}"
|
||||
f"Polling terminé: {stats['success']}/{stats['total_found']} OK, "
|
||||
f"{stats['status_changes']} changements détectés"
|
||||
)
|
||||
|
||||
return stats
|
||||
|
||||
async def process_webhook(
|
||||
self, session: AsyncSession, payload: Dict
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
try:
|
||||
event_type = payload.get("event")
|
||||
transaction_id = payload.get("transaction_id") or payload.get("id")
|
||||
|
||||
if not transaction_id:
|
||||
return False, "Pas de transaction_id dans le webhook"
|
||||
|
||||
query = select(UniversignTransaction).where(
|
||||
UniversignTransaction.transaction_id == transaction_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
transaction = result.scalar_one_or_none()
|
||||
|
||||
if not transaction:
|
||||
logger.warning(
|
||||
f"Webhook reçu pour transaction inconnue: {transaction_id}"
|
||||
)
|
||||
return False, "Transaction inconnue"
|
||||
|
||||
transaction.webhook_received = True
|
||||
|
||||
success, error = await self.sync_transaction(
|
||||
session, transaction, force=True
|
||||
)
|
||||
|
||||
await self._log_sync_attempt(
|
||||
session=session,
|
||||
transaction=transaction,
|
||||
sync_type=f"webhook:{event_type}",
|
||||
success=success,
|
||||
error_message=error,
|
||||
changes=json.dumps(payload),
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.info(
|
||||
f"✓ Webhook traité: {transaction_id} "
|
||||
f"event={event_type} success={success}"
|
||||
)
|
||||
|
||||
return success, error
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur traitement webhook: {e}", exc_info=True)
|
||||
return False, str(e)
|
||||
|
||||
async def _sync_signers(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
universign_data: Dict,
|
||||
):
|
||||
"""Synchronise les signataires"""
|
||||
signers_data = universign_data.get("signers", [])
|
||||
|
||||
# Supprimer les anciens signataires
|
||||
for signer in transaction.signers:
|
||||
await session.delete(signer)
|
||||
|
||||
# Créer les nouveaux
|
||||
for idx, signer_data in enumerate(signers_data):
|
||||
signer = UniversignSigner(
|
||||
id=f"{transaction.id}_signer_{idx}",
|
||||
transaction_id=transaction.id,
|
||||
email=signer_data.get("email", ""),
|
||||
name=signer_data.get("name"),
|
||||
status=UniversignSignerStatus(signer_data.get("status", "waiting")),
|
||||
order_index=idx,
|
||||
viewed_at=self._parse_date(signer_data.get("viewed_at")),
|
||||
signed_at=self._parse_date(signer_data.get("signed_at")),
|
||||
refused_at=self._parse_date(signer_data.get("refused_at")),
|
||||
)
|
||||
session.add(signer)
|
||||
|
||||
async def _log_sync_attempt(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
transaction: UniversignTransaction,
|
||||
sync_type: str,
|
||||
success: bool,
|
||||
error_message: Optional[str] = None,
|
||||
previous_status: Optional[str] = None,
|
||||
new_status: Optional[str] = None,
|
||||
changes: Optional[str] = None,
|
||||
):
|
||||
"""Enregistre une tentative de sync dans les logs"""
|
||||
log = UniversignSyncLog(
|
||||
transaction_id=transaction.id,
|
||||
sync_type=sync_type,
|
||||
sync_timestamp=datetime.now(),
|
||||
previous_status=previous_status,
|
||||
new_status=new_status,
|
||||
changes_detected=changes,
|
||||
success=success,
|
||||
error_message=error_message,
|
||||
)
|
||||
session.add(log)
|
||||
|
||||
async def _execute_status_actions(
|
||||
self, session: AsyncSession, transaction: UniversignTransaction, new_status: str
|
||||
):
|
||||
"""Exécute les actions métier associées au statut"""
|
||||
actions = get_status_actions(new_status)
|
||||
|
||||
if not actions:
|
||||
return
|
||||
|
||||
# Mise à jour Sage
|
||||
if actions.get("update_sage_status"):
|
||||
await self._update_sage_status(transaction, new_status)
|
||||
|
||||
# Déclencher workflow
|
||||
if actions.get("trigger_workflow"):
|
||||
await self._trigger_workflow(transaction)
|
||||
|
||||
# Notifications
|
||||
if actions.get("send_notification"):
|
||||
await self._send_notification(transaction, new_status)
|
||||
|
||||
# Archive
|
||||
if actions.get("archive_document"):
|
||||
await self._archive_signed_document(transaction)
|
||||
|
||||
async def _update_sage_status(self, transaction, status):
|
||||
"""Met à jour le statut dans Sage"""
|
||||
# TODO: Appeler sage_client.mettre_a_jour_champ_libre()
|
||||
logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}")
|
||||
|
||||
async def _trigger_workflow(self, transaction):
|
||||
"""Déclenche un workflow (ex: devis→commande)"""
|
||||
logger.info(f"TODO: Workflow pour {transaction.sage_document_id}")
|
||||
|
||||
async def _send_notification(self, transaction, status):
|
||||
"""Envoie une notification email"""
|
||||
logger.info(f"TODO: Notif pour {transaction.sage_document_id}")
|
||||
|
||||
async def _archive_signed_document(self, transaction):
|
||||
"""Archive le document signé"""
|
||||
logger.info(f"TODO: Archivage pour {transaction.sage_document_id}")
|
||||
|
||||
@staticmethod
|
||||
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
|
||||
"""Parse une date ISO 8601"""
|
||||
if not date_str:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class UniversignSyncScheduler:
|
||||
def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5):
|
||||
self.sync_service = sync_service
|
||||
self.interval_minutes = interval_minutes
|
||||
self.is_running = False
|
||||
|
||||
async def start(self, session_factory):
|
||||
"""Démarre le polling automatique"""
|
||||
import asyncio
|
||||
|
||||
self.is_running = True
|
||||
|
||||
logger.info(
|
||||
f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)"
|
||||
)
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
async with session_factory() as session:
|
||||
stats = await self.sync_service.sync_all_pending(session)
|
||||
|
||||
logger.info(
|
||||
f"Polling: {stats['success']} transactions synchronisées, "
|
||||
f"{stats['status_changes']} changements"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur mise à jour Sage (refusé): {e}")
|
||||
logger.error(f"Erreur polling: {e}", exc_info=True)
|
||||
|
||||
async def _update_sage_to_expired(self, transaction: UniversignTransaction):
|
||||
"""Met à jour Sage quand signature expirée"""
|
||||
try:
|
||||
if not self.sage_client:
|
||||
return
|
||||
# Attendre avant le prochain cycle
|
||||
await asyncio.sleep(self.interval_minutes * 60)
|
||||
|
||||
# Statut 4 = Expiré/Archivé (selon config Sage)
|
||||
self.sage_client.changer_statut_document(
|
||||
document_type_code=transaction.sage_document_type.value,
|
||||
numero=transaction.sage_document_id,
|
||||
nouveau_statut=4,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"⏰ Statut Sage → 4 (Expiré) pour {transaction.sage_document_id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur mise à jour Sage (expiré): {e}")
|
||||
def stop(self):
|
||||
"""Arrête le polling"""
|
||||
self.is_running = False
|
||||
logger.info("Arrêt polling Universign")
|
||||
|
|
|
|||
Loading…
Reference in a new issue