import requests import json import logging import uuid from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, UniversignSigner, UniversignSyncLog, UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, EmailLog, StatutEmail, ) from data.data import templates_signature_email from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, get_status_actions, is_final_status, resolve_status_conflict, ) logger = logging.getLogger(__name__) class UniversignSyncService: def __init__(self, api_url: str, api_key: str, timeout: int = 30): self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") self.sage_client = None self.email_queue = None self.settings = None def configure(self, sage_client, email_queue, settings): self.sage_client = sage_client self.email_queue = email_queue self.settings = settings def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() try: response = requests.get( f"{self.api_url}/transactions/{transaction_id}", auth=self.auth, timeout=self.timeout, headers={"Accept": "application/json"}, ) response_time_ms = int((datetime.now() - start_time).total_seconds() * 1000) if response.status_code == 200: data = response.json() logger.info( f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)" ) return { "transaction": data, "http_status": 200, "response_time_ms": response_time_ms, "fetched_at": datetime.now(), } elif response.status_code == 404: logger.warning( f"Transaction {transaction_id} introuvable sur Universign" ) return None else: logger.error( f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}" ) return None except requests.exceptions.Timeout: logger.error(f"Timeout récupération {transaction_id} (>{self.timeout}s)") return None except Exception as e: logger.error(f"Erreur fetch {transaction_id}: {e}", exc_info=True) return None async def sync_transaction( self, session: AsyncSession, transaction: UniversignTransaction, force: bool = False, ) -> Tuple[bool, Optional[str]]: if is_final_status(transaction.local_status.value) and not force: logger.debug( f"Skip {transaction.transaction_id}: statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None result = self.fetch_transaction_status(transaction.transaction_id) if not result: error = "Échec récupération données Universign" await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) status_changed = previous_local_status != new_local_status if not status_changed and not force: logger.debug(f"Pas de changement pour {transaction.transaction_id}") transaction.last_synced_at = datetime.now() transaction.needs_sync = False await session.commit() return True, None try: transaction.universign_status = UniversignTransactionStatus( universign_status_raw ) except ValueError: transaction.universign_status = ( UniversignTransactionStatus.COMPLETED if new_local_status == "SIGNE" else UniversignTransactionStatus.FAILED ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() if new_local_status == "SIGNE" and not transaction.signed_at: transaction.signed_at = datetime.now() if new_local_status == "REFUSE" and not transaction.refused_at: transaction.refused_at = datetime.now() if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() if universign_data.get("documents") and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] if first_doc.get("url"): transaction.document_url = first_doc["url"] await self._sync_signers(session, transaction, universign_data) transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None await self._log_sync_attempt( session=session, transaction=transaction, sync_type="polling", success=True, error_message=None, previous_status=previous_local_status, new_status=new_local_status, changes=json.dumps( { "status_changed": status_changed, "universign_raw": universign_status_raw, "response_time_ms": result.get("response_time_ms"), } ), ) await session.commit() if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( f"Sync OK: {transaction.transaction_id} {previous_local_status} → {new_local_status}" ) return True, None async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where( and_( UniversignTransaction.needs_sync, or_( ~UniversignTransaction.local_status.in_( [ LocalDocumentStatus.SIGNED, LocalDocumentStatus.REJECTED, LocalDocumentStatus.EXPIRED, ] ), UniversignTransaction.last_synced_at < (datetime.now() - timedelta(hours=1)), UniversignTransaction.last_synced_at.is_(None), ), ) ) .order_by(UniversignTransaction.created_at.asc()) .limit(max_transactions) ) result = await session.execute(query) transactions = result.scalars().all() stats = { "total_found": len(transactions), "success": 0, "failed": 0, "skipped": 0, "status_changes": 0, } for transaction in transactions: try: previous_status = transaction.local_status.value success, error = await self.sync_transaction( session, transaction, force=False ) if success: stats["success"] += 1 if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: stats["failed"] += 1 except Exception as e: logger.error( f"Erreur sync {transaction.transaction_id}: {e}", exc_info=True ) stats["failed"] += 1 logger.info( f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés" ) return stats async def process_webhook( self, session: AsyncSession, payload: Dict ) -> Tuple[bool, Optional[str]]: try: event_type = payload.get("event") transaction_id = payload.get("transaction_id") or payload.get("id") if not transaction_id: return False, "Pas de transaction_id dans le webhook" query = ( select(UniversignTransaction) .options(selectinload(UniversignTransaction.signers)) .where(UniversignTransaction.transaction_id == transaction_id) ) result = await session.execute(query) transaction = result.scalar_one_or_none() if not transaction: logger.warning( f"Webhook reçu pour transaction inconnue: {transaction_id}" ) return False, "Transaction inconnue" transaction.webhook_received = True success, error = await self.sync_transaction( session, transaction, force=True ) await self._log_sync_attempt( session=session, transaction=transaction, sync_type=f"webhook:{event_type}", success=success, error_message=error, changes=json.dumps(payload), ) await session.commit() logger.info( f"Webhook traité: {transaction_id} event={event_type} success={success}" ) return success, error except Exception as e: logger.error(f"Erreur traitement webhook: {e}", exc_info=True) return False, str(e) async def _sync_signers( self, session: AsyncSession, transaction: UniversignTransaction, universign_data: Dict, ): signers_data = universign_data.get("signers", []) # Ne pas toucher aux signers existants si Universign n'en retourne pas if not signers_data: return # Mettre à jour les signers existants ou en créer de nouveaux existing_signers = {s.email: s for s in transaction.signers} for idx, signer_data in enumerate(signers_data): email = signer_data.get("email", "") if email in existing_signers: # Mise à jour du signer existant signer = existing_signers[email] signer.status = UniversignSignerStatus( signer_data.get("status", "waiting") ) signer.viewed_at = ( self._parse_date(signer_data.get("viewed_at")) or signer.viewed_at ) signer.signed_at = ( self._parse_date(signer_data.get("signed_at")) or signer.signed_at ) signer.refused_at = ( self._parse_date(signer_data.get("refused_at")) or signer.refused_at ) else: # Nouveau signer signer = UniversignSigner( id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}", transaction_id=transaction.id, email=email, name=signer_data.get("name"), status=UniversignSignerStatus(signer_data.get("status", "waiting")), order_index=idx, viewed_at=self._parse_date(signer_data.get("viewed_at")), signed_at=self._parse_date(signer_data.get("signed_at")), refused_at=self._parse_date(signer_data.get("refused_at")), ) session.add(signer) async def _log_sync_attempt( self, session: AsyncSession, transaction: UniversignTransaction, sync_type: str, success: bool, error_message: Optional[str] = None, previous_status: Optional[str] = None, new_status: Optional[str] = None, changes: Optional[str] = None, ): log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, sync_timestamp=datetime.now(), previous_status=previous_status, new_status=new_status, changes_detected=changes, success=success, error_message=error_message, ) session.add(log) async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): actions = get_status_actions(new_status) if not actions: return if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) if actions.get("send_notification"): await self._send_notification(session, transaction, new_status) async def _update_sage_status( self, transaction: UniversignTransaction, status: str ): if not self.sage_client: logger.warning("sage_client non configuré pour mise à jour Sage") return try: type_doc = transaction.sage_document_type.value doc_id = transaction.sage_document_id if status == "SIGNE": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=2 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)") elif status == "EN_COURS": self.sage_client.changer_statut_document( document_type_code=type_doc, numero=doc_id, nouveau_statut=1 ) logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)") except Exception as e: logger.error( f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}" ) async def _send_notification( self, session: AsyncSession, transaction: UniversignTransaction, status: str ): if not self.email_queue or not self.settings: logger.warning("email_queue ou settings non configuré") return try: if status == "SIGNE": template = templates_signature_email["signature_confirmee"] type_labels = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 60: "Facture", 50: "Avoir", } variables = { "NOM_SIGNATAIRE": transaction.requester_name or "Client", "TYPE_DOC": type_labels.get( transaction.sage_document_type.value, "Document" ), "NUMERO": transaction.sage_document_id, "DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M") if transaction.signed_at else datetime.now().strftime("%d/%m/%Y à %H:%M"), "TRANSACTION_ID": transaction.transaction_id, "CONTACT_EMAIL": self.settings.smtp_from, } sujet = template["sujet"] corps = template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) email_log = EmailLog( id=str(uuid.uuid4()), destinataire=transaction.requester_email, sujet=sujet, corps_html=corps, document_ids=transaction.sage_document_id, type_document=transaction.sage_document_type.value, statut=StatutEmail.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() self.email_queue.enqueue(email_log.id) logger.info( f"Email confirmation signature envoyé à {transaction.requester_email}" ) except Exception as e: logger.error( f"Erreur envoi notification pour {transaction.transaction_id}: {e}" ) @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except Exception: return None class UniversignSyncScheduler: def __init__(self, sync_service: UniversignSyncService, interval_minutes: int = 5): self.sync_service = sync_service self.interval_minutes = interval_minutes self.is_running = False async def start(self, session_factory): import asyncio self.is_running = True logger.info( f"Démarrage polling Universign (intervalle: {self.interval_minutes}min)" ) while self.is_running: try: async with session_factory() as session: stats = await self.sync_service.sync_all_pending(session) logger.info( f"Polling: {stats['success']} transactions synchronisées, " f"{stats['status_changes']} changements" ) except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) await asyncio.sleep(self.interval_minutes * 60) def stop(self): self.is_running = False logger.info("Arrêt polling Universign")