Sage100-vps/email_queue.py
2025-12-24 11:41:31 +03:00

340 lines
11 KiB
Python

import threading
import queue
import time
import asyncio
from datetime import datetime, timedelta
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from config import settings
import logging
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
from io import BytesIO
logger = logging.getLogger(__name__)
class EmailQueue:
"""
Queue d'emails avec workers threadés et retry automatique
"""
def __init__(self):
self.queue = queue.Queue()
self.workers = []
self.running = False
self.session_factory = None
self.sage_client = None
def start(self, num_workers: int = 3):
"""Démarre les workers"""
if self.running:
logger.warning("Queue déjà démarrée")
return
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker, name=f"EmailWorker-{i}", daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"✅ Queue email démarrée avec {num_workers} worker(s)")
def stop(self):
"""Arrête les workers proprement"""
logger.info("🛑 Arrêt de la queue email...")
self.running = False
# Attendre que la queue soit vide (max 30s)
try:
self.queue.join()
logger.info("✅ Queue email arrêtée proprement")
except:
logger.warning("⚠️ Timeout lors de l'arrêt de la queue")
def enqueue(self, email_log_id: str):
"""Ajoute un email dans la queue"""
self.queue.put(email_log_id)
logger.debug(f"📨 Email {email_log_id} ajouté à la queue")
def _worker(self):
"""Worker qui traite les emails dans un thread"""
# Créer une event loop pour ce thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while self.running:
try:
# Récupérer un email de la queue (timeout 1s)
email_log_id = self.queue.get(timeout=1)
# Traiter l'email
loop.run_until_complete(self._process_email(email_log_id))
# Marquer comme traité
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"❌ Erreur worker: {e}", exc_info=True)
try:
self.queue.task_done()
except:
pass
finally:
loop.close()
async def _process_email(self, email_log_id: str):
"""Traite un email avec retry automatique"""
from database import EmailLog, StatutEmail
from sqlalchemy import select
if not self.session_factory:
logger.error("❌ session_factory non configuré")
return
async with self.session_factory() as session:
# Charger l'email log
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
)
email_log = result.scalar_one_or_none()
if not email_log:
logger.error(f"❌ Email log {email_log_id} introuvable")
return
# Marquer comme en cours
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
await session.commit()
try:
# Envoi avec retry automatique
await self._send_with_retry(email_log)
# Succès
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None
logger.info(f"✅ Email envoyé: {email_log.destinataire}")
except Exception as e:
# Échec
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = str(e)[:1000] # Limiter la taille
# Programmer un retry si < max attempts
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
email_log.prochain_retry = datetime.now() + timedelta(seconds=delay)
# Programmer le retry
timer = threading.Timer(delay, self.enqueue, args=[email_log_id])
timer.daemon = True
timer.start()
logger.warning(
f"⚠️ Retry prévu dans {delay}s pour {email_log.destinataire}"
)
else:
logger.error(f"❌ Échec définitif: {email_log.destinataire} - {e}")
await session.commit()
@retry(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def _send_with_retry(self, email_log):
"""Envoi SMTP avec retry Tenacity + génération PDF"""
# Préparer le message
msg = MIMEMultipart()
msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet
# Corps HTML
msg.attach(MIMEText(email_log.corps_html, "html"))
# 📎 GÉNÉRATION ET ATTACHEMENT DES PDFs
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
for doc_id in document_ids:
doc_id = doc_id.strip()
if not doc_id:
continue
try:
# Générer PDF (appel bloquant dans thread séparé)
pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc
)
if pdf_bytes:
# Attacher PDF
part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf")
part["Content-Disposition"] = (
f'attachment; filename="{doc_id}.pdf"'
)
msg.attach(part)
logger.info(f"📎 PDF attaché: {doc_id}.pdf")
except Exception as e:
logger.error(f"❌ Erreur génération PDF {doc_id}: {e}")
# Continuer avec les autres PDFs
# Envoi SMTP (bloquant mais dans thread séparé)
await asyncio.to_thread(self._send_smtp, msg)
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client:
logger.error("❌ sage_client non configuré")
raise Exception("sage_client non disponible")
# 📡 Récupérer document depuis gateway Windows via HTTP
try:
doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e:
logger.error(f"❌ Erreur récupération document {doc_id}: {e}")
raise Exception(f"Document {doc_id} inaccessible")
if not doc:
raise Exception(f"Document {doc_id} introuvable")
# 📄 Créer PDF avec ReportLab
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=A4)
width, height = A4
# === EN-TÊTE ===
pdf.setFont("Helvetica-Bold", 20)
pdf.drawString(2 * cm, height - 3 * cm, f"Document N° {doc_id}")
# Type de document
type_labels = {
0: "DEVIS",
1: "BON DE LIVRAISON",
2: "BON DE RETOUR",
3: "COMMANDE",
4: "PRÉPARATION",
5: "FACTURE",
}
type_label = type_labels.get(type_doc, "DOCUMENT")
pdf.setFont("Helvetica", 12)
pdf.drawString(2 * cm, height - 4 * cm, f"Type: {type_label}")
# === INFORMATIONS CLIENT ===
y = height - 5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "CLIENT")
y -= 0.8 * cm
pdf.setFont("Helvetica", 11)
pdf.drawString(2 * cm, y, f"Code: {doc.get('client_code', '')}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Nom: {doc.get('client_intitule', '')}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Date: {doc.get('date', '')}")
# === LIGNES ===
y -= 1.5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "ARTICLES")
y -= 1 * cm
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(2 * cm, y, "Désignation")
pdf.drawString(10 * cm, y, "Qté")
pdf.drawString(12 * cm, y, "Prix Unit.")
pdf.drawString(15 * cm, y, "Total HT")
y -= 0.5 * cm
pdf.line(2 * cm, y, width - 2 * cm, y)
y -= 0.7 * cm
pdf.setFont("Helvetica", 9)
for ligne in doc.get("lignes", []):
# Nouvelle page si nécessaire
if y < 3 * cm:
pdf.showPage()
y = height - 3 * cm
pdf.setFont("Helvetica", 9)
designation = ligne.get("designation", "")[:50]
pdf.drawString(2 * cm, y, designation)
pdf.drawString(10 * cm, y, str(ligne.get("quantite", 0)))
pdf.drawString(12 * cm, y, f"{ligne.get('prix_unitaire', 0):.2f}")
pdf.drawString(15 * cm, y, f"{ligne.get('montant_ht', 0):.2f}")
y -= 0.6 * cm
# === TOTAUX ===
y -= 1 * cm
pdf.line(12 * cm, y, width - 2 * cm, y)
y -= 0.8 * cm
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(12 * cm, y, "Total HT:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ht', 0):.2f}")
y -= 0.6 * cm
pdf.drawString(12 * cm, y, "TVA (20%):")
tva = doc.get("total_ttc", 0) - doc.get("total_ht", 0)
pdf.drawString(15 * cm, y, f"{tva:.2f}")
y -= 0.6 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(12 * cm, y, "Total TTC:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ttc', 0):.2f}")
# === PIED DE PAGE ===
pdf.setFont("Helvetica", 8)
pdf.drawString(
2 * cm, 2 * cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}"
)
pdf.drawString(2 * cm, 1.5 * cm, "Sage 100c - API Dataven")
# Finaliser
pdf.save()
buffer.seek(0)
logger.info(f"✅ PDF généré: {doc_id}.pdf")
return buffer.read()
def _send_smtp(self, msg):
"""Envoi SMTP bloquant (appelé depuis asyncio.to_thread)"""
try:
with smtplib.SMTP(
settings.smtp_host, settings.smtp_port, timeout=30
) as server:
if settings.smtp_use_tls:
server.starttls()
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
server.send_message(msg)
except smtplib.SMTPException as e:
raise Exception(f"Erreur SMTP: {str(e)}")
except Exception as e:
raise Exception(f"Erreur envoi: {str(e)}")
# Instance globale
email_queue = EmailQueue()