diff --git a/main.py b/main.py index f061cbb..da6e9dc 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,7 @@ import time from config import settings, validate_settings from sage_connector import SageConnector import pyodbc +import os # ===================================================== # LOGGING @@ -1705,10 +1706,15 @@ def generer_pdf_document( raise HTTPException(500, str(e)) -@app.get("/sage/test-API-transformation") -def generer_pdf_transformation_native(numero: str, type_doc: int): +@app.get("/sage/object-exploration") +async def explorer_objets_impression_sage(modele:str="Devis client avec détail projet.bgc"): try: - expliration = sage.generer_pdf_transformation_native(numero, type_doc) + dossier = r"C:\Users\Public\Documents\Sage\Entreprise 100c\fr-FR\Documents standards\Gestion commerciale\Ventes" + chemin = os.path.join(dossier, modele) + + if not os.path.exists(chemin): + return {"error": f"Fichier non trouve: {modele}"} + expliration = sage.analyser_bgc_complet(chemin) if not expliration: raise HTTPException(404, f"ERROR") diff --git a/sage_connector.py b/sage_connector.py index ee9df35..b1bebcb 100644 --- a/sage_connector.py +++ b/sage_connector.py @@ -1,7 +1,7 @@ import win32com.client import pythoncom # AJOUT CRITIQUE from datetime import datetime, timedelta, date -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any import threading import time import logging @@ -11,6 +11,11 @@ from contextlib import contextmanager import pywintypes import os import glob +import tempfile +import logging +from dataclasses import dataclass, field +import zlib +import struct logger = logging.getLogger(__name__) @@ -3116,18 +3121,6 @@ class SageConnector: return datetime.now() def creer_devis_enrichi(self, devis_data: dict, forcer_brouillon: bool = False): - """ - Crée un devis dans Sage avec support de la référence et des dates. - - Args: - devis_data: dict contenant: - - client: {code: str} - - date_devis: str ou date - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - lignes: list[dict] - forcer_brouillon: bool, force le statut brouillon - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -4350,316 +4343,6 @@ class SageConnector: "documents_existants": [], } - def obtenir_chaine_transformation_complete(self, numero_document, type_document): - """ - Obtient toute la chaîne de transformation d'un document (ascendante et descendante). - - Exemple: Pour une commande BC00001 - - Ascendant: Devis DE00123 - - Descendant: BL BL00045, Facture FA00067 - - Returns: - dict: { - "document_actuel": {...}, - "origine": {...}, # Document source (peut être None) - "descendants": [...], # Documents créés à partir de celui-ci - "chaine_complete": [...] # Toute la chaîne du devis à la facture - } - """ - logger.info( - f"[CHAINE] Analyse chaîne pour {numero_document} (type {type_document})" - ) - - try: - with self._get_sql_connection() as conn: - cursor = conn.cursor() - - # ======================================== - # 1. Infos du document actuel - # ======================================== - cursor.execute( - """ - SELECT DO_Piece, DO_Type, DO_Ref, DO_Date, DO_TotalHT, DO_Statut - FROM F_DOCENTETE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (numero_document, type_document), - ) - doc_actuel_row = cursor.fetchone() - - if not doc_actuel_row: - raise ValueError( - f"Document {numero_document} (type {type_document}) introuvable" - ) - - doc_actuel = { - "numero": doc_actuel_row.DO_Piece.strip(), - "type": int(doc_actuel_row.DO_Type), - "type_libelle": self._get_type_libelle(int(doc_actuel_row.DO_Type)), - "ref": ( - doc_actuel_row.DO_Ref.strip() if doc_actuel_row.DO_Ref else "" - ), - "date": doc_actuel_row.DO_Date, - "total_ht": ( - float(doc_actuel_row.DO_TotalHT) - if doc_actuel_row.DO_TotalHT - else 0.0 - ), - "statut": ( - int(doc_actuel_row.DO_Statut) if doc_actuel_row.DO_Statut else 0 - ), - } - - # ======================================== - # 2. Chercher le document source (ascendant) - # ======================================== - origine = None - - # Chercher dans les lignes du document actuel - cursor.execute( - """ - SELECT DISTINCT - DL_PieceDE, DL_DateDE, - DL_PieceBC, DL_DateBC, - DL_PieceBL, DL_DateBL - FROM F_DOCLIGNE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (numero_document, type_document), - ) - lignes = cursor.fetchall() - - for ligne in lignes: - # Vérifier chaque champ de liaison possible - if ligne.DL_PieceDE and ligne.DL_PieceDE.strip(): - piece_source = ligne.DL_PieceDE.strip() - type_source = 0 # Devis - break - elif ligne.DL_PieceBC and ligne.DL_PieceBC.strip(): - piece_source = ligne.DL_PieceBC.strip() - type_source = 1 # Commande - break - elif ligne.DL_PieceBL and ligne.DL_PieceBL.strip(): - piece_source = ligne.DL_PieceBL.strip() - type_source = 3 # BL - break - else: - piece_source = None - - if piece_source: - # Récupérer les infos du document source - cursor.execute( - """ - SELECT DO_Piece, DO_Type, DO_Ref, DO_Date, DO_TotalHT, DO_Statut - FROM F_DOCENTETE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (piece_source, type_source), - ) - source_row = cursor.fetchone() - - if source_row: - origine = { - "numero": source_row.DO_Piece.strip(), - "type": int(source_row.DO_Type), - "type_libelle": self._get_type_libelle( - int(source_row.DO_Type) - ), - "ref": ( - source_row.DO_Ref.strip() if source_row.DO_Ref else "" - ), - "date": source_row.DO_Date, - "total_ht": ( - float(source_row.DO_TotalHT) - if source_row.DO_TotalHT - else 0.0 - ), - "statut": ( - int(source_row.DO_Statut) if source_row.DO_Statut else 0 - ), - } - logger.info( - f"[CHAINE] Origine trouvée: {origine['numero']} ({origine['type_libelle']})" - ) - - # ======================================== - # 3. Chercher les documents descendants - # ======================================== - verif = self.verifier_si_deja_transforme_sql( - numero_document, type_document - ) - descendants = verif["documents_cibles"] - - # Enrichir avec les détails - for desc in descendants: - cursor.execute( - """ - SELECT DO_Ref, DO_Date, DO_TotalHT - FROM F_DOCENTETE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (desc["numero"], desc["type"]), - ) - desc_row = cursor.fetchone() - if desc_row: - desc["ref"] = desc_row.DO_Ref.strip() if desc_row.DO_Ref else "" - desc["date"] = desc_row.DO_Date - desc["total_ht"] = ( - float(desc_row.DO_TotalHT) if desc_row.DO_TotalHT else 0.0 - ) - - # ======================================== - # 4. Construire la chaîne complète - # ======================================== - chaine_complete = [] - - # Remonter récursivement jusqu'au devis - doc_temp = origine - while doc_temp: - chaine_complete.insert(0, doc_temp) - # Chercher l'origine de ce document - verif_temp = self.verifier_si_deja_transforme_sql( - doc_temp["numero"], doc_temp["type"] - ) - # Remonter (chercher dans les lignes) - cursor.execute( - """ - SELECT DISTINCT DL_PieceDE, DL_PieceBC, DL_PieceBL - FROM F_DOCLIGNE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (doc_temp["numero"], doc_temp["type"]), - ) - ligne_temp = cursor.fetchone() - - if ligne_temp: - if ligne_temp.DL_PieceDE and ligne_temp.DL_PieceDE.strip(): - piece_parent = ligne_temp.DL_PieceDE.strip() - type_parent = 0 - elif ligne_temp.DL_PieceBC and ligne_temp.DL_PieceBC.strip(): - piece_parent = ligne_temp.DL_PieceBC.strip() - type_parent = 10 - elif ligne_temp.DL_PieceBL and ligne_temp.DL_PieceBL.strip(): - piece_parent = ligne_temp.DL_PieceBL.strip() - type_parent = 30 - else: - break - - # Récupérer infos parent - cursor.execute( - """ - SELECT DO_Piece, DO_Type, DO_Ref, DO_Date, DO_TotalHT, DO_Statut - FROM F_DOCENTETE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (piece_parent, type_parent), - ) - parent_row = cursor.fetchone() - - if parent_row: - doc_temp = { - "numero": parent_row.DO_Piece.strip(), - "type": int(parent_row.DO_Type), - "type_libelle": self._get_type_libelle( - int(parent_row.DO_Type) - ), - "ref": ( - parent_row.DO_Ref.strip() - if parent_row.DO_Ref - else "" - ), - "date": parent_row.DO_Date, - "total_ht": ( - float(parent_row.DO_TotalHT) - if parent_row.DO_TotalHT - else 0.0 - ), - "statut": ( - int(parent_row.DO_Statut) - if parent_row.DO_Statut - else 0 - ), - } - else: - break - else: - break - - # Ajouter le document actuel - chaine_complete.append(doc_actuel) - - # Ajouter les descendants récursivement - def ajouter_descendants(doc, profondeur=0): - if profondeur > 10: # Sécurité contre boucles infinies - return - verif = self.verifier_si_deja_transforme_sql( - doc["numero"], doc["type"] - ) - for desc in verif["documents_cibles"]: - # Récupérer infos complètes - cursor.execute( - """ - SELECT DO_Piece, DO_Type, DO_Ref, DO_Date, DO_TotalHT, DO_Statut - FROM F_DOCENTETE - WHERE DO_Piece = ? AND DO_Type = ? - """, - (desc["numero"], desc["type"]), - ) - desc_row = cursor.fetchone() - if desc_row: - desc_complet = { - "numero": desc_row.DO_Piece.strip(), - "type": int(desc_row.DO_Type), - "type_libelle": self._get_type_libelle( - int(desc_row.DO_Type) - ), - "ref": ( - desc_row.DO_Ref.strip() if desc_row.DO_Ref else "" - ), - "date": desc_row.DO_Date, - "total_ht": ( - float(desc_row.DO_TotalHT) - if desc_row.DO_TotalHT - else 0.0 - ), - "statut": ( - int(desc_row.DO_Statut) if desc_row.DO_Statut else 0 - ), - } - if desc_complet not in chaine_complete: - chaine_complete.append(desc_complet) - ajouter_descendants(desc_complet, profondeur + 1) - - ajouter_descendants(doc_actuel) - - # ======================================== - # Résultat - # ======================================== - logger.info( - f"[CHAINE] Chaîne complète: {len(chaine_complete)} document(s)" - ) - for i, doc in enumerate(chaine_complete): - logger.info( - f"[CHAINE] {i+1}. {doc['numero']} ({doc['type_libelle']}) - " - f"{doc['total_ht']}€ HT" - ) - - return { - "document_actuel": doc_actuel, - "origine": origine, - "descendants": descendants, - "chaine_complete": chaine_complete, - } - - except Exception as e: - logger.error(f"[CHAINE] Erreur analyse chaîne: {e}", exc_info=True) - return { - "document_actuel": None, - "origine": None, - "descendants": [], - "chaine_complete": [], - } - def _get_type_libelle(self, type_doc: int) -> str: """ Retourne le libellé d'un type de document. @@ -6314,18 +5997,6 @@ class SageConnector: raise RuntimeError(f"Échec création commande: {str(e)}") def modifier_commande(self, numero: str, commande_data: Dict) -> Dict: - """ - Modifie une commande existante dans Sage. - - Args: - numero: Numéro de la commande - commande_data: dict contenant les champs à modifier: - - date_commande: str ou date (optionnel) - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - statut: int (optionnel) - - lignes: list[dict] (optionnel) - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -6836,17 +6507,6 @@ class SageConnector: raise RuntimeError(f"Erreur Sage: {error_message}") def creer_livraison_enrichi(self, livraison_data: dict) -> Dict: - """ - Crée une livraison dans Sage avec support des dates. - - Args: - livraison_data: dict contenant: - - client: {code: str} - - date_livraison: str ou date (date du document) - - date_livraison_prevue: str ou date (optionnel - date prévue livraison client) - - reference: str (optionnel) - - lignes: list[dict] - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -7137,18 +6797,6 @@ class SageConnector: raise RuntimeError(f"Échec création livraison: {str(e)}") def modifier_livraison(self, numero: str, livraison_data: Dict) -> Dict: - """ - Modifie une livraison existante dans Sage. - - Args: - numero: Numéro de la livraison - livraison_data: dict contenant les champs à modifier: - - date_livraison: str ou date (optionnel - date du document) - - date_livraison_prevue: str ou date (optionnel - date prévue livraison client) - - reference: str (optionnel) - - statut: int (optionnel) - - lignes: list[dict] (optionnel) - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -7579,17 +7227,6 @@ class SageConnector: raise RuntimeError(f"Erreur Sage: {error_message}") def creer_avoir_enrichi(self, avoir_data: dict) -> Dict: - """ - Crée un avoir dans Sage avec support des dates. - - Args: - avoir_data: dict contenant: - - client: {code: str} - - date_avoir: str ou date - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - lignes: list[dict] - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -7866,18 +7503,6 @@ class SageConnector: raise RuntimeError(f"Échec création avoir: {str(e)}") def modifier_avoir(self, numero: str, avoir_data: Dict) -> Dict: - """ - Modifie un avoir existant dans Sage. - - Args: - numero: Numéro de l'avoir - avoir_data: dict contenant les champs à modifier: - - date_avoir: str ou date (optionnel) - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - statut: int (optionnel) - - lignes: list[dict] (optionnel) - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -8399,17 +8024,6 @@ class SageConnector: raise RuntimeError(f"Erreur Sage: {error_message}") def creer_facture_enrichi(self, facture_data: dict) -> Dict: - """ - Crée une facture dans Sage avec support des dates. - - Args: - facture_data: dict contenant: - - client: {code: str} - - date_facture: str ou date - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - lignes: list[dict] - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -8748,18 +8362,6 @@ class SageConnector: raise RuntimeError(f"Échec création facture: {str(e)}") def modifier_facture(self, numero: str, facture_data: Dict) -> Dict: - """ - Modifie une facture existante dans Sage. - - Args: - numero: Numéro de la facture - facture_data: dict contenant les champs à modifier: - - date_facture: str ou date (optionnel) - - date_livraison: str ou date (optionnel) - - reference: str (optionnel) - - statut: int (optionnel) - - lignes: list[dict] (optionnel) - """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -10367,28 +9969,6 @@ class SageConnector: raise RuntimeError(f"Erreur technique Sage : {error_message}") def creer_famille(self, famille_data: dict) -> dict: - """ - Crée une nouvelle famille d'articles dans Sage 100c - - **RESTRICTION : Seules les familles de type DÉTAIL peuvent être créées** - Les familles de type Total doivent être créées manuellement dans Sage. - - Args: - famille_data: { - "code": str (obligatoire, max 18 car, ex: "ALIM"), - "intitule": str (obligatoire, max 69 car, ex: "Produits alimentaires"), - "type": int (IGNORÉ - toujours 0=Détail), - "compte_achat": str (optionnel, ex: "607000"), - "compte_vente": str (optionnel, ex: "707000") - } - - Returns: - dict: Famille créée avec tous ses attributs - - Raises: - ValueError: Si la famille existe déjà ou données invalides - RuntimeError: Si erreur technique Sage - """ with self._com_context(), self._lock_com: try: logger.info("[FAMILLE] === CRÉATION FAMILLE (TYPE DÉTAIL) ===") @@ -11868,339 +11448,6 @@ class SageConnector: logger.error(f"[STOCK] Erreur lecture : {e}", exc_info=True) raise ValueError(f"Erreur lecture stock : {str(e)}") - def verifier_stock_apres_mouvement( - self, article_ref: str, numero_mouvement: str - ) -> Dict: - try: - with self._com_context(), self._lock_com: - logger.info( - f"[DEBUG] Vérification mouvement {numero_mouvement} pour {article_ref}" - ) - - diagnostic = { - "article_ref": article_ref.upper(), - "numero_mouvement": numero_mouvement, - "mouvement_trouve": False, - "ar_ref_dans_ligne": None, - "quantite_ligne": 0, - "stock_actuel": 0, - "problemes": [], - } - - # ======================================== - # 1. VÉRIFIER LE DOCUMENT - # ======================================== - factory = self.cial.FactoryDocumentStock - - persist = None - index = 1 - - while index < 10000: - try: - persist_test = factory.List(index) - if persist_test is None: - break - - doc_test = win32com.client.CastTo( - persist_test, "IBODocumentStock3" - ) - doc_test.Read() - - if getattr(doc_test, "DO_Piece", "") == numero_mouvement: - persist = persist_test - diagnostic["mouvement_trouve"] = True - break - - index += 1 - except: - index += 1 - - if not persist: - diagnostic["problemes"].append( - f"Document {numero_mouvement} introuvable" - ) - return diagnostic - - doc = win32com.client.CastTo(persist, "IBODocumentStock3") - doc.Read() - - # ======================================== - # 2. VÉRIFIER LES LIGNES - # ======================================== - try: - factory_lignes = getattr(doc, "FactoryDocumentLigne", None) - if not factory_lignes: - factory_lignes = getattr(doc, "FactoryDocumentStockLigne", None) - - if factory_lignes: - idx = 1 - while idx <= 100: - try: - ligne_p = factory_lignes.List(idx) - if ligne_p is None: - break - - ligne = win32com.client.CastTo( - ligne_p, "IBODocumentLigne3" - ) - ligne.Read() - - ar_ref_ligne = getattr(ligne, "AR_Ref", "").strip() - - if ar_ref_ligne == article_ref.upper(): - diagnostic["ar_ref_dans_ligne"] = ar_ref_ligne - diagnostic["quantite_ligne"] = float( - getattr(ligne, "DL_Qte", 0) - ) - break - - idx += 1 - except: - idx += 1 - except Exception as e: - diagnostic["problemes"].append(f"Erreur lecture lignes : {e}") - - if not diagnostic["ar_ref_dans_ligne"]: - diagnostic["problemes"].append( - f"AR_Ref '{article_ref}' non trouvé dans les lignes du mouvement. " - f"L'article n'a pas été correctement lié." - ) - - # ======================================== - # 3. LIRE LE STOCK ACTUEL - # ======================================== - try: - stock_info = self.lire_stock_article(article_ref) - diagnostic["stock_actuel"] = stock_info["stock_total"] - except: - diagnostic["problemes"].append("Impossible de lire le stock actuel") - - # ======================================== - # 4. ANALYSE - # ======================================== - if diagnostic["ar_ref_dans_ligne"] and diagnostic["stock_actuel"] == 0: - diagnostic["problemes"].append( - "PROBLÈME : L'article est dans la ligne du mouvement, " - "mais le stock n'a pas été mis à jour. Cela indique un problème " - "avec la méthode SetDefaultArticle() ou la configuration Sage." - ) - - return diagnostic - - except Exception as e: - logger.error(f"[DEBUG] Erreur : {e}", exc_info=True) - raise - """ - Lit le stock d'un article - VERSION CORRIGÉE - - CORRECTIONS : - 1. Cherche d'abord via ArticleStock - 2. Puis via DepotStock si disponible - 3. Calcule le total même si aucun dépôt n'est trouvé - """ - try: - with self._com_context(), self._lock_com: - logger.info(f"[STOCK] Lecture stock article : {reference}") - - factory_article = self.cial.FactoryArticle - persist = factory_article.ReadReference(reference.upper()) - - if not persist: - raise ValueError(f"Article {reference} introuvable") - - article = win32com.client.CastTo(persist, "IBOArticle3") - article.Read() - - # Infos de base - ar_suivi = getattr(article, "AR_SuiviStock", 0) - - suivi_libelles = { - 0: "Aucun", - 1: "CMUP (sans lot)", - 2: "FIFO/LIFO (avec lot)", - } - - stock_info = { - "article": getattr(article, "AR_Ref", "").strip(), - "designation": getattr(article, "AR_Design", ""), - "stock_total": 0.0, - "suivi_stock": ar_suivi, - "suivi_libelle": suivi_libelles.get(ar_suivi, "Inconnu"), - "depots": [], - } - - # ======================================== - # MÉTHODE 1 : Via ArticleStock (global) - # ======================================== - stock_global_trouve = False - - try: - # Chercher dans ArticleStock (collection sur l'article) - if hasattr(article, "ArticleStock"): - article_stocks = article.ArticleStock - - if article_stocks: - try: - nb_stocks = article_stocks.Count - logger.info(f" ArticleStock.Count = {nb_stocks}") - - for i in range(1, nb_stocks + 1): - try: - stock_item = article_stocks.Item(i) - - qte = float( - getattr(stock_item, "AS_QteSto", 0.0) - ) - stock_info["stock_total"] += qte - - depot_code = "?" - try: - depot_obj = getattr( - stock_item, "Depot", None - ) - if depot_obj: - depot_obj.Read() - depot_code = getattr( - depot_obj, "DE_Code", "?" - ) - except: - pass - - stock_info["depots"].append( - { - "code": depot_code, - "quantite": qte, - "qte_mini": float( - getattr( - stock_item, "AS_QteMini", 0.0 - ) - ), - "qte_maxi": float( - getattr( - stock_item, "AS_QteMaxi", 0.0 - ) - ), - } - ) - - stock_global_trouve = True - except: - continue - except: - pass - except: - pass - - # ======================================== - # MÉTHODE 2 : Via FactoryDepotStock (si méthode 1 échoue) - # ======================================== - if not stock_global_trouve: - logger.info( - " ArticleStock non disponible, essai FactoryDepotStock..." - ) - - try: - factory_depot = self.cial.FactoryDepot - - # Scanner tous les dépôts - index_depot = 1 - while index_depot <= 100: - try: - persist_depot = factory_depot.List(index_depot) - if persist_depot is None: - break - - depot_obj = win32com.client.CastTo( - persist_depot, "IBODepot3" - ) - depot_obj.Read() - - depot_code = getattr(depot_obj, "DE_Code", "").strip() - - # Chercher le stock dans ce dépôt - try: - factory_depot_stock = getattr( - depot_obj, "FactoryDepotStock", None - ) - - if factory_depot_stock: - index_stock = 1 - while index_stock <= 1000: - try: - stock_persist = ( - factory_depot_stock.List( - index_stock - ) - ) - if stock_persist is None: - break - - stock = win32com.client.CastTo( - stock_persist, "IBODepotStock3" - ) - stock.Read() - - ar_ref_stock = getattr( - stock, "AR_Ref", "" - ).strip() - - if ar_ref_stock == reference.upper(): - qte = float( - getattr(stock, "AS_QteSto", 0.0) - ) - stock_info["stock_total"] += qte - - stock_info["depots"].append( - { - "code": depot_code, - "quantite": qte, - "qte_mini": float( - getattr( - stock, - "AS_QteMini", - 0.0, - ) - ), - "qte_maxi": float( - getattr( - stock, - "AS_QteMaxi", - 0.0, - ) - ), - } - ) - - break - - index_stock += 1 - except: - index_stock += 1 - except: - pass - - index_depot += 1 - except: - index_depot += 1 - except: - pass - - # ======================================== - # RÉSULTAT FINAL - # ======================================== - if not stock_info["depots"]: - logger.warning(f"[STOCK] {reference} : Aucun stock trouvé") - else: - logger.info( - f"[STOCK] {reference} : {stock_info['stock_total']} unités dans {len(stock_info['depots'])} dépôt(s)" - ) - - return stock_info - - except Exception as e: - logger.error(f"[STOCK] Erreur lecture : {e}", exc_info=True) - raise - def creer_sortie_stock(self, sortie_data: Dict) -> Dict: try: with self._com_context(), self._lock_com: @@ -12615,1126 +11862,4 @@ class SageConnector: logger.error(f"Erreur vérification stock: {e}") raise - def lister_modeles_crystal(self) -> Dict: - """ - Liste les modèles en scannant le répertoire Sage - FONCTIONNE MÊME SI FactoryEtat N'EXISTE PAS - """ - try: - logger.info("[MODELES] Scan du répertoire des modèles...") - - # Chemin typique des modèles Sage 100c - # Adapter selon votre installation - chemins_possibles = [ - r"C:\Users\Public\Documents\Sage\Entreprise 100c\fr-FR\Documents standards\Gestion commerciale\Ventes", - ] - - # Essayer de détecter depuis la base Sage - chemin_base = self.chemin_base - if chemin_base: - # Extraire le répertoire Sage - - - dossier_sage = os.path.dirname(os.path.dirname(chemin_base)) - chemins_possibles.insert(0, os.path.join(dossier_sage, "Modeles")) - - modeles_par_type = { - "devis": [], - "commandes": [], - "livraisons": [], - "factures": [], - "avoirs": [], - "autres": [], - } - - for chemin in chemins_possibles: - if not os.path.exists(chemin): - continue - - logger.info(f"[MODELES] Scan: {chemin}") - - # Chercher tous les fichiers .RPT et .BGC - for pattern in ["*.RPT", "*.rpt", "*.BGC", "*.bgc"]: - fichiers = glob.glob(os.path.join(chemin, pattern)) - - for fichier in fichiers: - nom_fichier = os.path.basename(fichier) - - # Déterminer la catégorie - categorie = "autres" - - nom_upper = nom_fichier.upper() - if "DEVIS" in nom_upper or nom_upper.startswith("VT_DE"): - categorie = "devis" - elif ( - "CMDE" in nom_upper - or "COMMANDE" in nom_upper - or nom_upper.startswith("VT_BC") - ): - categorie = "commandes" - elif nom_upper.startswith("VT_BL") or "LIVRAISON" in nom_upper: - categorie = "livraisons" - elif "FACT" in nom_upper or nom_upper.startswith("VT_FA"): - categorie = "factures" - elif "AVOIR" in nom_upper or nom_upper.startswith("VT_AV"): - categorie = "avoirs" - - modeles_par_type[categorie].append( - { - "fichier": nom_fichier, - "nom": nom_fichier.replace(".RPT", "") - .replace(".rpt", "") - .replace(".BGC", "") - .replace(".bgc", ""), - "chemin_complet": fichier, - } - ) - - # Si on a trouvé des fichiers, pas besoin de continuer - if any(len(v) > 0 for v in modeles_par_type.values()): - break - - total = sum(len(v) for v in modeles_par_type.values()) - logger.info(f"[MODELES] {total} modèles trouvés") - - return modeles_par_type - - except Exception as e: - logger.error(f"[MODELES] Erreur: {e}", exc_info=True) - raise RuntimeError(f"Erreur listage modèles: {str(e)}") - - def generer_pdf_document( - self, numero: str, type_doc: int, modele: str = None - ) -> bytes: - """ - Génération PDF Sage 100c - UTILISE LES .BGC DIRECTEMENT - - Args: - numero: Numéro document (ex: "FA00123") - type_doc: Type Sage (0=devis, 60=facture, etc.) - modele: Nom fichier .bgc (optionnel) - - Returns: - bytes: Contenu PDF - """ - if not self.cial: - raise RuntimeError("Connexion Sage non établie") - - try: - with self._com_context(), self._lock_com: - logger.info(f"[PDF] === GÉNÉRATION PDF AVEC .BGC ===") - logger.info(f"[PDF] Document: {numero} (type={type_doc})") - - # ======================================== - # 1. CHARGER LE DOCUMENT SAGE - # ======================================== - factory = self.cial.FactoryDocumentVente - persist = factory.ReadPiece(type_doc, numero) - - if not persist: - persist = self._find_document_in_list(numero, type_doc) - - if not persist: - raise ValueError(f"Document {numero} introuvable") - - doc = win32com.client.CastTo(persist, "IBODocumentVente3") - doc.Read() - - logger.info(f"[PDF] Document chargé") - - # ======================================== - # 2. DÉTERMINER LE MODÈLE .BGC - # ======================================== - chemin_modele = self._determiner_modele(type_doc, modele) - logger.info(f"[PDF] Modèle: {os.path.basename(chemin_modele)}") - logger.info(f"[PDF] 📁 Chemin: {chemin_modele}") - - # ======================================== - # 3. VÉRIFIER QUE LE FICHIER EXISTE - # ======================================== - - - if not os.path.exists(chemin_modele): - raise ValueError(f"Modèle introuvable: {chemin_modele}") - - # ======================================== - # 4. CRÉER FICHIER TEMPORAIRE - # ======================================== - import tempfile - import time - - temp_dir = tempfile.gettempdir() - pdf_path = os.path.join( - temp_dir, f"sage_{numero}_{int(time.time())}.pdf" - ) - - pdf_bytes = None - - # ======================================== - # MÉTHODE 1 : Crystal Reports Runtime (PRIORITAIRE) - # ======================================== - logger.info("[PDF] 🔷 Méthode 1: Crystal Reports Runtime...") - - try: - pdf_bytes = self._generer_pdf_crystal_runtime( - numero, type_doc, chemin_modele, pdf_path - ) - if pdf_bytes: - logger.info("[PDF] Méthode 1 réussie (Crystal Runtime)") - except Exception as e: - logger.warning(f"[PDF] Méthode 1 échouée: {e}") - - # ======================================== - # MÉTHODE 2 : Crystal via DLL Sage - # ======================================== - if not pdf_bytes: - logger.info("[PDF] 🔷 Méthode 2: Crystal via DLL Sage...") - - try: - pdf_bytes = self._generer_pdf_crystal_sage_dll( - numero, type_doc, chemin_modele, pdf_path - ) - if pdf_bytes: - logger.info("[PDF] Méthode 2 réussie (DLL Sage)") - except Exception as e: - logger.warning(f"[PDF] Méthode 2 échouée: {e}") - - # ======================================== - # MÉTHODE 3 : Sage Reports Viewer (si installé) - # ======================================== - if not pdf_bytes: - logger.info("[PDF] 🔷 Méthode 3: Sage Reports Viewer...") - - try: - pdf_bytes = self._generer_pdf_sage_viewer( - numero, type_doc, chemin_modele, pdf_path - ) - if pdf_bytes: - logger.info("[PDF] Méthode 3 réussie (Sage Viewer)") - except Exception as e: - logger.warning(f"[PDF] Méthode 3 échouée: {e}") - - # ======================================== - # MÉTHODE 4 : Python reportlab (FALLBACK) - # ======================================== - if not pdf_bytes: - logger.warning("[PDF] TOUTES LES MÉTHODES .BGC ONT ÉCHOUÉ") - logger.info("[PDF] 🔷 Méthode 4: PDF Custom (fallback)...") - - try: - pdf_bytes = self._generer_pdf_custom(doc, numero, type_doc) - if pdf_bytes: - logger.info("[PDF] Méthode 4 réussie (PDF custom)") - except Exception as e: - logger.error(f"[PDF] Méthode 4 échouée: {e}") - - # ======================================== - # VALIDATION & NETTOYAGE - # ======================================== - try: - if os.path.exists(pdf_path): - os.remove(pdf_path) - except: - pass - - if not pdf_bytes: - raise RuntimeError( - " ÉCHEC GÉNÉRATION PDF\n\n" - " DIAGNOSTIC:\n" - f"- Modèle .bgc trouvé: ({os.path.basename(chemin_modele)})\n" - f"- Crystal Reports installé: NON DÉTECTÉ\n\n" - "💡 SOLUTIONS:\n" - "1. Installer SAP Crystal Reports Runtime (gratuit):\n" - " https://www.sap.com/products/technology-platform/crystal-reports/trial.html\n" - " Choisir: Crystal Reports Runtime (64-bit)\n\n" - "2. OU installer depuis DVD Sage 100c:\n" - " Composants/Crystal Reports Runtime\n\n" - "3. Vérifier que le service 'Crystal Reports' est démarré:\n" - " services.msc → SAP Crystal Reports Processing Server\n\n" - "4. En attendant, utiliser /pdf-custom pour un PDF simple" - ) - - if len(pdf_bytes) < 500: - raise RuntimeError("PDF généré trop petit (probablement corrompu)") - - logger.info(f"[PDF] SUCCÈS: {len(pdf_bytes):,} octets") - return pdf_bytes - - except ValueError as e: - logger.error(f"[PDF] Erreur métier: {e}") - raise - except Exception as e: - logger.error(f"[PDF] Erreur technique: {e}", exc_info=True) - raise RuntimeError(f"Erreur PDF: {str(e)}") - - def _generer_pdf_crystal_runtime(self, numero, type_doc, chemin_modele, pdf_path): - """🔷 Méthode 1: Crystal Reports Runtime API""" - try: - - - # Essayer différentes ProgID Crystal Reports - prog_ids_crystal = [ - "CrystalRuntime.Application.140", # Crystal Reports 2020 - "CrystalRuntime.Application.13", # Crystal Reports 2016 - "CrystalRuntime.Application.12", # Crystal Reports 2013 - "CrystalRuntime.Application.11", # Crystal Reports 2011 - "CrystalRuntime.Application", # Générique - "CrystalDesignRunTime.Application", # Alternative - ] - - crystal = None - prog_id_utilisee = None - - for prog_id in prog_ids_crystal: - try: - crystal = win32com.client.Dispatch(prog_id) - prog_id_utilisee = prog_id - logger.info(f" Crystal trouvé: {prog_id}") - break - except Exception as e: - logger.debug(f" {prog_id}: {e}") - continue - - if not crystal: - logger.info(" Aucune ProgID Crystal trouvée") - return None - - # Ouvrir le rapport .bgc - logger.info(f" Ouverture: {os.path.basename(chemin_modele)}") - rapport = crystal.OpenReport(chemin_modele) - - # Configurer la connexion SQL - logger.info(" 🔌 Configuration connexion SQL...") - - for table in rapport.Database.Tables: - try: - # Méthode 1: SetDataSource - table.SetDataSource(self.sql_server, self.sql_database, "", "") - except: - try: - # Méthode 2: ConnectionProperties - table.ConnectionProperties.Item["Server Name"] = self.sql_server - table.ConnectionProperties.Item["Database Name"] = ( - self.sql_database - ) - table.ConnectionProperties.Item["Integrated Security"] = True - except: - pass - - # Appliquer le filtre Crystal Reports - logger.info(f" Filtre: DO_Piece = '{numero}'") - rapport.RecordSelectionFormula = f"{{F_DOCENTETE.DO_Piece}} = '{numero}'" - - # Exporter en PDF - logger.info(" Export PDF...") - rapport.ExportOptions.DestinationType = 1 # crEDTDiskFile - rapport.ExportOptions.FormatType = 31 # crEFTPortableDocFormat (PDF) - rapport.ExportOptions.DiskFileName = pdf_path - rapport.Export(False) - - # Attendre la création du fichier - import time - - max_wait = 30 - waited = 0 - - while not os.path.exists(pdf_path) and waited < max_wait: - time.sleep(0.5) - waited += 0.5 - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, "rb") as f: - return f.read() - - logger.warning(" Fichier PDF non créé") - return None - - except Exception as e: - logger.debug(f" Crystal Runtime: {e}") - return None - - def _generer_pdf_crystal_sage_dll(self, numero, type_doc, chemin_modele, pdf_path): - """🔷 Méthode 2: Utiliser les DLL Crystal de Sage directement""" - try: - - import ctypes - - # Chercher les DLL Crystal dans le dossier Sage - dossier_sage = os.path.dirname(os.path.dirname(self.chemin_base)) - chemins_dll = [ - os.path.join(dossier_sage, "CrystalReports", "crpe32.dll"), - os.path.join(dossier_sage, "Crystal", "crpe32.dll"), - r"C:\Program Files (x86)\SAP BusinessObjects\Crystal Reports for .NET Framework 4.0\Common\SAP BusinessObjects Enterprise XI 4.0\win64_x64\crpe32.dll", - ] - - dll_trouvee = None - for chemin_dll in chemins_dll: - if os.path.exists(chemin_dll): - dll_trouvee = chemin_dll - break - - if not dll_trouvee: - logger.info(" DLL Crystal Sage non trouvée") - return None - - logger.info(f" DLL trouvée: {dll_trouvee}") - - # Charger la DLL - crpe = ctypes.cdll.LoadLibrary(dll_trouvee) - - # Ouvrir le rapport (API C Crystal Reports) - # Note: Ceci est une approche bas niveau, peut nécessiter des ajustements - job_handle = crpe.PEOpenPrintJob(chemin_modele.encode()) - - if job_handle == 0: - logger.warning(" Impossible d'ouvrir le rapport") - return None - - # Définir les paramètres de connexion - # ... (code simplifié, nécessiterait plus de configuration) - - # Exporter - crpe.PEExportTo(job_handle, pdf_path.encode(), 31) # 31 = PDF - - # Fermer - crpe.PEClosePrintJob(job_handle) - - import time - - time.sleep(2) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, "rb") as f: - return f.read() - - return None - - except Exception as e: - logger.debug(f" DLL Sage: {e}") - return None - - def _generer_pdf_sage_viewer(self, numero, type_doc, chemin_modele, pdf_path): - """🔷 Méthode 3: Sage Reports Viewer (si installé)""" - try: - - - # Chercher l'exécutable Sage Reports - executables_possibles = [ - r"C:\Program Files\Sage\Reports\SageReports.exe", - r"C:\Program Files (x86)\Sage\Reports\SageReports.exe", - os.path.join( - os.path.dirname(os.path.dirname(self.chemin_base)), - "Reports", - "SageReports.exe", - ), - ] - - exe_trouve = None - for exe in executables_possibles: - if os.path.exists(exe): - exe_trouve = exe - break - - if not exe_trouve: - logger.info(" SageReports.exe non trouvé") - return None - - logger.info(f" SageReports trouvé: {exe_trouve}") - - # Lancer en ligne de commande avec paramètres - import subprocess - - cmd = [ - exe_trouve, - "/report", - chemin_modele, - "/export", - pdf_path, - "/format", - "PDF", - "/filter", - f"DO_Piece='{numero}'", - "/silent", - ] - - logger.info(" Lancement SageReports...") - result = subprocess.run(cmd, capture_output=True, timeout=30) - - import time - - time.sleep(2) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, "rb") as f: - return f.read() - - logger.warning(" PDF non généré par SageReports") - return None - - except Exception as e: - logger.debug(f" Sage Viewer: {e}") - return None - - def _generer_pdf_custom(self, doc, numero, type_doc): - """🎨 Génère un PDF simple avec les données du document (FALLBACK)""" - try: - from reportlab.lib.pagesizes import A4 - from reportlab.lib.units import cm - from reportlab.pdfgen import canvas - from reportlab.lib import colors - from io import BytesIO - - buffer = BytesIO() - pdf = canvas.Canvas(buffer, pagesize=A4) - width, height = A4 - - # En-tête - pdf.setFont("Helvetica-Bold", 20) - type_libelles = { - 0: "DEVIS", - 10: "BON DE COMMANDE", - 30: "BON DE LIVRAISON", - 60: "FACTURE", - 50: "AVOIR", - } - type_libelle = type_libelles.get(type_doc, "DOCUMENT") - - pdf.drawString(2 * cm, height - 3 * cm, type_libelle) - - # Numéro - pdf.setFont("Helvetica", 12) - pdf.drawString(2 * cm, height - 4 * cm, f"Numéro: {numero}") - - # Date - date_doc = getattr(doc, "DO_Date", "") - pdf.drawString(2 * cm, height - 4.5 * cm, f"Date: {date_doc}") - - # Client - try: - client = getattr(doc, "Client", None) - if client: - client.Read() - client_nom = getattr(client, "CT_Intitule", "") - pdf.drawString(2 * cm, height - 5.5 * cm, f"Client: {client_nom}") - except: - pass - - # Ligne séparatrice - pdf.line(2 * cm, height - 6 * cm, width - 2 * cm, height - 6 * cm) - - # Lignes du document - y_pos = height - 7 * cm - pdf.setFont("Helvetica-Bold", 10) - pdf.drawString(2 * cm, y_pos, "Article") - pdf.drawString(8 * cm, y_pos, "Qté") - pdf.drawString(11 * cm, y_pos, "Prix U.") - pdf.drawString(15 * cm, y_pos, "Total") - - y_pos -= 0.5 * cm - pdf.line(2 * cm, y_pos, width - 2 * cm, y_pos) - - # Lire lignes - pdf.setFont("Helvetica", 9) - try: - factory_lignes = getattr(doc, "FactoryDocumentLigne", None) - if factory_lignes: - idx = 1 - while idx <= 50 and y_pos > 5 * cm: - try: - ligne_p = factory_lignes.List(idx) - if ligne_p is None: - break - - ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3") - ligne.Read() - - y_pos -= 0.7 * cm - - design = getattr(ligne, "DL_Design", "")[:40] - qte = float(getattr(ligne, "DL_Qte", 0)) - prix = float(getattr(ligne, "DL_PrixUnitaire", 0)) - total = float(getattr(ligne, "DL_MontantHT", 0)) - - pdf.drawString(2 * cm, y_pos, design) - pdf.drawString(8 * cm, y_pos, f"{qte:.2f}") - pdf.drawString(11 * cm, y_pos, f"{prix:.2f}€") - pdf.drawString(15 * cm, y_pos, f"{total:.2f}€") - - idx += 1 - except: - break - except: - pass - - # Totaux - y_pos = 5 * cm - pdf.line(2 * cm, y_pos, width - 2 * cm, y_pos) - - y_pos -= 0.7 * cm - pdf.setFont("Helvetica-Bold", 11) - - total_ht = float(getattr(doc, "DO_TotalHT", 0)) - total_ttc = float(getattr(doc, "DO_TotalTTC", 0)) - - pdf.drawString(13 * cm, y_pos, f"Total HT:") - pdf.drawString(16 * cm, y_pos, f"{total_ht:.2f}€") - - y_pos -= 0.7 * cm - pdf.drawString(13 * cm, y_pos, f"TVA:") - pdf.drawString(16 * cm, y_pos, f"{(total_ttc - total_ht):.2f}€") - - y_pos -= 0.7 * cm - pdf.setFont("Helvetica-Bold", 14) - pdf.drawString(13 * cm, y_pos, f"Total TTC:") - pdf.drawString(16 * cm, y_pos, f"{total_ttc:.2f}€") - - # Pied de page - pdf.setFont("Helvetica", 8) - pdf.drawString( - 2 * cm, 2 * cm, "PDF généré par l'API Sage - Version simplifiée" - ) - pdf.drawString( - 2 * cm, - 1.5 * cm, - "Pour un rendu Crystal Reports complet, installez SAP BusinessObjects", - ) - - pdf.showPage() - pdf.save() - - return buffer.getvalue() - - except Exception as e: - logger.error(f"PDF custom: {e}") - return None - - def _determiner_modele(self, type_doc: int, modele_demande: str = None) -> str: - """ - Détermine le chemin du modèle Crystal Reports à utiliser - - Args: - type_doc: Type Sage (0=devis, 60=facture, etc.) - modele_demande: Nom fichier .bgc spécifique (optionnel) - - Returns: - str: Chemin complet du modèle - """ - if modele_demande: - # Modèle spécifié - modeles_dispo = self.lister_modeles_crystal() - - for categorie, liste in modeles_dispo.items(): - for m in liste: - if m["fichier"].lower() == modele_demande.lower(): - return m["chemin_complet"] - - raise ValueError(f"Modèle '{modele_demande}' introuvable") - - # Modèle par défaut selon type - modeles = self.lister_modeles_crystal() - - mapping = { - 0: "devis", - 10: "commandes", - 30: "livraisons", - 60: "factures", - 50: "avoirs", - } - - categorie = mapping.get(type_doc) - - if not categorie or categorie not in modeles: - raise ValueError(f"Aucun modèle disponible pour type {type_doc}") - - liste = modeles[categorie] - if not liste: - raise ValueError(f"Aucun modèle {categorie} trouvé") - - # Prioriser modèle "standard" (sans FlyDoc, email, etc.) - modele_std = next( - ( - m - for m in liste - if "flydoc" not in m["fichier"].lower() - and "email" not in m["fichier"].lower() - ), - liste[0], - ) - - return modele_std["chemin_complet"] - - - def generer_pdf_via_sage_com(self, numero: str, type_doc: int, modele: str = None) -> bytes: - """ - 🎯 NOUVELLE MÉTHODE : Utilise l'API COM Sage pour exporter en PDF - - Sage 100c expose des méthodes pour imprimer/exporter les documents. - Cette méthode utilise exactement le même processus que l'interface graphique. - """ - if not self.cial: - raise RuntimeError("Connexion Sage non établie") - - import os - import tempfile - import time - - try: - with self._com_context(), self._lock_com: - logger.info(f"[PDF-COM] Génération PDF via API Sage COM") - logger.info(f"[PDF-COM] Document: {numero} (type={type_doc})") - - # 1. Charger le document - factory = self.cial.FactoryDocumentVente - persist = factory.ReadPiece(type_doc, numero) - - if not persist: - persist = self._find_document_in_list(numero, type_doc) - - if not persist: - raise ValueError(f"Document {numero} introuvable") - - doc = win32com.client.CastTo(persist, "IBODocumentVente3") - doc.Read() - - # 2. Déterminer le modèle - chemin_modele = self._determiner_modele(type_doc, modele) - logger.info(f"[PDF-COM] Modèle: {os.path.basename(chemin_modele)}") - - # 3. Créer fichier temporaire - temp_dir = tempfile.gettempdir() - pdf_path = os.path.join(temp_dir, f"sage_{numero}_{int(time.time())}.pdf") - - # ======================================== - # MÉTHODE A : Utiliser BiPrint (si disponible) - # ======================================== - try: - logger.info("[PDF-COM] Tentative avec BiPrint...") - - # BiPrint est l'objet Sage pour l'impression - bi_print = self.cial.BiPrint - - # Configurer l'impression - bi_print.Destination = 4 # 4 = Fichier PDF - bi_print.FileName = pdf_path - bi_print.Model = chemin_modele - - # Lancer l'impression - bi_print.Preview = False - bi_print.Launch(doc) - - # Attendre la création du fichier - max_wait = 30 - waited = 0 - while not os.path.exists(pdf_path) and waited < max_wait: - time.sleep(0.5) - waited += 0.5 - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, 'rb') as f: - pdf_bytes = f.read() - os.remove(pdf_path) - logger.info(f"[PDF-COM] ✅ BiPrint réussi ({len(pdf_bytes)} octets)") - return pdf_bytes - - except Exception as e: - logger.warning(f"[PDF-COM] BiPrint échoué: {e}") - - # ======================================== - # MÉTHODE B : Utiliser Print() direct - # ======================================== - try: - logger.info("[PDF-COM] Tentative avec Print() direct...") - - # Certains objets Sage ont une méthode Print() - if hasattr(doc, 'Print'): - doc.Print( - Model=chemin_modele, - Destination=4, # PDF - FileName=pdf_path, - Preview=False - ) - - time.sleep(2) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, 'rb') as f: - pdf_bytes = f.read() - os.remove(pdf_path) - logger.info(f"[PDF-COM] ✅ Print() réussi ({len(pdf_bytes)} octets)") - return pdf_bytes - - except Exception as e: - logger.warning(f"[PDF-COM] Print() échoué: {e}") - - # ======================================== - # MÉTHODE C : Utiliser CrystalPrint (interface Crystal dans Sage) - # ======================================== - try: - logger.info("[PDF-COM] Tentative avec CrystalPrint...") - - # Sage utilise un wrapper Crystal Reports - crystal_print = self.cial.CrystalPrint - - crystal_print.ReportFileName = chemin_modele - crystal_print.SelectionFormula = f"{{F_DOCENTETE.DO_Piece}} = '{numero}'" - crystal_print.Destination = 4 # PDF - crystal_print.DestinationPath = pdf_path - - crystal_print.Execute() - - time.sleep(2) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, 'rb') as f: - pdf_bytes = f.read() - os.remove(pdf_path) - logger.info(f"[PDF-COM] ✅ CrystalPrint réussi ({len(pdf_bytes)} octets)") - return pdf_bytes - - except Exception as e: - logger.warning(f"[PDF-COM] CrystalPrint échoué: {e}") - - # ======================================== - # MÉTHODE D : Explorer les propriétés de l'objet Sage - # ======================================== - logger.info("[PDF-COM] Exploration des méthodes disponibles...") - - # Lister toutes les méthodes disponibles - for attr_name in dir(doc): - if 'print' in attr_name.lower() or 'export' in attr_name.lower(): - logger.info(f" Méthode trouvée: {attr_name}") - - for attr_name in dir(self.cial): - if 'print' in attr_name.lower() or 'crystal' in attr_name.lower(): - logger.info(f" Objet Cial: {attr_name}") - - raise RuntimeError( - "Aucune méthode COM Sage trouvée pour générer le PDF.\n" - "Le diagnostic a listé les méthodes disponibles ci-dessus." - ) - - except Exception as e: - logger.error(f"[PDF-COM] Erreur: {e}", exc_info=True) - raise - - - def explorer_api_sage(self): - """ - 🔍 Fonction de diagnostic pour explorer l'API COM Sage - """ - logger.info("="*60) - logger.info("🔍 EXPLORATION DE L'API COM SAGE") - logger.info("="*60) - - # Explorer l'objet Cial - logger.info("\n📦 Propriétés de self.cial:") - for attr in dir(self.cial): - if not attr.startswith('_'): - try: - obj = getattr(self.cial, attr) - logger.info(f" {attr}: {type(obj).__name__}") - except: - logger.info(f" {attr}: (inaccessible)") - - # Chercher spécifiquement les objets liés à l'impression - logger.info("\n🖨️ Objets liés à l'impression:") - for attr in dir(self.cial): - if any(keyword in attr.lower() for keyword in ['print', 'export', 'crystal', 'report', 'pdf']): - logger.info(f" ✅ {attr}") - - # Explorer un document - try: - factory = self.cial.FactoryDocumentVente - persist = factory.List(1) # Premier document - - if persist: - doc = win32com.client.CastTo(persist, "IBODocumentVente3") - doc.Read() - - logger.info("\n📄 Méthodes du document:") - for attr in dir(doc): - if any(keyword in attr.lower() for keyword in ['print', 'export', 'pdf']): - logger.info(f" ✅ {attr}") - - except Exception as e: - logger.error(f"Erreur exploration document: {e}") - - - """ - 🎯 GÉNÉRATION PDF via l'API Transformation native de Sage - Cette méthode NE NÉCESSITE PAS Crystal Reports - """ - - def generer_pdf_transformation_native(self, numero: str, type_doc: int) -> bytes: - """ - Génère un PDF en utilisant l'objet Transformation natif de Sage - - Cette méthode utilise l'API interne de Sage qui ne dépend pas de Crystal Reports - - Args: - numero: Numéro document (ex: "FA00123") - type_doc: Type Sage (0=devis, 10=BC, 30=BL, 60=facture, 50=avoir) - - Returns: - bytes: Contenu PDF - """ - if not self.cial: - raise RuntimeError("Connexion Sage non établie") - - try: - with self._com_context(), self._lock_com: - logger.info("="*60) - logger.info("🎯 GÉNÉRATION PDF VIA TRANSFORMATION NATIVE") - logger.info("="*60) - - # ========================================== - # 1. CHARGER LE DOCUMENT - # ========================================== - logger.info(f"📄 Chargement document {numero} (type={type_doc})") - - factory = self.cial.FactoryDocumentVente - persist = factory.ReadPiece(type_doc, numero) - - if not persist: - persist = self._find_document_in_list(numero, type_doc) - - if not persist: - raise ValueError(f"Document {numero} introuvable") - - doc = win32com.client.CastTo(persist, "IBODocumentVente3") - doc.Read() - - logger.info("✅ Document chargé") - - # ========================================== - # 2. ACCÉDER À L'OBJET TRANSFORMATION.VENTE - # ========================================== - logger.info("🔄 Accès objet Transformation.Vente...") - - transformation = self.cial.Transformation.Vente - logger.info(f"✅ Objet TransformationVente: {type(transformation).__name__}") - - # Explorer les méthodes disponibles - logger.info("📋 Méthodes TransformationVente disponibles:") - for attr in dir(transformation): - if not attr.startswith('_') and callable(getattr(transformation, attr, None)): - logger.info(f" - {attr}") - - # ========================================== - # 3. CRÉER FICHIER TEMPORAIRE - # ========================================== - import tempfile - import time - - temp_dir = tempfile.gettempdir() - pdf_path = os.path.join(temp_dir, f"sage_{numero}_{int(time.time())}.pdf") - - logger.info(f"📁 Fichier temporaire: {pdf_path}") - - # ========================================== - # 4. MÉTHODES POSSIBLES DE TRANSFORMATION - # ========================================== - - # Méthode A: TransformToFile - try: - logger.info("🔷 Tentative: TransformToFile...") - - # Paramètres possibles - transformation.TransformToFile( - persist, # Document source - pdf_path, # Fichier destination - 1 # Format (1=PDF, possiblement) - ) - - time.sleep(1) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, 'rb') as f: - pdf_bytes = f.read() - os.remove(pdf_path) - logger.info(f"✅ PDF généré: {len(pdf_bytes):,} octets") - return pdf_bytes - - except AttributeError: - logger.info(" ❌ TransformToFile n'existe pas") - except Exception as e: - logger.info(f" ❌ TransformToFile échoué: {e}") - - # Méthode B: Transform + Export - try: - logger.info("🔷 Tentative: Transform + Export...") - - result = transformation.Transform(persist) - - if hasattr(result, 'ExportToPDF'): - result.ExportToPDF(pdf_path) - elif hasattr(result, 'Export'): - result.Export(pdf_path, 'PDF') - - time.sleep(1) - - if os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0: - with open(pdf_path, 'rb') as f: - pdf_bytes = f.read() - os.remove(pdf_path) - logger.info(f"✅ PDF généré: {len(pdf_bytes):,} octets") - return pdf_bytes - - except AttributeError: - logger.info(" ❌ Transform n'existe pas") - except Exception as e: - logger.info(f" ❌ Transform échoué: {e}") - - # Méthode C: CreatePDF - try: - logger.info("🔷 Tentative: CreatePDF...") - - pdf_bytes = transformation.CreatePDF(persist) - - if pdf_bytes and len(pdf_bytes) > 0: - logger.info(f"✅ PDF généré: {len(pdf_bytes):,} octets") - return pdf_bytes - - except AttributeError: - logger.info(" ❌ CreatePDF n'existe pas") - except Exception as e: - logger.info(f" ❌ CreatePDF échoué: {e}") - - # ========================================== - # 5. FALLBACK: EXPLORER TOUTES LES MÉTHODES - # ========================================== - logger.info("🔍 Exploration exhaustive...") - - for attr in dir(transformation): - if not attr.startswith('_'): - try: - method = getattr(transformation, attr) - if callable(method): - logger.info(f" Essai: {attr}") - - # Essayer d'appeler avec le document - result = method(persist) - - # Si c'est un objet, chercher des méthodes d'export - if result and hasattr(result, '__class__'): - for sub_attr in dir(result): - if 'pdf' in sub_attr.lower() or 'export' in sub_attr.lower(): - logger.info(f" → Trouvé: {sub_attr}") - except: - pass - - # Si on arrive ici, aucune méthode n'a fonctionné - logger.warning("❌ Aucune méthode Transformation disponible") - return None - - except Exception as e: - logger.error(f"Erreur transformation native: {e}", exc_info=True) - return None - - - def explorer_transformation_complete(self): - """ - 🔬 Exploration complète de l'objet Transformation - À lancer pour comprendre comment il fonctionne - """ - try: - with self._com_context(), self._lock_com: - transformation = self.cial.Transformation - - print("\n" + "="*60) - print("🔬 ANALYSE OBJET TRANSFORMATION") - print("="*60) - - print(f"\nType: {type(transformation).__name__}") - print(f"Classe: {transformation.__class__.__name__}") - - # Toutes les propriétés - print("\n📋 PROPRIÉTÉS TRANSFORMATION:") - for attr in dir(transformation): - if not attr.startswith('_'): - try: - obj = getattr(transformation, attr) - obj_type = type(obj).__name__ - - if not callable(obj): - print(f" {attr}: {obj_type}") - except Exception as e: - print(f" {attr}: (inaccessible) - {e}") - - # Toutes les méthodes - print("\n🔧 MÉTHODES TRANSFORMATION:") - for attr in dir(transformation): - if not attr.startswith('_'): - try: - obj = getattr(transformation, attr) - if callable(obj): - print(f" {attr}()") - except: - pass - - # ========================================== - # EXPLORER TRANSFORMATION.VENTE (IMPORTANT!) - # ========================================== - print("\n" + "="*60) - print("🔬 ANALYSE TRANSFORMATION.VENTE (Documents vente)") - print("="*60) - - transfo_vente = transformation.Vente - print(f"\nType: {type(transfo_vente).__name__}") - print(f"Classe: {transfo_vente.__class__.__name__}") - - print("\n📋 PROPRIÉTÉS TRANSFORMATION.VENTE:") - for attr in dir(transfo_vente): - if not attr.startswith('_'): - try: - obj = getattr(transfo_vente, attr) - obj_type = type(obj).__name__ - - if not callable(obj): - print(f" {attr}: {obj_type}") - try: - val = str(obj) - if len(val) < 100: - print(f" = {val}") - except: - pass - except Exception as e: - print(f" {attr}: (inaccessible) - {e}") - - print("\n🔧 MÉTHODES TRANSFORMATION.VENTE:") - for attr in dir(transfo_vente): - if not attr.startswith('_'): - try: - obj = getattr(transfo_vente, attr) - if callable(obj): - print(f" ✅ {attr}()") - except: - pass - - # ========================================== - # EXPLORER TRANSFORMATION.ACHAT - # ========================================== - print("\n" + "="*60) - print("🔬 ANALYSE TRANSFORMATION.ACHAT (Documents achat)") - print("="*60) - - transfo_achat = transformation.Achat - print(f"\nType: {type(transfo_achat).__name__}") - - print("\n🔧 MÉTHODES TRANSFORMATION.ACHAT:") - for attr in dir(transfo_achat): - if not attr.startswith('_'): - try: - obj = getattr(transfo_achat, attr) - if callable(obj): - print(f" ✅ {attr}()") - except: - pass - - print("\n" + "="*60) - - except Exception as e: - logger.error(f"Erreur exploration: {e}", exc_info=True) - - \ No newline at end of file