Sage100-vps/email_queue.py
2025-12-29 11:20:55 +03:00

306 lines
10 KiB
Python

import threading
import queue
import time
import asyncio
from datetime import datetime, timedelta
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from config import settings
import logging
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
from io import BytesIO
logger = logging.getLogger(__name__)
class EmailQueue:
def __init__(self):
self.queue = queue.Queue()
self.workers = []
self.running = False
self.session_factory = None
self.sage_client = None
def start(self, num_workers: int = 3):
"""Démarre les workers"""
if self.running:
logger.warning("Queue déjà démarrée")
return
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker, name=f"EmailWorker-{i}", daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f" Queue email démarrée avec {num_workers} worker(s)")
def stop(self):
"""Arrête les workers proprement"""
logger.info(" Arrêt de la queue email...")
self.running = False
try:
self.queue.join()
logger.info(" Queue email arrêtée proprement")
except:
logger.warning(" Timeout lors de l'arrêt de la queue")
def enqueue(self, email_log_id: str):
"""Ajoute un email dans la queue"""
self.queue.put(email_log_id)
logger.debug(f" Email {email_log_id} ajouté à la queue")
def _worker(self):
"""Worker qui traite les emails dans un thread"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while self.running:
try:
email_log_id = self.queue.get(timeout=1)
loop.run_until_complete(self._process_email(email_log_id))
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f" Erreur worker: {e}", exc_info=True)
try:
self.queue.task_done()
except:
pass
finally:
loop.close()
async def _process_email(self, email_log_id: str):
"""Traite un email avec retry automatique"""
from database import EmailLog, StatutEmail
from sqlalchemy import select
if not self.session_factory:
logger.error(" session_factory non configuré")
return
async with self.session_factory() as session:
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
)
email_log = result.scalar_one_or_none()
if not email_log:
logger.error(f" Email log {email_log_id} introuvable")
return
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
await session.commit()
try:
await self._send_with_retry(email_log)
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None
logger.info(f" Email envoyé: {email_log.destinataire}")
except Exception as e:
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = str(e)[:1000] # Limiter la taille
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
email_log.prochain_retry = datetime.now() + timedelta(seconds=delay)
timer = threading.Timer(delay, self.enqueue, args=[email_log_id])
timer.daemon = True
timer.start()
logger.warning(
f" Retry prévu dans {delay}s pour {email_log.destinataire}"
)
else:
logger.error(f" Échec définitif: {email_log.destinataire} - {e}")
await session.commit()
@retry(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def _send_with_retry(self, email_log):
"""Envoi SMTP avec retry Tenacity + génération PDF"""
msg = MIMEMultipart()
msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet
msg.attach(MIMEText(email_log.corps_html, "html"))
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
for doc_id in document_ids:
doc_id = doc_id.strip()
if not doc_id:
continue
try:
pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc
)
if pdf_bytes:
part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf")
part["Content-Disposition"] = (
f'attachment; filename="{doc_id}.pdf"'
)
msg.attach(part)
logger.info(f"📎 PDF attaché: {doc_id}.pdf")
except Exception as e:
logger.error(f" Erreur génération PDF {doc_id}: {e}")
await asyncio.to_thread(self._send_smtp, msg)
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client:
logger.error(" sage_client non configuré")
raise Exception("sage_client non disponible")
try:
doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e:
logger.error(f" Erreur récupération document {doc_id}: {e}")
raise Exception(f"Document {doc_id} inaccessible")
if not doc:
raise Exception(f"Document {doc_id} introuvable")
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=A4)
width, height = A4
pdf.setFont("Helvetica-Bold", 20)
pdf.drawString(2 * cm, height - 3 * cm, f"Document N° {doc_id}")
type_labels = {
0: "DEVIS",
1: "BON DE LIVRAISON",
2: "BON DE RETOUR",
3: "COMMANDE",
4: "PRÉPARATION",
5: "FACTURE",
}
type_label = type_labels.get(type_doc, "DOCUMENT")
pdf.setFont("Helvetica", 12)
pdf.drawString(2 * cm, height - 4 * cm, f"Type: {type_label}")
y = height - 5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "CLIENT")
y -= 0.8 * cm
pdf.setFont("Helvetica", 11)
pdf.drawString(2 * cm, y, f"Code: {doc.get('client_code', '')}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Nom: {doc.get('client_intitule', '')}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Date: {doc.get('date', '')}")
y -= 1.5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "ARTICLES")
y -= 1 * cm
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(2 * cm, y, "Désignation")
pdf.drawString(10 * cm, y, "Qté")
pdf.drawString(12 * cm, y, "Prix Unit.")
pdf.drawString(15 * cm, y, "Total HT")
y -= 0.5 * cm
pdf.line(2 * cm, y, width - 2 * cm, y)
y -= 0.7 * cm
pdf.setFont("Helvetica", 9)
for ligne in doc.get("lignes", []):
if y < 3 * cm:
pdf.showPage()
y = height - 3 * cm
pdf.setFont("Helvetica", 9)
designation = ligne.get("designation", "")[:50]
pdf.drawString(2 * cm, y, designation)
pdf.drawString(10 * cm, y, str(ligne.get("quantite", 0)))
pdf.drawString(12 * cm, y, f"{ligne.get('prix_unitaire', 0):.2f}")
pdf.drawString(15 * cm, y, f"{ligne.get('montant_ht', 0):.2f}")
y -= 0.6 * cm
y -= 1 * cm
pdf.line(12 * cm, y, width - 2 * cm, y)
y -= 0.8 * cm
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(12 * cm, y, "Total HT:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ht', 0):.2f}")
y -= 0.6 * cm
pdf.drawString(12 * cm, y, "TVA (20%):")
tva = doc.get("total_ttc", 0) - doc.get("total_ht", 0)
pdf.drawString(15 * cm, y, f"{tva:.2f}")
y -= 0.6 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(12 * cm, y, "Total TTC:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ttc', 0):.2f}")
pdf.setFont("Helvetica", 8)
pdf.drawString(
2 * cm, 2 * cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}"
)
pdf.drawString(2 * cm, 1.5 * cm, "Sage 100c - API Dataven")
pdf.save()
buffer.seek(0)
logger.info(f" PDF généré: {doc_id}.pdf")
return buffer.read()
def _send_smtp(self, msg):
try:
with smtplib.SMTP(
settings.smtp_host, settings.smtp_port, timeout=30
) as server:
if settings.smtp_use_tls:
server.starttls()
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
server.send_message(msg)
except smtplib.SMTPException as e:
raise Exception(f"Erreur SMTP: {str(e)}")
except Exception as e:
raise Exception(f"Erreur envoi: {str(e)}")
email_queue = EmailQueue()