from typing import Dict from config.config import settings import logging from datetime import datetime import uuid import requests from sqlalchemy.ext.asyncio import AsyncSession from data.data import templates_signature_email from database import EmailLog, StatutEmail as StatutEmailEnum logger = logging.getLogger(__name__) async def universign_envoyer( doc_id: str, pdf_bytes: bytes, email: str, nom: str, doc_data: Dict, session: AsyncSession, ) -> Dict: from email_queue import email_queue try: api_key = settings.universign_api_key api_url = settings.universign_api_url auth = (api_key, "") logger.info(f" Démarrage processus Universign pour {email}") logger.info(f"Document: {doc_id} ({doc_data.get('type_label')})") if not pdf_bytes or len(pdf_bytes) == 0: raise Exception("Le PDF généré est vide") logger.info(f"PDF valide : {len(pdf_bytes)} octets") logger.info("ÉTAPE 1/6 : Création transaction") response = requests.post( f"{api_url}/transactions", auth=auth, json={ "name": f"{doc_data.get('type_label', 'Document')} {doc_id}", "language": "fr", }, timeout=30, ) if response.status_code != 200: logger.error(f"Erreur création transaction: {response.text}") raise Exception(f"Erreur création transaction: {response.status_code}") transaction_id = response.json().get("id") logger.info(f"Transaction créée: {transaction_id}") logger.info("ÉTAPE 2/6 : Upload PDF") files = { "file": ( f"{doc_data.get('type_label', 'Document')}_{doc_id}.pdf", pdf_bytes, "application/pdf", ) } response = requests.post( f"{api_url}/files", auth=auth, files=files, timeout=60, ) if response.status_code not in [200, 201]: logger.error(f"Erreur upload: {response.text}") raise Exception(f"Erreur upload fichier: {response.status_code}") file_id = response.json().get("id") logger.info(f"Fichier uploadé: {file_id}") logger.info("ÉTAPE 3/6 : Ajout document à transaction") response = requests.post( f"{api_url}/transactions/{transaction_id}/documents", auth=auth, data={"document": file_id}, timeout=30, ) if response.status_code not in [200, 201]: logger.error(f"Erreur ajout document: {response.text}") raise Exception(f"Erreur ajout document: {response.status_code}") document_id = response.json().get("id") logger.info(f"Document ajouté: {document_id}") logger.info("ÉTAPE 4/6 : Création champ signature") response = requests.post( f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields", auth=auth, data={ "type": "signature", }, timeout=30, ) if response.status_code not in [200, 201]: logger.error(f"Erreur création champ: {response.text}") raise Exception(f"Erreur création champ: {response.status_code}") field_id = response.json().get("id") logger.info(f"Champ créé: {field_id}") logger.info(" ÉTAPE 5/6 : Liaison signataire au champ") response = requests.post( f"{api_url}/transactions/{transaction_id}/signatures", # /signatures pas /signers auth=auth, data={ "signer": email, "field": field_id, }, timeout=30, ) if response.status_code not in [200, 201]: logger.error(f"Erreur liaison signataire: {response.text}") raise Exception(f"Erreur liaison signataire: {response.status_code}") logger.info(f"Signataire lié: {email}") logger.info("ÉTAPE 6/6 : Démarrage transaction") response = requests.post( f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30 ) if response.status_code not in [200, 201]: logger.error(f"Erreur démarrage: {response.text}") raise Exception(f"Erreur démarrage: {response.status_code}") final_data = response.json() logger.info("Transaction démarrée") logger.info("Récupération URL de signature") signer_url = "" if final_data.get("actions"): for action in final_data["actions"]: if action.get("url"): signer_url = action["url"] break if not signer_url and final_data.get("signers"): for signer in final_data["signers"]: if signer.get("email") == email: signer_url = signer.get("url", "") break if not signer_url: logger.error(f"URL introuvable dans: {final_data}") raise ValueError("URL de signature non retournée par Universign") logger.info("URL récupérée") logger.info(" Préparation email") template = templates_signature_email["demande_signature"] type_labels = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 60: "Facture", 50: "Avoir", } variables = { "NOM_SIGNATAIRE": nom, "TYPE_DOC": type_labels.get(doc_data.get("type_doc", 0), "Document"), "NUMERO": doc_id, "DATE": doc_data.get("date", datetime.now().strftime("%d/%m/%Y")), "MONTANT_TTC": f"{doc_data.get('montant_ttc', 0):.2f}", "SIGNER_URL": signer_url, "CONTACT_EMAIL": settings.smtp_from, } sujet = template["sujet"] corps = template["corps_html"] for var, valeur in variables.items(): sujet = sujet.replace(f"{{{{{var}}}}}", str(valeur)) corps = corps.replace(f"{{{{{var}}}}}", str(valeur)) email_log = EmailLog( id=str(uuid.uuid4()), destinataire=email, sujet=sujet, corps_html=corps, document_ids=doc_id, type_document=doc_data.get("type_doc"), statut=StatutEmailEnum.EN_ATTENTE, date_creation=datetime.now(), nb_tentatives=0, ) session.add(email_log) await session.flush() email_queue.enqueue(email_log.id) logger.info(f"Email mis en file pour {email}") logger.info("🎉 Processus terminé avec succès") return { "transaction_id": transaction_id, "signer_url": signer_url, "statut": "ENVOYE", "email_log_id": email_log.id, "email_sent": True, } except Exception as e: logger.error(f"Erreur Universign: {e}", exc_info=True) return { "error": str(e), "statut": "ERREUR", "email_sent": False, } async def universign_statut(transaction_id: str) -> Dict: """Récupération statut signature""" import requests try: response = requests.get( f"{settings.universign_api_url}/transactions/{transaction_id}", auth=(settings.universign_api_key, ""), timeout=10, ) if response.status_code == 200: data = response.json() statut_map = { "draft": "EN_ATTENTE", "started": "EN_ATTENTE", "completed": "SIGNE", "refused": "REFUSE", "expired": "EXPIRE", "canceled": "REFUSE", } return { "statut": statut_map.get(data.get("state"), "EN_ATTENTE"), "date_signature": data.get("completed_at"), } else: return {"statut": "ERREUR"} except Exception as e: logger.error(f"Erreur statut Universign: {e}") return {"statut": "ERREUR", "error": str(e)}