575 lines
18 KiB
Python
575 lines
18 KiB
Python
"""
|
|
Service de synchronisation Universign
|
|
Architecture : polling + webhooks avec retry et gestion d'erreurs
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, and_, or_
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from database.models.universign_models import (
|
|
UniversignTransaction,
|
|
UniversignSigner,
|
|
UniversignSyncLog,
|
|
UniversignTransactionStatus,
|
|
LocalDocumentStatus,
|
|
UniversignSignerStatus,
|
|
)
|
|
from status_mapping import (
|
|
map_universign_to_local,
|
|
get_sage_status_code,
|
|
is_transition_allowed,
|
|
get_status_actions,
|
|
is_final_status,
|
|
resolve_status_conflict,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UniversignSyncService:
|
|
"""
|
|
Service centralisé de synchronisation Universign
|
|
"""
|
|
|
|
def __init__(self, api_url: str, api_key: str, timeout: int = 30):
|
|
self.api_url = api_url.rstrip("/")
|
|
self.api_key = api_key
|
|
self.timeout = timeout
|
|
self.auth = (api_key, "")
|
|
|
|
# ========================================
|
|
# 1. RÉCUPÉRATION DEPUIS UNIVERSIGN
|
|
# ========================================
|
|
|
|
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
|
|
"""
|
|
Récupère le statut actuel d'une transaction depuis Universign
|
|
|
|
Args:
|
|
transaction_id: ID Universign (ex: tr_abc123)
|
|
|
|
Returns:
|
|
Dictionnaire avec les données Universign ou None
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.api_url}/transactions/{transaction_id}",
|
|
auth=self.auth,
|
|
timeout=self.timeout,
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
|
|
response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
logger.info(
|
|
f"✓ Fetch OK: {transaction_id} "
|
|
f"status={data.get('state')} "
|
|
f"({response_time_ms}ms)"
|
|
)
|
|
return {
|
|
"transaction": data,
|
|
"http_status": 200,
|
|
"response_time_ms": response_time_ms,
|
|
"fetched_at": datetime.now(),
|
|
}
|
|
|
|
elif response.status_code == 404:
|
|
logger.warning(
|
|
f"Transaction {transaction_id} introuvable sur Universign"
|
|
)
|
|
return None
|
|
|
|
else:
|
|
logger.error(
|
|
f"Erreur HTTP {response.status_code} "
|
|
f"pour {transaction_id}: {response.text}"
|
|
)
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
|
|
return None
|
|
|
|
# ========================================
|
|
# 2. SYNCHRONISATION UNITAIRE
|
|
# ========================================
|
|
|
|
async def sync_transaction(
|
|
self,
|
|
session: AsyncSession,
|
|
transaction: UniversignTransaction,
|
|
force: bool = False,
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Synchronise UNE transaction avec Universign
|
|
|
|
Args:
|
|
session: Session BDD
|
|
transaction: Transaction à synchroniser
|
|
force: Forcer même si statut final
|
|
|
|
Returns:
|
|
(success: bool, error_message: Optional[str])
|
|
"""
|
|
# === VÉRIFICATIONS PRÉALABLES ===
|
|
|
|
# Si statut final et pas de force, skip
|
|
if is_final_status(transaction.local_status.value) and not force:
|
|
logger.debug(
|
|
f"Skip {transaction.transaction_id}: "
|
|
f"statut final {transaction.local_status.value}"
|
|
)
|
|
transaction.needs_sync = False
|
|
await session.commit()
|
|
return True, None
|
|
|
|
# === FETCH UNIVERSIGN ===
|
|
|
|
result = self.fetch_transaction_status(transaction.transaction_id)
|
|
|
|
if not result:
|
|
error = "Échec récupération données Universign"
|
|
await self._log_sync_attempt(session, transaction, "polling", False, error)
|
|
return False, error
|
|
|
|
# === EXTRACTION DONNÉES ===
|
|
|
|
universign_data = result["transaction"]
|
|
universign_status_raw = universign_data.get("state", "draft")
|
|
|
|
# === MAPPING STATUT ===
|
|
|
|
new_local_status = map_universign_to_local(universign_status_raw)
|
|
previous_local_status = transaction.local_status.value
|
|
|
|
# === VALIDATION TRANSITION ===
|
|
|
|
if not is_transition_allowed(previous_local_status, new_local_status):
|
|
logger.warning(
|
|
f"Transition refusée: {previous_local_status} → {new_local_status}"
|
|
)
|
|
# En cas de conflit, résoudre par priorité
|
|
new_local_status = resolve_status_conflict(
|
|
previous_local_status, new_local_status
|
|
)
|
|
|
|
# === DÉTECTION CHANGEMENT ===
|
|
|
|
status_changed = previous_local_status != new_local_status
|
|
|
|
if not status_changed and not force:
|
|
logger.debug(f"Pas de changement pour {transaction.transaction_id}")
|
|
transaction.last_synced_at = datetime.now()
|
|
transaction.needs_sync = False
|
|
await session.commit()
|
|
return True, None
|
|
|
|
# === MISE À JOUR TRANSACTION ===
|
|
|
|
transaction.universign_status = UniversignTransactionStatus(
|
|
universign_status_raw
|
|
)
|
|
transaction.local_status = LocalDocumentStatus(new_local_status)
|
|
transaction.universign_status_updated_at = datetime.now()
|
|
|
|
# === DATES SPÉCIFIQUES ===
|
|
|
|
if new_local_status == "EN_COURS" and not transaction.sent_at:
|
|
transaction.sent_at = datetime.now()
|
|
|
|
if new_local_status == "SIGNE" and not transaction.signed_at:
|
|
transaction.signed_at = datetime.now()
|
|
|
|
if new_local_status == "REFUSE" and not transaction.refused_at:
|
|
transaction.refused_at = datetime.now()
|
|
|
|
if new_local_status == "EXPIRE" and not transaction.expired_at:
|
|
transaction.expired_at = datetime.now()
|
|
|
|
# === URLS ===
|
|
|
|
if "signers" in universign_data and len(universign_data["signers"]) > 0:
|
|
first_signer = universign_data["signers"][0]
|
|
if "url" in first_signer:
|
|
transaction.signer_url = first_signer["url"]
|
|
|
|
if "documents" in universign_data and len(universign_data["documents"]) > 0:
|
|
first_doc = universign_data["documents"][0]
|
|
if "url" in first_doc:
|
|
transaction.document_url = first_doc["url"]
|
|
|
|
# === SIGNATAIRES ===
|
|
|
|
await self._sync_signers(session, transaction, universign_data)
|
|
|
|
# === FLAGS ===
|
|
|
|
transaction.last_synced_at = datetime.now()
|
|
transaction.sync_attempts += 1
|
|
transaction.needs_sync = not is_final_status(new_local_status)
|
|
transaction.sync_error = None
|
|
|
|
# === LOG ===
|
|
|
|
await self._log_sync_attempt(
|
|
session=session,
|
|
transaction=transaction,
|
|
sync_type="polling",
|
|
success=True,
|
|
error_message=None,
|
|
previous_status=previous_local_status,
|
|
new_status=new_local_status,
|
|
changes=json.dumps(
|
|
{
|
|
"status_changed": status_changed,
|
|
"universign_raw": universign_status_raw,
|
|
"response_time_ms": result.get("response_time_ms"),
|
|
}
|
|
),
|
|
)
|
|
|
|
await session.commit()
|
|
|
|
# === ACTIONS MÉTIER ===
|
|
|
|
if status_changed:
|
|
await self._execute_status_actions(session, transaction, new_local_status)
|
|
|
|
logger.info(
|
|
f"✓ Sync OK: {transaction.transaction_id} "
|
|
f"{previous_local_status} → {new_local_status}"
|
|
)
|
|
|
|
return True, None
|
|
|
|
# ========================================
|
|
# 3. SYNCHRONISATION DE MASSE (POLLING)
|
|
# ========================================
|
|
|
|
async def sync_all_pending(
|
|
self, session: AsyncSession, max_transactions: int = 50
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Synchronise toutes les transactions en attente
|
|
|
|
Args:
|
|
session: Session BDD
|
|
max_transactions: Nombre max de transactions à traiter
|
|
|
|
Returns:
|
|
Statistiques de synchronisation
|
|
"""
|
|
# === SÉLECTION TRANSACTIONS À SYNCHRONISER ===
|
|
|
|
query = (
|
|
select(UniversignTransaction)
|
|
.where(
|
|
and_(
|
|
UniversignTransaction.needs_sync == True,
|
|
or_(
|
|
# Transactions non finales
|
|
~UniversignTransaction.local_status.in_(
|
|
[
|
|
LocalDocumentStatus.SIGNE,
|
|
LocalDocumentStatus.REFUSE,
|
|
LocalDocumentStatus.EXPIRE,
|
|
]
|
|
),
|
|
# OU dernière sync > 1h (vérification finale)
|
|
UniversignTransaction.last_synced_at
|
|
< (datetime.now() - timedelta(hours=1)),
|
|
# OU jamais synchronisé
|
|
UniversignTransaction.last_synced_at.is_(None),
|
|
),
|
|
)
|
|
)
|
|
.order_by(UniversignTransaction.created_at.asc())
|
|
.limit(max_transactions)
|
|
)
|
|
|
|
result = await session.execute(query)
|
|
transactions = result.scalars().all()
|
|
|
|
# === STATISTIQUES ===
|
|
|
|
stats = {
|
|
"total_found": len(transactions),
|
|
"success": 0,
|
|
"failed": 0,
|
|
"skipped": 0,
|
|
"status_changes": 0,
|
|
}
|
|
|
|
# === TRAITEMENT ===
|
|
|
|
for transaction in transactions:
|
|
try:
|
|
previous_status = transaction.local_status.value
|
|
|
|
success, error = await self.sync_transaction(
|
|
session, transaction, force=False
|
|
)
|
|
|
|
if success:
|
|
stats["success"] += 1
|
|
|
|
if transaction.local_status.value != previous_status:
|
|
stats["status_changes"] += 1
|
|
else:
|
|
stats["failed"] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True
|
|
)
|
|
stats["failed"] += 1
|
|
|
|
logger.info(
|
|
f"Polling terminé: {stats['success']}/{stats['total_found']} OK, "
|
|
f"{stats['status_changes']} changements détectés"
|
|
)
|
|
|
|
return stats
|
|
|
|
# ========================================
|
|
# 4. WEBHOOK HANDLER
|
|
# ========================================
|
|
|
|
async def process_webhook(
|
|
self, session: AsyncSession, payload: Dict
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Traite un webhook Universign
|
|
|
|
Args:
|
|
session: Session BDD
|
|
payload: Corps du webhook
|
|
|
|
Returns:
|
|
(success: bool, error_message: Optional[str])
|
|
"""
|
|
try:
|
|
# === EXTRACTION DONNÉES ===
|
|
|
|
event_type = payload.get("event")
|
|
transaction_id = payload.get("transaction_id") or payload.get("id")
|
|
|
|
if not transaction_id:
|
|
return False, "Pas de transaction_id dans le webhook"
|
|
|
|
# === RECHERCHE TRANSACTION ===
|
|
|
|
query = select(UniversignTransaction).where(
|
|
UniversignTransaction.transaction_id == transaction_id
|
|
)
|
|
result = await session.execute(query)
|
|
transaction = result.scalar_one_or_none()
|
|
|
|
if not transaction:
|
|
logger.warning(
|
|
f"Webhook reçu pour transaction inconnue: {transaction_id}"
|
|
)
|
|
return False, "Transaction inconnue"
|
|
|
|
# === MARQUAGE WEBHOOK ===
|
|
|
|
transaction.webhook_received = True
|
|
|
|
# === SYNCHRONISATION ===
|
|
|
|
success, error = await self.sync_transaction(
|
|
session, transaction, force=True
|
|
)
|
|
|
|
# === LOG WEBHOOK ===
|
|
|
|
await self._log_sync_attempt(
|
|
session=session,
|
|
transaction=transaction,
|
|
sync_type=f"webhook:{event_type}",
|
|
success=success,
|
|
error_message=error,
|
|
changes=json.dumps(payload),
|
|
)
|
|
|
|
await session.commit()
|
|
|
|
logger.info(
|
|
f"✓ Webhook traité: {transaction_id} "
|
|
f"event={event_type} success={success}"
|
|
)
|
|
|
|
return success, error
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur traitement webhook: {e}", exc_info=True)
|
|
return False, str(e)
|
|
|
|
# ========================================
|
|
# 5. HELPERS PRIVÉS
|
|
# ========================================
|
|
|
|
async def _sync_signers(
|
|
self,
|
|
session: AsyncSession,
|
|
transaction: UniversignTransaction,
|
|
universign_data: Dict,
|
|
):
|
|
"""Synchronise les signataires"""
|
|
signers_data = universign_data.get("signers", [])
|
|
|
|
# Supprimer les anciens signataires
|
|
for signer in transaction.signers:
|
|
await session.delete(signer)
|
|
|
|
# Créer les nouveaux
|
|
for idx, signer_data in enumerate(signers_data):
|
|
signer = UniversignSigner(
|
|
id=f"{transaction.id}_signer_{idx}",
|
|
transaction_id=transaction.id,
|
|
email=signer_data.get("email", ""),
|
|
name=signer_data.get("name"),
|
|
status=UniversignSignerStatus(signer_data.get("status", "waiting")),
|
|
order_index=idx,
|
|
viewed_at=self._parse_date(signer_data.get("viewed_at")),
|
|
signed_at=self._parse_date(signer_data.get("signed_at")),
|
|
refused_at=self._parse_date(signer_data.get("refused_at")),
|
|
)
|
|
session.add(signer)
|
|
|
|
async def _log_sync_attempt(
|
|
self,
|
|
session: AsyncSession,
|
|
transaction: UniversignTransaction,
|
|
sync_type: str,
|
|
success: bool,
|
|
error_message: Optional[str] = None,
|
|
previous_status: Optional[str] = None,
|
|
new_status: Optional[str] = None,
|
|
changes: Optional[str] = None,
|
|
):
|
|
"""Enregistre une tentative de sync dans les logs"""
|
|
log = UniversignSyncLog(
|
|
transaction_id=transaction.id,
|
|
sync_type=sync_type,
|
|
sync_timestamp=datetime.now(),
|
|
previous_status=previous_status,
|
|
new_status=new_status,
|
|
changes_detected=changes,
|
|
success=success,
|
|
error_message=error_message,
|
|
)
|
|
session.add(log)
|
|
|
|
async def _execute_status_actions(
|
|
self, session: AsyncSession, transaction: UniversignTransaction, new_status: str
|
|
):
|
|
"""Exécute les actions métier associées au statut"""
|
|
actions = get_status_actions(new_status)
|
|
|
|
if not actions:
|
|
return
|
|
|
|
# Mise à jour Sage
|
|
if actions.get("update_sage_status"):
|
|
await self._update_sage_status(transaction, new_status)
|
|
|
|
# Déclencher workflow
|
|
if actions.get("trigger_workflow"):
|
|
await self._trigger_workflow(transaction)
|
|
|
|
# Notifications
|
|
if actions.get("send_notification"):
|
|
await self._send_notification(transaction, new_status)
|
|
|
|
# Archive
|
|
if actions.get("archive_document"):
|
|
await self._archive_signed_document(transaction)
|
|
|
|
async def _update_sage_status(self, transaction, status):
|
|
"""Met à jour le statut dans Sage"""
|
|
# TODO: Appeler sage_client.mettre_a_jour_champ_libre()
|
|
logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}")
|
|
|
|
async def _trigger_workflow(self, transaction):
|
|
"""Déclenche un workflow (ex: devis→commande)"""
|
|
logger.info(f"TODO: Workflow pour {transaction.sage_document_id}")
|
|
|
|
async def _send_notification(self, transaction, status):
|
|
"""Envoie une notification email"""
|
|
logger.info(f"TODO: Notif pour {transaction.sage_document_id}")
|
|
|
|
async def _archive_signed_document(self, transaction):
|
|
"""Archive le document signé"""
|
|
logger.info(f"TODO: Archivage pour {transaction.sage_document_id}")
|
|
|
|
@staticmethod
|
|
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
|
|
"""Parse une date ISO 8601"""
|
|
if not date_str:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
|
except:
|
|
return None
|
|
|
|
|
|
# ========================================
|
|
# 6. TÂCHE PLANIFIÉE (BACKGROUND)
|
|
# ========================================
|
|
|
|
|
|
class UniversignSyncScheduler:
|
|
"""
|
|
Planificateur de synchronisation automatique
|
|
"""
|
|
|
|
def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5):
|
|
self.sync_service = sync_service
|
|
self.interval_minutes = interval_minutes
|
|
self.is_running = False
|
|
|
|
async def start(self, session_factory):
|
|
"""Démarre le polling automatique"""
|
|
import asyncio
|
|
|
|
self.is_running = True
|
|
|
|
logger.info(
|
|
f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)"
|
|
)
|
|
|
|
while self.is_running:
|
|
try:
|
|
async with session_factory() as session:
|
|
stats = await self.sync_service.sync_all_pending(session)
|
|
|
|
logger.info(
|
|
f"Polling: {stats['success']} transactions synchronisées, "
|
|
f"{stats['status_changes']} changements"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur polling: {e}", exc_info=True)
|
|
|
|
# Attendre avant le prochain cycle
|
|
await asyncio.sleep(self.interval_minutes * 60)
|
|
|
|
def stop(self):
|
|
"""Arrête le polling"""
|
|
self.is_running = False
|
|
logger.info("Arrêt polling Universign")
|