Sage100-vps/email_queue.py
2025-12-30 09:03:16 +03:00

519 lines
No EOL
21 KiB
Python

import threading
import queue
import time
import asyncio
from datetime import datetime, timedelta
import smtplib
import ssl
import socket
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from config import settings
import logging
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
from io import BytesIO
logger = logging.getLogger(__name__)
ULTRA_DEBUG = True
def debug_log(message: str, level: str = "INFO"):
if ULTRA_DEBUG:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
prefix = {
"INFO": "🔍",
"SUCCESS": "",
"ERROR": "",
"WARN": "⚠️",
"STEP": "📍",
"DATA": "📦"
}.get(level, "")
logger.info(f"{prefix} [{timestamp}] {message}")
class EmailQueue:
def __init__(self):
self.queue = queue.Queue()
self.workers = []
self.running = False
self.session_factory = None
self.sage_client = None
def start(self, num_workers: int = 3):
if self.running:
logger.warning("Queue déjà démarrée")
return
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker, name=f"EmailWorker-{i}", daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"✅ Queue email démarrée avec {num_workers} worker(s)")
def stop(self):
logger.info("🛑 Arrêt de la queue email...")
self.running = False
try:
self.queue.join()
logger.info("✅ Queue email arrêtée proprement")
except:
logger.warning("⚠️ Timeout lors de l'arrêt de la queue")
def enqueue(self, email_log_id: str):
"""Ajoute un email dans la queue"""
self.queue.put(email_log_id)
debug_log(f"Email {email_log_id} ajouté à la queue (taille: {self.queue.qsize()})")
def _worker(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
worker_name = threading.current_thread().name
debug_log(f"Worker {worker_name} démarré", "SUCCESS")
try:
while self.running:
try:
email_log_id = self.queue.get(timeout=1)
debug_log(f"[{worker_name}] Traitement email {email_log_id}", "STEP")
loop.run_until_complete(self._process_email(email_log_id))
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"❌ Erreur worker {worker_name}: {e}", exc_info=True)
try:
self.queue.task_done()
except:
pass
finally:
loop.close()
debug_log(f"Worker {worker_name} arrêté", "WARN")
async def _process_email(self, email_log_id: str):
from database import EmailLog, StatutEmail
from sqlalchemy import select
debug_log(f"═══ DÉBUT TRAITEMENT EMAIL {email_log_id} ═══", "STEP")
if not self.session_factory:
logger.error("❌ session_factory non configuré")
return
async with self.session_factory() as session:
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
)
email_log = result.scalar_one_or_none()
if not email_log:
logger.error(f"❌ Email log {email_log_id} introuvable en DB")
return
debug_log(f"Email trouvé en DB:", "DATA")
debug_log(f" → Destinataire: {email_log.destinataire}")
debug_log(f" → Sujet: {email_log.sujet[:50]}...")
debug_log(f" → Tentative: {email_log.nb_tentatives + 1}")
debug_log(f" → Documents: {email_log.document_ids}")
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
await session.commit()
try:
await self._send_with_retry(email_log)
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None
debug_log(f"Email envoyé avec succès: {email_log.destinataire}", "SUCCESS")
except Exception as e:
error_msg = str(e)
debug_log(f"ÉCHEC ENVOI: {error_msg}", "ERROR")
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = error_msg[:1000]
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
email_log.prochain_retry = datetime.now() + timedelta(seconds=delay)
timer = threading.Timer(delay, self.enqueue, args=[email_log_id])
timer.daemon = True
timer.start()
debug_log(f"Retry #{email_log.nb_tentatives + 1} planifié dans {delay}s", "WARN")
else:
debug_log(f"ÉCHEC DÉFINITIF après {email_log.nb_tentatives} tentatives", "ERROR")
await session.commit()
debug_log(f"═══ FIN TRAITEMENT EMAIL {email_log_id} ═══", "STEP")
async def _send_with_retry(self, email_log):
debug_log("Construction du message MIME...", "STEP")
msg = MIMEMultipart()
msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet
debug_log(f"Headers configurés:", "DATA")
debug_log(f" → From: {settings.smtp_from}")
debug_log(f" → To: {email_log.destinataire}")
debug_log(f" → Subject: {email_log.sujet}")
msg.attach(MIMEText(email_log.corps_html, "html"))
debug_log(f"Corps HTML attaché ({len(email_log.corps_html)} chars)")
# Attachement des PDFs
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
debug_log(f"Documents à attacher: {document_ids}")
for doc_id in document_ids:
doc_id = doc_id.strip()
if not doc_id:
continue
try:
debug_log(f"Génération PDF pour {doc_id}...")
pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc
)
if pdf_bytes:
part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf")
part["Content-Disposition"] = (
f'attachment; filename="{doc_id}.pdf"'
)
msg.attach(part)
debug_log(f"PDF attaché: {doc_id}.pdf ({len(pdf_bytes)} bytes)", "SUCCESS")
except Exception as e:
debug_log(f"Erreur génération PDF {doc_id}: {e}", "ERROR")
# Envoi SMTP
debug_log("Lancement envoi SMTP...", "STEP")
await asyncio.to_thread(self._send_smtp, msg)
def _send_smtp(self, msg):
debug_log("═══════════════════════════════════════════", "STEP")
debug_log(" DÉBUT ENVOI SMTP ULTRA DEBUG", "STEP")
debug_log("═══════════════════════════════════════════", "STEP")
# ═══ CONFIGURATION ═══
debug_log("CONFIGURATION SMTP:", "DATA")
debug_log(f" → Host: {settings.smtp_host}")
debug_log(f" → Port: {settings.smtp_port}")
debug_log(f" → User: {settings.smtp_user}")
debug_log(f" → Password: {'*' * len(settings.smtp_password) if settings.smtp_password else 'NON DÉFINI'}")
debug_log(f" → From: {settings.smtp_from}")
debug_log(f" → TLS: {settings.smtp_use_tls}")
debug_log(f" → To: {msg['To']}")
server = None
try:
# ═══ ÉTAPE 1: RÉSOLUTION DNS ═══
debug_log("ÉTAPE 1/7: Résolution DNS...", "STEP")
try:
ip_addresses = socket.getaddrinfo(settings.smtp_host, settings.smtp_port)
debug_log(f" → DNS résolu: {ip_addresses[0][4]}", "SUCCESS")
except socket.gaierror as e:
debug_log(f" → ÉCHEC DNS: {e}", "ERROR")
raise Exception(f"Résolution DNS échouée pour {settings.smtp_host}: {e}")
# ═══ ÉTAPE 2: CONNEXION TCP ═══
debug_log("ÉTAPE 2/7: Connexion TCP...", "STEP")
start_time = time.time()
try:
server = smtplib.SMTP(
settings.smtp_host,
settings.smtp_port,
timeout=30
)
server.set_debuglevel(2 if ULTRA_DEBUG else 0) # Active le debug SMTP natif
connect_time = time.time() - start_time
debug_log(f" → Connexion établie en {connect_time:.2f}s", "SUCCESS")
except socket.timeout:
debug_log(f" → TIMEOUT connexion (>30s)", "ERROR")
raise Exception(f"Timeout connexion TCP vers {settings.smtp_host}:{settings.smtp_port}")
except ConnectionRefusedError:
debug_log(f" → CONNEXION REFUSÉE", "ERROR")
raise Exception(f"Connexion refusée par {settings.smtp_host}:{settings.smtp_port}")
except Exception as e:
debug_log(f" → ERREUR CONNEXION: {type(e).__name__}: {e}", "ERROR")
raise
# ═══ ÉTAPE 3: EHLO ═══
debug_log("ÉTAPE 3/7: Envoi EHLO...", "STEP")
try:
ehlo_code, ehlo_msg = server.ehlo()
debug_log(f" → EHLO Response: {ehlo_code}", "SUCCESS")
debug_log(f" → Capabilities: {ehlo_msg.decode()[:200]}...")
except Exception as e:
debug_log(f" → ÉCHEC EHLO: {e}", "ERROR")
raise
# ═══ ÉTAPE 4: STARTTLS ═══
if settings.smtp_use_tls:
debug_log("ÉTAPE 4/7: Négociation STARTTLS...", "STEP")
try:
# Vérifier si le serveur supporte STARTTLS
if server.has_extn('STARTTLS'):
debug_log(" → Serveur supporte STARTTLS", "SUCCESS")
# Créer un contexte SSL
context = ssl.create_default_context()
debug_log(f" → Contexte SSL créé (protocole: {context.protocol})")
tls_code, tls_msg = server.starttls(context=context)
debug_log(f" → STARTTLS Response: {tls_code} - {tls_msg}", "SUCCESS")
# Re-EHLO après STARTTLS
server.ehlo()
debug_log(" → Re-EHLO après TLS: OK", "SUCCESS")
else:
debug_log(" → Serveur ne supporte PAS STARTTLS!", "WARN")
except smtplib.SMTPNotSupportedError:
debug_log(" → STARTTLS non supporté par le serveur", "WARN")
except ssl.SSLError as e:
debug_log(f" → ERREUR SSL: {e}", "ERROR")
raise Exception(f"Erreur SSL/TLS: {e}")
except Exception as e:
debug_log(f" → ÉCHEC STARTTLS: {type(e).__name__}: {e}", "ERROR")
raise
else:
debug_log("ÉTAPE 4/7: STARTTLS désactivé (smtp_use_tls=False)", "WARN")
# ═══ ÉTAPE 5: AUTHENTIFICATION ═══
debug_log("ÉTAPE 5/7: Authentification...", "STEP")
if settings.smtp_user and settings.smtp_password:
debug_log(f" → Tentative login avec: {settings.smtp_user}")
try:
# Lister les méthodes d'auth supportées
if server.has_extn('AUTH'):
auth_methods = server.esmtp_features.get('auth', '')
debug_log(f" → Méthodes AUTH supportées: {auth_methods}")
server.login(settings.smtp_user, settings.smtp_password)
debug_log(" → Authentification RÉUSSIE", "SUCCESS")
except smtplib.SMTPAuthenticationError as e:
debug_log(f" → ÉCHEC AUTHENTIFICATION: {e.smtp_code} - {e.smtp_error}", "ERROR")
debug_log(f" → Code: {e.smtp_code}", "ERROR")
debug_log(f" → Message: {e.smtp_error.decode() if isinstance(e.smtp_error, bytes) else e.smtp_error}", "ERROR")
# Diagnostic spécifique selon le code d'erreur
if e.smtp_code == 535:
debug_log(" → 535 = Identifiants incorrects ou app password requis", "ERROR")
elif e.smtp_code == 534:
debug_log(" → 534 = 2FA requis, utiliser un App Password", "ERROR")
elif e.smtp_code == 530:
debug_log(" → 530 = Authentification requise mais échouée", "ERROR")
raise Exception(f"Authentification SMTP échouée: {e.smtp_code} - {e.smtp_error}")
except smtplib.SMTPException as e:
debug_log(f" → ERREUR SMTP AUTH: {e}", "ERROR")
raise
else:
debug_log(" → Pas d'authentification configurée", "WARN")
# ═══ ÉTAPE 6: ENVOI DU MESSAGE ═══
debug_log("ÉTAPE 6/7: Envoi du message...", "STEP")
debug_log(f" → From: {msg['From']}")
debug_log(f" → To: {msg['To']}")
debug_log(f" → Subject: {msg['Subject']}")
debug_log(f" → Taille message: {len(msg.as_string())} bytes")
try:
# send_message retourne un dict des destinataires refusés
refused = server.send_message(msg)
if refused:
debug_log(f" → DESTINATAIRES REFUSÉS: {refused}", "ERROR")
raise Exception(f"Destinataires refusés: {refused}")
else:
debug_log(" → Message envoyé avec succès!", "SUCCESS")
except smtplib.SMTPRecipientsRefused as e:
debug_log(f" → DESTINATAIRE REFUSÉ: {e.recipients}", "ERROR")
raise Exception(f"Destinataire refusé: {e.recipients}")
except smtplib.SMTPSenderRefused as e:
debug_log(f" → EXPÉDITEUR REFUSÉ: {e.smtp_code} - {e.smtp_error}", "ERROR")
debug_log(f" → L'adresse '{msg['From']}' n'est pas autorisée à envoyer", "ERROR")
raise Exception(f"Expéditeur refusé: {e.smtp_code} - {e.smtp_error}")
except smtplib.SMTPDataError as e:
debug_log(f" → ERREUR DATA: {e.smtp_code} - {e.smtp_error}", "ERROR")
raise Exception(f"Erreur DATA SMTP: {e.smtp_code} - {e.smtp_error}")
# ═══ ÉTAPE 7: QUIT ═══
debug_log("ÉTAPE 7/7: Fermeture connexion...", "STEP")
try:
server.quit()
debug_log(" → Connexion fermée proprement", "SUCCESS")
except:
pass
debug_log("═══════════════════════════════════════════", "SUCCESS")
debug_log(" ENVOI SMTP TERMINÉ AVEC SUCCÈS", "SUCCESS")
debug_log("═══════════════════════════════════════════", "SUCCESS")
except Exception as e:
debug_log("═══════════════════════════════════════════", "ERROR")
debug_log(f" ÉCHEC ENVOI SMTP: {type(e).__name__}", "ERROR")
debug_log(f" Message: {str(e)}", "ERROR")
debug_log("═══════════════════════════════════════════", "ERROR")
# Fermer la connexion si elle existe
if server:
try:
server.quit()
except:
pass
raise Exception(f"Erreur SMTP: {str(e)}")
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client:
logger.error("❌ sage_client non configuré")
raise Exception("sage_client non disponible")
try:
doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e:
logger.error(f"❌ Erreur récupération document {doc_id}: {e}")
raise Exception(f"Document {doc_id} inaccessible")
if not doc:
raise Exception(f"Document {doc_id} introuvable")
buffer = BytesIO()
pdf = canvas.Canvas(buffer, pagesize=A4)
width, height = A4
pdf.setFont("Helvetica-Bold", 20)
pdf.drawString(2 * cm, height - 3 * cm, f"Document N° {doc_id}")
type_labels = {
0: "DEVIS",
1: "BON DE LIVRAISON",
2: "BON DE RETOUR",
3: "COMMANDE",
4: "PRÉPARATION",
5: "FACTURE",
}
type_label = type_labels.get(type_doc, "DOCUMENT")
pdf.setFont("Helvetica", 12)
pdf.drawString(2 * cm, height - 4 * cm, f"Type: {type_label}")
y = height - 5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "CLIENT")
y -= 0.8 * cm
pdf.setFont("Helvetica", 11)
pdf.drawString(2 * cm, y, f"Code: {doc.get('client_code') or ''}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Nom: {doc.get('client_intitule') or ''}")
y -= 0.6 * cm
pdf.drawString(2 * cm, y, f"Date: {doc.get('date') or ''}")
y -= 1.5 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(2 * cm, y, "ARTICLES")
y -= 1 * cm
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(2 * cm, y, "Désignation")
pdf.drawString(10 * cm, y, "Qté")
pdf.drawString(12 * cm, y, "Prix Unit.")
pdf.drawString(15 * cm, y, "Total HT")
y -= 0.5 * cm
pdf.line(2 * cm, y, width - 2 * cm, y)
y -= 0.7 * cm
pdf.setFont("Helvetica", 9)
for ligne in doc.get("lignes", []):
if y < 3 * cm:
pdf.showPage()
y = height - 3 * cm
pdf.setFont("Helvetica", 9)
# ✅ FIX: Gérer les valeurs None correctement
designation = (
ligne.get("designation")
or ligne.get("designation_article")
or ""
)
if designation:
designation = str(designation)[:50]
else:
designation = ""
pdf.drawString(2 * cm, y, designation)
pdf.drawString(10 * cm, y, str(ligne.get("quantite") or 0))
pdf.drawString(12 * cm, y, f"{ligne.get('prix_unitaire_ht') or ligne.get('prix_unitaire', 0):.2f}")
pdf.drawString(15 * cm, y, f"{ligne.get('montant_ligne_ht') or ligne.get('montant_ht', 0):.2f}")
y -= 0.6 * cm
y -= 1 * cm
pdf.line(12 * cm, y, width - 2 * cm, y)
y -= 0.8 * cm
pdf.setFont("Helvetica-Bold", 11)
pdf.drawString(12 * cm, y, "Total HT:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ht') or 0:.2f}")
y -= 0.6 * cm
pdf.drawString(12 * cm, y, "TVA (20%):")
tva = (doc.get("total_ttc") or 0) - (doc.get("total_ht") or 0)
pdf.drawString(15 * cm, y, f"{tva:.2f}")
y -= 0.6 * cm
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(12 * cm, y, "Total TTC:")
pdf.drawString(15 * cm, y, f"{doc.get('total_ttc') or 0:.2f}")
pdf.setFont("Helvetica", 8)
pdf.drawString(
2 * cm, 2 * cm, f"Généré le {datetime.now().strftime('%d/%m/%Y %H:%M')}"
)
pdf.drawString(2 * cm, 1.5 * cm, "Sage 100c - API Dataven")
pdf.save()
buffer.seek(0)
logger.info(f"✅ PDF généré: {doc_id}.pdf")
return buffer.read()
email_queue = EmailQueue()