import threading import queue import time import asyncio from datetime import datetime, timedelta from typing import Optional from tenacity import retry, stop_after_attempt, wait_exponential import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.application import MIMEApplication from config import settings import logging from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas from reportlab.lib.units import cm from io import BytesIO logger = logging.getLogger(__name__) class EmailQueue: def __init__(self): self.queue = queue.Queue() self.workers = [] self.running = False self.session_factory = None self.sage_client = None def start(self, num_workers: int = 3): """Démarre les workers""" if self.running: logger.warning("Queue déjà démarrée") return self.running = True for i in range(num_workers): worker = threading.Thread( target=self._worker, name=f"EmailWorker-{i}", daemon=True ) worker.start() self.workers.append(worker) logger.info(f" Queue email démarrée avec {num_workers} worker(s)") def stop(self): """Arrête les workers proprement""" logger.info(" Arrêt de la queue email...") self.running = False try: self.queue.join() logger.info(" Queue email arrêtée proprement") except: logger.warning(" Timeout lors de l'arrêt de la queue") def enqueue(self, email_log_id: str): """Ajoute un email dans la queue""" self.queue.put(email_log_id) logger.debug(f" Email {email_log_id} ajouté à la queue") def _worker(self): """Worker qui traite les emails dans un thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: while self.running: try: email_log_id = self.queue.get(timeout=1) loop.run_until_complete(self._process_email(email_log_id)) self.queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f" Erreur worker: {e}", exc_info=True) try: self.queue.task_done() except: pass finally: loop.close() async def _process_email(self, email_log_id: str): """Traite un email avec retry automatique""" from database import EmailLog, StatutEmail from sqlalchemy import select if not self.session_factory: logger.error(" session_factory non configuré") return async with self.session_factory() as session: result = await session.execute( select(EmailLog).where(EmailLog.id == email_log_id) ) email_log = result.scalar_one_or_none() if not email_log: logger.error(f" Email log {email_log_id} introuvable") return email_log.statut = StatutEmail.EN_COURS email_log.nb_tentatives += 1 await session.commit() try: await self._send_with_retry(email_log) email_log.statut = StatutEmail.ENVOYE email_log.date_envoi = datetime.now() email_log.derniere_erreur = None logger.info(f" Email envoyé: {email_log.destinataire}") except Exception as e: email_log.statut = StatutEmail.ERREUR email_log.derniere_erreur = str(e)[:1000] # Limiter la taille if email_log.nb_tentatives < settings.max_retry_attempts: delay = settings.retry_delay_seconds * ( 2 ** (email_log.nb_tentatives - 1) ) email_log.prochain_retry = datetime.now() + timedelta(seconds=delay) timer = threading.Timer(delay, self.enqueue, args=[email_log_id]) timer.daemon = True timer.start() logger.warning( f" Retry prévu dans {delay}s pour {email_log.destinataire}" ) else: logger.error(f" Échec définitif: {email_log.destinataire} - {e}") await session.commit() @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def _send_with_retry(self, email_log): """Envoi SMTP avec retry Tenacity + génération PDF""" msg = MIMEMultipart() msg["From"] = settings.smtp_from msg["To"] = email_log.destinataire msg["Subject"] = email_log.sujet msg.attach(MIMEText(email_log.corps_html, "html")) if email_log.document_ids: document_ids = email_log.document_ids.split(",") type_doc = email_log.type_document for doc_id in document_ids: doc_id = doc_id.strip() if not doc_id: continue try: pdf_bytes = await asyncio.to_thread( self._generate_pdf, doc_id, type_doc ) if pdf_bytes: part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf") part["Content-Disposition"] = ( f'attachment; filename="{doc_id}.pdf"' ) msg.attach(part) logger.info(f"📎 PDF attaché: {doc_id}.pdf") except Exception as e: logger.error(f" Erreur génération PDF {doc_id}: {e}") await asyncio.to_thread(self._send_smtp, msg) def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes: if not self.sage_client: logger.error("❌ sage_client non configuré") raise Exception("sage_client non disponible") try: doc = self.sage_client.lire_document(doc_id, type_doc) except Exception as e: logger.error(f"❌ Erreur récupération document {doc_id}: {e}") raise Exception(f"Document {doc_id} inaccessible") if not doc: raise Exception(f"Document {doc_id} introuvable") buffer = BytesIO() pdf = canvas.Canvas(buffer, pagesize=A4) width, height = A4 pdf.setFont("Helvetica-Bold", 20) pdf.drawString(2 * cm, height - 3 * cm, f"Document N° {doc_id}") type_labels = { 0: "DEVIS", 1: "BON DE LIVRAISON", 2: "BON DE RETOUR", 3: "COMMANDE", 4: "PRÉPARATION", 5: "FACTURE", } type_label = type_labels.get(type_doc, "DOCUMENT") pdf.setFont("Helvetica", 12) pdf.drawString(2 * cm, height - 4 * cm, f"Type: {type_label}") y = height - 5 * cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(2 * cm, y, "CLIENT") y -= 0.8 * cm pdf.setFont("Helvetica", 11) pdf.drawString(2 * cm, y, f"Code: {doc.get('client_code') or ''}") y -= 0.6 * cm pdf.drawString(2 * cm, y, f"Nom: {doc.get('client_intitule') or ''}") y -= 0.6 * cm pdf.drawString(2 * cm, y, f"Date: {doc.get('date') or ''}") y -= 1.5 * cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(2 * cm, y, "ARTICLES") y -= 1 * cm pdf.setFont("Helvetica-Bold", 10) pdf.drawString(2 * cm, y, "Désignation") pdf.drawString(10 * cm, y, "Qté") pdf.drawString(12 * cm, y, "Prix Unit.") pdf.drawString(15 * cm, y, "Total HT") y -= 0.5 * cm pdf.line(2 * cm, y, width - 2 * cm, y) y -= 0.7 * cm pdf.setFont("Helvetica", 9) for ligne in doc.get("lignes", []): if y < 3 * cm: pdf.showPage() y = height - 3 * cm pdf.setFont("Helvetica", 9) designation = ( ligne.get("designation") or ligne.get("designation_article") or "" ) if designation: designation = str(designation)[:50] else: designation = "" pdf.drawString(2 * cm, y, designation) pdf.drawString(10 * cm, y, str(ligne.get("quantite") or 0)) pdf.drawString(12 * cm, y, f"{ligne.get('prix_unitaire_ht') or ligne.get('prix_unitaire', 0):.2f}€") pdf.drawString(15 * cm, y, f"{ligne.get('montant_ligne_ht') or ligne.get('montant_ht', 0):.2f}€") y -= 0.6 * cm y -= 1 * cm pdf.line(12 * cm, y, width - 2 * cm, y) y -= 0.8 * cm pdf.setFont("Helvetica-Bold", 11) pdf.drawString(12 * cm, y, "Total HT:") pdf.drawString(15 * cm, y, f"{doc.get('total_ht') or 0:.2f}€") y -= 0.6 * cm pdf.drawString(12 * cm, y, "TVA (20%):") tva = (doc.get("total_ttc") or 0) - (doc.get("total_ht") or 0) pdf.drawString(15 * cm, y, f"{tva:.2f}€") y -= 0.6 * cm pdf.setFont("Helvetica-Bold", 14) pdf.drawString(12 * cm, y, "Total TTC:") pdf.drawString(15 * cm, y, f"{doc.get('total_ttc') or 0:.2f}€") pdf.setFont("Helvetica", 8) pdf.drawString( 2 * cm, 2 * cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}" ) pdf.drawString(2 * cm, 1.5 * cm, "Sage 100c - API Dataven") pdf.save() buffer.seek(0) logger.info(f"✅ PDF généré: {doc_id}.pdf") return buffer.read() def _send_smtp(self, msg): try: with smtplib.SMTP( settings.smtp_host, settings.smtp_port, timeout=30 ) as server: if settings.smtp_use_tls: server.starttls() if settings.smtp_user and settings.smtp_password: server.login(settings.smtp_user, settings.smtp_password) server.send_message(msg) except smtplib.SMTPException as e: raise Exception(f"Erreur SMTP: {str(e)}") except Exception as e: raise Exception(f"Erreur envoi: {str(e)}") email_queue = EmailQueue()