Sage100-vps/email_queue.py

313 lines
9.7 KiB
Python

import threading
import queue
import asyncio
from datetime import datetime, timedelta
import smtplib
import ssl
import socket
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from config.config import settings
import logging
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
from io import BytesIO
logger = logging.getLogger(__name__)
class EmailQueue:
def __init__(self):
self.queue = queue.Queue()
self.workers = []
self.running = False
self.session_factory = None
self.sage_client = None
def start(self, num_workers: int = 3):
if self.running:
return
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker, name=f"EmailWorker-{i}", daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"Queue email démarrée avec {num_workers} worker(s)")
def stop(self):
self.running = False
try:
self.queue.join()
except Exception:
pass
def enqueue(self, email_log_id: str):
self.queue.put(email_log_id)
def _worker(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while self.running:
try:
email_log_id = self.queue.get(timeout=1)
loop.run_until_complete(self._process_email(email_log_id))
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Erreur worker: {e}")
try:
self.queue.task_done()
except Exception:
pass
finally:
loop.close()
async def _process_email(self, email_log_id: str):
from database import EmailLog, StatutEmail
from sqlalchemy import select
if not self.session_factory:
logger.error("session_factory non configuré")
return
async with self.session_factory() as session:
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
)
email_log = result.scalar_one_or_none()
if not email_log:
logger.error(f"Email log {email_log_id} introuvable")
return
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
await session.commit()
try:
await self._send_with_retry(email_log)
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None
except Exception as e:
error_msg = str(e)
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = error_msg[:1000]
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
email_log.prochain_retry = datetime.now() + timedelta(seconds=delay)
timer = threading.Timer(delay, self.enqueue, args=[email_log_id])
timer.daemon = True
timer.start()
await session.commit()
async def _send_with_retry(self, email_log):
msg = MIMEMultipart()
msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet
msg.attach(MIMEText(email_log.corps_html, "html"))
# Attachement des PDFs
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
for doc_id in document_ids:
doc_id = doc_id.strip()
if not doc_id:
continue
try:
pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc
)
if pdf_bytes:
part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf")
part["Content-Disposition"] = (
f'attachment; filename="{doc_id}.pdf"'
)
msg.attach(part)
except Exception as e:
logger.error(f"Erreur génération PDF {doc_id}: {e}")
# Envoi SMTP
await asyncio.to_thread(self._send_smtp, msg)
def _send_smtp(self, msg):
server = None
try:
# Résolution DNS
socket.getaddrinfo(settings.smtp_host, settings.smtp_port)
# Connexion
server = smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30)
# EHLO
server.ehlo()
# STARTTLS
if settings.smtp_use_tls:
if server.has_extn("STARTTLS"):
context = ssl.create_default_context()
server.starttls(context=context)
server.ehlo()
# Authentification
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
# Envoi
refused = server.send_message(msg)
if refused:
raise Exception(f"Destinataires refusés: {refused}")
# Fermeture
server.quit()
except Exception as e:
if server:
try:
server.quit()
except Exception:
pass
raise Exception(f"Erreur SMTP: {str(e)}")
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client:
raise Exception("sage_client non disponible")
try:
doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e:
raise Exception(f"Document {doc_id} inaccessible : {e}")
if not doc:
raise Exception(f"Document {doc_id} introuvable")
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=A4)
width, height = A4
pdf.setFont("Helvetica-Bold", 20)
pdf.drawString(2 * cm, height - 3 * cm, f"Document N° {doc_id}")
type_labels = {
0: "DEVIS",
1: "BON DE LIVRAISON",
2: "BON DE RETOUR",
3: "COMMANDE",
4: "PRÉPARATION",
5: "FACTURE",
}
type_label = type_labels.get(type_doc, "DOCUMENT")
pdf.setFont("Helvetica", 12)
pdf.drawString(2 * cm, height - 4 * cm, f"Type: {type_label}")
y = height - 5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "CLIENT")
y -= 0.8 * cm
pdf.setFont("Helvetica", 11)
pdf.drawString(2 * cm, y, f"Code: {doc.get('client_code') or ''}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Nom: {doc.get('client_intitule') or ''}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Date: {doc.get('date') or ''}")
y -= 1.5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "ARTICLES")
y -= 1 * cm
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(2 * cm, y, "Désignation")
pdf.drawString(10 * cm, y, "Qté")
pdf.drawString(12 * cm, y, "Prix Unit.")
pdf.drawString(15 * cm, y, "Total HT")
y -= 0.5 * cm
pdf.line(2 * cm, y, width - 2 * cm, y)
y -= 0.7 * cm
pdf.setFont("Helvetica", 9)
for ligne in doc.get("lignes", []):
if y < 3 * cm:
pdf.showPage()
y = height - 3 * cm
pdf.setFont("Helvetica", 9)
designation = (
ligne.get("designation") or ligne.get("designation_article") or ""
)
if designation:
designation = str(designation)[:50]
else:
designation = ""
pdf.drawString(2 * cm, y, designation)
pdf.drawString(10 * cm, y, str(ligne.get("quantite") or 0))
pdf.drawString(
12 * cm,
y,
f"{ligne.get('prix_unitaire_ht') or ligne.get('prix_unitaire', 0):.2f}",
)
pdf.drawString(
15 * cm,
y,
f"{ligne.get('montant_ligne_ht') or ligne.get('montant_ht', 0):.2f}",
)
y -= 0.6 * cm
y -= 1 * cm
pdf.line(12 * cm, y, width - 2 * cm, y)
y -= 0.8 * cm
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(12 * cm, y, "Total HT:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ht_net') or 0:.2f}")
y -= 0.6 * cm
pdf.drawString(12 * cm, y, "TVA (20%):")
tva = (doc.get("total_ttc") or 0) - (doc.get("total_ht_net") or 0)
pdf.drawString(15 * cm, y, f"{tva:.2f}")
y -= 0.6 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(12 * cm, y, "Total TTC:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ttc') or 0:.2f}")
pdf.setFont("Helvetica", 8)
pdf.drawString(
2 * cm, 2 * cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}"
)
pdf.drawString(2 * cm, 1.5 * cm, "Sage 100c - API Dataven")
pdf.save()
buffer.seek(0)
return buffer.read()
email_queue = EmailQueue()