import threading import queue import asyncio from datetime import datetime, timedelta import smtplib import ssl import socket import os from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.application import MIMEApplication from config.config import settings import logging from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from io import BytesIO from reportlab.lib.units import mm from reportlab.lib.colors import HexColor, Color from PIL import Image logger = logging.getLogger(__name__) FONT_PATH = os.getenv("SAGE_FONT_PATH", "/sage/pdfs/Sage_Text-Medium.ttf") LOGO_PATH = os.getenv("SAGE_LOGO_PATH", "/sage/pdfs/logo.png") FONT_FALLBACK_PATHS = [ "/sage/pdfs/Sage_Text-Medium.ttf", "./sage/pdfs/Sage_Text-Medium.ttf", "/sage/pdfs/Sage_Text-Bold.ttf", "./sage/pdfs/Sage_Text-Bold.ttf", ] LOGO_FALLBACK_PATHS = [ "/sage/pdfs/logo.png", "./sage/pdfs/logo.png", ] _sage_font_registered = False def _find_file(primary_path: str, fallback_paths: list) -> str | None: """Recherche un fichier dans les chemins possibles.""" all_paths = [primary_path] + fallback_paths for path in all_paths: if path and os.path.exists(path): return path return None def _register_sage_font(): """Enregistre la police Sage si disponible.""" global _sage_font_registered if _sage_font_registered: return True font_path = _find_file(FONT_PATH, FONT_FALLBACK_PATHS) if font_path: try: pdfmetrics.registerFont(TTFont("SageText", font_path)) pdfmetrics.registerFont(TTFont("SageText-Bold", font_path)) _sage_font_registered = True logger.info(f"Police Sage enregistrée: {font_path}") return True except Exception as e: logger.warning(f"Impossible d'enregistrer la police Sage: {e}") logger.info("Utilisation de Helvetica comme police de fallback") return False # ============================================================================ # HELPERS POUR GESTION DES LOCKS SQLite # ============================================================================ async def execute_with_retry( session, operation, max_retries: int = 5, base_delay: float = 0.1, max_delay: float = 2.0, ): """ Exécute une opération async avec retry en cas de lock SQLite. Args: session: Session SQLAlchemy async operation: Coroutine à exécuter max_retries: Nombre max de tentatives base_delay: Délai initial entre les tentatives (secondes) max_delay: Délai maximum entre les tentatives """ import sqlite3 from sqlalchemy.exc import OperationalError last_exception = None for attempt in range(max_retries): try: result = await operation() return result except OperationalError as e: last_exception = e # Vérifier si c'est un lock SQLite if "database is locked" in str(e).lower(): delay = min(base_delay * (2**attempt), max_delay) # Ajouter un jitter aléatoire pour éviter les collisions import random delay += random.uniform(0, delay * 0.1) logger.warning( f"SQLite locked (tentative {attempt + 1}/{max_retries}), " f"retry dans {delay:.2f}s" ) await asyncio.sleep(delay) else: # Autre erreur OperationalError, ne pas retry raise except Exception as e: # Autres exceptions, ne pas retry raise # Toutes les tentatives ont échoué logger.error(f"Échec après {max_retries} tentatives: {last_exception}") raise last_exception class EmailQueue: def __init__(self): self.queue = queue.Queue() self.workers = [] self.running = False self.session_factory = None self.sage_client = None # Lock pour synchroniser les accès DB dans le worker self._db_lock = asyncio.Lock() def start(self, num_workers: int = 2): # Réduire le nombre de workers pour SQLite if self.running: return self.running = True for i in range(num_workers): worker = threading.Thread( target=self._worker, name=f"EmailWorker-{i}", daemon=True ) worker.start() self.workers.append(worker) logger.info(f"Queue email démarrée avec {num_workers} worker(s)") def stop(self): self.running = False try: self.queue.join() except Exception: pass def enqueue(self, email_log_id: str, delay_seconds: float = 0): """ Ajoute un email à la queue avec un délai optionnel. Args: email_log_id: ID de l'email log delay_seconds: Délai avant traitement (pour éviter les conflits) """ if delay_seconds > 0: timer = threading.Timer(delay_seconds, lambda: self.queue.put(email_log_id)) timer.daemon = True timer.start() logger.debug(f"Email {email_log_id} planifié dans {delay_seconds}s") else: self.queue.put(email_log_id) def _worker(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: while self.running: try: email_log_id = self.queue.get(timeout=1) loop.run_until_complete(self._process_email(email_log_id)) self.queue.task_done() except queue.Empty: continue except Exception as e: logger.error(f"Erreur worker: {e}", exc_info=True) try: self.queue.task_done() except Exception: pass finally: loop.close() async def _process_email(self, email_log_id: str): from database import EmailLog, StatutEmail from sqlalchemy import select from sqlalchemy.exc import OperationalError if not self.session_factory: logger.error("session_factory non configuré") return max_db_retries = 5 for db_attempt in range(max_db_retries): try: async with self.session_factory() as session: # Lecture de l'email log avec retry async def fetch_email(): result = await session.execute( select(EmailLog).where(EmailLog.id == email_log_id) ) return result.scalar_one_or_none() email_log = await execute_with_retry(session, fetch_email) if not email_log: logger.error(f"Email log {email_log_id} introuvable") return # Mise à jour du statut avec retry async def update_status_en_cours(): email_log.statut = StatutEmail.EN_COURS email_log.nb_tentatives += 1 await session.commit() await execute_with_retry(session, update_status_en_cours) try: # Envoi de l'email (pas de DB ici) await self._send_with_retry(email_log) # Mise à jour succès avec retry async def update_status_success(): email_log.statut = StatutEmail.ENVOYE email_log.date_envoi = datetime.now() email_log.derniere_erreur = None await session.commit() await execute_with_retry(session, update_status_success) logger.info(f"✅ Email envoyé: {email_log.destinataire}") except Exception as e: error_msg = str(e) logger.error(f"Erreur envoi email: {error_msg}") # Mise à jour erreur avec retry async def update_status_error(): email_log.statut = StatutEmail.ERREUR email_log.derniere_erreur = error_msg[:1000] if email_log.nb_tentatives < settings.max_retry_attempts: delay = settings.retry_delay_seconds * ( 2 ** (email_log.nb_tentatives - 1) ) email_log.prochain_retry = datetime.now() + timedelta( seconds=delay ) await session.commit() await execute_with_retry(session, update_status_error) # Replanifier si tentatives restantes if email_log.nb_tentatives < settings.max_retry_attempts: delay = settings.retry_delay_seconds * ( 2 ** (email_log.nb_tentatives - 1) ) self.enqueue(email_log_id, delay_seconds=delay) logger.info( f"Email {email_log_id} replanifié dans {delay}s" ) # Sortir de la boucle de retry si tout s'est bien passé return except OperationalError as e: if "database is locked" in str(e).lower(): delay = 0.5 * (2**db_attempt) logger.warning( f"DB locked lors du traitement email {email_log_id}, " f"tentative {db_attempt + 1}/{max_db_retries}, " f"retry dans {delay:.1f}s" ) await asyncio.sleep(delay) else: logger.error(f"Erreur DB non récupérable: {e}") raise except Exception as e: logger.error(f"Erreur inattendue traitement email: {e}", exc_info=True) raise # Si on arrive ici, toutes les tentatives ont échoué logger.error( f"Échec définitif traitement email {email_log_id} après {max_db_retries} tentatives DB" ) # Replanifier l'email pour plus tard self.enqueue(email_log_id, delay_seconds=30) async def _send_with_retry(self, email_log): msg = MIMEMultipart() msg["From"] = settings.smtp_from msg["To"] = email_log.destinataire msg["Subject"] = email_log.sujet msg.attach(MIMEText(email_log.corps_html, "html")) # Attachement des PDFs if email_log.document_ids: document_ids = email_log.document_ids.split(",") type_doc = email_log.type_document for doc_id in document_ids: doc_id = doc_id.strip() if not doc_id: continue try: pdf_bytes = await asyncio.to_thread( self._generate_pdf, doc_id, type_doc ) if pdf_bytes: part = MIMEApplication(pdf_bytes, Name=f"{doc_id}.pdf") part["Content-Disposition"] = ( f'attachment; filename="{doc_id}.pdf"' ) msg.attach(part) except Exception as e: logger.error(f"Erreur génération PDF {doc_id}: {e}") # Envoi SMTP await asyncio.to_thread(self._send_smtp, msg) def _send_smtp(self, msg): server = None try: # Résolution DNS socket.getaddrinfo(settings.smtp_host, settings.smtp_port) # Connexion server = smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30) # EHLO server.ehlo() # STARTTLS if settings.smtp_use_tls: if server.has_extn("STARTTLS"): context = ssl.create_default_context() server.starttls(context=context) server.ehlo() # Authentification if settings.smtp_user and settings.smtp_password: server.login(settings.smtp_user, settings.smtp_password) # Envoi refused = server.send_message(msg) if refused: raise Exception(f"Destinataires refusés: {refused}") # Fermeture server.quit() logger.info(f"SMTP: Email envoyé à {msg['To']}") except Exception as e: if server: try: server.quit() except Exception: pass raise Exception(f"Erreur SMTP: {str(e)}") def _generate_pdf(self, doc_id: str, type_doc: int) -> bytes: if not self.sage_client: raise Exception("sage_client non disponible") try: doc = self.sage_client.lire_document(doc_id, type_doc) except Exception as e: raise Exception(f"Document {doc_id} inaccessible : {e}") if not doc: raise Exception(f"Document {doc_id} introuvable") # Récupérer les informations de la société émettrice societe_info = None try: societe_info = self.sage_client.lire_informations_societe() # Log selon le type (dict ou objet) if societe_info: if isinstance(societe_info, dict): raison = societe_info.get("raison_sociale", "N/A") else: raison = getattr(societe_info, "raison_sociale", "N/A") logger.debug(f"Infos société récupérées: {raison}") except Exception as e: logger.warning(f"Impossible de récupérer les infos société: {e}") # Générer le PDF avec le nouveau générateur generator = SagePDFGenerator(doc, type_doc, societe_info=societe_info) return generator.generate() class SagePDFGenerator: # Couleurs Sage SAGE_GREEN = HexColor("#00D639") SAGE_GREEN_DARK = HexColor("#00B830") GRAY_50 = HexColor("#F9FAFB") GRAY_100 = HexColor("#F3F4F6") GRAY_200 = HexColor("#E5E7EB") GRAY_300 = HexColor("#D1D5DB") GRAY_400 = HexColor("#9CA3AF") GRAY_500 = HexColor("#6B7280") GRAY_600 = HexColor("#4B5563") GRAY_700 = HexColor("#374151") GRAY_800 = HexColor("#1F2937") GRAY_900 = HexColor("#111827") # Valeurs par défaut (fallback si société non disponible) DEFAULT_COMPANY_NAME = "Entreprise" DEFAULT_COMPANY_ADDRESS = "" DEFAULT_COMPANY_CITY = "" DEFAULT_COMPANY_EMAIL = "" # Labels des types de documents TYPE_LABELS = { 0: "Devis", 10: "Commande", 30: "Bon de Livraison", 50: "Avoir", 60: "Facture", } def __init__(self, doc_data: dict, type_doc: int, societe_info=None): self.doc = doc_data self.type_doc = type_doc self.type_label = self.TYPE_LABELS.get(type_doc, "Document") self.societe_info = societe_info # Configuration de la page self.page_width, self.page_height = A4 self.margin = 15 * mm self.content_width = self.page_width - 2 * self.margin # État du générateur self.buffer = BytesIO() self.pdf = None self.current_y = 0 self.page_number = 1 self.total_pages = 1 # Sera calculé # Initialiser les polices _register_sage_font() self.use_sage_font = _sage_font_registered def _get_societe_field(self, field: str, default: str = "") -> str: """Récupère un champ de la société avec fallback (supporte dict et objet).""" if self.societe_info is None: return default # Support dict ou objet Pydantic if isinstance(self.societe_info, dict): value = self.societe_info.get(field) else: value = getattr(self.societe_info, field, None) return value if value is not None else default def _get_societe_name(self) -> str: """Retourne la raison sociale de la société.""" return self._get_societe_field("raison_sociale", self.DEFAULT_COMPANY_NAME) def _get_societe_address_line1(self) -> str: """Retourne la première ligne d'adresse.""" adresse = self._get_societe_field("adresse", "") complement = self._get_societe_field("complement_adresse", "") if adresse and complement: return f"{adresse}, {complement}" return adresse or complement or self.DEFAULT_COMPANY_ADDRESS def _get_societe_address_line2(self) -> str: """Retourne la deuxième ligne d'adresse (CP + Ville + Pays).""" cp = self._get_societe_field("code_postal", "") ville = self._get_societe_field("ville", "") pays = self._get_societe_field("pays", "") parts = [] if cp: parts.append(cp) if ville: parts.append(ville) city_line = " ".join(parts) if pays and pays.lower() not in ["france", "fr"]: city_line = f"{city_line}, {pays}" if city_line else pays return city_line or self.DEFAULT_COMPANY_CITY def _get_societe_email(self) -> str: """Retourne l'email de la société.""" # Priorité à email_societe, puis email email = self._get_societe_field("email_societe", "") if not email: email = self._get_societe_field("email", "") return email or self.DEFAULT_COMPANY_EMAIL def _get_societe_phone(self) -> str: """Retourne le téléphone de la société.""" return self._get_societe_field("telephone", "") def _get_societe_siret(self) -> str: """Retourne le SIRET de la société.""" return self._get_societe_field("siret", "") def _get_societe_tva(self) -> str: """Retourne le numéro de TVA de la société.""" return self._get_societe_field("numero_tva", "") def _get_font(self, bold: bool = False) -> str: """Retourne le nom de la police à utiliser.""" if self.use_sage_font: return "SageText-Bold" if bold else "SageText" return "Helvetica-Bold" if bold else "Helvetica" def _draw_text( self, x: float, y: float, text: str, font_size: int = 9, bold: bool = False, color: Color = None, align: str = "left", ): self.pdf.setFont(self._get_font(bold), font_size) if color: self.pdf.setFillColor(color) if align == "right": self.pdf.drawRightString(x, y, str(text)) elif align == "center": self.pdf.drawCentredString(x, y, str(text)) else: self.pdf.drawString(x, y, str(text)) def _draw_logo( self, x: float, y: float, max_width: float = 50 * mm, max_height: float = 20 * mm, ): """Dessine le logo Sage ou le nom de l'entreprise.""" logo_path = _find_file(LOGO_PATH, LOGO_FALLBACK_PATHS) if logo_path: try: with Image.open(logo_path) as img: img_width, img_height = img.size ratio = min(max_width / img_width, max_height / img_height) draw_width = img_width * ratio draw_height = img_height * ratio self.pdf.drawImage( logo_path, x, y - draw_height, width=draw_width, height=draw_height, preserveAspectRatio=True, mask="auto", ) return draw_height except Exception as e: logger.warning(f"Impossible de charger le logo: {e}") self._draw_text( x, y - 8 * mm, "Sage", font_size=24, bold=True, color=self.SAGE_GREEN ) return 10 * mm def _new_page(self): """Crée une nouvelle page.""" if self.pdf: self.pdf.showPage() self.page_number += 1 self.current_y = self.page_height - self.margin def _draw_header(self): y = self.page_height - self.margin logo_height = self._draw_logo( self.margin, y, max_width=55 * mm, max_height=22 * mm ) right_x = self.page_width - self.margin numero = self.doc.get("numero") or "BROUILLON" self._draw_text( right_x, y - 6 * mm, numero.upper(), font_size=20, bold=True, color=self.GRAY_800, align="right", ) date_str = self.doc.get("date") or datetime.now().strftime("%d/%m/%Y") self._draw_text( right_x, y - 14 * mm, f"Date : {date_str}", font_size=9, color=self.GRAY_600, align="right", ) date_validite = ( self.doc.get("date_livraison") or self.doc.get("date_echeance") or date_str ) self._draw_text( right_x, y - 20 * mm, f"Validité : {date_validite}", font_size=9, color=self.GRAY_600, align="right", ) reference = self.doc.get("reference") or "—" self._draw_text( right_x, y - 26 * mm, f"Réf : {reference}", font_size=9, color=self.GRAY_600, align="right", ) return y - max(logo_height + 10 * mm, 35 * mm) def _draw_addresses(self, y: float) -> float: col1_x = self.margin col2_x = self.margin + self.content_width / 2 + 8 * mm col_width = self.content_width / 2 - 8 * mm self._draw_text( col1_x, y, "ÉMETTEUR", font_size=8, bold=True, color=self.GRAY_400 ) # === INFORMATIONS SOCIÉTÉ (dynamiques) === y_emetteur = y - 6 * mm self._draw_text( col1_x, y_emetteur, self._get_societe_name(), font_size=10, bold=True, color=self.GRAY_800, ) # Adresse ligne 1 address_line1 = self._get_societe_address_line1() if address_line1: y_emetteur -= 5 * mm # Tronquer si trop long if len(address_line1) > 45: address_line1 = address_line1[:42] + "..." self._draw_text( col1_x, y_emetteur, address_line1, font_size=9, color=self.GRAY_600 ) # Adresse ligne 2 (CP + Ville) address_line2 = self._get_societe_address_line2() if address_line2: y_emetteur -= 4 * mm self._draw_text( col1_x, y_emetteur, address_line2, font_size=9, color=self.GRAY_600 ) # Email societe_email = self._get_societe_email() if societe_email: y_emetteur -= 5 * mm self._draw_text( col1_x, y_emetteur, societe_email, font_size=9, color=self.GRAY_600 ) # Téléphone (optionnel) societe_phone = self._get_societe_phone() if societe_phone: y_emetteur -= 4 * mm self._draw_text( col1_x, y_emetteur, f"Tél: {societe_phone}", font_size=8, color=self.GRAY_500, ) # === DESTINATAIRE === box_padding = 4 * mm box_height = 26 * mm box_y = y - 3 * mm # Fond gris arrondi self.pdf.setFillColor(self.GRAY_50) self.pdf.roundRect( col2_x - box_padding, box_y - box_height, col_width + 2 * box_padding, box_height + box_padding, 3 * mm, fill=1, stroke=0, ) self._draw_text( col2_x, y, "DESTINATAIRE", font_size=8, bold=True, color=self.GRAY_400 ) y_dest = y - 6 * mm client_name = ( self.doc.get("client_intitule") or self.doc.get("client_nom") or "Client" ) self._draw_text( col2_x, y_dest, client_name, font_size=10, bold=True, color=self.GRAY_800 ) y_dest -= 5 * mm client_adresse = ( self.doc.get("client_adresse") or self.doc.get("adresse_livraison") or "" ) if client_adresse: if len(client_adresse) > 40: client_adresse = client_adresse[:37] + "..." self._draw_text( col2_x, y_dest, client_adresse, font_size=9, color=self.GRAY_600 ) y_dest -= 4 * mm client_cp = self.doc.get("client_cp") or "" client_ville = self.doc.get("client_ville") or "" if client_cp or client_ville: ville_complete = f"{client_cp} {client_ville}".strip() self._draw_text( col2_x, y_dest, ville_complete, font_size=9, color=self.GRAY_600 ) return min(y_emetteur, box_y - box_height) - 15 * mm def _draw_table_header(self, y: float) -> float: col_des = self.margin col_qte = self.margin + 95 * mm col_pu = self.margin + 115 * mm col_rem = self.margin + 140 * mm col_tva = self.margin + 157 * mm col_mt = self.page_width - self.margin self.pdf.setStrokeColor(self.GRAY_200) self.pdf.setLineWidth(0.5) self.pdf.line( self.margin, y - 3 * mm, self.page_width - self.margin, y - 3 * mm ) self._draw_text( col_des, y, "Désignation", font_size=9, bold=True, color=self.GRAY_700 ) self._draw_text( col_qte, y, "Qté", font_size=9, bold=True, color=self.GRAY_700, align="right", ) self._draw_text( col_pu, y, "Prix Unit. HT", font_size=9, bold=True, color=self.GRAY_700, align="right", ) self._draw_text( col_rem, y, "Remise", font_size=9, bold=True, color=self.GRAY_700, align="right", ) self._draw_text( col_tva, y, "TVA", font_size=9, bold=True, color=self.GRAY_700, align="right", ) self._draw_text( col_mt, y, "Montant HT", font_size=9, bold=True, color=self.GRAY_700, align="right", ) return y - 8 * mm def _draw_line_item(self, y: float, ligne: dict, alternate: bool = False) -> float: col_des = self.margin col_qte = self.margin + 95 * mm col_pu = self.margin + 115 * mm col_rem = self.margin + 140 * mm col_tva = self.margin + 157 * mm col_mt = self.page_width - self.margin if alternate: self.pdf.setFillColor(self.GRAY_50) self.pdf.rect( self.margin - 2 * mm, y - 5 * mm, self.content_width + 4 * mm, 12 * mm, fill=1, stroke=0, ) designation = ( ligne.get("designation") or ligne.get("designation_article") or ligne.get("article_designation") or "" ) if len(designation) > 50: designation = designation[:47] + "..." self._draw_text( col_des, y, designation, font_size=9, bold=True, color=self.GRAY_800 ) description = ligne.get("description", "") line_height = 6 * mm if description and description != designation: y -= 4 * mm if len(description) > 60: description = description[:57] + "..." self._draw_text(col_des, y, description, font_size=8, color=self.GRAY_500) line_height += 4 * mm quantite = ligne.get("quantite") or 0 self._draw_text( col_qte, y + (4 * mm if description and description != designation else 0), str(quantite), font_size=9, color=self.GRAY_800, align="right", ) prix_unit = ligne.get("prix_unitaire_ht") or ligne.get("prix_unitaire") or 0 self._draw_text( col_pu, y + (4 * mm if description and description != designation else 0), f"{prix_unit:.2f} €", font_size=9, color=self.GRAY_800, align="right", ) remise = ligne.get("remise_pourcentage") or ligne.get("remise") or 0 remise_text = f"{remise:.0f}%" if remise > 0 else "—" remise_color = self.SAGE_GREEN_DARK if remise > 0 else self.GRAY_400 self._draw_text( col_rem, y + (4 * mm if description and description != designation else 0), remise_text, font_size=9, color=remise_color, align="right", ) # TVA taux_tva = ligne.get("taux_taxe1") or ligne.get("taux_tva") or 20 self._draw_text( col_tva, y + (4 * mm if description and description != designation else 0), f"{taux_tva:.0f}%", font_size=9, color=self.GRAY_600, align="right", ) montant_ht = ligne.get("montant_ligne_ht") or ligne.get("montant_ht") or 0 self._draw_text( col_mt, y + (4 * mm if description and description != designation else 0), f"{montant_ht:.2f} €", font_size=9, bold=True, color=self.GRAY_800, align="right", ) return y - line_height - 2 * mm def _draw_totals(self, y: float) -> float: totals_x = self.page_width - self.margin - 60 * mm right_x = self.page_width - self.margin self.pdf.setStrokeColor(self.GRAY_200) self.pdf.setLineWidth(0.5) self.pdf.line(totals_x - 5 * mm, y + 5 * mm, right_x, y + 5 * mm) self._draw_text(totals_x, y, "Total HT", font_size=9, color=self.GRAY_600) total_ht = self.doc.get("total_ht_net") or self.doc.get("total_ht") or 0 self._draw_text( right_x, y, f"{total_ht:.2f} €", font_size=9, color=self.GRAY_800, align="right", ) y -= 6 * mm remise_globale = ( self.doc.get("remise_globale") or self.doc.get("montant_remise") or 0 ) if remise_globale > 0: self._draw_text(totals_x, y, "Remise", font_size=9, color=self.GRAY_600) self._draw_text( right_x, y, f"-{remise_globale:.2f} €", font_size=9, color=self.SAGE_GREEN_DARK, align="right", ) y -= 6 * mm # TVA total_ttc = self.doc.get("total_ttc") or 0 tva = total_ttc - total_ht + remise_globale taux_tva_global = self.doc.get("taux_tva_principal") or 20 self._draw_text( totals_x, y, f"TVA ({taux_tva_global:.0f}%)", font_size=9, color=self.GRAY_600, ) self._draw_text( right_x, y, f"{tva:.2f} €", font_size=9, color=self.GRAY_800, align="right" ) y -= 10 * mm self.pdf.setStrokeColor(self.GRAY_300) self.pdf.setLineWidth(1) self.pdf.line(totals_x - 5 * mm, y + 4 * mm, right_x, y + 4 * mm) self._draw_text( totals_x, y, "Net à payer", font_size=12, bold=True, color=self.SAGE_GREEN ) self._draw_text( right_x, y, f"{total_ttc:.2f} €", font_size=12, bold=True, color=self.SAGE_GREEN, align="right", ) return y - 15 * mm def _draw_notes(self, y: float) -> float: notes = ( self.doc.get("notes_publique") or self.doc.get("notes") or self.doc.get("commentaire") ) if not notes: return y self.pdf.setStrokeColor(self.GRAY_200) self.pdf.setLineWidth(0.5) self.pdf.line( self.margin, y + 5 * mm, self.page_width - self.margin, y + 5 * mm ) self._draw_text( self.margin, y, "NOTES & CONDITIONS", font_size=8, bold=True, color=self.GRAY_400, ) y -= 5 * mm for line in notes.split("\n"): if y < 30 * mm: break line = line.strip() if line: if len(line) > 90: line = line[:87] + "..." self._draw_text(self.margin, y, line, font_size=8, color=self.GRAY_600) y -= 4 * mm return y def _draw_footer(self): footer_y = 15 * mm # Informations légales de la société legal_parts = [] societe_name = self._get_societe_name() if societe_name and societe_name != self.DEFAULT_COMPANY_NAME: legal_parts.append(societe_name) siret = self._get_societe_siret() if siret: legal_parts.append(f"SIRET: {siret}") tva = self._get_societe_tva() if tva: legal_parts.append(f"TVA: {tva}") # Forme juridique et capital si disponibles if self.societe_info: forme = self._get_societe_field("forme_juridique", "") capital = self._get_societe_field("capital", 0) # Convertir en float si c'est une string if isinstance(capital, str): try: capital = float(capital) except (ValueError, TypeError): capital = 0 if forme and capital and float(capital) > 0: legal_parts.append( f"{forme} au capital de {float(capital):,.0f} €".replace(",", " ") ) # Dessiner les informations légales if legal_parts: legal_text = " • ".join(legal_parts) # Tronquer si trop long if len(legal_text) > 100: legal_text = legal_text[:97] + "..." self._draw_text( self.page_width / 2, footer_y + 4 * mm, legal_text, font_size=7, color=self.GRAY_400, align="center", ) # Pagination page_text = f"Page {self.page_number} / {self.total_pages}" self._draw_text( self.page_width / 2, footer_y, page_text, font_size=8, color=self.GRAY_400, align="center", ) def generate(self) -> bytes: self.pdf = canvas.Canvas(self.buffer, pagesize=A4) self.current_y = self._draw_header() self.current_y = self._draw_addresses(self.current_y) self.current_y = self._draw_table_header(self.current_y) lignes = self.doc.get("lignes", []) if not lignes: self.pdf.setFont(self._get_font(), 9) self.pdf.setFillColor(self.GRAY_400) self.pdf.drawCentredString( self.page_width / 2, self.current_y, "Aucune ligne" ) self.current_y -= 15 * mm else: for idx, ligne in enumerate(lignes): if self.current_y < 70 * mm: self._draw_footer() self._new_page() self.current_y = self._draw_table_header( self.page_height - self.margin - 10 * mm ) self.current_y = self._draw_line_item( self.current_y, ligne, alternate=(idx % 2 == 1) ) if self.current_y < 60 * mm: self._draw_footer() self._new_page() self.current_y = self.page_height - self.margin - 20 * mm self.current_y = self._draw_totals(self.current_y) self.current_y = self._draw_notes(self.current_y) self._draw_footer() self.pdf.save() self.buffer.seek(0) return self.buffer.read() email_queue = EmailQueue()