Sage100-vps/services/universign_sync.py
2026-01-06 15:41:03 +03:00

479 lines
16 KiB
Python

import requests
import json
import logging
from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_
from database import (
UniversignTransaction,
UniversignSigner,
UniversignSyncLog,
UniversignTransactionStatus,
LocalDocumentStatus,
UniversignSignerStatus,
)
from utils.universign_status_mapping import (
map_universign_to_local,
is_transition_allowed,
get_status_actions,
is_final_status,
resolve_status_conflict,
)
logger = logging.getLogger(__name__)
class UniversignSyncService:
def __init__(self, api_url: str, api_key: str, timeout: int = 30):
self.api_url = api_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self.auth = (api_key, "")
def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]:
start_time = datetime.now()
try:
response = requests.get(
f"{self.api_url}/transactions/{transaction_id}",
auth=self.auth,
timeout=self.timeout,
headers={"Accept": "application/json"},
)
response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
if response.status_code == 200:
data = response.json()
logger.info(
f"✓ Fetch OK: {transaction_id} "
f"status={data.get('state')} "
f"({response_time_ms}ms)"
)
return {
"transaction": data,
"http_status": 200,
"response_time_ms": response_time_ms,
"fetched_at": datetime.now(),
}
elif response.status_code == 404:
logger.warning(
f"Transaction {transaction_id} introuvable sur Universign"
)
return None
else:
logger.error(
f"Erreur HTTP {response.status_code} "
f"pour {transaction_id}: {response.text}"
)
return None
except requests.exceptions.Timeout:
logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)")
return None
except Exception as e:
logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True)
return None
async def sync_transaction(
self,
session: AsyncSession,
transaction: UniversignTransaction,
force: bool = False,
) -> Tuple[bool, Optional[str]]:
if is_final_status(transaction.local_status.value) and not force:
logger.debug(
f"Skip {transaction.transaction_id}: "
f"statut final {transaction.local_status.value}"
)
transaction.needs_sync = False
await session.commit()
return True, None
# === FETCH UNIVERSIGN ===
result = self.fetch_transaction_status(transaction.transaction_id)
if not result:
error = "Échec récupération données Universign"
await self._log_sync_attempt(session, transaction, "polling", False, error)
return False, error
# === EXTRACTION DONNÉES ===
universign_data = result["transaction"]
universign_status_raw = universign_data.get("state", "draft")
# === MAPPING STATUT ===
new_local_status = map_universign_to_local(universign_status_raw)
previous_local_status = transaction.local_status.value
# === VALIDATION TRANSITION ===
if not is_transition_allowed(previous_local_status, new_local_status):
logger.warning(
f"Transition refusée: {previous_local_status}{new_local_status}"
)
# En cas de conflit, résoudre par priorité
new_local_status = resolve_status_conflict(
previous_local_status, new_local_status
)
# === DÉTECTION CHANGEMENT ===
status_changed = previous_local_status != new_local_status
if not status_changed and not force:
logger.debug(f"Pas de changement pour {transaction.transaction_id}")
transaction.last_synced_at = datetime.now()
transaction.needs_sync = False
await session.commit()
return True, None
# === MISE À JOUR TRANSACTION ===
transaction.universign_status = UniversignTransactionStatus(
universign_status_raw
)
transaction.local_status = LocalDocumentStatus(new_local_status)
transaction.universign_status_updated_at = datetime.now()
# === DATES SPÉCIFIQUES ===
if new_local_status == "EN_COURS" and not transaction.sent_at:
transaction.sent_at = datetime.now()
if new_local_status == "SIGNE" and not transaction.signed_at:
transaction.signed_at = datetime.now()
if new_local_status == "REFUSE" and not transaction.refused_at:
transaction.refused_at = datetime.now()
if new_local_status == "EXPIRE" and not transaction.expired_at:
transaction.expired_at = datetime.now()
# === URLS ===
if "signers" in universign_data and len(universign_data["signers"]) > 0:
first_signer = universign_data["signers"][0]
if "url" in first_signer:
transaction.signer_url = first_signer["url"]
if "documents" in universign_data and len(universign_data["documents"]) > 0:
first_doc = universign_data["documents"][0]
if "url" in first_doc:
transaction.document_url = first_doc["url"]
# === SIGNATAIRES ===
await self._sync_signers(session, transaction, universign_data)
# === FLAGS ===
transaction.last_synced_at = datetime.now()
transaction.sync_attempts += 1
transaction.needs_sync = not is_final_status(new_local_status)
transaction.sync_error = None
# === LOG ===
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type="polling",
success=True,
error_message=None,
previous_status=previous_local_status,
new_status=new_local_status,
changes=json.dumps(
{
"status_changed": status_changed,
"universign_raw": universign_status_raw,
"response_time_ms": result.get("response_time_ms"),
}
),
)
await session.commit()
# === ACTIONS MÉTIER ===
if status_changed:
await self._execute_status_actions(session, transaction, new_local_status)
logger.info(
f"✓ Sync OK: {transaction.transaction_id} "
f"{previous_local_status}{new_local_status}"
)
return True, None
async def sync_all_pending(
self, session: AsyncSession, max_transactions: int = 50
) -> Dict[str, int]:
"""
Synchronise toutes les transactions en attente
"""
from sqlalchemy.orm import selectinload # Si pas déjà importé en haut
query = (
select(UniversignTransaction)
.options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE
.where(
and_(
UniversignTransaction.needs_sync,
or_(
~UniversignTransaction.local_status.in_(
[
LocalDocumentStatus.SIGNED,
LocalDocumentStatus.REJECTED,
LocalDocumentStatus.EXPIRED,
]
),
UniversignTransaction.last_synced_at
< (datetime.now() - timedelta(hours=1)),
UniversignTransaction.last_synced_at.is_(None),
),
)
)
.order_by(UniversignTransaction.created_at.asc())
.limit(max_transactions)
)
result = await session.execute(query)
transactions = result.scalars().all()
stats = {
"total_found": len(transactions),
"success": 0,
"failed": 0,
"skipped": 0,
"status_changes": 0,
}
for transaction in transactions:
try:
previous_status = transaction.local_status.value
success, error = await self.sync_transaction(
session, transaction, force=False
)
if success:
stats["success"] += 1
if transaction.local_status.value != previous_status:
stats["status_changes"] += 1
else:
stats["failed"] += 1
except Exception as e:
logger.error(
f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True
)
stats["failed"] += 1
logger.info(
f"Polling terminé: {stats['success']}/{stats['total_found']} OK, "
f"{stats['status_changes']} changements détectés"
)
return stats
async def process_webhook(
self, session: AsyncSession, payload: Dict
) -> Tuple[bool, Optional[str]]:
try:
event_type = payload.get("event")
transaction_id = payload.get("transaction_id") or payload.get("id")
if not transaction_id:
return False, "Pas de transaction_id dans le webhook"
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
logger.warning(
f"Webhook reçu pour transaction inconnue: {transaction_id}"
)
return False, "Transaction inconnue"
transaction.webhook_received = True
success, error = await self.sync_transaction(
session, transaction, force=True
)
await self._log_sync_attempt(
session=session,
transaction=transaction,
sync_type=f"webhook:{event_type}",
success=success,
error_message=error,
changes=json.dumps(payload),
)
await session.commit()
logger.info(
f"✓ Webhook traité: {transaction_id} "
f"event={event_type} success={success}"
)
return success, error
except Exception as e:
logger.error(f"Erreur traitement webhook: {e}", exc_info=True)
return False, str(e)
async def _sync_signers(
self,
session: AsyncSession,
transaction: UniversignTransaction,
universign_data: Dict,
):
"""Synchronise les signataires"""
signers_data = universign_data.get("signers", [])
# Supprimer les anciens signataires
for signer in transaction.signers:
await session.delete(signer)
# Créer les nouveaux
for idx, signer_data in enumerate(signers_data):
signer = UniversignSigner(
id=f"{transaction.id}_signer_{idx}",
transaction_id=transaction.id,
email=signer_data.get("email", ""),
name=signer_data.get("name"),
status=UniversignSignerStatus(signer_data.get("status", "waiting")),
order_index=idx,
viewed_at=self._parse_date(signer_data.get("viewed_at")),
signed_at=self._parse_date(signer_data.get("signed_at")),
refused_at=self._parse_date(signer_data.get("refused_at")),
)
session.add(signer)
async def _log_sync_attempt(
self,
session: AsyncSession,
transaction: UniversignTransaction,
sync_type: str,
success: bool,
error_message: Optional[str] = None,
previous_status: Optional[str] = None,
new_status: Optional[str] = None,
changes: Optional[str] = None,
):
"""Enregistre une tentative de sync dans les logs"""
log = UniversignSyncLog(
transaction_id=transaction.id,
sync_type=sync_type,
sync_timestamp=datetime.now(),
previous_status=previous_status,
new_status=new_status,
changes_detected=changes,
success=success,
error_message=error_message,
)
session.add(log)
async def _execute_status_actions(
self, session: AsyncSession, transaction: UniversignTransaction, new_status: str
):
"""Exécute les actions métier associées au statut"""
actions = get_status_actions(new_status)
if not actions:
return
# Mise à jour Sage
if actions.get("update_sage_status"):
await self._update_sage_status(transaction, new_status)
# Déclencher workflow
if actions.get("trigger_workflow"):
await self._trigger_workflow(transaction)
# Notifications
if actions.get("send_notification"):
await self._send_notification(transaction, new_status)
# Archive
if actions.get("archive_document"):
await self._archive_signed_document(transaction)
async def _update_sage_status(self, transaction, status):
"""Met à jour le statut dans Sage"""
# TODO: Appeler sage_client.mettre_a_jour_champ_libre()
logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}")
async def _trigger_workflow(self, transaction):
"""Déclenche un workflow (ex: devis→commande)"""
logger.info(f"TODO: Workflow pour {transaction.sage_document_id}")
async def _send_notification(self, transaction, status):
"""Envoie une notification email"""
logger.info(f"TODO: Notif pour {transaction.sage_document_id}")
async def _archive_signed_document(self, transaction):
"""Archive le document signé"""
logger.info(f"TODO: Archivage pour {transaction.sage_document_id}")
@staticmethod
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse une date ISO 8601"""
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except Exception:
return None
class UniversignSyncScheduler:
def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5):
self.sync_service = sync_service
self.interval_minutes = interval_minutes
self.is_running = False
async def start(self, session_factory):
"""Démarre le polling automatique"""
import asyncio
self.is_running = True
logger.info(
f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)"
)
while self.is_running:
try:
async with session_factory() as session:
stats = await self.sync_service.sync_all_pending(session)
logger.info(
f"Polling: {stats['success']} transactions synchronisées, "
f"{stats['status_changes']} changements"
)
except Exception as e:
logger.error(f"Erreur polling: {e}", exc_info=True)
# Attendre avant le prochain cycle
await asyncio.sleep(self.interval_minutes * 60)
def stop(self):
"""Arrête le polling"""
self.is_running = False
logger.info("Arrêt polling Universign")