refactor(universign): clean up code and update status constants

This commit is contained in:
Fanilo-Nantenaina 2026-01-06 11:29:36 +03:00
parent b40c998062
commit 410d4553d5
4 changed files with 24 additions and 214 deletions

3
api.py
View file

@ -1,4 +1,4 @@
from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body, Request from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
@ -90,7 +90,6 @@ from core.sage_context import (
) )
from utils.generic_functions import ( from utils.generic_functions import (
_preparer_lignes_document, _preparer_lignes_document,
normaliser_type_doc,
universign_envoyer, universign_envoyer,
universign_statut, universign_statut,
) )

View file

@ -481,22 +481,22 @@ async def get_sync_stats(session: AsyncSession = Depends(get_session)):
# Par statut # Par statut
signed_query = select(func.count(UniversignTransaction.id)).where( signed_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.SIGNE UniversignTransaction.local_status == LocalDocumentStatus.SIGNED
) )
signed = (await session.execute(signed_query)).scalar() signed = (await session.execute(signed_query)).scalar()
in_progress_query = select(func.count(UniversignTransaction.id)).where( in_progress_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EN_COURS UniversignTransaction.local_status == LocalDocumentStatus.IN_PROGRESS
) )
in_progress = (await session.execute(in_progress_query)).scalar() in_progress = (await session.execute(in_progress_query)).scalar()
refused_query = select(func.count(UniversignTransaction.id)).where( refused_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.REFUSE UniversignTransaction.local_status == LocalDocumentStatus.REJECTED
) )
refused = (await session.execute(refused_query)).scalar() refused = (await session.execute(refused_query)).scalar()
expired_query = select(func.count(UniversignTransaction.id)).where( expired_query = select(func.count(UniversignTransaction.id)).where(
UniversignTransaction.local_status == LocalDocumentStatus.EXPIRE UniversignTransaction.local_status == LocalDocumentStatus.EXPIRED
) )
expired = (await session.execute(expired_query)).scalar() expired = (await session.execute(expired_query)).scalar()

View file

@ -1,16 +1,10 @@
"""
Service de synchronisation Universign
Architecture : polling + webhooks avec retry et gestion d'erreurs
"""
import requests import requests
import json import json
import logging import logging
from typing import Dict, List, Optional, Tuple from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_ from sqlalchemy import select, and_, or_
from sqlalchemy.orm import selectinload
from database import ( from database import (
UniversignTransaction, UniversignTransaction,
@ -22,7 +16,6 @@ from database import (
) )
from utils.universign_status_mapping import ( from utils.universign_status_mapping import (
map_universign_to_local, map_universign_to_local,
get_sage_status_code,
is_transition_allowed, is_transition_allowed,
get_status_actions, get_status_actions,
is_final_status, is_final_status,
@ -33,30 +26,13 @@ logger = logging.getLogger(__name__)
class UniversignSyncService: class UniversignSyncService:
"""
Service centralisé de synchronisation Universign
"""
def __init__(self, api_url: str, api_key: str, timeout: int = 30): def __init__(self, api_url: str, api_key: str, timeout: int = 30):
self.api_url = api_url.rstrip("/") self.api_url = api_url.rstrip("/")
self.api_key = api_key self.api_key = api_key
self.timeout = timeout self.timeout = timeout
self.auth = (api_key, "") self.auth = (api_key, "")
# ========================================
# 1. RÉCUPÉRATION DEPUIS UNIVERSIGN
# ========================================
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
"""
Récupère le statut actuel d'une transaction depuis Universign
Args:
transaction_id: ID Universign (ex: tr_abc123)
Returns:
Dictionnaire avec les données Universign ou None
"""
start_time = datetime.now() start_time = datetime.now()
try: try:
@ -104,30 +80,12 @@ class UniversignSyncService:
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
return None return None
# ========================================
# 2. SYNCHRONISATION UNITAIRE
# ========================================
async def sync_transaction( async def sync_transaction(
self, self,
session: AsyncSession, session: AsyncSession,
transaction: UniversignTransaction, transaction: UniversignTransaction,
force: bool = False, force: bool = False,
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
"""
Synchronise UNE transaction avec Universign
Args:
session: Session BDD
transaction: Transaction à synchroniser
force: Forcer même si statut final
Returns:
(success: bool, error_message: Optional[str])
"""
# === VÉRIFICATIONS PRÉALABLES ===
# Si statut final et pas de force, skip
if is_final_status(transaction.local_status.value) and not force: if is_final_status(transaction.local_status.value) and not force:
logger.debug( logger.debug(
f"Skip {transaction.transaction_id}: " f"Skip {transaction.transaction_id}: "
@ -263,18 +221,6 @@ class UniversignSyncService:
async def sync_all_pending( async def sync_all_pending(
self, session: AsyncSession, max_transactions: int = 50 self, session: AsyncSession, max_transactions: int = 50
) -> Dict[str, int]: ) -> Dict[str, int]:
"""
Synchronise toutes les transactions en attente
Args:
session: Session BDD
max_transactions: Nombre max de transactions à traiter
Returns:
Statistiques de synchronisation
"""
# === SÉLECTION TRANSACTIONS À SYNCHRONISER ===
query = ( query = (
select(UniversignTransaction) select(UniversignTransaction)
.where( .where(
@ -304,8 +250,6 @@ class UniversignSyncService:
result = await session.execute(query) result = await session.execute(query)
transactions = result.scalars().all() transactions = result.scalars().all()
# === STATISTIQUES ===
stats = { stats = {
"total_found": len(transactions), "total_found": len(transactions),
"success": 0, "success": 0,
@ -314,8 +258,6 @@ class UniversignSyncService:
"status_changes": 0, "status_changes": 0,
} }
# === TRAITEMENT ===
for transaction in transactions: for transaction in transactions:
try: try:
previous_status = transaction.local_status.value previous_status = transaction.local_status.value
@ -345,34 +287,16 @@ class UniversignSyncService:
return stats return stats
# ========================================
# 4. WEBHOOK HANDLER
# ========================================
async def process_webhook( async def process_webhook(
self, session: AsyncSession, payload: Dict self, session: AsyncSession, payload: Dict
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
"""
Traite un webhook Universign
Args:
session: Session BDD
payload: Corps du webhook
Returns:
(success: bool, error_message: Optional[str])
"""
try: try:
# === EXTRACTION DONNÉES ===
event_type = payload.get("event") event_type = payload.get("event")
transaction_id = payload.get("transaction_id") or payload.get("id") transaction_id = payload.get("transaction_id") or payload.get("id")
if not transaction_id: if not transaction_id:
return False, "Pas de transaction_id dans le webhook" return False, "Pas de transaction_id dans le webhook"
# === RECHERCHE TRANSACTION ===
query = select(UniversignTransaction).where( query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id UniversignTransaction.transaction_id == transaction_id
) )
@ -385,18 +309,12 @@ class UniversignSyncService:
) )
return False, "Transaction inconnue" return False, "Transaction inconnue"
# === MARQUAGE WEBHOOK ===
transaction.webhook_received = True transaction.webhook_received = True
# === SYNCHRONISATION ===
success, error = await self.sync_transaction( success, error = await self.sync_transaction(
session, transaction, force=True session, transaction, force=True
) )
# === LOG WEBHOOK ===
await self._log_sync_attempt( await self._log_sync_attempt(
session=session, session=session,
transaction=transaction, transaction=transaction,
@ -419,10 +337,6 @@ class UniversignSyncService:
logger.error(f"Erreur traitement webhook: {e}", exc_info=True) logger.error(f"Erreur traitement webhook: {e}", exc_info=True)
return False, str(e) return False, str(e)
# ========================================
# 5. HELPERS PRIVÉS
# ========================================
async def _sync_signers( async def _sync_signers(
self, self,
session: AsyncSession, session: AsyncSession,
@ -524,20 +438,11 @@ class UniversignSyncService:
return None return None
try: try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00")) return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except: except Exception:
return None return None
# ========================================
# 6. TÂCHE PLANIFIÉE (BACKGROUND)
# ========================================
class UniversignSyncScheduler: class UniversignSyncScheduler:
"""
Planificateur de synchronisation automatique
"""
def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5):
self.sync_service = sync_service self.sync_service = sync_service
self.interval_minutes = interval_minutes self.interval_minutes = interval_minutes

View file

@ -1,7 +1,6 @@
from typing import Dict, List, Optional from typing import Dict, List
from config.config import settings from config.config import settings
import logging import logging
from enum import Enum
from datetime import datetime from datetime import datetime
import uuid import uuid
@ -290,47 +289,31 @@ def _preparer_lignes_document(lignes: List) -> List[Dict]:
] ]
# ========================================
# MAPPING UNIVERSIGN → LOCAL
# ========================================
UNIVERSIGN_TO_LOCAL: Dict[str, str] = { UNIVERSIGN_TO_LOCAL: Dict[str, str] = {
# États initiaux # États initiaux
"draft": "EN_ATTENTE", # Transaction créée "draft": "EN_ATTENTE",
"ready": "EN_ATTENTE", # Prête mais pas envoyée "ready": "EN_ATTENTE",
# En cours # En cours
"started": "EN_COURS", # Envoyée, en attente de signature "started": "EN_COURS",
# États finaux (succès) # États finaux (succès)
"completed": "SIGNE", # ✓ Signé avec succès "completed": "SIGNE",
# États finaux (échec) # États finaux (échec)
"refused": "REFUSE", # ✗ Refusé par un signataire "refused": "REFUSE",
"expired": "EXPIRE", # ⏰ Délai expiré "expired": "EXPIRE",
"canceled": "REFUSE", # ✗ Annulé manuellement "canceled": "REFUSE",
"failed": "ERREUR", # ⚠️ Erreur technique "failed": "ERREUR",
} }
# ========================================
# MAPPING LOCAL → SAGE (CHAMP LIBRE)
# ========================================
LOCAL_TO_SAGE_STATUS: Dict[str, int] = { LOCAL_TO_SAGE_STATUS: Dict[str, int] = {
""" "EN_ATTENTE": 0,
À stocker dans un champ libre Sage (ex: CB_STATUT_SIGNATURE) "EN_COURS": 1,
""" "SIGNE": 2,
"EN_ATTENTE": 0, # Non envoyé "REFUSE": 3,
"EN_COURS": 1, # Envoyé, en attente "EXPIRE": 4,
"SIGNE": 2, # ✓ Signé (peut déclencher workflow) "ERREUR": 5,
"REFUSE": 3, # ✗ Refusé
"EXPIRE": 4, # ⏰ Expiré
"ERREUR": 5, # ⚠️ Erreur
} }
# ========================================
# ACTIONS MÉTIER PAR STATUT
# ========================================
STATUS_ACTIONS: Dict[str, Dict[str, any]] = { STATUS_ACTIONS: Dict[str, Dict[str, any]] = {
""" """
Actions automatiques à déclencher selon le statut Actions automatiques à déclencher selon le statut
@ -366,10 +349,6 @@ STATUS_ACTIONS: Dict[str, Dict[str, any]] = {
} }
# ========================================
# RÈGLES DE TRANSITION
# ========================================
ALLOWED_TRANSITIONS: Dict[str, list] = { ALLOWED_TRANSITIONS: Dict[str, list] = {
""" """
Transitions de statuts autorisées (validation) Transitions de statuts autorisées (validation)
@ -383,21 +362,7 @@ ALLOWED_TRANSITIONS: Dict[str, list] = {
} }
# ========================================
# FONCTION DE MAPPING
# ========================================
def map_universign_to_local(universign_status: str) -> str: def map_universign_to_local(universign_status: str) -> str:
"""
Convertit un statut Universign en statut local
Args:
universign_status: Statut brut Universign
Returns:
Statut local normalisé
"""
return UNIVERSIGN_TO_LOCAL.get( return UNIVERSIGN_TO_LOCAL.get(
universign_status.lower(), universign_status.lower(),
"ERREUR", # Fallback si statut inconnu "ERREUR", # Fallback si statut inconnu
@ -405,29 +370,10 @@ def map_universign_to_local(universign_status: str) -> str:
def get_sage_status_code(local_status: str) -> int: def get_sage_status_code(local_status: str) -> int:
"""
Obtient le code numérique pour Sage
Args:
local_status: Statut local (EN_ATTENTE, SIGNE, etc.)
Returns:
Code numérique pour champ libre Sage
"""
return LOCAL_TO_SAGE_STATUS.get(local_status, 5) return LOCAL_TO_SAGE_STATUS.get(local_status, 5)
def is_transition_allowed(from_status: str, to_status: str) -> bool: def is_transition_allowed(from_status: str, to_status: str) -> bool:
"""
Vérifie si une transition de statut est valide
Args:
from_status: Statut actuel
to_status: Nouveau statut
Returns:
True si la transition est autorisée
"""
if from_status == to_status: if from_status == to_status:
return True # Même statut = OK (idempotence) return True # Même statut = OK (idempotence)
@ -436,70 +382,30 @@ def is_transition_allowed(from_status: str, to_status: str) -> bool:
def get_status_actions(local_status: str) -> Dict[str, any]: def get_status_actions(local_status: str) -> Dict[str, any]:
"""
Obtient les actions à exécuter pour un statut
Args:
local_status: Statut local
Returns:
Dictionnaire des actions à exécuter
"""
return STATUS_ACTIONS.get(local_status, {}) return STATUS_ACTIONS.get(local_status, {})
def is_final_status(local_status: str) -> bool: def is_final_status(local_status: str) -> bool:
"""
Détermine si le statut est final (pas de synchronisation nécessaire)
Args:
local_status: Statut local
Returns:
True si statut final
"""
return local_status in ["SIGNE", "REFUSE", "EXPIRE"] return local_status in ["SIGNE", "REFUSE", "EXPIRE"]
# ========================================
# PRIORITÉS DE STATUTS (POUR CONFLITS)
# ========================================
STATUS_PRIORITY: Dict[str, int] = { STATUS_PRIORITY: Dict[str, int] = {
""" "ERREUR": 0,
En cas de conflit de synchronisation, prendre le statut
avec la priorité la plus élevée
"""
"ERREUR": 0, # Priorité la plus basse
"EN_ATTENTE": 1, "EN_ATTENTE": 1,
"EN_COURS": 2, "EN_COURS": 2,
"EXPIRE": 3, "EXPIRE": 3,
"REFUSE": 4, "REFUSE": 4,
"SIGNE": 5, # Priorité la plus élevée (état final souhaité) "SIGNE": 5,
} }
def resolve_status_conflict(status_a: str, status_b: str) -> str: def resolve_status_conflict(status_a: str, status_b: str) -> str:
"""
Résout un conflit entre deux statuts (prend le plus prioritaire)
Args:
status_a: Premier statut
status_b: Second statut
Returns:
Statut prioritaire
"""
priority_a = STATUS_PRIORITY.get(status_a, 0) priority_a = STATUS_PRIORITY.get(status_a, 0)
priority_b = STATUS_PRIORITY.get(status_b, 0) priority_b = STATUS_PRIORITY.get(status_b, 0)
return status_a if priority_a >= priority_b else status_b return status_a if priority_a >= priority_b else status_b
# ========================================
# MESSAGES UTILISATEUR
# ========================================
STATUS_MESSAGES: Dict[str, Dict[str, str]] = { STATUS_MESSAGES: Dict[str, Dict[str, str]] = {
"EN_ATTENTE": { "EN_ATTENTE": {
"fr": "Document en attente d'envoi", "fr": "Document en attente d'envoi",