refactor(email_queue): remove debug logging and simplify email processing

This commit is contained in:
Fanilo-Nantenaina 2026-01-06 12:06:55 +03:00
parent 410d4553d5
commit 677cd826d7
2 changed files with 29 additions and 287 deletions

View file

@ -1,6 +1,5 @@
import threading import threading
import queue import queue
import time
import asyncio import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
import smtplib import smtplib
@ -18,22 +17,6 @@ from io import BytesIO
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ULTRA_DEBUG = True
def debug_log(message: str, level: str = "INFO"):
if ULTRA_DEBUG:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
prefix = {
"INFO": "[INFO]",
"SUCCESS": "[SUCCESS]",
"ERROR": "[ERROR]",
"WARN": "[WARN]",
"STEP": "[STEP]",
"DATA": "[DATA]",
}.get(level, "")
logger.info(f"{prefix} [{timestamp}] {message}")
class EmailQueue: class EmailQueue:
def __init__(self): def __init__(self):
@ -45,7 +28,6 @@ class EmailQueue:
def start(self, num_workers: int = 3): def start(self, num_workers: int = 3):
if self.running: if self.running:
logger.warning("Queue déjà démarrée")
return return
self.running = True self.running = True
@ -56,63 +38,45 @@ class EmailQueue:
worker.start() worker.start()
self.workers.append(worker) self.workers.append(worker)
logger.info(f" Queue email démarrée avec {num_workers} worker(s)") logger.info(f"Queue email démarrée avec {num_workers} worker(s)")
def stop(self): def stop(self):
logger.info("Arrêt de la queue email...")
self.running = False self.running = False
try: try:
self.queue.join() self.queue.join()
logger.info(" Queue email arrêtée proprement")
except Exception: except Exception:
logger.warning(" Timeout lors de l'arrêt de la queue") pass
def enqueue(self, email_log_id: str): def enqueue(self, email_log_id: str):
self.queue.put(email_log_id) self.queue.put(email_log_id)
debug_log(
f"Email {email_log_id} ajouté à la queue (taille: {self.queue.qsize()})"
)
def _worker(self): def _worker(self):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
worker_name = threading.current_thread().name
debug_log(f"Worker {worker_name} démarré", "SUCCESS")
try: try:
while self.running: while self.running:
try: try:
email_log_id = self.queue.get(timeout=1) email_log_id = self.queue.get(timeout=1)
debug_log(
f"[{worker_name}] Traitement email {email_log_id}", "STEP"
)
loop.run_until_complete(self._process_email(email_log_id)) loop.run_until_complete(self._process_email(email_log_id))
self.queue.task_done() self.queue.task_done()
except queue.Empty: except queue.Empty:
continue continue
except Exception as e: except Exception as e:
logger.error(f" Erreur worker {worker_name}: {e}", exc_info=True) logger.error(f"Erreur worker: {e}")
try: try:
self.queue.task_done() self.queue.task_done()
except Exception: except Exception:
pass pass
finally: finally:
loop.close() loop.close()
debug_log(f"Worker {worker_name} arrêté", "WARN")
async def _process_email(self, email_log_id: str): async def _process_email(self, email_log_id: str):
from database import EmailLog, StatutEmail from database import EmailLog, StatutEmail
from sqlalchemy import select from sqlalchemy import select
debug_log(f"═══ DÉBUT TRAITEMENT EMAIL {email_log_id} ═══", "STEP")
if not self.session_factory: if not self.session_factory:
logger.error(" session_factory non configuré") logger.error("session_factory non configuré")
return return
async with self.session_factory() as session: async with self.session_factory() as session:
@ -122,33 +86,21 @@ class EmailQueue:
email_log = result.scalar_one_or_none() email_log = result.scalar_one_or_none()
if not email_log: if not email_log:
logger.error(f" Email log {email_log_id} introuvable en DB") logger.error(f"Email log {email_log_id} introuvable")
return return
debug_log("Email trouvé en DB:", "DATA")
debug_log(f" → Destinataire: {email_log.destinataire}")
debug_log(f" → Sujet: {email_log.sujet[:50]}...")
debug_log(f" → Tentative: {email_log.nb_tentatives + 1}")
debug_log(f" → Documents: {email_log.document_ids}")
email_log.statut = StatutEmail.EN_COURS email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1 email_log.nb_tentatives += 1
await session.commit() await session.commit()
try: try:
await self._send_with_retry(email_log) await self._send_with_retry(email_log)
email_log.statut = StatutEmail.ENVOYE email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now() email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None email_log.derniere_erreur = None
debug_log(
f"Email envoyé avec succès: {email_log.destinataire}", "SUCCESS"
)
except Exception as e: except Exception as e:
error_msg = str(e) error_msg = str(e)
debug_log(f"ÉCHEC ENVOI: {error_msg}", "ERROR")
email_log.statut = StatutEmail.ERREUR email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = error_msg[:1000] email_log.derniere_erreur = error_msg[:1000]
@ -162,49 +114,26 @@ class EmailQueue:
timer.daemon = True timer.daemon = True
timer.start() timer.start()
debug_log(
f"Retry #{email_log.nb_tentatives + 1} planifié dans {delay}s",
"WARN",
)
else:
debug_log(
f"ÉCHEC DÉFINITIF après {email_log.nb_tentatives} tentatives",
"ERROR",
)
await session.commit() await session.commit()
debug_log(f"═══ FIN TRAITEMENT EMAIL {email_log_id} ═══", "STEP")
async def _send_with_retry(self, email_log): async def _send_with_retry(self, email_log):
debug_log("Construction du message MIME...", "STEP")
msg = MIMEMultipart() msg = MIMEMultipart()
msg["From"] = settings.smtp_from msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet msg["Subject"] = email_log.sujet
debug_log("Headers configurés:", "DATA")
debug_log(f" → From: {settings.smtp_from}")
debug_log(f" → To: {email_log.destinataire}")
debug_log(f" → Subject: {email_log.sujet}")
msg.attach(MIMEText(email_log.corps_html, "html")) msg.attach(MIMEText(email_log.corps_html, "html"))
debug_log(f"Corps HTML attaché ({len(email_log.corps_html)} chars)")
# Attachement des PDFs # Attachement des PDFs
if email_log.document_ids: if email_log.document_ids:
document_ids = email_log.document_ids.split(",") document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document type_doc = email_log.type_document
debug_log(f"Documents à attacher: {document_ids}")
for doc_id in document_ids: for doc_id in document_ids:
doc_id = doc_id.strip() doc_id = doc_id.strip()
if not doc_id: if not doc_id:
continue continue
try: try:
debug_log(f"Génération PDF pour {doc_id}...")
pdf_bytes = await asyncio.to_thread( pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc self._generate_pdf, doc_id, type_doc
) )
@ -215,246 +144,61 @@ class EmailQueue:
f'attachment; filename="{doc_id}.pdf"' f'attachment; filename="{doc_id}.pdf"'
) )
msg.attach(part) msg.attach(part)
debug_log(
f"PDF attaché: {doc_id}.pdf ({len(pdf_bytes)} bytes)",
"SUCCESS",
)
except Exception as e: except Exception as e:
debug_log(f"Erreur génération PDF {doc_id}: {e}", "ERROR") logger.error(f"Erreur génération PDF {doc_id}: {e}")
# Envoi SMTP # Envoi SMTP
debug_log("Lancement envoi SMTP...", "STEP")
await asyncio.to_thread(self._send_smtp, msg) await asyncio.to_thread(self._send_smtp, msg)
def _send_smtp(self, msg): def _send_smtp(self, msg):
debug_log("═══════════════════════════════════════════", "STEP")
debug_log(" DÉBUT ENVOI SMTP ULTRA DEBUG", "STEP")
debug_log("═══════════════════════════════════════════", "STEP")
# ═══ CONFIGURATION ═══
debug_log("CONFIGURATION SMTP:", "DATA")
debug_log(f" → Host: {settings.smtp_host}")
debug_log(f" → Port: {settings.smtp_port}")
debug_log(f" → User: {settings.smtp_user}")
debug_log(
f" → Password: {'*' * len(settings.smtp_password) if settings.smtp_password else 'NON DÉFINI'}"
)
debug_log(f" → From: {settings.smtp_from}")
debug_log(f" → TLS: {settings.smtp_use_tls}")
debug_log(f" → To: {msg['To']}")
server = None server = None
try: try:
# ═══ ÉTAPE 1: RÉSOLUTION DNS ═══ # Résolution DNS
debug_log("ÉTAPE 1/7: Résolution DNS...", "STEP") socket.getaddrinfo(settings.smtp_host, settings.smtp_port)
try:
ip_addresses = socket.getaddrinfo(
settings.smtp_host, settings.smtp_port
)
debug_log(f" → DNS résolu: {ip_addresses[0][4]}", "SUCCESS")
except socket.gaierror as e:
debug_log(f" → ÉCHEC DNS: {e}", "ERROR")
raise Exception(
f"Résolution DNS échouée pour {settings.smtp_host}: {e}"
)
# ═══ ÉTAPE 2: CONNEXION TCP ═══ # Connexion
debug_log("ÉTAPE 2/7: Connexion TCP...", "STEP") server = smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30)
start_time = time.time()
try: # EHLO
server = smtplib.SMTP(
settings.smtp_host, settings.smtp_port, timeout=30
)
server.set_debuglevel(
2 if ULTRA_DEBUG else 0
) # Active le debug SMTP natif
connect_time = time.time() - start_time
debug_log(f" → Connexion établie en {connect_time:.2f}s", "SUCCESS")
except socket.timeout:
debug_log(" → TIMEOUT connexion (>30s)", "ERROR")
raise Exception(
f"Timeout connexion TCP vers {settings.smtp_host}:{settings.smtp_port}"
)
except ConnectionRefusedError:
debug_log(" → CONNEXION REFUSÉE", "ERROR")
raise Exception(
f"Connexion refusée par {settings.smtp_host}:{settings.smtp_port}"
)
except Exception as e:
debug_log(f" → ERREUR CONNEXION: {type(e).__name__}: {e}", "ERROR")
raise
# ═══ ÉTAPE 3: EHLO ═══
debug_log("ÉTAPE 3/7: Envoi EHLO...", "STEP")
try:
ehlo_code, ehlo_msg = server.ehlo()
debug_log(f" → EHLO Response: {ehlo_code}", "SUCCESS")
debug_log(f" → Capabilities: {ehlo_msg.decode()[:200]}...")
except Exception as e:
debug_log(f" → ÉCHEC EHLO: {e}", "ERROR")
raise
# ═══ ÉTAPE 4: STARTTLS ═══
if settings.smtp_use_tls:
debug_log("ÉTAPE 4/7: Négociation STARTTLS...", "STEP")
try:
# Vérifier si le serveur supporte STARTTLS
if server.has_extn("STARTTLS"):
debug_log(" → Serveur supporte STARTTLS", "SUCCESS")
# Créer un contexte SSL
context = ssl.create_default_context()
debug_log(
f" → Contexte SSL créé (protocole: {context.protocol})"
)
tls_code, tls_msg = server.starttls(context=context)
debug_log(
f" → STARTTLS Response: {tls_code} - {tls_msg}", "SUCCESS"
)
# Re-EHLO après STARTTLS
server.ehlo() server.ehlo()
debug_log(" → Re-EHLO après TLS: OK", "SUCCESS")
else:
debug_log(" → Serveur ne supporte PAS STARTTLS!", "WARN")
except smtplib.SMTPNotSupportedError: # STARTTLS
debug_log(" → STARTTLS non supporté par le serveur", "WARN") if settings.smtp_use_tls:
except ssl.SSLError as e: if server.has_extn("STARTTLS"):
debug_log(f" → ERREUR SSL: {e}", "ERROR") context = ssl.create_default_context()
raise Exception(f"Erreur SSL/TLS: {e}") server.starttls(context=context)
except Exception as e: server.ehlo()
debug_log(f" → ÉCHEC STARTTLS: {type(e).__name__}: {e}", "ERROR")
raise
else:
debug_log("ÉTAPE 4/7: STARTTLS désactivé (smtp_use_tls=False)", "WARN")
# ═══ ÉTAPE 5: AUTHENTIFICATION ═══ # Authentification
debug_log("ÉTAPE 5/7: Authentification...", "STEP")
if settings.smtp_user and settings.smtp_password: if settings.smtp_user and settings.smtp_password:
debug_log(f" → Tentative login avec: {settings.smtp_user}")
try:
# Lister les méthodes d'auth supportées
if server.has_extn("AUTH"):
auth_methods = server.esmtp_features.get("auth", "")
debug_log(f" → Méthodes AUTH supportées: {auth_methods}")
server.login(settings.smtp_user, settings.smtp_password) server.login(settings.smtp_user, settings.smtp_password)
debug_log(" → Authentification RÉUSSIE", "SUCCESS")
except smtplib.SMTPAuthenticationError as e: # Envoi
debug_log(
f" → ÉCHEC AUTHENTIFICATION: {e.smtp_code} - {e.smtp_error}",
"ERROR",
)
debug_log(f" → Code: {e.smtp_code}", "ERROR")
debug_log(
f" → Message: {e.smtp_error.decode() if isinstance(e.smtp_error, bytes) else e.smtp_error}",
"ERROR",
)
# Diagnostic spécifique selon le code d'erreur
if e.smtp_code == 535:
debug_log(
" → 535 = Identifiants incorrects ou app password requis",
"ERROR",
)
elif e.smtp_code == 534:
debug_log(
" → 534 = 2FA requis, utiliser un App Password", "ERROR"
)
elif e.smtp_code == 530:
debug_log(
" → 530 = Authentification requise mais échouée", "ERROR"
)
raise Exception(
f"Authentification SMTP échouée: {e.smtp_code} - {e.smtp_error}"
)
except smtplib.SMTPException as e:
debug_log(f" → ERREUR SMTP AUTH: {e}", "ERROR")
raise
else:
debug_log(" → Pas d'authentification configurée", "WARN")
# ═══ ÉTAPE 6: ENVOI DU MESSAGE ═══
debug_log("ÉTAPE 6/7: Envoi du message...", "STEP")
debug_log(f" → From: {msg['From']}")
debug_log(f" → To: {msg['To']}")
debug_log(f" → Subject: {msg['Subject']}")
debug_log(f" → Taille message: {len(msg.as_string())} bytes")
try:
# send_message retourne un dict des destinataires refusés
refused = server.send_message(msg) refused = server.send_message(msg)
if refused: if refused:
debug_log(f" → DESTINATAIRES REFUSÉS: {refused}", "ERROR")
raise Exception(f"Destinataires refusés: {refused}") raise Exception(f"Destinataires refusés: {refused}")
else:
debug_log(" → Message envoyé avec succès!", "SUCCESS")
except smtplib.SMTPRecipientsRefused as e: # Fermeture
debug_log(f" → DESTINATAIRE REFUSÉ: {e.recipients}", "ERROR")
raise Exception(f"Destinataire refusé: {e.recipients}")
except smtplib.SMTPSenderRefused as e:
debug_log(
f" → EXPÉDITEUR REFUSÉ: {e.smtp_code} - {e.smtp_error}", "ERROR"
)
debug_log(
f" → L'adresse '{msg['From']}' n'est pas autorisée à envoyer",
"ERROR",
)
raise Exception(f"Expéditeur refusé: {e.smtp_code} - {e.smtp_error}")
except smtplib.SMTPDataError as e:
debug_log(f" → ERREUR DATA: {e.smtp_code} - {e.smtp_error}", "ERROR")
raise Exception(f"Erreur DATA SMTP: {e.smtp_code} - {e.smtp_error}")
# ═══ ÉTAPE 7: QUIT ═══
debug_log("ÉTAPE 7/7: Fermeture connexion...", "STEP")
try:
server.quit() server.quit()
debug_log(" → Connexion fermée proprement", "SUCCESS")
except Exception:
pass
debug_log("═══════════════════════════════════════════", "SUCCESS")
debug_log(" ENVOI SMTP TERMINÉ AVEC SUCCÈS", "SUCCESS")
debug_log("═══════════════════════════════════════════", "SUCCESS")
except Exception as e: except Exception as e:
debug_log("═══════════════════════════════════════════", "ERROR")
debug_log(f" ÉCHEC ENVOI SMTP: {type(e).__name__}", "ERROR")
debug_log(f" Message: {str(e)}", "ERROR")
debug_log("═══════════════════════════════════════════", "ERROR")
# Fermer la connexion si elle existe
if server: if server:
try: try:
server.quit() server.quit()
except Exception: except Exception:
pass pass
raise Exception(f"Erreur SMTP: {str(e)}") raise Exception(f"Erreur SMTP: {str(e)}")
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes: def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client: if not self.sage_client:
logger.error(" sage_client non configuré")
raise Exception("sage_client non disponible") raise Exception("sage_client non disponible")
try: try:
doc = self.sage_client.lire_document(doc_id, type_doc) doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e: except Exception as e:
logger.error(f" Erreur récupération document {doc_id}: {e}") raise Exception(f"Document {doc_id} inaccessible : {e}")
raise Exception(f"Document {doc_id} inaccessible")
if not doc: if not doc:
raise Exception(f"Document {doc_id} introuvable") raise Exception(f"Document {doc_id} introuvable")
@ -514,7 +258,6 @@ class EmailQueue:
y = height - 3 * cm y = height - 3 * cm
pdf.setFont("Helvetica", 9) pdf.setFont("Helvetica", 9)
# FIX: Gérer les valeurs None correctement
designation = ( designation = (
ligne.get("designation") or ligne.get("designation_article") or "" ligne.get("designation") or ligne.get("designation_article") or ""
) )
@ -564,7 +307,6 @@ class EmailQueue:
pdf.save() pdf.save()
buffer.seek(0) buffer.seek(0)
logger.info(f" PDF généré: {doc_id}.pdf")
return buffer.read() return buffer.read()

View file

@ -8,7 +8,7 @@ from pydantic import BaseModel, EmailStr
import logging import logging
from data.data import templates_signature_email from data.data import templates_signature_email
from email_queue import email_queue from email_queue import email_queue
from database import get_session from database import UniversignSignerStatus, UniversignTransactionStatus, get_session
from database import ( from database import (
UniversignTransaction, UniversignTransaction,
UniversignSigner, UniversignSigner,
@ -184,7 +184,7 @@ async def create_signature(
transaction_id=universign_tx_id, transaction_id=universign_tx_id,
sage_document_id=request.sage_document_id, sage_document_id=request.sage_document_id,
sage_document_type=request.sage_document_type, sage_document_type=request.sage_document_type,
universign_status="started", universign_status=UniversignTransactionStatus.STARTED,
local_status=LocalDocumentStatus.IN_PROGRESS, local_status=LocalDocumentStatus.IN_PROGRESS,
signer_url=signer_url, signer_url=signer_url,
requester_email=request.signer_email, requester_email=request.signer_email,
@ -203,7 +203,7 @@ async def create_signature(
transaction_id=local_id, transaction_id=local_id,
email=request.signer_email, email=request.signer_email,
name=request.signer_name, name=request.signer_name,
status="waiting", status=UniversignSignerStatus.WAITING,
order_index=0, order_index=0,
) )