Sage100-ws/sage_connector.py
2025-11-28 06:30:32 +03:00

1462 lines
59 KiB
Python

import win32com.client
import pythoncom # AJOUT CRITIQUE
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional
import threading
import time
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class SageConnector:
"""
Connecteur Sage 100c avec gestion COM threading correcte
CHANGEMENTS PRODUCTION:
- Initialisation COM par thread (CoInitialize/CoUninitialize)
- Lock robuste pour thread-safety
- Gestion d'erreurs exhaustive
- Logging structuré
- Retry automatique sur erreurs COM
"""
def __init__(self, chemin_base, utilisateur="<Administrateur>", mot_de_passe=""):
self.chemin_base = chemin_base
self.utilisateur = utilisateur
self.mot_de_passe = mot_de_passe
self.cial = None
# Cache
self._cache_clients: List[Dict] = []
self._cache_articles: List[Dict] = []
self._cache_clients_dict: Dict[str, Dict] = {}
self._cache_articles_dict: Dict[str, Dict] = {}
# Métadonnées cache
self._cache_clients_last_update: Optional[datetime] = None
self._cache_articles_last_update: Optional[datetime] = None
self._cache_ttl_minutes = 15
# Thread d'actualisation
self._refresh_thread: Optional[threading.Thread] = None
self._stop_refresh = threading.Event()
# Locks thread-safe
self._lock_clients = threading.RLock()
self._lock_articles = threading.RLock()
self._lock_com = threading.RLock() # Lock pour accès COM
# Thread-local storage pour COM
self._thread_local = threading.local()
# =========================================================================
# GESTION COM THREAD-SAFE
# =========================================================================
@contextmanager
def _com_context(self):
"""
Context manager pour initialiser COM dans chaque thread
CRITIQUE: FastAPI utilise un pool de threads.
Chaque thread doit initialiser COM avant d'utiliser les objets Sage.
"""
# Vérifier si COM est déjà initialisé pour ce thread
if not hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoInitialize()
self._thread_local.com_initialized = True
logger.debug(
f"COM initialisé pour thread {threading.current_thread().name}"
)
except Exception as e:
logger.error(f"Erreur initialisation COM: {e}")
raise
try:
yield
finally:
# Ne pas désinitialiser COM ici car le thread peut être réutilisé
pass
def _cleanup_com_thread(self):
"""Nettoie COM pour le thread actuel (à appeler à la fin)"""
if hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoUninitialize()
delattr(self._thread_local, "com_initialized")
logger.debug(
f"COM nettoyé pour thread {threading.current_thread().name}"
)
except:
pass
# =========================================================================
# CONNEXION
# =========================================================================
def connecter(self):
"""Connexion initiale à Sage"""
try:
with self._com_context():
self.cial = win32com.client.gencache.EnsureDispatch(
"Objets100c.Cial.Stream"
)
self.cial.Name = self.chemin_base
self.cial.Loggable.UserName = self.utilisateur
self.cial.Loggable.UserPwd = self.mot_de_passe
self.cial.Open()
logger.info(f"Connexion Sage réussie: {self.chemin_base}")
# Chargement initial du cache
logger.info("Chargement initial du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
logger.info(
f"Cache initialisé: {len(self._cache_clients)} clients, {len(self._cache_articles)} articles"
)
# Démarrage du thread d'actualisation
self._start_refresh_thread()
return True
except Exception as e:
logger.error(f"Erreur connexion Sage: {e}", exc_info=True)
return False
def deconnecter(self):
"""Déconnexion propre"""
self._stop_refresh.set()
if self._refresh_thread:
self._refresh_thread.join(timeout=5)
if self.cial:
try:
with self._com_context():
self.cial.Close()
logger.info("Connexion Sage fermée")
except:
pass
# =========================================================================
# SYSTÈME DE CACHE
# =========================================================================
def _start_refresh_thread(self):
"""Démarre le thread d'actualisation automatique"""
def refresh_loop():
# Initialiser COM pour ce thread worker
pythoncom.CoInitialize()
try:
while not self._stop_refresh.is_set():
time.sleep(60) # Vérifier toutes les minutes
# Clients
if self._cache_clients_last_update:
age = datetime.now() - self._cache_clients_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
logger.info(
f"Actualisation cache clients (âge: {age.seconds//60}min)"
)
self._refresh_cache_clients()
# Articles
if self._cache_articles_last_update:
age = datetime.now() - self._cache_articles_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
logger.info(
f"Actualisation cache articles (âge: {age.seconds//60}min)"
)
self._refresh_cache_articles()
finally:
# Nettoyer COM en fin de thread
pythoncom.CoUninitialize()
self._refresh_thread = threading.Thread(
target=refresh_loop, daemon=True, name="SageCacheRefresh"
)
self._refresh_thread.start()
def _refresh_cache_clients(self):
"""Actualise le cache des clients"""
if not self.cial:
return
clients = []
clients_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
index = 1
erreurs_consecutives = 0
max_erreurs = 50
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_client(persist)
if obj:
data = self._extraire_client(obj)
clients.append(data)
clients_dict[data["numero"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh clients après {max_erreurs} erreurs"
)
break
with self._lock_clients:
self._cache_clients = clients
self._cache_clients_dict = clients_dict
self._cache_clients_last_update = datetime.now()
logger.info(f" Cache clients actualisé: {len(clients)} clients")
except Exception as e:
logger.error(f" Erreur refresh clients: {e}", exc_info=True)
def _refresh_cache_articles(self):
"""Actualise le cache des articles"""
if not self.cial:
return
articles = []
articles_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryArticle
index = 1
erreurs_consecutives = 0
max_erreurs = 50
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_article(persist)
if obj:
data = self._extraire_article(obj)
articles.append(data)
articles_dict[data["reference"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh articles après {max_erreurs} erreurs"
)
break
with self._lock_articles:
self._cache_articles = articles
self._cache_articles_dict = articles_dict
self._cache_articles_last_update = datetime.now()
logger.info(f" Cache articles actualisé: {len(articles)} articles")
except Exception as e:
logger.error(f" Erreur refresh articles: {e}", exc_info=True)
# =========================================================================
# API PUBLIQUE (ultra-rapide grâce au cache)
# =========================================================================
def lister_tous_clients(self, filtre=""):
"""Retourne les clients depuis le cache (instantané)"""
with self._lock_clients:
if not filtre:
return self._cache_clients.copy()
filtre_lower = filtre.lower()
return [
c
for c in self._cache_clients
if filtre_lower in c["numero"].lower()
or filtre_lower in c["intitule"].lower()
]
def lire_client(self, code_client):
"""Retourne un client depuis le cache (instantané)"""
with self._lock_clients:
return self._cache_clients_dict.get(code_client)
def lister_tous_articles(self, filtre=""):
"""Retourne les articles depuis le cache (instantané)"""
with self._lock_articles:
if not filtre:
return self._cache_articles.copy()
filtre_lower = filtre.lower()
return [
a
for a in self._cache_articles
if filtre_lower in a["reference"].lower()
or filtre_lower in a["designation"].lower()
]
def lire_article(self, reference):
"""Retourne un article depuis le cache (instantané)"""
with self._lock_articles:
return self._cache_articles_dict.get(reference)
def forcer_actualisation_cache(self):
"""Force l'actualisation immédiate du cache (endpoint admin)"""
logger.info("Actualisation forcée du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
logger.info("Cache actualisé")
def get_cache_info(self):
"""Retourne les infos du cache (endpoint monitoring)"""
with self._lock_clients, self._lock_articles:
return {
"clients": {
"count": len(self._cache_clients),
"last_update": (
self._cache_clients_last_update.isoformat()
if self._cache_clients_last_update
else None
),
"age_minutes": (
(
datetime.now() - self._cache_clients_last_update
).total_seconds()
/ 60
if self._cache_clients_last_update
else None
),
},
"articles": {
"count": len(self._cache_articles),
"last_update": (
self._cache_articles_last_update.isoformat()
if self._cache_articles_last_update
else None
),
"age_minutes": (
(
datetime.now() - self._cache_articles_last_update
).total_seconds()
/ 60
if self._cache_articles_last_update
else None
),
},
"ttl_minutes": self._cache_ttl_minutes,
}
# =========================================================================
# CAST HELPERS
# =========================================================================
def _cast_client(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOClient3")
obj.Read()
return obj
except:
return None
def _cast_article(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOArticle3")
obj.Read()
return obj
except:
return None
# =========================================================================
# EXTRACTION
# =========================================================================
def _extraire_client(self, client_obj):
data = {
"numero": getattr(client_obj, "CT_Num", ""),
"intitule": getattr(client_obj, "CT_Intitule", ""),
"type": getattr(client_obj, "CT_Type", 0),
}
try:
adresse = getattr(client_obj, "Adresse", None)
if adresse:
data["adresse"] = getattr(adresse, "Adresse", "")
data["code_postal"] = getattr(adresse, "CodePostal", "")
data["ville"] = getattr(adresse, "Ville", "")
except:
pass
try:
telecom = getattr(client_obj, "Telecom", None)
if telecom:
data["telephone"] = getattr(telecom, "Telephone", "")
data["email"] = getattr(telecom, "EMail", "")
except:
pass
return data
def _extraire_article(self, article_obj):
return {
"reference": getattr(article_obj, "AR_Ref", ""),
"designation": getattr(article_obj, "AR_Design", ""),
"prix_vente": getattr(article_obj, "AR_PrixVen", 0.0),
"prix_achat": getattr(article_obj, "AR_PrixAch", 0.0),
"stock_reel": getattr(article_obj, "AR_Stock", 0.0),
"stock_mini": getattr(article_obj, "AR_StockMini", 0.0),
}
# =========================================================================
# CRÉATION DEVIS (US-A1) - VERSION TRANSACTIONNELLE
# =========================================================================
def creer_devis_enrichi(self, devis_data: dict):
"""
Création de devis avec transaction Sage
✅ SOLUTION FINALE: Utilisation de SetDefaultArticle()
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(
f"🚀 Début création devis pour client {devis_data['client']['code']}"
)
try:
with self._com_context(), self._lock_com:
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("✅ Transaction Sage démarrée")
except Exception as e:
logger.warning(f"⚠️ BeginTrans échoué: {e}")
try:
# ===== CRÉATION DOCUMENT =====
process = self.cial.CreateProcess_Document(0) # Type 0 = Devis
doc = process.Document
try:
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
except:
pass
logger.info("📄 Document devis créé")
# ===== DATE =====
import pywintypes
if isinstance(devis_data["date_devis"], str):
try:
date_obj = datetime.fromisoformat(devis_data["date_devis"])
except:
date_obj = datetime.now()
elif isinstance(devis_data["date_devis"], date):
date_obj = datetime.combine(
devis_data["date_devis"], datetime.min.time()
)
else:
date_obj = datetime.now()
doc.DO_Date = pywintypes.Time(date_obj)
logger.info(f"📅 Date définie: {date_obj.date()}")
# ===== CLIENT (CRITIQUE: Doit être défini AVANT les lignes) =====
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(
devis_data["client"]["code"]
)
if not persist_client:
raise ValueError(
f"❌ Client {devis_data['client']['code']} introuvable"
)
client_obj = self._cast_client(persist_client)
if not client_obj:
raise ValueError(
f"❌ Impossible de charger le client {devis_data['client']['code']}"
)
# ✅ CRITIQUE: Associer le client au document
doc.SetDefaultClient(client_obj)
doc.Write()
logger.info(
f"👤 Client {devis_data['client']['code']} associé et document écrit"
)
# ===== LIGNES AVEC SetDefaultArticle() =====
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
logger.info(f"📦 Ajout de {len(devis_data['lignes'])} lignes...")
for idx, ligne_data in enumerate(devis_data["lignes"], 1):
logger.info(
f"--- Ligne {idx}: {ligne_data['article_code']} ---"
)
# 🔍 ÉTAPE 1: Charger l'article RÉEL depuis Sage pour le prix
persist_article = factory_article.ReadReference(
ligne_data["article_code"]
)
if not persist_article:
raise ValueError(
f"❌ Article {ligne_data['article_code']} introuvable dans Sage"
)
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
# 💰 ÉTAPE 2: Récupérer le prix de vente RÉEL
prix_sage = float(getattr(article_obj, "AR_PrixVen", 0.0))
designation_sage = getattr(article_obj, "AR_Design", "")
logger.info(f"💰 Prix Sage: {prix_sage}")
if prix_sage == 0:
logger.warning(
f"⚠️ ATTENTION: Article {ligne_data['article_code']} a un prix de vente = 0€"
)
# 📝 ÉTAPE 3: Créer la ligne de devis
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# ✅✅✅ SOLUTION FINALE: SetDefaultArticleReference avec 2 paramètres ✅✅✅
quantite = float(ligne_data["quantite"])
try:
# Méthode 1: Via référence (plus simple et plus fiable)
ligne_obj.SetDefaultArticleReference(
ligne_data["article_code"], quantite
)
logger.info(
f"✅ Article associé via SetDefaultArticleReference('{ligne_data['article_code']}', {quantite})"
)
except Exception as e:
logger.warning(
f"⚠️ SetDefaultArticleReference échoué: {e}, tentative avec objet article"
)
try:
# Méthode 2: Via objet article
ligne_obj.SetDefaultArticle(article_obj, quantite)
logger.info(
f"✅ Article associé via SetDefaultArticle(obj, {quantite})"
)
except Exception as e2:
logger.error(
f"❌ Toutes les méthodes d'association ont échoué"
)
# Fallback: définir manuellement
ligne_obj.DL_Design = (
designation_sage or ligne_data["designation"]
)
ligne_obj.DL_Qte = quantite
logger.warning("⚠️ Configuration manuelle appliquée")
# ⚙️ ÉTAPE 4: Vérifier le prix automatique chargé
prix_auto = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
logger.info(f"💰 Prix auto chargé: {prix_auto}")
# 💵 ÉTAPE 5: Ajuster le prix si nécessaire
prix_a_utiliser = ligne_data.get("prix_unitaire_ht")
if prix_a_utiliser is not None and prix_a_utiliser > 0:
# Prix personnalisé fourni
ligne_obj.DL_PrixUnitaire = float(prix_a_utiliser)
logger.info(f"💰 Prix personnalisé: {prix_a_utiliser}")
elif prix_auto == 0:
# Pas de prix auto, forcer le prix Sage
if prix_sage == 0:
raise ValueError(
f"Prix nul pour article {ligne_data['article_code']}"
)
ligne_obj.DL_PrixUnitaire = float(prix_sage)
logger.info(f"💰 Prix Sage forcé: {prix_sage}")
else:
# Prix auto correct, on le garde
logger.info(f"💰 Prix auto conservé: {prix_auto}")
prix_final = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
montant_ligne = quantite * prix_final
logger.info(f"{quantite} x {prix_final}€ = {montant_ligne}")
# 🎁 Remise
remise = ligne_data.get("remise_pourcentage", 0)
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = 0
montant_apres_remise = montant_ligne * (
1 - remise / 100
)
logger.info(
f"🎁 Remise {remise}% → {montant_apres_remise}"
)
except Exception as e:
logger.warning(f"⚠️ Remise non appliquée: {e}")
# 💾 ÉTAPE 6: Écrire la ligne
ligne_obj.Write()
logger.info(f"✅ Ligne {idx} écrite")
# 🔍 VÉRIFICATION: Relire la ligne pour confirmer
try:
ligne_obj.Read()
prix_enregistre = float(
getattr(ligne_obj, "DL_PrixUnitaire", 0.0)
)
montant_enregistre = float(
getattr(ligne_obj, "DL_MontantHT", 0.0)
)
logger.info(
f"🔍 Vérif: Prix={prix_enregistre}€, Montant HT={montant_enregistre}"
)
if montant_enregistre == 0:
logger.error(
f"❌ PROBLÈME: Montant enregistré = 0 pour ligne {idx}"
)
else:
logger.info(f"✅ Ligne {idx} OK: {montant_enregistre}")
except Exception as e:
logger.warning(f"⚠️ Impossible de vérifier la ligne: {e}")
# ===== VALIDATION DOCUMENT =====
logger.info("💾 Écriture finale du document...")
doc.Write()
logger.info("🔄 Lancement du traitement (Process)...")
process.Process()
# ===== RÉCUPÉRATION NUMÉRO =====
numero_devis = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(
doc_result, "IBODocumentVente3"
)
doc_result.Read()
numero_devis = getattr(doc_result, "DO_Piece", "")
logger.info(
f"📄 Numéro (via DocumentResult): {numero_devis}"
)
except Exception as e:
logger.warning(f"⚠️ DocumentResult non accessible: {e}")
if not numero_devis:
numero_devis = getattr(doc, "DO_Piece", "")
logger.info(f"📄 Numéro (via Document): {numero_devis}")
if not numero_devis:
raise RuntimeError("❌ Numéro devis vide après création")
# ===== COMMIT TRANSACTION =====
if transaction_active:
self.cial.CptaApplication.CommitTrans()
logger.info("✅ Transaction committée")
# ===== ATTENTE INDEXATION =====
logger.info("⏳ Attente indexation Sage (2s)...")
time.sleep(2)
# ===== RELECTURE COMPLÈTE =====
logger.info("🔍 Relecture complète du document...")
factory_doc = self.cial.FactoryDocumentVente
persist_reread = factory_doc.ReadPiece(0, numero_devis)
if not persist_reread:
logger.error(f"❌ Impossible de relire le devis {numero_devis}")
# Fallback: retourner les totaux calculés
total_calcule = sum(
l.get("montant_ligne_ht", 0) for l in devis_data["lignes"]
)
logger.warning(f"⚠️ Utilisation total calculé: {total_calcule}")
return {
"numero_devis": numero_devis,
"total_ht": total_calcule,
"total_ttc": round(total_calcule * 1.20, 2),
"nb_lignes": len(devis_data["lignes"]),
"client_code": devis_data["client"]["code"],
"date_devis": str(date_obj.date()),
}
doc_final = win32com.client.CastTo(
persist_reread, "IBODocumentVente3"
)
doc_final.Read()
# ===== EXTRACTION TOTAUX =====
total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0))
client_code_final = getattr(doc_final, "CT_Num", "")
date_finale = getattr(doc_final, "DO_Date", None)
logger.info(f"💰 Total HT: {total_ht}")
logger.info(f"💰 Total TTC: {total_ttc}")
# ===== DIAGNOSTIC EN CAS D'ANOMALIE =====
if total_ht == 0 and total_ttc > 0:
logger.warning("⚠️ Anomalie: Total HT = 0 mais Total TTC > 0")
logger.info("🔍 Lecture des lignes pour diagnostic...")
try:
factory_lignes_verif = doc_final.FactoryDocumentLigne
except:
factory_lignes_verif = doc_final.FactoryDocumentVenteLigne
index = 1
total_calcule = 0.0
while index <= 20:
try:
ligne_p = factory_lignes_verif.List(index)
if ligne_p is None:
break
ligne_verif = win32com.client.CastTo(
ligne_p, "IBODocumentLigne3"
)
ligne_verif.Read()
montant = float(
getattr(ligne_verif, "DL_MontantHT", 0.0)
)
logger.info(
f" Ligne {index}: Montant HT = {montant}"
)
total_calcule += montant
index += 1
except:
break
logger.info(f"📊 Total calculé manuellement: {total_calcule}")
if total_calcule > 0:
total_ht = total_calcule
total_ttc = round(total_ht * 1.20, 2)
logger.info(
f"✅ Correction appliquée: HT={total_ht}€, TTC={total_ttc}"
)
logger.info(
f"✅ ✅ ✅ DEVIS CRÉÉ: {numero_devis} - {total_ttc}€ TTC ✅ ✅ ✅"
)
return {
"numero_devis": numero_devis,
"total_ht": total_ht,
"total_ttc": total_ttc,
"nb_lignes": len(devis_data["lignes"]),
"client_code": client_code_final,
"date_devis": (
str(date_finale) if date_finale else str(date_obj.date())
),
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("❌ Transaction annulée (rollback)")
except:
pass
raise
except Exception as e:
logger.error(f"❌ ❌ ❌ ERREUR CRÉATION DEVIS: {e}", exc_info=True)
raise RuntimeError(f"Échec création devis: {str(e)}")
# =========================================================================
# LECTURE DEVIS
# =========================================================================
def lire_devis(self, numero_devis):
"""
Lecture d'un devis (y compris brouillon)
✅ CORRIGÉ: Utilise .Client pour le client et .Article pour les lignes
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
# ✅ PRIORITÉ 1: Essayer ReadPiece (documents validés)
persist = factory.ReadPiece(0, numero_devis)
# ✅ PRIORITÉ 2: Rechercher dans la liste (brouillons)
if not persist:
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(
persist_test, "IBODocumentVente3"
)
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero_devis
):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
logger.warning(f"Devis {numero_devis} introuvable")
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ CHARGEMENT CLIENT VIA .Client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
logger.debug(
f"Client chargé via .Client: {client_code} - {client_intitule}"
)
except Exception as e:
logger.debug(f"Erreur chargement client: {e}")
# Fallback sur cache si disponible
if client_code:
client_obj_cache = self.lire_client(client_code)
if client_obj_cache:
client_intitule = client_obj_cache.get("intitule", "")
devis = {
"numero": getattr(doc, "DO_Piece", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": getattr(doc, "DO_Statut", 0),
"lignes": [],
}
# Lecture des lignes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_persist = factory_lignes.List(index)
if ligne_persist is None:
break
ligne = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
ligne.Read()
# ✅✅✅ CHARGEMENT ARTICLE VIA .Article ✅✅✅
article_ref = ""
try:
# Méthode 1: Essayer AR_Ref direct (parfois disponible)
article_ref = getattr(ligne, "AR_Ref", "").strip()
# Méthode 2: Si vide, utiliser la propriété .Article
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
logger.debug(
f"Article chargé via .Article: {article_ref}"
)
except Exception as e:
logger.debug(
f"Erreur chargement article ligne {index}: {e}"
)
devis["lignes"].append(
{
"article": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"montant_ht": float(
getattr(ligne, "DL_MontantHT", 0.0)
),
}
)
index += 1
except Exception as e:
logger.debug(f"Erreur lecture ligne {index}: {e}")
break
logger.info(
f"✅ Devis {numero_devis} lu: {len(devis['lignes'])} lignes, {devis['total_ttc']:.2f}€, client: {client_intitule}"
)
return devis
except Exception as e:
logger.error(f"❌ Erreur lecture devis {numero_devis}: {e}")
return None
def lire_document(self, numero, type_doc):
"""Lecture générique document (pour PDF)"""
if type_doc == 0:
return self.lire_devis(numero)
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, numero)
if not persist:
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# Lire lignes
lignes = []
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_p = factory_lignes.List(index)
if ligne_p is None:
break
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
ligne.Read()
lignes.append(
{
"designation": getattr(ligne, "DL_Design", ""),
"quantite": getattr(ligne, "DL_Qte", 0.0),
"prix_unitaire": getattr(ligne, "DL_PrixUnitaire", 0.0),
"montant_ht": getattr(ligne, "DL_MontantHT", 0.0),
}
)
index += 1
except:
break
return {
"numero": getattr(doc, "DO_Piece", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": getattr(doc, "CT_Num", ""),
"client_intitule": getattr(doc, "CT_Intitule", ""),
"total_ht": getattr(doc, "DO_TotalHT", 0.0),
"total_ttc": getattr(doc, "DO_TotalTTC", 0.0),
"lignes": lignes,
}
except Exception as e:
logger.error(f" Erreur lecture document: {e}")
return None
# =========================================================================
# TRANSFORMATION (US-A2)
# =========================================================================
def transformer_document(self, numero_source, type_source, type_cible):
"""
Transformation de document avec la méthode NATIVE de Sage
CHANGEMENT MAJEUR:
- Utilise TransformInto() au lieu de CreateProcess_Document()
- Méthode officielle Sage pour les transformations
- Gère automatiquement les numéros, statuts, et lignes
Documentation Sage:
IBODocumentVente3.TransformInto(DO_Type: int) -> IBODocumentVente3
"""
if not self.cial:
raise RuntimeError("Connexion Sage non etablie")
# Convertir en int si enum
type_source = int(type_source)
type_cible = int(type_cible)
logger.info(
f"[TRANSFORM] Demande: {numero_source} "
f"(type {type_source}) -> type {type_cible}"
)
# Validation des types
types_valides = {0, 1, 2, 3, 4, 5}
if type_source not in types_valides or type_cible not in types_valides:
raise ValueError(
f"Types invalides: source={type_source}, cible={type_cible}. "
f"Valeurs valides: {types_valides}"
)
# Matrice de transformations Sage 100c
transformations_autorisees = {
(0, 3): "Devis -> Commande",
(0, 1): "Devis -> Bon de livraison",
(0, 5): "Devis -> Facture", # Peut être supporté selon config
(3, 1): "Commande -> Bon de livraison",
(3, 4): "Commande -> Preparation",
(3, 5): "Commande -> Facture", # Direct si autorisé
(1, 5): "Bon de livraison -> Facture",
(4, 1): "Preparation -> Bon de livraison",
}
if (type_source, type_cible) not in transformations_autorisees:
raise ValueError(
f"Transformation non autorisee par Sage: "
f"{type_source} -> {type_cible}. "
f"Valides: "
+ ", ".join(
f"{k[0]}->{k[1]}" for k in transformations_autorisees.keys()
)
)
try:
with self._com_context(), self._lock_com:
# ===== LECTURE SOURCE =====
factory = self.cial.FactoryDocumentVente
persist_source = factory.ReadPiece(type_source, numero_source)
if not persist_source:
logger.warning(
f"[TRANSFORM] ReadPiece failed, searching in List()..."
)
persist_source = self._find_document_in_list(
numero_source, type_source
)
if not persist_source:
raise ValueError(
f"Document {numero_source} (type {type_source}) introuvable"
)
doc_source = win32com.client.CastTo(persist_source, "IBODocumentVente3")
doc_source.Read()
# VÉRIFICATIONS STATUT
statut_actuel = getattr(doc_source, "DO_Statut", 0)
type_reel = getattr(doc_source, "DO_Type", -1)
logger.info(
f"[TRANSFORM] Document source: type={type_reel}, "
f"statut={statut_actuel}, numero={numero_source}"
)
# Vérifier cohérence type
if type_reel != type_source:
raise ValueError(
f"Incoherence: document {numero_source} est de type {type_reel}, "
f"pas de type {type_source}"
)
# RÈGLES DE STATUT
if statut_actuel == 5:
raise ValueError(
f"Document {numero_source} deja transforme (statut=5). "
f"Impossible de le transformer a nouveau."
)
if statut_actuel == 6:
raise ValueError(
f"Document {numero_source} annule (statut=6). "
f"Impossible de le transformer."
)
if statut_actuel in [3, 4]:
raise ValueError(
f"Document {numero_source} deja realise (statut={statut_actuel}). "
f"Ce document a deja ete transforme partiellement ou totalement."
)
# Forcer statut "Accepté" si brouillon
if type_source == 0 and statut_actuel == 0:
logger.warning(
f"[TRANSFORM] Devis en brouillon (statut=0), "
f"passage a 'Accepte' (statut=2)"
)
try:
doc_source.DO_Statut = 2
doc_source.Write()
logger.info(f"[TRANSFORM] Statut change: 0 -> 2")
# Re-lire
doc_source.Read()
nouveau_statut = getattr(doc_source, "DO_Statut", 0)
if nouveau_statut != 2:
raise RuntimeError(
f"Echec changement statut: toujours a {nouveau_statut}"
)
except Exception as e:
raise RuntimeError(f"Impossible de changer le statut: {e}")
# ===== TRANSACTION =====
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("[TRANSFORM] Transaction demarree")
except AttributeError:
# BeginTrans n'existe pas sur cette version
logger.debug(
"[TRANSFORM] BeginTrans non disponible, continue sans transaction"
)
except Exception as e:
logger.warning(f"[TRANSFORM] BeginTrans echoue: {e}")
try:
# ✅✅✅ MÉTHODE NATIVE SAGE: TransformInto() ✅✅✅
logger.info(f"[TRANSFORM] Appel TransformInto({type_cible})...")
try:
# La méthode TransformInto() retourne le nouveau document
doc_cible = doc_source.TransformInto(type_cible)
if doc_cible is None:
raise RuntimeError(
"TransformInto() a retourne None. "
"Verifiez la configuration Sage et les autorisations."
)
logger.info("[TRANSFORM] TransformInto() execute avec succes")
# Cast vers le bon type
try:
doc_cible = win32com.client.CastTo(
doc_cible, "IBODocumentVente3"
)
except:
pass
# Lire le document cible
doc_cible.Read()
# Récupérer le numéro
numero_cible = getattr(doc_cible, "DO_Piece", "")
if not numero_cible:
raise RuntimeError(
"Numero document cible vide apres transformation"
)
# Compter les lignes
try:
factory_lignes = doc_cible.FactoryDocumentLigne
except:
factory_lignes = doc_cible.FactoryDocumentVenteLigne
nb_lignes = 0
index = 1
while index <= 1000:
try:
ligne_p = factory_lignes.List(index)
if ligne_p is None:
break
nb_lignes += 1
index += 1
except:
break
logger.info(
f"[TRANSFORM] Document cible cree: {numero_cible} avec {nb_lignes} lignes"
)
except AttributeError as e:
# TransformInto() n'existe pas sur cette version de Sage
logger.error(f"[TRANSFORM] TransformInto() non disponible: {e}")
raise RuntimeError(
f"La methode TransformInto() n'est pas disponible sur votre version de Sage 100c. "
f"Vous devez soit: "
f"(1) Mettre a jour Sage, ou "
f"(2) Activer le module de gestion commerciale pour les commandes, ou "
f"(3) Utiliser l'interface Sage manuellement pour les transformations."
)
except Exception as e:
logger.error(f"[TRANSFORM] TransformInto() echoue: {e}")
# Essayer de déterminer la cause
if "Valeur invalide" in str(e):
raise RuntimeError(
f"Sage refuse la transformation vers le type {type_cible}. "
f"Causes possibles:\n"
f"1. Le module 'Commandes' n'est pas active dans votre licence Sage\n"
f"2. L'utilisateur n'a pas les droits sur ce type de document\n"
f"3. La configuration Sage bloque ce type de transformation\n"
f"4. Il manque des parametres obligatoires (depot, tarif, etc.)\n\n"
f"Verifiez dans Sage: Fichier > Autorisations > Gestion Commerciale"
)
elif "Acces refuse" in str(e) or "Access denied" in str(e):
raise RuntimeError(
f"Acces refuse pour creer une commande (type {type_cible}). "
f"Verifiez les droits utilisateur dans Sage: "
f"Fichier > Autorisations > Votre utilisateur"
)
else:
raise RuntimeError(
f"Erreur Sage lors de la transformation: {e}\n"
f"Consultez les logs Sage pour plus de details."
)
# Commit transaction
if transaction_active:
try:
self.cial.CptaApplication.CommitTrans()
logger.info("[TRANSFORM] Transaction committee")
except:
pass
# MAJ statut source -> Transformé
try:
doc_source.Read() # Re-lire au cas où
doc_source.DO_Statut = 5
doc_source.Write()
logger.info(
f"[TRANSFORM] Statut source mis a jour: -> 5 (TRANSFORME)"
)
except Exception as e:
logger.warning(
f"[TRANSFORM] Impossible de MAJ statut source: {e}"
)
logger.info(
f"[TRANSFORM] SUCCES: "
f"{numero_source} ({type_source}) -> "
f"{numero_cible} ({type_cible}) - {nb_lignes} lignes"
)
return {
"success": True,
"document_source": numero_source,
"document_cible": numero_cible,
"nb_lignes": nb_lignes,
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("[TRANSFORM] Transaction annulee")
except:
pass
raise
except Exception as e:
logger.error(f"[TRANSFORM] Erreur: {e}", exc_info=True)
raise RuntimeError(f"Echec transformation: {str(e)}")
def _find_document_in_list(self, numero, type_doc):
"""Cherche un document dans List() si ReadPiece échoue"""
try:
factory = self.cial.FactoryDocumentVente
index = 1
while index < 10000:
try:
persist = factory.List(index)
if persist is None:
break
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
if (
getattr(doc, "DO_Type", -1) == type_doc
and getattr(doc, "DO_Piece", "") == numero
):
logger.info(f"[TRANSFORM] Document trouve a l'index {index}")
return persist
index += 1
except:
index += 1
continue
return None
except Exception as e:
logger.error(f"[TRANSFORM] Erreur recherche document: {e}")
return None
# =========================================================================
# CHAMPS LIBRES (US-A3)
# =========================================================================
def mettre_a_jour_champ_libre(self, doc_id, type_doc, nom_champ, valeur):
"""Mise à jour champ libre pour Universign ID"""
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, doc_id)
if persist:
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
try:
setattr(doc, f"DO_{nom_champ}", valeur)
doc.Write()
logger.debug(f"Champ libre {nom_champ} = {valeur} sur {doc_id}")
return True
except Exception as e:
logger.warning(f"Impossible de mettre à jour {nom_champ}: {e}")
except Exception as e:
logger.error(f"Erreur MAJ champ libre: {e}")
return False
def _lire_client_obj(self, code_client):
"""Retourne l'objet client Sage brut (pour remises)"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
persist = factory.ReadNumero(code_client)
if persist:
return self._cast_client(persist)
except:
pass
return None
# =========================================================================
# US-A6 - LECTURE CONTACTS
# =========================================================================
def lire_contact_principal_client(self, code_client):
"""
NOUVEAU: Lecture contact principal d'un client
Pour US-A6: relance devis via Universign
Récupère l'email du contact principal pour l'envoi
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(code_client)
if not persist_client:
return None
client = self._cast_client(persist_client)
if not client:
return None
# Récupérer infos contact principal
contact_info = {
"client_code": code_client,
"client_intitule": getattr(client, "CT_Intitule", ""),
"email": None,
"nom": None,
"telephone": None,
}
# Email principal depuis Telecom
try:
telecom = getattr(client, "Telecom", None)
if telecom:
contact_info["email"] = getattr(telecom, "EMail", "")
contact_info["telephone"] = getattr(telecom, "Telephone", "")
except:
pass
# Nom du contact
try:
contact_info["nom"] = (
getattr(client, "CT_Contact", "")
or contact_info["client_intitule"]
)
except:
contact_info["nom"] = contact_info["client_intitule"]
return contact_info
except Exception as e:
logger.error(f"Erreur lecture contact client {code_client}: {e}")
return None
# =========================================================================
# US-A7 - MAJ CHAMP DERNIERE RELANCE
# =========================================================================
def mettre_a_jour_derniere_relance(self, doc_id, type_doc):
"""
NOUVEAU: Met à jour le champ libre "Dernière relance"
Pour US-A7: relance facture en un clic
"""
date_relance = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return self.mettre_a_jour_champ_libre(
doc_id, type_doc, "DerniereRelance", date_relance
)