From 42048d11ee215b9a726db6860b9e91556eb05558 Mon Sep 17 00:00:00 2001 From: mickael Date: Fri, 19 Dec 2025 18:55:15 +0100 Subject: [PATCH] =?UTF-8?q?creer=20et=20modifier=20devis=20mis=20=C3=A0=20?= =?UTF-8?q?jour?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sage_connector.py | 1004 ++++++++++++++++++++++++++++----------------- 1 file changed, 618 insertions(+), 386 deletions(-) diff --git a/sage_connector.py b/sage_connector.py index 980303b..b57e079 100644 --- a/sage_connector.py +++ b/sage_connector.py @@ -3400,6 +3400,17 @@ class SageConnector: } def creer_devis_enrichi(self, devis_data: dict, forcer_brouillon: bool = False): + """ + Crée un devis dans Sage avec support de la référence. + + Args: + devis_data: dict contenant: + - client: {code: str} + - date_devis: str ou date + - reference: str (optionnel) + - lignes: list[dict] + forcer_brouillon: bool, force le statut brouillon + """ if not self.cial: raise RuntimeError("Connexion Sage non établie") @@ -3466,14 +3477,25 @@ class SageConnector: ) doc.SetDefaultClient(client_obj) + logger.info(f"👤 Client {devis_data['client']['code']} associé") - # ✅ STATUT: Définir SEULEMENT si brouillon demandé - doc.DO_Statut = 2 - logger.info("📊 Statut forcé: 0 (Brouillon)") - # Sinon, laisser Sage décider (généralement 2 = Accepté) + # ===== RÉFÉRENCE ===== + if "reference" in devis_data and devis_data["reference"]: + try: + doc.DO_Ref = str(devis_data["reference"]) + logger.info(f"🔖 Référence définie: {devis_data['reference']}") + except Exception as e: + logger.warning(f"⚠️ Impossible de définir la référence: {e}") + + # ===== STATUT ===== + if forcer_brouillon: + doc.DO_Statut = 0 + logger.info("📊 Statut forcé: 0 (Brouillon)") + else: + doc.DO_Statut = 2 + logger.info("📊 Statut défini: 2 (Accepté)") doc.Write() - logger.info(f"👤 Client {devis_data['client']['code']} associé") # ===== LIGNES ===== try: @@ -3570,12 +3592,11 @@ class SageConnector: # ===== VALIDATION ===== doc.Write() - # ✅ PROCESS() uniquement si pas en brouillon + # ===== PROCESS ===== if not forcer_brouillon: logger.info("🔄 Lancement Process()...") process.Process() else: - # En brouillon, Process() peut échouer, on essaie mais on ignore l'erreur try: process.Process() logger.info("✅ Process() appelé (brouillon)") @@ -3583,33 +3604,7 @@ class SageConnector: logger.debug("⚠️ Process() ignoré pour brouillon") # ===== RÉCUPÉRATION NUMÉRO ===== - numero_devis = None - - # Méthode 1: DocumentResult - try: - doc_result = process.DocumentResult - if doc_result: - doc_result = win32com.client.CastTo( - doc_result, "IBODocumentVente3" - ) - doc_result.Read() - numero_devis = getattr(doc_result, "DO_Piece", "") - except: - pass - - # Méthode 2: Document direct - if not numero_devis: - numero_devis = getattr(doc, "DO_Piece", "") - - # Méthode 3: SetDefaultNumPiece - if not numero_devis: - try: - doc.SetDefaultNumPiece() - doc.Write() - doc.Read() - numero_devis = getattr(doc, "DO_Piece", "") - except: - pass + numero_devis = self._recuperer_numero_devis(process, doc) if not numero_devis: raise RuntimeError("❌ Numéro devis vide après création") @@ -3624,77 +3619,17 @@ class SageConnector: except: pass - # ===== RELECTURE SIMPLIFIÉE (pas d'attente excessive) ===== - # Attendre juste 500ms pour l'indexation + # ===== RELECTURE ===== + import time time.sleep(0.5) - factory_doc = self.cial.FactoryDocumentVente - persist_reread = factory_doc.ReadPiece(0, numero_devis) - - if not persist_reread: - # Si ReadPiece échoue, chercher dans les 100 premiers - logger.debug("ReadPiece échoué, recherche dans List()...") - index = 1 - while index < 100: - try: - persist_test = factory_doc.List(index) - if persist_test is None: - break - - doc_test = win32com.client.CastTo( - persist_test, "IBODocumentVente3" - ) - doc_test.Read() - - if ( - getattr(doc_test, "DO_Type", -1) == 0 - and getattr(doc_test, "DO_Piece", "") - == numero_devis - ): - persist_reread = persist_test - logger.info(f"✅ Document trouvé à l'index {index}") - break - - index += 1 - except: - index += 1 - - # Extraction des totaux - if persist_reread: - doc_final = win32com.client.CastTo( - persist_reread, "IBODocumentVente3" - ) - doc_final.Read() - - total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0)) - total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0)) - statut_final = getattr(doc_final, "DO_Statut", 0) - else: - # Fallback: calculer manuellement - total_calcule = sum( - l.get("montant_ligne_ht", 0) for l in devis_data["lignes"] - ) - total_ht = total_calcule - total_ttc = round(total_calcule * 1.20, 2) - statut_final = 0 if forcer_brouillon else 2 - - logger.info(f"💰 Total HT: {total_ht}€") - logger.info(f"💰 Total TTC: {total_ttc}€") - logger.info(f"📊 Statut final: {statut_final}") + doc_final_data = self._relire_devis(numero_devis, devis_data, forcer_brouillon) logger.info( - f"✅ ✅ ✅ DEVIS CRÉÉ: {numero_devis} - {total_ttc}€ TTC ✅ ✅ ✅" + f"✅ ✅ ✅ DEVIS CRÉÉ: {numero_devis} - {doc_final_data['total_ttc']}€ TTC ✅ ✅ ✅" ) - return { - "numero_devis": numero_devis, - "total_ht": total_ht, - "total_ttc": total_ttc, - "nb_lignes": len(devis_data["lignes"]), - "client_code": devis_data["client"]["code"], - "date_devis": str(date_obj.date()), - "statut": statut_final, - } + return doc_final_data except Exception as e: if transaction_active: @@ -3709,6 +3644,590 @@ class SageConnector: logger.error(f"❌ ERREUR CRÉATION DEVIS: {e}", exc_info=True) raise RuntimeError(f"Échec création devis: {str(e)}") + + def _recuperer_numero_devis(self, process, doc): + """Récupère le numéro du devis créé via plusieurs méthodes.""" + numero_devis = None + + # Méthode 1: DocumentResult + try: + doc_result = process.DocumentResult + if doc_result: + doc_result = win32com.client.CastTo( + doc_result, "IBODocumentVente3" + ) + doc_result.Read() + numero_devis = getattr(doc_result, "DO_Piece", "") + except: + pass + + # Méthode 2: Document direct + if not numero_devis: + numero_devis = getattr(doc, "DO_Piece", "") + + # Méthode 3: SetDefaultNumPiece + if not numero_devis: + try: + doc.SetDefaultNumPiece() + doc.Write() + doc.Read() + numero_devis = getattr(doc, "DO_Piece", "") + except: + pass + + return numero_devis + + + def _relire_devis(self, numero_devis, devis_data, forcer_brouillon): + """Relit le devis créé et extrait les informations finales.""" + factory_doc = self.cial.FactoryDocumentVente + persist_reread = factory_doc.ReadPiece(0, numero_devis) + + if not persist_reread: + logger.debug("ReadPiece échoué, recherche dans List()...") + persist_reread = self._rechercher_devis_dans_liste(numero_devis, factory_doc) + + # Extraction des totaux + if persist_reread: + doc_final = win32com.client.CastTo( + persist_reread, "IBODocumentVente3" + ) + doc_final.Read() + + total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0)) + total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0)) + statut_final = getattr(doc_final, "DO_Statut", 0) + reference_final = getattr(doc_final, "DO_Ref", "") + else: + # Fallback: calculer manuellement + total_calcule = sum( + l.get("montant_ligne_ht", 0) for l in devis_data["lignes"] + ) + total_ht = total_calcule + total_ttc = round(total_calcule * 1.20, 2) + statut_final = 0 if forcer_brouillon else 2 + reference_final = devis_data.get("reference", "") + + logger.info(f"💰 Total HT: {total_ht}€") + logger.info(f"💰 Total TTC: {total_ttc}€") + logger.info(f"📊 Statut final: {statut_final}") + if reference_final: + logger.info(f"🔖 Référence: {reference_final}") + + return { + "numero_devis": numero_devis, + "total_ht": total_ht, + "total_ttc": total_ttc, + "nb_lignes": len(devis_data["lignes"]), + "client_code": devis_data["client"]["code"], + "date_devis": str(devis_data.get("date_devis", "")), + "reference": reference_final, + "statut": statut_final, + } + + + def _rechercher_devis_dans_liste(self, numero_devis, factory_doc): + """Recherche un devis dans les 100 premiers éléments de la liste.""" + index = 1 + while index < 100: + try: + persist_test = factory_doc.List(index) + if persist_test is None: + break + + doc_test = win32com.client.CastTo( + persist_test, "IBODocumentVente3" + ) + doc_test.Read() + + if ( + getattr(doc_test, "DO_Type", -1) == 0 + and getattr(doc_test, "DO_Piece", "") == numero_devis + ): + logger.info(f"✅ Document trouvé à l'index {index}") + return persist_test + + index += 1 + except: + index += 1 + + return None + + + def modifier_devis(self, numero: str, devis_data: Dict) -> Dict: + """ + Modifie un devis existant dans Sage. + + Args: + numero: Numéro du devis à modifier + devis_data: dict contenant les champs à modifier: + - date_devis: str ou date (optionnel) + - reference: str (optionnel) + - statut: int (optionnel) + - lignes: list[dict] (optionnel) + + Returns: + dict contenant les informations du devis modifié + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + # ===== ÉTAPE 1 : CHARGER LE DEVIS ===== + logger.info(f"🔍 Recherche devis {numero}...") + doc = self._charger_devis(numero) + logger.info(f"✅ Devis {numero} trouvé") + + # ===== VÉRIFICATION ===== + self._verifier_devis_non_transforme(numero, doc) + + champs_modifies = [] + + # ===== ÉTAPE 2 : MODIFIER LES CHAMPS SIMPLES (sauf statut si lignes à modifier) ===== + # Créer une copie des données sans le statut si on modifie les lignes + devis_data_champs = devis_data.copy() + statut_a_modifier = None + + if "lignes" in devis_data and devis_data["lignes"] is not None: + # Si on modifie les lignes, on garde le statut pour la fin + if "statut" in devis_data_champs: + statut_a_modifier = devis_data_champs.pop("statut") + logger.info("📊 Modification du statut reportée après les lignes") + + champs_modifies = self._modifier_champs_simples(doc, devis_data_champs) + + # ===== ÉTAPE 3 : MODIFICATION DES LIGNES ===== + if "lignes" in devis_data and devis_data["lignes"] is not None: + self._modifier_lignes_devis(doc, devis_data["lignes"]) + champs_modifies.append("lignes") + + # Validation après modification des lignes + logger.info("💾 Sauvegarde après modification des lignes...") + doc.Write() + + import time + time.sleep(0.5) + + doc.Read() + + # ===== ÉTAPE 4 : MODIFIER LE STATUT (si nécessaire et après les lignes) ===== + if statut_a_modifier is not None: + try: + statut_actuel = getattr(doc, "DO_Statut", 0) + nouveau_statut = int(statut_a_modifier) + + if nouveau_statut != statut_actuel and nouveau_statut in [0, 1, 2, 3]: + doc.DO_Statut = nouveau_statut + doc.Write() + + import time + time.sleep(0.5) + + doc.Read() + + champs_modifies.append("statut") + logger.info(f"📊 Statut modifié: {statut_actuel} → {nouveau_statut}") + except Exception as e: + logger.warning(f"⚠️ Impossible de modifier le statut: {e}") + + # ===== VALIDATION FINALE ===== + logger.info("💾 Validation finale...") + try: + doc.Write() + except: + pass # Peut échouer si déjà sauvegardé + + import time + time.sleep(0.5) + + doc.Read() + + # ===== RÉSULTAT ===== + resultat = self._extraire_infos_devis(doc, numero, champs_modifies) + + logger.info(f"✅✅✅ DEVIS MODIFIÉ: {numero} ✅✅✅") + logger.info(f"💰 Totaux: {resultat['total_ht']}€ HT / {resultat['total_ttc']}€ TTC") + + return resultat + + except ValueError as e: + logger.error(f"❌ Erreur métier: {e}") + raise + + except Exception as e: + logger.error(f"❌ Erreur technique: {e}", exc_info=True) + raise RuntimeError(f"Erreur technique Sage: {str(e)}") + + + def _charger_devis(self, numero: str): + """Charge un devis depuis Sage.""" + factory = self.cial.FactoryDocumentVente + persist = factory.ReadPiece(0, numero) + + if not persist: + # Recherche dans la liste si ReadPiece échoue + persist = self._rechercher_devis_par_numero(numero, factory) + + if not persist: + raise ValueError(f"Devis {numero} introuvable") + + doc = win32com.client.CastTo(persist, "IBODocumentVente3") + doc.Read() + + return doc + + + def _rechercher_devis_par_numero(self, numero: str, factory): + """Recherche un devis par son numéro dans la liste.""" + index = 1 + while index < 10000: + try: + persist_test = factory.List(index) + if persist_test is None: + break + + doc_test = win32com.client.CastTo( + persist_test, "IBODocumentVente3" + ) + doc_test.Read() + + if ( + getattr(doc_test, "DO_Type", -1) == 0 + and getattr(doc_test, "DO_Piece", "") == numero + ): + return persist_test + + index += 1 + except: + index += 1 + + return None + + + def _verifier_devis_non_transforme(self, numero: str, doc): + """Vérifie que le devis n'a pas déjà été transformé.""" + verification = self.verifier_si_deja_transforme_sql(numero, 0) + + if verification["deja_transforme"]: + docs_cibles = verification["documents_cibles"] + nums = [d["numero"] for d in docs_cibles] + raise ValueError( + f"❌ Devis {numero} déjà transformé en {len(docs_cibles)} document(s): {', '.join(nums)}" + ) + + statut_actuel = getattr(doc, "DO_Statut", 0) + if statut_actuel == 5: + raise ValueError(f"Devis {numero} déjà transformé (statut=5)") + + + def _modifier_champs_simples(self, doc, devis_data: Dict) -> list: + """Modifie les champs simples du devis (date, référence, statut).""" + champs_modifies = [] + + # IMPORTANT: Relire le document pour s'assurer qu'il est à jour + try: + doc.Read() + except: + pass + + # DATE - Modifier et sauvegarder immédiatement + if "date_devis" in devis_data: + try: + import pywintypes + date_str = devis_data["date_devis"] + date_obj = ( + datetime.fromisoformat(date_str) + if isinstance(date_str, str) + else date_str + ) + + doc.DO_Date = pywintypes.Time(date_obj) + doc.Write() # Sauvegarder immédiatement + doc.Read() # Relire après sauvegarde + + champs_modifies.append("date") + logger.info(f"📅 Date modifiée: {date_obj.date()}") + except Exception as e: + logger.warning(f"⚠️ Impossible de modifier la date: {e}") + + # RÉFÉRENCE - Modifier et sauvegarder immédiatement + if "reference" in devis_data: + try: + nouvelle_reference = devis_data["reference"] + ancienne_reference = getattr(doc, "DO_Ref", "") + + doc.DO_Ref = str(nouvelle_reference) if nouvelle_reference else "" + doc.Write() # Sauvegarder immédiatement + doc.Read() # Relire après sauvegarde + + champs_modifies.append("reference") + logger.info(f"🔖 Référence: '{ancienne_reference}' → '{nouvelle_reference}'") + except Exception as e: + logger.warning(f"⚠️ Impossible de modifier la référence: {e}") + + # STATUT - Modifier avec précaution (à la fin seulement) + # Le statut ne doit être modifié QUE si aucune autre modification n'est en cours + if "statut" in devis_data and "lignes" not in devis_data: + try: + statut_actuel = getattr(doc, "DO_Statut", 0) + nouveau_statut = int(devis_data["statut"]) + + # Vérifier que le changement de statut est valide + if nouveau_statut != statut_actuel: + # Statuts valides: 0=Brouillon, 1=Refusé, 2=Accepté, 3=Confirmé, 5=Transformé + if nouveau_statut in [0, 1, 2, 3]: + doc.DO_Statut = nouveau_statut + doc.Write() + doc.Read() + + champs_modifies.append("statut") + logger.info(f"📊 Statut modifié: {statut_actuel} → {nouveau_statut}") + else: + logger.warning(f"⚠️ Statut {nouveau_statut} invalide ou non modifiable") + except Exception as e: + logger.warning(f"⚠️ Impossible de modifier le statut: {e}") + + return champs_modifies + + def _modifier_lignes_devis(self, doc, nouvelles_lignes: list): + """Modifie intelligemment les lignes du devis.""" + logger.info(f"🔄 Modification intelligente des lignes...") + + # Relire le document pour s'assurer qu'il est à jour + try: + doc.Read() + except: + pass + + nb_nouvelles = len(nouvelles_lignes) + + try: + factory_lignes = doc.FactoryDocumentLigne + except: + factory_lignes = doc.FactoryDocumentVenteLigne + + factory_article = self.cial.FactoryArticle + + # Compter les lignes existantes + nb_existantes = self._compter_lignes_existantes(factory_lignes) + + logger.info(f"📊 {nb_existantes} existantes → {nb_nouvelles} nouvelles") + + # STRATÉGIE : Supprimer d'abord, puis recréer + # C'est plus sûr que de modifier en place + if nb_nouvelles != nb_existantes: + logger.info("🔄 Stratégie: Suppression puis recréation des lignes") + + # Supprimer toutes les lignes existantes + self._supprimer_toutes_les_lignes(factory_lignes, nb_existantes) + + # Sauvegarder après suppression + doc.Write() + doc.Read() + + # Recréer toutes les lignes + for idx, ligne_data in enumerate(nouvelles_lignes, 1): + self._ajouter_nouvelle_ligne(factory_lignes, factory_article, ligne_data, idx) + + else: + # STRATÉGIE : Modification en place si même nombre de lignes + logger.info("🔄 Stratégie: Modification en place") + + for idx in range(1, nb_nouvelles + 1): + self._modifier_ligne_existante( + factory_lignes, factory_article, idx, nouvelles_lignes[idx - 1] + ) + + + def _supprimer_toutes_les_lignes(self, factory_lignes, nb_existantes: int): + """Supprime toutes les lignes du devis.""" + logger.info(f"🗑️ Suppression de {nb_existantes} lignes...") + + # Supprimer en partant de la fin pour éviter les problèmes d'index + for idx in range(nb_existantes, 0, -1): + try: + ligne_p = factory_lignes.List(idx) + if ligne_p: + try: + ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3") + except: + ligne = win32com.client.CastTo(ligne_p, "IBODocumentVenteLigne3") + + ligne.Read() + + try: + ligne.Remove() + logger.debug(f" ✅ Ligne {idx} supprimée") + except AttributeError: + # Si Remove() n'existe pas, essayer WriteDefault() + try: + ligne.WriteDefault() + logger.debug(f" ⚠️ Ligne {idx} réinitialisée (Remove indisponible)") + except: + logger.warning(f" ⚠️ Impossible de supprimer la ligne {idx}") + except Exception as e: + logger.warning(f" ⚠️ Erreur suppression ligne {idx}: {e}") + except Exception as e: + logger.debug(f" ⚠️ Ligne {idx} non accessible: {e}") + + + def _modifier_ligne_existante(self, factory_lignes, factory_article, idx: int, ligne_data: dict): + """Modifie une ligne existante du devis.""" + try: + ligne_p = factory_lignes.List(idx) + if not ligne_p: + raise ValueError(f"Ligne {idx} introuvable") + + try: + ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3") + except: + ligne = win32com.client.CastTo(ligne_p, "IBODocumentVenteLigne3") + + ligne.Read() + + persist_article = factory_article.ReadReference(ligne_data["article_code"]) + if not persist_article: + raise ValueError(f"Article {ligne_data['article_code']} introuvable") + + article_obj = win32com.client.CastTo(persist_article, "IBOArticle3") + article_obj.Read() + + # Réinitialiser la ligne + try: + ligne.WriteDefault() + except: + pass + + quantite = float(ligne_data["quantite"]) + + # Définir l'article + try: + ligne.SetDefaultArticleReference(ligne_data["article_code"], quantite) + except: + try: + ligne.SetDefaultArticle(article_obj, quantite) + except: + ligne.DL_Design = ligne_data.get("designation", "") + ligne.DL_Qte = quantite + + # Prix + if ligne_data.get("prix_unitaire_ht"): + ligne.DL_PrixUnitaire = float(ligne_data["prix_unitaire_ht"]) + + # Remise + if ligne_data.get("remise_pourcentage", 0) > 0: + try: + ligne.DL_Remise01REM_Valeur = float(ligne_data["remise_pourcentage"]) + ligne.DL_Remise01REM_Type = 0 + except: + pass + + ligne.Write() + logger.debug(f" ✅ Ligne {idx} modifiée: {ligne_data['article_code']}") + + except Exception as e: + logger.error(f" ❌ Erreur modification ligne {idx}: {e}") + raise + + + def _compter_lignes_existantes(self, factory_lignes) -> int: + """Compte le nombre de lignes existantes dans le document.""" + nb_existantes = 0 + index = 1 + while index <= 100: + try: + ligne_p = factory_lignes.List(index) + if ligne_p is None: + break + nb_existantes += 1 + index += 1 + except: + break + + return nb_existantes + + + def _ajouter_nouvelle_ligne(self, factory_lignes, factory_article, ligne_data: dict, idx: int): + """Ajoute une nouvelle ligne au devis.""" + persist_article = factory_article.ReadReference(ligne_data["article_code"]) + if not persist_article: + raise ValueError(f"Article {ligne_data['article_code']} introuvable") + + article_obj = win32com.client.CastTo(persist_article, "IBOArticle3") + article_obj.Read() + + ligne_persist = factory_lignes.Create() + + try: + ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentLigne3") + except: + ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentVenteLigne3") + + quantite = float(ligne_data["quantite"]) + + # Définir l'article + try: + ligne_obj.SetDefaultArticleReference(ligne_data["article_code"], quantite) + except: + try: + ligne_obj.SetDefaultArticle(article_obj, quantite) + except: + ligne_obj.DL_Design = ligne_data.get("designation", "") + ligne_obj.DL_Qte = quantite + + # Prix + if ligne_data.get("prix_unitaire_ht"): + ligne_obj.DL_PrixUnitaire = float(ligne_data["prix_unitaire_ht"]) + + # Remise + if ligne_data.get("remise_pourcentage", 0) > 0: + try: + ligne_obj.DL_Remise01REM_Valeur = float(ligne_data["remise_pourcentage"]) + ligne_obj.DL_Remise01REM_Type = 0 + except: + pass + + ligne_obj.Write() + logger.debug(f" ✅ Ligne {idx} ajoutée") + + + def _supprimer_lignes_en_trop(self, factory_lignes, nb_existantes: int, nb_nouvelles: int): + """Supprime les lignes en trop du devis.""" + for idx in range(nb_existantes, nb_nouvelles, -1): + try: + ligne_p = factory_lignes.List(idx) + if ligne_p: + ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3") + ligne.Read() + + try: + ligne.Remove() + except AttributeError: + ligne.WriteDefault() + except: + pass + except: + pass + + + def _extraire_infos_devis(self, doc, numero: str, champs_modifies: list) -> Dict: + """Extrait les informations du devis modifié.""" + total_ht = float(getattr(doc, "DO_TotalHT", 0.0)) + total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0)) + statut = getattr(doc, "DO_Statut", 0) + reference = getattr(doc, "DO_Ref", "") + + return { + "numero": numero, + "total_ht": total_ht, + "total_ttc": total_ttc, + "reference": reference, + "champs_modifies": champs_modifies, + "statut": statut, + } + + def lire_devis(self, numero_devis): try: # Lire le devis via SQL @@ -5524,293 +6043,6 @@ class SageConnector: raise RuntimeError(f"Erreur technique Sage: {error_message}") - def modifier_devis(self, numero: str, devis_data: Dict) -> Dict: - if not self.cial: - raise RuntimeError("Connexion Sage non établie") - - try: - with self._com_context(), self._lock_com: - # ÉTAPE 1 : CHARGER LE DEVIS - logger.info(f"🔍 Recherche devis {numero}...") - - factory = self.cial.FactoryDocumentVente - persist = factory.ReadPiece(0, numero) - - if not persist: - index = 1 - while index < 10000: - try: - persist_test = factory.List(index) - if persist_test is None: - break - - doc_test = win32com.client.CastTo( - persist_test, "IBODocumentVente3" - ) - doc_test.Read() - - if ( - getattr(doc_test, "DO_Type", -1) == 0 - and getattr(doc_test, "DO_Piece", "") == numero - ): - persist = persist_test - break - - index += 1 - except: - index += 1 - - if not persist: - raise ValueError(f"Devis {numero} introuvable") - - doc = win32com.client.CastTo(persist, "IBODocumentVente3") - doc.Read() - - logger.info(f"✅ Devis {numero} trouvé") - - # Vérifier transformation - verification = self.verifier_si_deja_transforme_sql(numero, 0) - - if verification["deja_transforme"]: - docs_cibles = verification["documents_cibles"] - nums = [d["numero"] for d in docs_cibles] - raise ValueError( - f"❌ Devis {numero} déjà transformé en {len(docs_cibles)} document(s): {', '.join(nums)}" - ) - - statut_actuel = getattr(doc, "DO_Statut", 0) - if statut_actuel == 5: - raise ValueError(f"Devis {numero} déjà transformé (statut=5)") - - # ÉTAPE 2 : CHAMPS SIMPLES - champs_modifies = [] - - if "date_devis" in devis_data: - import pywintypes - - date_str = devis_data["date_devis"] - date_obj = ( - datetime.fromisoformat(date_str) - if isinstance(date_str, str) - else date_str - ) - doc.DO_Date = pywintypes.Time(date_obj) - champs_modifies.append("date") - logger.info(f"📅 Date: {date_obj.date()}") - - if "statut" in devis_data: - nouveau_statut = devis_data["statut"] - doc.DO_Statut = nouveau_statut - champs_modifies.append("statut") - logger.info(f"📊 Statut: {statut_actuel} → {nouveau_statut}") - - if champs_modifies: - doc.Write() - - # ÉTAPE 3 : MODIFICATION INTELLIGENTE DES LIGNES - if "lignes" in devis_data and devis_data["lignes"] is not None: - logger.info(f"🔄 Modification intelligente des lignes...") - - nouvelles_lignes = devis_data["lignes"] - nb_nouvelles = len(nouvelles_lignes) - - try: - factory_lignes = doc.FactoryDocumentLigne - except: - factory_lignes = doc.FactoryDocumentVenteLigne - - factory_article = self.cial.FactoryArticle - - # Compter existantes - nb_existantes = 0 - index = 1 - while index <= 100: - try: - ligne_p = factory_lignes.List(index) - if ligne_p is None: - break - nb_existantes += 1 - index += 1 - except: - break - - logger.info( - f"📊 {nb_existantes} existantes → {nb_nouvelles} nouvelles" - ) - - # MODIFIER EXISTANTES - nb_a_modifier = min(nb_existantes, nb_nouvelles) - - for idx in range(1, nb_a_modifier + 1): - ligne_data = nouvelles_lignes[idx - 1] - - ligne_p = factory_lignes.List(idx) - ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3") - ligne.Read() - - persist_article = factory_article.ReadReference( - ligne_data["article_code"] - ) - if not persist_article: - raise ValueError( - f"Article {ligne_data['article_code']} introuvable" - ) - - article_obj = win32com.client.CastTo( - persist_article, "IBOArticle3" - ) - article_obj.Read() - - try: - ligne.WriteDefault() - except: - pass - - quantite = float(ligne_data["quantite"]) - - try: - ligne.SetDefaultArticleReference( - ligne_data["article_code"], quantite - ) - except: - try: - ligne.SetDefaultArticle(article_obj, quantite) - except: - ligne.DL_Design = ligne_data.get("designation", "") - ligne.DL_Qte = quantite - - if ligne_data.get("prix_unitaire_ht"): - ligne.DL_PrixUnitaire = float( - ligne_data["prix_unitaire_ht"] - ) - - if ligne_data.get("remise_pourcentage", 0) > 0: - try: - ligne.DL_Remise01REM_Valeur = float( - ligne_data["remise_pourcentage"] - ) - ligne.DL_Remise01REM_Type = 0 - except: - pass - - ligne.Write() - logger.debug(f" ✅ Ligne {idx} modifiée") - - # AJOUTER MANQUANTES - if nb_nouvelles > nb_existantes: - for idx in range(nb_existantes, nb_nouvelles): - ligne_data = nouvelles_lignes[idx] - - persist_article = factory_article.ReadReference( - ligne_data["article_code"] - ) - if not persist_article: - raise ValueError( - f"Article {ligne_data['article_code']} introuvable" - ) - - article_obj = win32com.client.CastTo( - persist_article, "IBOArticle3" - ) - article_obj.Read() - - ligne_persist = factory_lignes.Create() - - try: - ligne_obj = win32com.client.CastTo( - ligne_persist, "IBODocumentLigne3" - ) - except: - ligne_obj = win32com.client.CastTo( - ligne_persist, "IBODocumentVenteLigne3" - ) - - quantite = float(ligne_data["quantite"]) - - try: - ligne_obj.SetDefaultArticleReference( - ligne_data["article_code"], quantite - ) - except: - try: - ligne_obj.SetDefaultArticle(article_obj, quantite) - except: - ligne_obj.DL_Design = ligne_data.get( - "designation", "" - ) - ligne_obj.DL_Qte = quantite - - if ligne_data.get("prix_unitaire_ht"): - ligne_obj.DL_PrixUnitaire = float( - ligne_data["prix_unitaire_ht"] - ) - - if ligne_data.get("remise_pourcentage", 0) > 0: - try: - ligne_obj.DL_Remise01REM_Valeur = float( - ligne_data["remise_pourcentage"] - ) - ligne_obj.DL_Remise01REM_Type = 0 - except: - pass - - ligne_obj.Write() - logger.debug(f" ✅ Ligne {idx + 1} ajoutée") - - # SUPPRIMER EN TROP - elif nb_nouvelles < nb_existantes: - for idx in range(nb_existantes, nb_nouvelles, -1): - try: - ligne_p = factory_lignes.List(idx) - if ligne_p: - ligne = win32com.client.CastTo( - ligne_p, "IBODocumentLigne3" - ) - ligne.Read() - - try: - ligne.Remove() - except AttributeError: - ligne.WriteDefault() - except: - pass - except: - pass - - champs_modifies.append("lignes") - - # VALIDATION - logger.info("💾 Validation finale...") - doc.Write() - - import time - - time.sleep(1) - - doc.Read() - - total_ht = float(getattr(doc, "DO_TotalHT", 0.0)) - total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0)) - - logger.info(f"✅✅✅ DEVIS MODIFIÉ: {numero} ✅✅✅") - logger.info(f"💰 Totaux: {total_ht}€ HT / {total_ttc}€ TTC") - - return { - "numero": numero, - "total_ht": total_ht, - "total_ttc": total_ttc, - "champs_modifies": champs_modifies, - "statut": getattr(doc, "DO_Statut", 0), - } - - except ValueError as e: - logger.error(f"❌ Erreur métier: {e}") - raise - - except Exception as e: - logger.error(f"❌ Erreur technique: {e}", exc_info=True) - raise RuntimeError(f"Erreur technique Sage: {str(e)}") - def creer_commande_enrichi(self, commande_data: dict) -> Dict: if not self.cial: raise RuntimeError("Connexion Sage non établie")