# -*- coding: utf-8 -*- """ Queue d'envoi d'emails avec threading et génération PDF Version VPS Linux - utilise sage_client pour récupérer les données """ import threading import queue import time import asyncio from datetime import datetime, timedelta from typing import Optional from tenacity import retry, stop_after_attempt, wait_exponential import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.application import MIMEApplication from config import settings import logging logger = logging.getLogger(__name__) class EmailQueue: """ Queue d'emails avec workers threadés et retry automatique """ def __init__(self): self.queue = queue.Queue() self.workers = [] self.running = False self.session_factory = None self.sage_client = None def start(self, num_workers: int = 3): """Démarre les workers""" if self.running: logger.warning("Queue déjà démarrée") return self.running = True for i in range(num_workers): worker = threading.Thread( target=self._worker, name=f"EmailWorker-{i}", daemon=True ) worker.start() self.workers.append(worker) logger.info(f"✅ Queue email démarrée avec {num_workers} worker(s)") def stop(self): """Arrête les workers proprement""" logger.info("🛑 Arrêt de la queue email...") self.running = False # Attendre que la queue soit vide (max 30s) try: self.queue.join() logger.info("✅ Queue email arrêtée proprement") except: logger.warning("⚠️ Timeout lors de l'arrêt de la queue") def enqueue(self, email_log_id: str): """Ajoute un email dans la queue""" self.queue.put(email_log_id) logger.debug(f"📨 Email {email_log_id} ajouté à la queue") def _worker(self): """Worker qui traite les emails dans un thread""" # Créer une event loop pour ce thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: while self.running: try: # Récupérer un email de la queue (timeout 1s) email_log_id = self.queue.get(timeout=1) # Traiter l'email loop.run_until_complete(self._process_email(email_log_id)) # Marquer comme traité self.queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"❌ Erreur worker: {e}", exc_info=True) try: self.queue.task_done() except: pass finally: loop.close() async def _process_email(self, email_log_id: str): """Traite un email avec retry automatique""" from database import EmailLog, StatutEmail from sqlalchemy import select if not self.session_factory: logger.error("❌ session_factory non configuré") return async with self.session_factory() as session: # Charger l'email log result = await session.execute( select(EmailLog).where(EmailLog.id == email_log_id) ) email_log = result.scalar_one_or_none() if not email_log: logger.error(f"❌ Email log {email_log_id} introuvable") return # Marquer comme en cours email_log.statut = StatutEmail.EN_COURS email_log.nb_tentatives += 1 await session.commit() try: # Envoi avec retry automatique await self._send_with_retry(email_log) # Succès email_log.statut = StatutEmail.ENVOYE email_log.date_envoi = datetime.now() email_log.derniere_erreur = None logger.info(f"✅ Email envoyé: {email_log.destinataire}") except Exception as e: # Échec email_log.statut = StatutEmail.ERREUR email_log.derniere_erreur = str(e)[:1000] # Limiter la taille # Programmer un retry si < max attempts if email_log.nb_tentatives < settings.max_retry_attempts: delay = settings.retry_delay_seconds * (2 ** (email_log.nb_tentatives - 1)) email_log.prochain_retry = datetime.now() + timedelta(seconds=delay) # Programmer le retry timer = threading.Timer(delay, self.enqueue, args=[email_log_id]) timer.daemon = True timer.start() logger.warning(f"⚠️ Retry prévu dans {delay}s pour {email_log.destinataire}") else: logger.error(f"❌ Échec définitif: {email_log.destinataire} - {e}") await session.commit() @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def _send_with_retry(self, email_log): """Envoi SMTP avec retry Tenacity + génération PDF""" # Préparer le message msg = MIMEMultipart() msg['From'] = settings.smtp_from msg['To'] = email_log.destinataire msg['Subject'] = email_log.sujet # Corps HTML msg.attach(MIMEText(email_log.corps_html, 'html')) # 📎 GÉNÉRATION ET ATTACHEMENT DES PDFs if email_log.document_ids: document_ids = email_log.document_ids.split(',') type_doc = email_log.type_document for doc_id in document_ids: doc_id = doc_id.strip() if not doc_id: continue try: # Générer PDF (appel bloquant dans thread séparé) pdf_bytes = await asyncio.to_thread( self._generate_pdf, doc_id, type_doc ) if pdf_bytes: # Attacher PDF part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf") part['Content-Disposition'] = f'attachment; filename="{doc_id}.pdf"' msg.attach(part) logger.info(f"📎 PDF attaché: {doc_id}.pdf") except Exception as e: logger.error(f"❌ Erreur génération PDF {doc_id}: {e}") # Continuer avec les autres PDFs # Envoi SMTP (bloquant mais dans thread séparé) await asyncio.to_thread(self._send_smtp, msg) def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes: """ Génération PDF via ReportLab + sage_client ⚠️ Cette méthode est appelée depuis un thread worker """ from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas from reportlab.lib.units import cm from io import BytesIO if not self.sage_client: logger.error("❌ sage_client non configuré") raise Exception("sage_client non disponible") # 📡 Récupérer document depuis gateway Windows via HTTP try: doc = self.sage_client.lire_document(doc_id, type_doc) except Exception as e: logger.error(f"❌ Erreur récupération document {doc_id}: {e}") raise Exception(f"Document {doc_id} inaccessible") if not doc: raise Exception(f"Document {doc_id} introuvable") # 📄 Créer PDF avec ReportLab buffer = BytesIO() pdf = canvas.Canvas(buffer, pagesize=A4) width, height = A4 # === EN-TÊTE === pdf.setFont("Helvetica-Bold", 20) pdf.drawString(2*cm, height - 3*cm, f"Document N° {doc_id}") # Type de document type_labels = { 0: "DEVIS", 1: "BON DE LIVRAISON", 2: "BON DE RETOUR", 3: "COMMANDE", 4: "PRÉPARATION", 5: "FACTURE" } type_label = type_labels.get(type_doc, "DOCUMENT") pdf.setFont("Helvetica", 12) pdf.drawString(2*cm, height - 4*cm, f"Type: {type_label}") # === INFORMATIONS CLIENT === y = height - 5*cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(2*cm, y, "CLIENT") y -= 0.8*cm pdf.setFont("Helvetica", 11) pdf.drawString(2*cm, y, f"Code: {doc.get('client_code', '')}") y -= 0.6*cm pdf.drawString(2*cm, y, f"Nom: {doc.get('client_intitule', '')}") y -= 0.6*cm pdf.drawString(2*cm, y, f"Date: {doc.get('date', '')}") # === LIGNES === y -= 1.5*cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(2*cm, y, "ARTICLES") y -= 1*cm pdf.setFont("Helvetica-Bold", 10) pdf.drawString(2*cm, y, "Désignation") pdf.drawString(10*cm, y, "Qté") pdf.drawString(12*cm, y, "Prix Unit.") pdf.drawString(15*cm, y, "Total HT") y -= 0.5*cm pdf.line(2*cm, y, width - 2*cm, y) y -= 0.7*cm pdf.setFont("Helvetica", 9) for ligne in doc.get('lignes', []): # Nouvelle page si nécessaire if y < 3*cm: pdf.showPage() y = height - 3*cm pdf.setFont("Helvetica", 9) designation = ligne.get('designation', '')[:50] pdf.drawString(2*cm, y, designation) pdf.drawString(10*cm, y, str(ligne.get('quantite', 0))) pdf.drawString(12*cm, y, f"{ligne.get('prix_unitaire', 0):.2f}€") pdf.drawString(15*cm, y, f"{ligne.get('montant_ht', 0):.2f}€") y -= 0.6*cm # === TOTAUX === y -= 1*cm pdf.line(12*cm, y, width - 2*cm, y) y -= 0.8*cm pdf.setFont("Helvetica-Bold", 11) pdf.drawString(12*cm, y, "Total HT:") pdf.drawString(15*cm, y, f"{doc.get('total_ht', 0):.2f}€") y -= 0.6*cm pdf.drawString(12*cm, y, "TVA (20%):") tva = doc.get('total_ttc', 0) - doc.get('total_ht', 0) pdf.drawString(15*cm, y, f"{tva:.2f}€") y -= 0.6*cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(12*cm, y, "Total TTC:") pdf.drawString(15*cm, y, f"{doc.get('total_ttc', 0):.2f}€") # === PIED DE PAGE === pdf.setFont("Helvetica", 8) pdf.drawString(2*cm, 2*cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}") pdf.drawString(2*cm, 1.5*cm, "Sage 100c - API Dataven") # Finaliser pdf.save() buffer.seek(0) logger.info(f"✅ PDF généré: {doc_id}.pdf") return buffer.read() def _send_smtp(self, msg): """Envoi SMTP bloquant (appelé depuis asyncio.to_thread)""" try: with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30) as server: if settings.smtp_use_tls: server.starttls() if settings.smtp_user and settings.smtp_password: server.login(settings.smtp_user, settings.smtp_password) server.send_message(msg) except smtplib.SMTPException as e: raise Exception(f"Erreur SMTP: {str(e)}") except Exception as e: raise Exception(f"Erreur envoi: {str(e)}") # Instance globale email_queue = EmailQueue()