diff --git a/api.py b/api.py index db1a6dd..a78fcbc 100644 --- a/api.py +++ b/api.py @@ -129,14 +129,19 @@ async def lifespan(app: FastAPI): api_url=settings.universign_api_url, api_key=settings.universign_api_key ) + # Configuration du service avec les dépendances + sync_service.configure( + sage_client=sage_client, email_queue=email_queue, settings=settings + ) + scheduler = UniversignSyncScheduler( sync_service=sync_service, - interval_minutes=5, # Synchronisation toutes les 5 minutes + interval_minutes=5, ) sync_task = asyncio.create_task(scheduler.start(async_session_factory)) - logger.info("✓ Synchronisation Universign démarrée (5min)") + logger.info("Synchronisation Universign démarrée (5min)") yield diff --git a/create_admin.py b/create_admin.py index 8513574..d3cb786 100644 --- a/create_admin.py +++ b/create_admin.py @@ -1,12 +1,3 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Script de création du premier utilisateur administrateur - -Usage: - python create_admin.py -""" - import asyncio import sys from pathlib import Path diff --git a/routes/universign.py b/routes/universign.py index 582047d..d379962 100644 --- a/routes/universign.py +++ b/routes/universign.py @@ -80,6 +80,29 @@ async def create_signature( request: CreateSignatureRequest, session: AsyncSession = Depends(get_session) ): try: + # === VÉRIFICATION DOUBLON === + existing_query = select(UniversignTransaction).where( + UniversignTransaction.sage_document_id == request.sage_document_id, + UniversignTransaction.sage_document_type == request.sage_document_type, + ~UniversignTransaction.local_status.in_( + [ + LocalDocumentStatus.SIGNED, + LocalDocumentStatus.REJECTED, + LocalDocumentStatus.EXPIRED, + LocalDocumentStatus.ERROR, + ] + ), + ) + existing_result = await session.execute(existing_query) + existing_tx = existing_result.scalar_one_or_none() + + if existing_tx: + raise HTTPException( + 400, + f"Une demande de signature est déjà en cours pour {request.sage_document_id} " + f"(transaction: {existing_tx.transaction_id}, statut: {existing_tx.local_status.value})", + ) + pdf_bytes = email_queue._generate_pdf( request.sage_document_id, normaliser_type_doc(request.sage_document_type) ) @@ -265,6 +288,21 @@ async def create_signature( email_queue.enqueue(email_log.id) + # === MISE À JOUR STATUT SAGE (Confirmé = 1) === + try: + from sage_client import sage_client + + sage_client.changer_statut_document( + document_type_code=request.sage_document_type.value, + numero=request.sage_document_id, + nouveau_statut=1, + ) + logger.info( + f"Statut Sage mis à jour: {request.sage_document_id} → Confirmé (1)" + ) + except Exception as e: + logger.warning(f"Impossible de mettre à jour le statut Sage: {e}") + # === RÉPONSE === return TransactionResponse( id=transaction.id, @@ -558,3 +596,203 @@ async def get_transaction_logs( for log in logs ], } + + +# Ajouter ces routes dans universign.py + + +@router.get("/documents/{sage_document_id}/signatures") +async def get_signatures_for_document( + sage_document_id: str, + session: AsyncSession = Depends(get_session), +): + """Liste toutes les transactions de signature pour un document Sage""" + query = ( + select(UniversignTransaction) + .options(selectinload(UniversignTransaction.signers)) + .where(UniversignTransaction.sage_document_id == sage_document_id) + .order_by(UniversignTransaction.created_at.desc()) + ) + + result = await session.execute(query) + transactions = result.scalars().all() + + return [ + { + "id": tx.id, + "transaction_id": tx.transaction_id, + "local_status": tx.local_status.value, + "universign_status": tx.universign_status.value + if tx.universign_status + else None, + "created_at": tx.created_at.isoformat(), + "signed_at": tx.signed_at.isoformat() if tx.signed_at else None, + "signer_url": tx.signer_url, + "signers_count": len(tx.signers), + } + for tx in transactions + ] + + +@router.delete("/documents/{sage_document_id}/duplicates") +async def cleanup_duplicate_signatures( + sage_document_id: str, + keep_latest: bool = Query( + True, description="Garder la plus récente (True) ou la plus ancienne (False)" + ), + session: AsyncSession = Depends(get_session), +): + """ + Supprime les doublons de signatures pour un document. + Garde une seule transaction (la plus récente ou ancienne selon le paramètre). + """ + query = ( + select(UniversignTransaction) + .where(UniversignTransaction.sage_document_id == sage_document_id) + .order_by( + UniversignTransaction.created_at.desc() + if keep_latest + else UniversignTransaction.created_at.asc() + ) + ) + + result = await session.execute(query) + transactions = result.scalars().all() + + if len(transactions) <= 1: + return { + "success": True, + "message": "Aucun doublon trouvé", + "kept": transactions[0].transaction_id if transactions else None, + "deleted_count": 0, + } + + # Garder la première (selon l'ordre), supprimer les autres + to_keep = transactions[0] + to_delete = transactions[1:] + + deleted_ids = [] + for tx in to_delete: + deleted_ids.append(tx.transaction_id) + await session.delete(tx) + + await session.commit() + + logger.info( + f"Nettoyage doublons {sage_document_id}: gardé {to_keep.transaction_id}, supprimé {deleted_ids}" + ) + + return { + "success": True, + "document_id": sage_document_id, + "kept": { + "id": to_keep.id, + "transaction_id": to_keep.transaction_id, + "status": to_keep.local_status.value, + "created_at": to_keep.created_at.isoformat(), + }, + "deleted_count": len(deleted_ids), + "deleted_transaction_ids": deleted_ids, + } + + +@router.delete("/transactions/{transaction_id}") +async def delete_transaction( + transaction_id: str, + session: AsyncSession = Depends(get_session), +): + """Supprime une transaction spécifique par son ID Universign""" + query = select(UniversignTransaction).where( + UniversignTransaction.transaction_id == transaction_id + ) + result = await session.execute(query) + tx = result.scalar_one_or_none() + + if not tx: + raise HTTPException(404, f"Transaction {transaction_id} introuvable") + + await session.delete(tx) + await session.commit() + + logger.info(f"Transaction {transaction_id} supprimée") + + return { + "success": True, + "deleted_transaction_id": transaction_id, + "document_id": tx.sage_document_id, + } + + +@router.post("/cleanup/all-duplicates") +async def cleanup_all_duplicates( + session: AsyncSession = Depends(get_session), +): + """ + Nettoie tous les doublons dans la base. + Pour chaque document avec plusieurs transactions, garde la plus récente non-erreur ou la plus récente. + """ + from sqlalchemy import func + + # Trouver les documents avec plusieurs transactions + subquery = ( + select( + UniversignTransaction.sage_document_id, + func.count(UniversignTransaction.id).label("count"), + ) + .group_by(UniversignTransaction.sage_document_id) + .having(func.count(UniversignTransaction.id) > 1) + ).subquery() + + duplicates_query = select(subquery.c.sage_document_id) + duplicates_result = await session.execute(duplicates_query) + duplicate_docs = [row[0] for row in duplicates_result.fetchall()] + + total_deleted = 0 + cleanup_details = [] + + for doc_id in duplicate_docs: + # Récupérer toutes les transactions pour ce document + tx_query = ( + select(UniversignTransaction) + .where(UniversignTransaction.sage_document_id == doc_id) + .order_by(UniversignTransaction.created_at.desc()) + ) + tx_result = await session.execute(tx_query) + transactions = tx_result.scalars().all() + + # Priorité: SIGNE > EN_COURS > EN_ATTENTE > autres + priority = {"SIGNE": 0, "EN_COURS": 1, "EN_ATTENTE": 2} + + def sort_key(tx): + status_priority = priority.get(tx.local_status.value, 99) + return (status_priority, -tx.created_at.timestamp()) + + sorted_txs = sorted(transactions, key=sort_key) + to_keep = sorted_txs[0] + to_delete = sorted_txs[1:] + + for tx in to_delete: + await session.delete(tx) + total_deleted += 1 + + cleanup_details.append( + { + "document_id": doc_id, + "kept": to_keep.transaction_id, + "kept_status": to_keep.local_status.value, + "deleted_count": len(to_delete), + } + ) + + await session.commit() + + logger.info( + f"Nettoyage global: {total_deleted} doublons supprimés sur {len(duplicate_docs)} documents" + ) + + return { + "success": True, + "documents_processed": len(duplicate_docs), + "total_deleted": total_deleted, + "details": cleanup_details, + } diff --git a/services/universign_sync.py b/services/universign_sync.py index f4f288c..c987c68 100644 --- a/services/universign_sync.py +++ b/services/universign_sync.py @@ -1,10 +1,13 @@ + import requests import json import logging +import uuid from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ +from sqlalchemy.orm import selectinload from database import ( UniversignTransaction, @@ -13,7 +16,10 @@ from database import ( UniversignTransactionStatus, LocalDocumentStatus, UniversignSignerStatus, + EmailLog, + StatutEmail, ) +from data.data import templates_signature_email from utils.universign_status_mapping import ( map_universign_to_local, is_transition_allowed, @@ -31,6 +37,14 @@ class UniversignSyncService: self.api_key = api_key self.timeout = timeout self.auth = (api_key, "") + self.sage_client = None + self.email_queue = None + self.settings = None + + def configure(self, sage_client, email_queue, settings): + self.sage_client = sage_client + self.email_queue = email_queue + self.settings = settings def fetch_transaction_status(self, transaction_id: str) -> Optional[Dict]: start_time = datetime.now() @@ -48,9 +62,7 @@ class UniversignSyncService: if response.status_code == 200: data = response.json() logger.info( - f"✓ Fetch OK: {transaction_id} " - f"status={data.get('state')} " - f"({response_time_ms}ms)" + f"Fetch OK: {transaction_id} status={data.get('state')} ({response_time_ms}ms)" ) return { "transaction": data, @@ -67,8 +79,7 @@ class UniversignSyncService: else: logger.error( - f"Erreur HTTP {response.status_code} " - f"pour {transaction_id}: {response.text}" + f"Erreur HTTP {response.status_code} pour {transaction_id}: {response.text}" ) return None @@ -88,15 +99,12 @@ class UniversignSyncService: ) -> Tuple[bool, Optional[str]]: if is_final_status(transaction.local_status.value) and not force: logger.debug( - f"Skip {transaction.transaction_id}: " - f"statut final {transaction.local_status.value}" + f"Skip {transaction.transaction_id}: statut final {transaction.local_status.value}" ) transaction.needs_sync = False await session.commit() return True, None - # === FETCH UNIVERSIGN === - result = self.fetch_transaction_status(transaction.transaction_id) if not result: @@ -104,29 +112,20 @@ class UniversignSyncService: await self._log_sync_attempt(session, transaction, "polling", False, error) return False, error - # === EXTRACTION DONNÉES === - universign_data = result["transaction"] universign_status_raw = universign_data.get("state", "draft") - # === MAPPING STATUT === - new_local_status = map_universign_to_local(universign_status_raw) previous_local_status = transaction.local_status.value - # === VALIDATION TRANSITION === - if not is_transition_allowed(previous_local_status, new_local_status): logger.warning( f"Transition refusée: {previous_local_status} → {new_local_status}" ) - # En cas de conflit, résoudre par priorité new_local_status = resolve_status_conflict( previous_local_status, new_local_status ) - # === DÉTECTION CHANGEMENT === - status_changed = previous_local_status != new_local_status if not status_changed and not force: @@ -136,16 +135,20 @@ class UniversignSyncService: await session.commit() return True, None - # === MISE À JOUR TRANSACTION === + try: + transaction.universign_status = UniversignTransactionStatus( + universign_status_raw + ) + except ValueError: + transaction.universign_status = ( + UniversignTransactionStatus.COMPLETED + if new_local_status == "SIGNE" + else UniversignTransactionStatus.FAILED + ) - transaction.universign_status = UniversignTransactionStatus( - universign_status_raw - ) transaction.local_status = LocalDocumentStatus(new_local_status) transaction.universign_status_updated_at = datetime.now() - # === DATES SPÉCIFIQUES === - if new_local_status == "EN_COURS" and not transaction.sent_at: transaction.sent_at = datetime.now() @@ -158,31 +161,18 @@ class UniversignSyncService: if new_local_status == "EXPIRE" and not transaction.expired_at: transaction.expired_at = datetime.now() - # === URLS === - - if "signers" in universign_data and len(universign_data["signers"]) > 0: - first_signer = universign_data["signers"][0] - if "url" in first_signer: - transaction.signer_url = first_signer["url"] - - if "documents" in universign_data and len(universign_data["documents"]) > 0: + if universign_data.get("documents") and len(universign_data["documents"]) > 0: first_doc = universign_data["documents"][0] - if "url" in first_doc: + if first_doc.get("url"): transaction.document_url = first_doc["url"] - # === SIGNATAIRES === - await self._sync_signers(session, transaction, universign_data) - # === FLAGS === - transaction.last_synced_at = datetime.now() transaction.sync_attempts += 1 transaction.needs_sync = not is_final_status(new_local_status) transaction.sync_error = None - # === LOG === - await self._log_sync_attempt( session=session, transaction=transaction, @@ -202,14 +192,11 @@ class UniversignSyncService: await session.commit() - # === ACTIONS MÉTIER === - if status_changed: await self._execute_status_actions(session, transaction, new_local_status) logger.info( - f"✓ Sync OK: {transaction.transaction_id} " - f"{previous_local_status} → {new_local_status}" + f"Sync OK: {transaction.transaction_id} {previous_local_status} → {new_local_status}" ) return True, None @@ -217,14 +204,9 @@ class UniversignSyncService: async def sync_all_pending( self, session: AsyncSession, max_transactions: int = 50 ) -> Dict[str, int]: - """ - Synchronise toutes les transactions en attente - """ - from sqlalchemy.orm import selectinload # Si pas déjà importé en haut - query = ( select(UniversignTransaction) - .options(selectinload(UniversignTransaction.signers)) # AJOUTER CETTE LIGNE + .options(selectinload(UniversignTransaction.signers)) .where( and_( UniversignTransaction.needs_sync, @@ -267,7 +249,6 @@ class UniversignSyncService: if success: stats["success"] += 1 - if transaction.local_status.value != previous_status: stats["status_changes"] += 1 else: @@ -280,8 +261,7 @@ class UniversignSyncService: stats["failed"] += 1 logger.info( - f"Polling terminé: {stats['success']}/{stats['total_found']} OK, " - f"{stats['status_changes']} changements détectés" + f"Polling terminé: {stats['success']}/{stats['total_found']} OK, {stats['status_changes']} changements détectés" ) return stats @@ -296,8 +276,10 @@ class UniversignSyncService: if not transaction_id: return False, "Pas de transaction_id dans le webhook" - query = select(UniversignTransaction).where( - UniversignTransaction.transaction_id == transaction_id + query = ( + select(UniversignTransaction) + .options(selectinload(UniversignTransaction.signers)) + .where(UniversignTransaction.transaction_id == transaction_id) ) result = await session.execute(query) transaction = result.scalar_one_or_none() @@ -326,8 +308,7 @@ class UniversignSyncService: await session.commit() logger.info( - f"✓ Webhook traité: {transaction_id} " - f"event={event_type} success={success}" + f"Webhook traité: {transaction_id} event={event_type} success={success}" ) return success, error @@ -342,27 +323,47 @@ class UniversignSyncService: transaction: UniversignTransaction, universign_data: Dict, ): - """Synchronise les signataires""" signers_data = universign_data.get("signers", []) - # Supprimer les anciens signataires - for signer in transaction.signers: - await session.delete(signer) + # Ne pas toucher aux signers existants si Universign n'en retourne pas + if not signers_data: + return + + # Mettre à jour les signers existants ou en créer de nouveaux + existing_signers = {s.email: s for s in transaction.signers} - # Créer les nouveaux for idx, signer_data in enumerate(signers_data): - signer = UniversignSigner( - id=f"{transaction.id}_signer_{idx}", - transaction_id=transaction.id, - email=signer_data.get("email", ""), - name=signer_data.get("name"), - status=UniversignSignerStatus(signer_data.get("status", "waiting")), - order_index=idx, - viewed_at=self._parse_date(signer_data.get("viewed_at")), - signed_at=self._parse_date(signer_data.get("signed_at")), - refused_at=self._parse_date(signer_data.get("refused_at")), - ) - session.add(signer) + email = signer_data.get("email", "") + + if email in existing_signers: + # Mise à jour du signer existant + signer = existing_signers[email] + signer.status = UniversignSignerStatus( + signer_data.get("status", "waiting") + ) + signer.viewed_at = ( + self._parse_date(signer_data.get("viewed_at")) or signer.viewed_at + ) + signer.signed_at = ( + self._parse_date(signer_data.get("signed_at")) or signer.signed_at + ) + signer.refused_at = ( + self._parse_date(signer_data.get("refused_at")) or signer.refused_at + ) + else: + # Nouveau signer + signer = UniversignSigner( + id=f"{transaction.id}_signer_{idx}_{int(datetime.now().timestamp())}", + transaction_id=transaction.id, + email=email, + name=signer_data.get("name"), + status=UniversignSignerStatus(signer_data.get("status", "waiting")), + order_index=idx, + viewed_at=self._parse_date(signer_data.get("viewed_at")), + signed_at=self._parse_date(signer_data.get("signed_at")), + refused_at=self._parse_date(signer_data.get("refused_at")), + ) + session.add(signer) async def _log_sync_attempt( self, @@ -375,7 +376,6 @@ class UniversignSyncService: new_status: Optional[str] = None, changes: Optional[str] = None, ): - """Enregistre une tentative de sync dans les logs""" log = UniversignSyncLog( transaction_id=transaction.id, sync_type=sync_type, @@ -391,48 +391,112 @@ class UniversignSyncService: async def _execute_status_actions( self, session: AsyncSession, transaction: UniversignTransaction, new_status: str ): - """Exécute les actions métier associées au statut""" actions = get_status_actions(new_status) if not actions: return - # Mise à jour Sage if actions.get("update_sage_status"): await self._update_sage_status(transaction, new_status) - # Déclencher workflow - if actions.get("trigger_workflow"): - await self._trigger_workflow(transaction) - - # Notifications if actions.get("send_notification"): - await self._send_notification(transaction, new_status) + await self._send_notification(session, transaction, new_status) - # Archive - if actions.get("archive_document"): - await self._archive_signed_document(transaction) + async def _update_sage_status( + self, transaction: UniversignTransaction, status: str + ): + if not self.sage_client: + logger.warning("sage_client non configuré pour mise à jour Sage") + return - async def _update_sage_status(self, transaction, status): - """Met à jour le statut dans Sage""" - # TODO: Appeler sage_client.mettre_a_jour_champ_libre() - logger.info(f"TODO: Mettre à jour Sage pour {transaction.sage_document_id}") + try: + type_doc = transaction.sage_document_type.value + doc_id = transaction.sage_document_id - async def _trigger_workflow(self, transaction): - """Déclenche un workflow (ex: devis→commande)""" - logger.info(f"TODO: Workflow pour {transaction.sage_document_id}") + if status == "SIGNE": + self.sage_client.changer_statut_document( + document_type_code=type_doc, numero=doc_id, nouveau_statut=2 + ) + logger.info(f"Statut Sage mis à jour: {doc_id} → Accepté (2)") - async def _send_notification(self, transaction, status): - """Envoie une notification email""" - logger.info(f"TODO: Notif pour {transaction.sage_document_id}") + elif status == "EN_COURS": + self.sage_client.changer_statut_document( + document_type_code=type_doc, numero=doc_id, nouveau_statut=1 + ) + logger.info(f"Statut Sage mis à jour: {doc_id} → Confirmé (1)") - async def _archive_signed_document(self, transaction): - """Archive le document signé""" - logger.info(f"TODO: Archivage pour {transaction.sage_document_id}") + except Exception as e: + logger.error( + f"Erreur mise à jour Sage pour {transaction.sage_document_id}: {e}" + ) + + async def _send_notification( + self, session: AsyncSession, transaction: UniversignTransaction, status: str + ): + if not self.email_queue or not self.settings: + logger.warning("email_queue ou settings non configuré") + return + + try: + if status == "SIGNE": + template = templates_signature_email["signature_confirmee"] + + type_labels = { + 0: "Devis", + 10: "Commande", + 30: "Bon de Livraison", + 60: "Facture", + 50: "Avoir", + } + + variables = { + "NOM_SIGNATAIRE": transaction.requester_name or "Client", + "TYPE_DOC": type_labels.get( + transaction.sage_document_type.value, "Document" + ), + "NUMERO": transaction.sage_document_id, + "DATE_SIGNATURE": transaction.signed_at.strftime("%d/%m/%Y à %H:%M") + if transaction.signed_at + else datetime.now().strftime("%d/%m/%Y à %H:%M"), + "TRANSACTION_ID": transaction.transaction_id, + "CONTACT_EMAIL": self.settings.smtp_from, + } + + sujet = template["sujet"] + corps = template["corps_html"] + + for var, valeur in variables.items(): + sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) + corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) + + email_log = EmailLog( + id=str(uuid.uuid4()), + destinataire=transaction.requester_email, + sujet=sujet, + corps_html=corps, + document_ids=transaction.sage_document_id, + type_document=transaction.sage_document_type.value, + statut=StatutEmail.EN_ATTENTE, + date_creation=datetime.now(), + nb_tentatives=0, + ) + + session.add(email_log) + await session.flush() + + self.email_queue.enqueue(email_log.id) + + logger.info( + f"Email confirmation signature envoyé à {transaction.requester_email}" + ) + + except Exception as e: + logger.error( + f"Erreur envoi notification pour {transaction.transaction_id}: {e}" + ) @staticmethod def _parse_date(date_str: Optional[str]) -> Optional[datetime]: - """Parse une date ISO 8601""" if not date_str: return None try: @@ -448,7 +512,6 @@ class UniversignSyncScheduler: self.is_running = False async def start(self, session_factory): - """Démarre le polling automatique""" import asyncio self.is_running = True @@ -470,10 +533,8 @@ class UniversignSyncScheduler: except Exception as e: logger.error(f"Erreur polling: {e}", exc_info=True) - # Attendre avant le prochain cycle await asyncio.sleep(self.interval_minutes * 60) def stop(self): - """Arrête le polling""" self.is_running = False logger.info("Arrêt polling Universign") diff --git a/utils/generic_functions.py b/utils/generic_functions.py index 753e7d2..f09ee5f 100644 --- a/utils/generic_functions.py +++ b/utils/generic_functions.py @@ -297,6 +297,7 @@ UNIVERSIGN_TO_LOCAL: Dict[str, str] = { "started": "EN_COURS", # États finaux (succès) "completed": "SIGNE", + "closed": "SIGNE", # États finaux (échec) "refused": "REFUSE", "expired": "EXPIRE",