import requests import json import logging from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, ) from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, get_status_actions, is_final_status, resolve_status_conflict, ) logger = logging.getLogger(__name__) class UniversignSyncService: def __init__(self, api_url: str, api_key: str, timeout: int = 30): self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() try: response = requests.get( f"{self.api_url}/transactions/{transaction_id}", auth=self.auth, timeout=self.timeout, headers={"Accept": "application/json"}, ) response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) if response.status_code == 200: data = response.json() logger.info( f"✓ Fetch OK: {transaction_id} " f"status={data.get('state')} " f"({response_time_ms}ms)" ) return { "transaction": data, "http_status": 200, "response_time_ms": response_time_ms, "fetched_at": datetime.now(), } elif response.status_code == 404: logger.warning( f"Transaction {transaction_id} introuvable sur Universign" ) return None else: logger.error( f"Erreur HTTP {response.status_code} " f"pour {transaction_id}: {response.text}" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") return None except Exception as e: logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) return None async def sync_transaction( self, session: AsyncSession, transaction: UniversignTransaction, force: bool = False, ) -> Tuple[bool, Optional[str]]: if is_final_status(transaction.local_status.value) and not force: logger.debug( f"Skip {transaction.transaction_id}: " f"statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None # === FETCH UNIVERSIGN === result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error # === EXTRACTION DONNÉES === universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") # === MAPPING STATUT === new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value # === VALIDATION TRANSITION === if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) # En cas de conflit, résoudre par priorité new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) # === DÉTECTION CHANGEMENT === status_changed = previous_local_status != new_local_status if not status_changed and not force: logger.debug(f"Pas de changement pour {transaction.transaction_id}") transaction.last_synced_at = datetime.now() transaction.needs_sync = False await session.commit() return True, None # === MISE À JOUR TRANSACTION === transaction.universign_status = UniversignTransactionStatus( universign_status_raw ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() # === DATES SPÉCIFIQUES === if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() # === URLS === if "signers" in universign_data and len(universign_data["signers"]) > 0: first_signer = universign_data["signers"][0] if "url" in first_signer: transaction.signer_url = first_signer["url"] if "documents" in universign_data and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] if "url" in first_doc: transaction.document_url = first_doc["url"] # === SIGNATAIRES === await self._sync_signers(session, transaction, universign_data) # === FLAGS === transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None # === LOG === await self._log_sync_attempt( session=session, transaction=transaction, sync_type="polling", success=True, error_message=None, previous_status=previous_local_status, new_status=new_local_status, changes=json.dumps( { "status_changed": status_changed, "universign_raw": universign_status_raw, "response_time_ms": result.get("response_time_ms"), } ), ) await session.commit() # === ACTIONS MÉTIER === if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( f"✓ Sync OK: {transaction.transaction_id} " f"{previous_local_status} → {new_local_status}" ) return True, None async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: """ Synchronise toutes les transactions en attente """ from sqlalchemy.orm import selectinload # Si pas déjà importé en haut query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE .where( and_( UniversignTransaction.needs_sync, or_( ~UniversignTransaction.local_status.in_( [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, ] ), UniversignTransaction.last_synced_at < (datetime.now() - timedelta(hours=1)), UniversignTransaction.last_synced_at.is_(None), ), ) ) .order_by(UniversignTransaction.created_at.asc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() stats = { "total_found": len(transactions), "success": 0, "failed": 0, "skipped": 0, "status_changes": 0, } for transaction in transactions: try: previous_status = transaction.local_status.value success, error = await self.sync_transaction( session, transaction, force=False ) if success: stats["success"] += 1 if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: stats["failed"] += 1 except Exception as e: logger.error( f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True ) stats["failed"] += 1 logger.info( f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " f"{stats['status_changes']} changements détectés" ) return stats async def process_webhook( self, session: AsyncSession, payload: Dict ) -> Tuple[bool, Optional[str]]: try: event_type = payload.get("event") transaction_id = payload.get("transaction_id") or payload.get("id") if not transaction_id: return False, "Pas de transaction_id dans le webhook" query = select(UniversignTransaction).where( UniversignTransaction.transaction_id == transaction_id ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: logger.warning( f"Webhook reçu pour transaction inconnue: {transaction_id}" ) return False, "Transaction inconnue" transaction.webhook_received = True success, error = await self.sync_transaction( session, transaction, force=True ) await self._log_sync_attempt( session=session, transaction=transaction, sync_type=f"webhook:{event_type}", success=success, error_message=error, changes=json.dumps(payload), ) await session.commit() logger.info( f"✓ Webhook traité: {transaction_id} " f"event={event_type} success={success}" ) return success, error except Exception as e: logger.error(f"Erreur traitement webhook: {e}", exc_info=True) return False, str(e) async def _sync_signers( self, session: AsyncSession, transaction: UniversignTransaction, universign_data: Dict, ): """Synchronise les signataires""" signers_data = universign_data.get("signers", []) # Supprimer les anciens signataires for signer in transaction.signers: await session.delete(signer) # Créer les nouveaux for idx, signer_data in enumerate(signers_data): signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}", transaction_id=transaction.id, email=signer_data.get("email", ""), name=signer_data.get("name"), status=UniversignSignerStatus(signer_data.get("status", "waiting")), order_index=idx, viewed_at=self._parse_date(signer_data.get("viewed_at")), signed_at=self._parse_date(signer_data.get("signed_at")), refused_at=self._parse_date(signer_data.get("refused_at")), ) session.add(signer) async def _log_sync_attempt( self, session: AsyncSession, transaction: UniversignTransaction, sync_type: str, success: bool, error_message: Optional[str] = None, previous_status: Optional[str] = None, new_status: Optional[str] = None, changes: Optional[str] = None, ): """Enregistre une tentative de sync dans les logs""" log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, sync_timestamp=datetime.now(), previous_status=previous_status, new_status=new_status, changes_detected=changes, success=success, error_message=error_message, ) session.add(log) async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): """Exécute les actions métier associées au statut""" actions = get_status_actions(new_status) if not actions: return # Mise à jour Sage if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) # Déclencher workflow if actions.get("trigger_workflow"): await self._trigger_workflow(transaction) # Notifications if actions.get("send_notification"): await self._send_notification(transaction, new_status) # Archive if actions.get("archive_document"): await self._archive_signed_document(transaction) async def _update_sage_status(self, transaction, status): """Met à jour le statut dans Sage""" # TODO: Appeler sage_client.mettre_a_jour_champ_libre() logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") async def _trigger_workflow(self, transaction): """Déclenche un workflow (ex: devis→commande)""" logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") async def _send_notification(self, transaction, status): """Envoie une notification email""" logger.info(f"TODO: Notif pour {transaction.sage_document_id}") async def _archive_signed_document(self, transaction): """Archive le document signé""" logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: """Parse une date ISO 8601""" if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except Exception: return None class UniversignSyncScheduler: def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): self.sync_service = sync_service self.interval_minutes = interval_minutes self.is_running = False async def start(self, session_factory): """Démarre le polling automatique""" import asyncio self.is_running = True logger.info( f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" ) while self.is_running: try: async with session_factory() as session: stats = await self.sync_service.sync_all_pending(session) logger.info( f"Polling: {stats['success']} transactions synchronisées, " f"{stats['status_changes']} changements" ) except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) # Attendre avant le prochain cycle await asyncio.sleep(self.interval_minutes * 60) def stop(self): """Arrête le polling""" self.is_running = False logger.info("Arrêt polling Universign")