Sage100-vps/email_queue.py
2026-01-13 17:37:23 +03:00

1112 lines
35 KiB
Python

import threading
import queue
import asyncio
from datetime import datetime, timedelta
import smtplib
import ssl
import socket
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from config.config import settings
import logging
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from io import BytesIO
from reportlab.lib.units import mm
from reportlab.lib.colors import HexColor, Color
from PIL import Image
logger = logging.getLogger(__name__)
FONT_PATH = os.getenv("SAGE_FONT_PATH", "/sage/pdfs/Sage_Text-Medium.ttf")
LOGO_PATH = os.getenv("SAGE_LOGO_PATH", "/sage/pdfs/logo.png")
FONT_FALLBACK_PATHS = [
"/sage/pdfs/Sage_Text-Medium.ttf",
"./sage/pdfs/Sage_Text-Medium.ttf",
"/sage/pdfs/Sage_Text-Bold.ttf",
"./sage/pdfs/Sage_Text-Bold.ttf",
]
LOGO_FALLBACK_PATHS = [
"/sage/pdfs/logo.png",
"./sage/pdfs/logo.png",
]
_sage_font_registered = False
def _find_file(primary_path: str, fallback_paths: list) -> str | None:
"""Recherche un fichier dans les chemins possibles."""
all_paths = [primary_path] + fallback_paths
for path in all_paths:
if path and os.path.exists(path):
return path
return None
def _register_sage_font():
"""Enregistre la police Sage si disponible."""
global _sage_font_registered
if _sage_font_registered:
return True
font_path = _find_file(FONT_PATH, FONT_FALLBACK_PATHS)
if font_path:
try:
pdfmetrics.registerFont(TTFont("SageText", font_path))
pdfmetrics.registerFont(TTFont("SageText-Bold", font_path))
_sage_font_registered = True
logger.info(f"Police Sage enregistrée: {font_path}")
return True
except Exception as e:
logger.warning(f"Impossible d'enregistrer la police Sage: {e}")
logger.info("Utilisation de Helvetica comme police de fallback")
return False
async def execute_with_retry(
session,
operation,
max_retries: int = 5,
base_delay: float = 0.1,
max_delay: float = 2.0,
):
from sqlalchemy.exc import OperationalError
last_exception = None
for attempt in range(max_retries):
try:
result = await operation()
return result
except OperationalError as e:
last_exception = e
# Vérifier si c'est un lock SQLite
if "database is locked" in str(e).lower():
delay = min(base_delay * (2**attempt), max_delay)
# Ajouter un jitter aléatoire pour éviter les collisions
import random
delay += random.uniform(0, delay * 0.1)
logger.warning(
f"SQLite locked (tentative {attempt + 1}/{max_retries}), "
f"retry dans {delay:.2f}s"
)
await asyncio.sleep(delay)
else:
# Autre erreur OperationalError, ne pas retry
raise
except Exception:
# Autres exceptions, ne pas retry
raise
# Toutes les tentatives ont échoué
logger.error(f"Échec après {max_retries} tentatives: {last_exception}")
raise last_exception
class EmailQueue:
def __init__(self):
self.queue = queue.Queue()
self.workers = []
self.running = False
self.session_factory = None
self.sage_client = None
# Lock pour synchroniser les accès DB dans le worker
self._db_lock = asyncio.Lock()
def start(self, num_workers: int = 2): # Réduire le nombre de workers pour SQLite
if self.running:
return
self.running = True
for i in range(num_workers):
worker = threading.Thread(
target=self._worker, name=f"EmailWorker-{i}", daemon=True
)
worker.start()
self.workers.append(worker)
logger.info(f"Queue email démarrée avec {num_workers} worker(s)")
def stop(self):
self.running = False
try:
self.queue.join()
except Exception:
pass
def enqueue(self, email_log_id: str, delay_seconds: float = 0):
if delay_seconds > 0:
timer = threading.Timer(delay_seconds, lambda: self.queue.put(email_log_id))
timer.daemon = True
timer.start()
logger.debug(f"Email {email_log_id} planifié dans {delay_seconds}s")
else:
self.queue.put(email_log_id)
def _worker(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while self.running:
try:
email_log_id = self.queue.get(timeout=1)
loop.run_until_complete(self._process_email(email_log_id))
self.queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Erreur worker: {e}", exc_info=True)
try:
self.queue.task_done()
except Exception:
pass
finally:
loop.close()
async def _process_email(self, email_log_id: str):
from database import EmailLog, StatutEmail
from sqlalchemy import select
from sqlalchemy.exc import OperationalError
if not self.session_factory:
logger.error("session_factory non configuré")
return
max_db_retries = 5
for db_attempt in range(max_db_retries):
try:
async with self.session_factory() as session:
# Lecture de l'email log avec retry
async def fetch_email():
result = await session.execute(
select(EmailLog).where(EmailLog.id == email_log_id)
)
return result.scalar_one_or_none()
email_log = await execute_with_retry(session, fetch_email)
if not email_log:
logger.error(f"Email log {email_log_id} introuvable")
return
# Mise à jour du statut avec retry
async def update_status_en_cours():
email_log.statut = StatutEmail.EN_COURS
email_log.nb_tentatives += 1
await session.commit()
await execute_with_retry(session, update_status_en_cours)
try:
# Envoi de l'email (pas de DB ici)
await self._send_with_retry(email_log)
# Mise à jour succès avec retry
async def update_status_success():
email_log.statut = StatutEmail.ENVOYE
email_log.date_envoi = datetime.now()
email_log.derniere_erreur = None
await session.commit()
await execute_with_retry(session, update_status_success)
logger.info(f"✅ Email envoyé: {email_log.destinataire}")
except Exception as e:
error_msg = str(e)
logger.error(f"Erreur envoi email: {error_msg}")
# Mise à jour erreur avec retry
async def update_status_error():
email_log.statut = StatutEmail.ERREUR
email_log.derniere_erreur = error_msg[:1000]
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
email_log.prochain_retry = datetime.now() + timedelta(
seconds=delay
)
await session.commit()
await execute_with_retry(session, update_status_error)
# Replanifier si tentatives restantes
if email_log.nb_tentatives < settings.max_retry_attempts:
delay = settings.retry_delay_seconds * (
2 ** (email_log.nb_tentatives - 1)
)
self.enqueue(email_log_id, delay_seconds=delay)
logger.info(
f"Email {email_log_id} replanifié dans {delay}s"
)
# Sortir de la boucle de retry si tout s'est bien passé
return
except OperationalError as e:
if "database is locked" in str(e).lower():
delay = 0.5 * (2**db_attempt)
logger.warning(
f"DB locked lors du traitement email {email_log_id}, "
f"tentative {db_attempt + 1}/{max_db_retries}, "
f"retry dans {delay:.1f}s"
)
await asyncio.sleep(delay)
else:
logger.error(f"Erreur DB non récupérable: {e}")
raise
except Exception as e:
logger.error(f"Erreur inattendue traitement email: {e}", exc_info=True)
raise
# Si on arrive ici, toutes les tentatives ont échoué
logger.error(
f"Échec définitif traitement email {email_log_id} après {max_db_retries} tentatives DB"
)
# Replanifier l'email pour plus tard
self.enqueue(email_log_id, delay_seconds=30)
async def _send_with_retry(self, email_log):
msg = MIMEMultipart()
msg["From"] = settings.smtp_from
msg["To"] = email_log.destinataire
msg["Subject"] = email_log.sujet
msg.attach(MIMEText(email_log.corps_html, "html"))
# Attachement des PDFs
if email_log.document_ids:
document_ids = email_log.document_ids.split(",")
type_doc = email_log.type_document
for doc_id in document_ids:
doc_id = doc_id.strip()
if not doc_id:
continue
try:
pdf_bytes = await asyncio.to_thread(
self._generate_pdf, doc_id, type_doc
)
if pdf_bytes:
part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf")
part["Content-Disposition"] = (
f'attachment; filename="{doc_id}.pdf"'
)
msg.attach(part)
except Exception as e:
logger.error(f"Erreur génération PDF {doc_id}: {e}")
# Envoi SMTP
await asyncio.to_thread(self._send_smtp, msg)
def _send_smtp(self, msg):
server = None
try:
# Résolution DNS
socket.getaddrinfo(settings.smtp_host, settings.smtp_port)
# Connexion
server = smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30)
# EHLO
server.ehlo()
# STARTTLS
if settings.smtp_use_tls:
if server.has_extn("STARTTLS"):
context = ssl.create_default_context()
server.starttls(context=context)
server.ehlo()
# Authentification
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
# Envoi
refused = server.send_message(msg)
if refused:
raise Exception(f"Destinataires refusés: {refused}")
# Fermeture
server.quit()
logger.info(f"SMTP: Email envoyé à {msg['To']}")
except Exception as e:
if server:
try:
server.quit()
except Exception:
pass
raise Exception(f"Erreur SMTP: {str(e)}")
def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes:
if not self.sage_client:
raise Exception("sage_client non disponible")
try:
doc = self.sage_client.lire_document(doc_id, type_doc)
except Exception as e:
raise Exception(f"Document {doc_id} inaccessible : {e}")
if not doc:
raise Exception(f"Document {doc_id} introuvable")
# Récupérer les informations de la société émettrice
societe_info = None
try:
societe_info = self.sage_client.lire_informations_societe()
logger.debug(
f"Infos société récupérées: {societe_info.raison_sociale if societe_info else 'N/A'}"
)
except Exception as e:
logger.warning(f"Impossible de récupérer les infos société: {e}")
# Générer le PDF avec le nouveau générateur
generator = SagePDFGenerator(doc, type_doc, societe_info=societe_info)
return generator.generate()
class SagePDFGenerator:
# Couleurs Sage
SAGE_GREEN = HexColor("#00D639")
SAGE_GREEN_DARK = HexColor("#00B830")
GRAY_50 = HexColor("#F9FAFB")
GRAY_100 = HexColor("#F3F4F6")
GRAY_200 = HexColor("#E5E7EB")
GRAY_300 = HexColor("#D1D5DB")
GRAY_400 = HexColor("#9CA3AF")
GRAY_500 = HexColor("#6B7280")
GRAY_600 = HexColor("#4B5563")
GRAY_700 = HexColor("#374151")
GRAY_800 = HexColor("#1F2937")
GRAY_900 = HexColor("#111827")
# Valeurs par défaut (fallback si société non disponible)
DEFAULT_COMPANY_NAME = "Entreprise"
DEFAULT_COMPANY_ADDRESS = ""
DEFAULT_COMPANY_CITY = ""
DEFAULT_COMPANY_EMAIL = ""
# Labels des types de documents
TYPE_LABELS = {
0: "Devis",
10: "Commande",
30: "Bon de Livraison",
50: "Avoir",
60: "Facture",
}
def __init__(self, doc_data: dict, type_doc: int, societe_info=None):
self.doc = doc_data
self.type_doc = type_doc
self.type_label = self.TYPE_LABELS.get(type_doc, "Document")
self.societe_info = societe_info
# Configuration de la page
self.page_width, self.page_height = A4
self.margin = 15 * mm
self.content_width = self.page_width - 2 * self.margin
# État du générateur
self.buffer = BytesIO()
self.pdf = None
self.current_y = 0
self.page_number = 1
self.total_pages = 1 # Sera calculé
# Initialiser les polices
_register_sage_font()
self.use_sage_font = _sage_font_registered
def _get_societe_field(self, field: str, default: str = "") -> str:
"""Récupère un champ de la société avec fallback."""
if self.societe_info is None:
return default
return getattr(self.societe_info, field, default) or default
def _get_societe_name(self) -> str:
"""Retourne la raison sociale de la société."""
return self._get_societe_field("raison_sociale", self.DEFAULT_COMPANY_NAME)
def _get_societe_address_line1(self) -> str:
"""Retourne la première ligne d'adresse."""
adresse = self._get_societe_field("adresse", "")
complement = self._get_societe_field("complement_adresse", "")
if adresse and complement:
return f"{adresse}, {complement}"
return adresse or complement or self.DEFAULT_COMPANY_ADDRESS
def _get_societe_address_line2(self) -> str:
"""Retourne la deuxième ligne d'adresse (CP + Ville + Pays)."""
cp = self._get_societe_field("code_postal", "")
ville = self._get_societe_field("ville", "")
pays = self._get_societe_field("pays", "")
parts = []
if cp:
parts.append(cp)
if ville:
parts.append(ville)
city_line = " ".join(parts)
if pays and pays.lower() not in ["france", "fr"]:
city_line = f"{city_line}, {pays}" if city_line else pays
return city_line or self.DEFAULT_COMPANY_CITY
def _get_societe_email(self) -> str:
"""Retourne l'email de la société."""
# Priorité à email_societe, puis email
email = self._get_societe_field("email_societe", "")
if not email:
email = self._get_societe_field("email", "")
return email or self.DEFAULT_COMPANY_EMAIL
def _get_societe_phone(self) -> str:
"""Retourne le téléphone de la société."""
return self._get_societe_field("telephone", "")
def _get_societe_siret(self) -> str:
"""Retourne le SIRET de la société."""
return self._get_societe_field("siret", "")
def _get_societe_tva(self) -> str:
"""Retourne le numéro de TVA de la société."""
return self._get_societe_field("numero_tva", "")
def _get_font(self, bold: bool = False) -> str:
"""Retourne le nom de la police à utiliser."""
if self.use_sage_font:
return "SageText-Bold" if bold else "SageText"
return "Helvetica-Bold" if bold else "Helvetica"
def _draw_text(
self,
x: float,
y: float,
text: str,
font_size: int = 9,
bold: bool = False,
color: Color = None,
align: str = "left",
):
self.pdf.setFont(self._get_font(bold), font_size)
if color:
self.pdf.setFillColor(color)
if align == "right":
self.pdf.drawRightString(x, y, str(text))
elif align == "center":
self.pdf.drawCentredString(x, y, str(text))
else:
self.pdf.drawString(x, y, str(text))
def _draw_logo(
self,
x: float,
y: float,
max_width: float = 50 * mm,
max_height: float = 20 * mm,
):
"""Dessine le logo Sage ou le nom de l'entreprise."""
logo_path = _find_file(LOGO_PATH, LOGO_FALLBACK_PATHS)
if logo_path:
try:
with Image.open(logo_path) as img:
img_width, img_height = img.size
ratio = min(max_width / img_width, max_height / img_height)
draw_width = img_width * ratio
draw_height = img_height * ratio
self.pdf.drawImage(
logo_path,
x,
y - draw_height,
width=draw_width,
height=draw_height,
preserveAspectRatio=True,
mask="auto",
)
return draw_height
except Exception as e:
logger.warning(f"Impossible de charger le logo: {e}")
self._draw_text(
x, y - 8 * mm, "Sage", font_size=24, bold=True, color=self.SAGE_GREEN
)
return 10 * mm
def _new_page(self):
"""Crée une nouvelle page."""
if self.pdf:
self.pdf.showPage()
self.page_number += 1
self.current_y = self.page_height - self.margin
def _draw_header(self):
y = self.page_height - self.margin
logo_height = self._draw_logo(
self.margin, y, max_width=55 * mm, max_height=22 * mm
)
right_x = self.page_width - self.margin
numero = self.doc.get("numero") or "BROUILLON"
self._draw_text(
right_x,
y - 6 * mm,
numero.upper(),
font_size=20,
bold=True,
color=self.GRAY_800,
align="right",
)
date_str = self.doc.get("date") or datetime.now().strftime("%d/%m/%Y")
self._draw_text(
right_x,
y - 14 * mm,
f"Date : {date_str}",
font_size=9,
color=self.GRAY_600,
align="right",
)
date_validite = (
self.doc.get("date_livraison") or self.doc.get("date_echeance") or date_str
)
self._draw_text(
right_x,
y - 20 * mm,
f"Validité : {date_validite}",
font_size=9,
color=self.GRAY_600,
align="right",
)
reference = self.doc.get("reference") or ""
self._draw_text(
right_x,
y - 26 * mm,
f"Réf : {reference}",
font_size=9,
color=self.GRAY_600,
align="right",
)
return y - max(logo_height + 10 * mm, 35 * mm)
def _draw_addresses(self, y: float) -> float:
col1_x = self.margin
col2_x = self.margin + self.content_width / 2 + 8 * mm
col_width = self.content_width / 2 - 8 * mm
self._draw_text(
col1_x, y, "ÉMETTEUR", font_size=8, bold=True, color=self.GRAY_400
)
# === INFORMATIONS SOCIÉTÉ (dynamiques) ===
y_emetteur = y - 6 * mm
self._draw_text(
col1_x,
y_emetteur,
self._get_societe_name(),
font_size=10,
bold=True,
color=self.GRAY_800,
)
# Adresse ligne 1
address_line1 = self._get_societe_address_line1()
if address_line1:
y_emetteur -= 5 * mm
# Tronquer si trop long
if len(address_line1) > 45:
address_line1 = address_line1[:42] + "..."
self._draw_text(
col1_x, y_emetteur, address_line1, font_size=9, color=self.GRAY_600
)
# Adresse ligne 2 (CP + Ville)
address_line2 = self._get_societe_address_line2()
if address_line2:
y_emetteur -= 4 * mm
self._draw_text(
col1_x, y_emetteur, address_line2, font_size=9, color=self.GRAY_600
)
# Email
societe_email = self._get_societe_email()
if societe_email:
y_emetteur -= 5 * mm
self._draw_text(
col1_x, y_emetteur, societe_email, font_size=9, color=self.GRAY_600
)
# Téléphone (optionnel)
societe_phone = self._get_societe_phone()
if societe_phone:
y_emetteur -= 4 * mm
self._draw_text(
col1_x,
y_emetteur,
f"Tél: {societe_phone}",
font_size=8,
color=self.GRAY_500,
)
# === DESTINATAIRE ===
box_padding = 4 * mm
box_height = 26 * mm
box_y = y - 3 * mm
# Fond gris arrondi
self.pdf.setFillColor(self.GRAY_50)
self.pdf.roundRect(
col2_x - box_padding,
box_y - box_height,
col_width + 2 * box_padding,
box_height + box_padding,
3 * mm,
fill=1,
stroke=0,
)
self._draw_text(
col2_x, y, "DESTINATAIRE", font_size=8, bold=True, color=self.GRAY_400
)
y_dest = y - 6 * mm
client_name = (
self.doc.get("client_intitule") or self.doc.get("client_nom") or "Client"
)
self._draw_text(
col2_x, y_dest, client_name, font_size=10, bold=True, color=self.GRAY_800
)
y_dest -= 5 * mm
client_adresse = (
self.doc.get("client_adresse") or self.doc.get("adresse_livraison") or ""
)
if client_adresse:
if len(client_adresse) > 40:
client_adresse = client_adresse[:37] + "..."
self._draw_text(
col2_x, y_dest, client_adresse, font_size=9, color=self.GRAY_600
)
y_dest -= 4 * mm
client_cp = self.doc.get("client_cp") or ""
client_ville = self.doc.get("client_ville") or ""
if client_cp or client_ville:
ville_complete = f"{client_cp} {client_ville}".strip()
self._draw_text(
col2_x, y_dest, ville_complete, font_size=9, color=self.GRAY_600
)
return min(y_emetteur, box_y - box_height) - 15 * mm
def _draw_table_header(self, y: float) -> float:
col_des = self.margin
col_qte = self.margin + 95 * mm
col_pu = self.margin + 115 * mm
col_rem = self.margin + 140 * mm
col_tva = self.margin + 157 * mm
col_mt = self.page_width - self.margin
self.pdf.setStrokeColor(self.GRAY_200)
self.pdf.setLineWidth(0.5)
self.pdf.line(
self.margin, y - 3 * mm, self.page_width - self.margin, y - 3 * mm
)
self._draw_text(
col_des, y, "Désignation", font_size=9, bold=True, color=self.GRAY_700
)
self._draw_text(
col_qte,
y,
"Qté",
font_size=9,
bold=True,
color=self.GRAY_700,
align="right",
)
self._draw_text(
col_pu,
y,
"Prix Unit. HT",
font_size=9,
bold=True,
color=self.GRAY_700,
align="right",
)
self._draw_text(
col_rem,
y,
"Remise",
font_size=9,
bold=True,
color=self.GRAY_700,
align="right",
)
self._draw_text(
col_tva,
y,
"TVA",
font_size=9,
bold=True,
color=self.GRAY_700,
align="right",
)
self._draw_text(
col_mt,
y,
"Montant HT",
font_size=9,
bold=True,
color=self.GRAY_700,
align="right",
)
return y - 8 * mm
def _draw_line_item(self, y: float, ligne: dict, alternate: bool = False) -> float:
col_des = self.margin
col_qte = self.margin + 95 * mm
col_pu = self.margin + 115 * mm
col_rem = self.margin + 140 * mm
col_tva = self.margin + 157 * mm
col_mt = self.page_width - self.margin
if alternate:
self.pdf.setFillColor(self.GRAY_50)
self.pdf.rect(
self.margin - 2 * mm,
y - 5 * mm,
self.content_width + 4 * mm,
12 * mm,
fill=1,
stroke=0,
)
designation = (
ligne.get("designation")
or ligne.get("designation_article")
or ligne.get("article_designation")
or ""
)
if len(designation) > 50:
designation = designation[:47] + "..."
self._draw_text(
col_des, y, designation, font_size=9, bold=True, color=self.GRAY_800
)
description = ligne.get("description", "")
line_height = 6 * mm
if description and description != designation:
y -= 4 * mm
if len(description) > 60:
description = description[:57] + "..."
self._draw_text(col_des, y, description, font_size=8, color=self.GRAY_500)
line_height += 4 * mm
quantite = ligne.get("quantite") or 0
self._draw_text(
col_qte,
y + (4 * mm if description and description != designation else 0),
str(quantite),
font_size=9,
color=self.GRAY_800,
align="right",
)
prix_unit = ligne.get("prix_unitaire_ht") or ligne.get("prix_unitaire") or 0
self._draw_text(
col_pu,
y + (4 * mm if description and description != designation else 0),
f"{prix_unit:.2f}",
font_size=9,
color=self.GRAY_800,
align="right",
)
remise = ligne.get("remise_pourcentage") or ligne.get("remise") or 0
remise_text = f"{remise:.0f}%" if remise > 0 else ""
remise_color = self.SAGE_GREEN_DARK if remise > 0 else self.GRAY_400
self._draw_text(
col_rem,
y + (4 * mm if description and description != designation else 0),
remise_text,
font_size=9,
color=remise_color,
align="right",
)
# TVA
taux_tva = ligne.get("taux_taxe1") or ligne.get("taux_tva") or 20
self._draw_text(
col_tva,
y + (4 * mm if description and description != designation else 0),
f"{taux_tva:.0f}%",
font_size=9,
color=self.GRAY_600,
align="right",
)
montant_ht = ligne.get("montant_ligne_ht") or ligne.get("montant_ht") or 0
self._draw_text(
col_mt,
y + (4 * mm if description and description != designation else 0),
f"{montant_ht:.2f}",
font_size=9,
bold=True,
color=self.GRAY_800,
align="right",
)
return y - line_height - 2 * mm
def _draw_totals(self, y: float) -> float:
totals_x = self.page_width - self.margin - 60 * mm
right_x = self.page_width - self.margin
self.pdf.setStrokeColor(self.GRAY_200)
self.pdf.setLineWidth(0.5)
self.pdf.line(totals_x - 5 * mm, y + 5 * mm, right_x, y + 5 * mm)
self._draw_text(totals_x, y, "Total HT", font_size=9, color=self.GRAY_600)
total_ht = self.doc.get("total_ht_net") or self.doc.get("total_ht") or 0
self._draw_text(
right_x,
y,
f"{total_ht:.2f}",
font_size=9,
color=self.GRAY_800,
align="right",
)
y -= 6 * mm
remise_globale = (
self.doc.get("remise_globale") or self.doc.get("montant_remise") or 0
)
if remise_globale > 0:
self._draw_text(totals_x, y, "Remise", font_size=9, color=self.GRAY_600)
self._draw_text(
right_x,
y,
f"-{remise_globale:.2f}",
font_size=9,
color=self.SAGE_GREEN_DARK,
align="right",
)
y -= 6 * mm
# TVA
total_ttc = self.doc.get("total_ttc") or 0
tva = total_ttc - total_ht + remise_globale
taux_tva_global = self.doc.get("taux_tva_principal") or 20
self._draw_text(
totals_x,
y,
f"TVA ({taux_tva_global:.0f}%)",
font_size=9,
color=self.GRAY_600,
)
self._draw_text(
right_x, y, f"{tva:.2f}", font_size=9, color=self.GRAY_800, align="right"
)
y -= 10 * mm
self.pdf.setStrokeColor(self.GRAY_300)
self.pdf.setLineWidth(1)
self.pdf.line(totals_x - 5 * mm, y + 4 * mm, right_x, y + 4 * mm)
self._draw_text(
totals_x, y, "Net à payer", font_size=12, bold=True, color=self.SAGE_GREEN
)
self._draw_text(
right_x,
y,
f"{total_ttc:.2f}",
font_size=12,
bold=True,
color=self.SAGE_GREEN,
align="right",
)
return y - 15 * mm
def _draw_notes(self, y: float) -> float:
notes = (
self.doc.get("notes_publique")
or self.doc.get("notes")
or self.doc.get("commentaire")
)
if not notes:
return y
self.pdf.setStrokeColor(self.GRAY_200)
self.pdf.setLineWidth(0.5)
self.pdf.line(
self.margin, y + 5 * mm, self.page_width - self.margin, y + 5 * mm
)
self._draw_text(
self.margin,
y,
"NOTES & CONDITIONS",
font_size=8,
bold=True,
color=self.GRAY_400,
)
y -= 5 * mm
for line in notes.split("\n"):
if y < 30 * mm:
break
line = line.strip()
if line:
if len(line) > 90:
line = line[:87] + "..."
self._draw_text(self.margin, y, line, font_size=8, color=self.GRAY_600)
y -= 4 * mm
return y
def _draw_footer(self):
footer_y = 15 * mm
# Informations légales de la société
legal_parts = []
societe_name = self._get_societe_name()
if societe_name and societe_name != self.DEFAULT_COMPANY_NAME:
legal_parts.append(societe_name)
siret = self._get_societe_siret()
if siret:
legal_parts.append(f"SIRET: {siret}")
tva = self._get_societe_tva()
if tva:
legal_parts.append(f"TVA: {tva}")
# Forme juridique et capital si disponibles
if self.societe_info:
forme = self._get_societe_field("forme_juridique", "")
capital = getattr(self.societe_info, "capital", 0)
if forme and capital > 0:
legal_parts.append(
f"{forme} au capital de {capital:,.0f}".replace(",", " ")
)
# Dessiner les informations légales
if legal_parts:
legal_text = "".join(legal_parts)
# Tronquer si trop long
if len(legal_text) > 100:
legal_text = legal_text[:97] + "..."
self._draw_text(
self.page_width / 2,
footer_y + 4 * mm,
legal_text,
font_size=7,
color=self.GRAY_400,
align="center",
)
# Pagination
page_text = f"Page {self.page_number} / {self.total_pages}"
self._draw_text(
self.page_width / 2,
footer_y,
page_text,
font_size=8,
color=self.GRAY_400,
align="center",
)
def generate(self) -> bytes:
self.pdf = canvas.Canvas(self.buffer, pagesize=A4)
self.current_y = self._draw_header()
self.current_y = self._draw_addresses(self.current_y)
self.current_y = self._draw_table_header(self.current_y)
lignes = self.doc.get("lignes", [])
if not lignes:
self.pdf.setFont(self._get_font(), 9)
self.pdf.setFillColor(self.GRAY_400)
self.pdf.drawCentredString(
self.page_width / 2, self.current_y, "Aucune ligne"
)
self.current_y -= 15 * mm
else:
for idx, ligne in enumerate(lignes):
if self.current_y < 70 * mm:
self._draw_footer()
self._new_page()
self.current_y = self._draw_table_header(
self.page_height - self.margin - 10 * mm
)
self.current_y = self._draw_line_item(
self.current_y, ligne, alternate=(idx % 2 == 1)
)
if self.current_y < 60 * mm:
self._draw_footer()
self._new_page()
self.current_y = self.page_height - self.margin - 20 * mm
self.current_y = self._draw_totals(self.current_y)
self.current_y = self._draw_notes(self.current_y)
self._draw_footer()
self.pdf.save()
self.buffer.seek(0)
return self.buffer.read()
email_queue = EmailQueue()