Sage100-ws/sage_connector.py

1582 lines
64 KiB
Python

import win32com.client
import pythoncom # AJOUT CRITIQUE
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional
import threading
import time
import logging
from contextlib import contextmanager
from config import settings, validate_settings
logger = logging.getLogger(__name__)
class SageConnector:
"""
Connecteur Sage 100c avec gestion COM threading correcte
CHANGEMENTS PRODUCTION:
- Initialisation COM par thread (CoInitialize/CoUninitialize)
- Lock robuste pour thread-safety
- Gestion d'erreurs exhaustive
- Logging structuré
- Retry automatique sur erreurs COM
"""
def __init__(self, chemin_base, utilisateur="<Administrateur>", mot_de_passe=""):
self.chemin_base = chemin_base
self.utilisateur = utilisateur
self.mot_de_passe = mot_de_passe
self.cial = None
# Cache
self._cache_clients: List[Dict] = []
self._cache_articles: List[Dict] = []
self._cache_clients_dict: Dict[str, Dict] = {}
self._cache_articles_dict: Dict[str, Dict] = {}
# Métadonnées cache
self._cache_clients_last_update: Optional[datetime] = None
self._cache_articles_last_update: Optional[datetime] = None
self._cache_ttl_minutes = 15
# Thread d'actualisation
self._refresh_thread: Optional[threading.Thread] = None
self._stop_refresh = threading.Event()
# Locks thread-safe
self._lock_clients = threading.RLock()
self._lock_articles = threading.RLock()
self._lock_com = threading.RLock() # Lock pour accès COM
# Thread-local storage pour COM
self._thread_local = threading.local()
# =========================================================================
# GESTION COM THREAD-SAFE
# =========================================================================
@contextmanager
def _com_context(self):
"""
Context manager pour initialiser COM dans chaque thread
CRITIQUE: FastAPI utilise un pool de threads.
Chaque thread doit initialiser COM avant d'utiliser les objets Sage.
"""
# Vérifier si COM est déjà initialisé pour ce thread
if not hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoInitialize()
self._thread_local.com_initialized = True
logger.debug(
f"COM initialisé pour thread {threading.current_thread().name}"
)
except Exception as e:
logger.error(f"Erreur initialisation COM: {e}")
raise
try:
yield
finally:
# Ne pas désinitialiser COM ici car le thread peut être réutilisé
pass
def _cleanup_com_thread(self):
"""Nettoie COM pour le thread actuel (à appeler à la fin)"""
if hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoUninitialize()
delattr(self._thread_local, "com_initialized")
logger.debug(
f"COM nettoyé pour thread {threading.current_thread().name}"
)
except:
pass
# =========================================================================
# CONNEXION
# =========================================================================
def connecter(self):
"""Connexion initiale à Sage"""
try:
with self._com_context():
self.cial = win32com.client.gencache.EnsureDispatch(
"Objets100c.Cial.Stream"
)
self.cial.Name = self.chemin_base
self.cial.Loggable.UserName = self.utilisateur
self.cial.Loggable.UserPwd = self.mot_de_passe
self.cial.Open()
logger.info(f"Connexion Sage réussie: {self.chemin_base}")
# Chargement initial du cache
logger.info("Chargement initial du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
logger.info(
f"Cache initialisé: {len(self._cache_clients)} clients, {len(self._cache_articles)} articles"
)
# Démarrage du thread d'actualisation
self._start_refresh_thread()
return True
except Exception as e:
logger.error(f"Erreur connexion Sage: {e}", exc_info=True)
return False
def deconnecter(self):
"""Déconnexion propre"""
self._stop_refresh.set()
if self._refresh_thread:
self._refresh_thread.join(timeout=5)
if self.cial:
try:
with self._com_context():
self.cial.Close()
logger.info("Connexion Sage fermée")
except:
pass
# =========================================================================
# SYSTÈME DE CACHE
# =========================================================================
def _start_refresh_thread(self):
"""Démarre le thread d'actualisation automatique"""
def refresh_loop():
# Initialiser COM pour ce thread worker
pythoncom.CoInitialize()
try:
while not self._stop_refresh.is_set():
time.sleep(60) # Vérifier toutes les minutes
# Clients
if self._cache_clients_last_update:
age = datetime.now() - self._cache_clients_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
logger.info(
f"Actualisation cache clients (âge: {age.seconds//60}min)"
)
self._refresh_cache_clients()
# Articles
if self._cache_articles_last_update:
age = datetime.now() - self._cache_articles_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
logger.info(
f"Actualisation cache articles (âge: {age.seconds//60}min)"
)
self._refresh_cache_articles()
finally:
# Nettoyer COM en fin de thread
pythoncom.CoUninitialize()
self._refresh_thread = threading.Thread(
target=refresh_loop, daemon=True, name="SageCacheRefresh"
)
self._refresh_thread.start()
def _refresh_cache_clients(self):
"""Actualise le cache des clients"""
if not self.cial:
return
clients = []
clients_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
index = 1
erreurs_consecutives = 0
max_erreurs = 50
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_client(persist)
if obj:
data = self._extraire_client(obj)
clients.append(data)
clients_dict[data["numero"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh clients après {max_erreurs} erreurs"
)
break
with self._lock_clients:
self._cache_clients = clients
self._cache_clients_dict = clients_dict
self._cache_clients_last_update = datetime.now()
logger.info(f" Cache clients actualisé: {len(clients)} clients")
except Exception as e:
logger.error(f" Erreur refresh clients: {e}", exc_info=True)
def _refresh_cache_articles(self):
"""Actualise le cache des articles"""
if not self.cial:
return
articles = []
articles_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryArticle
index = 1
erreurs_consecutives = 0
max_erreurs = 50
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_article(persist)
if obj:
data = self._extraire_article(obj)
articles.append(data)
articles_dict[data["reference"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh articles après {max_erreurs} erreurs"
)
break
with self._lock_articles:
self._cache_articles = articles
self._cache_articles_dict = articles_dict
self._cache_articles_last_update = datetime.now()
logger.info(f" Cache articles actualisé: {len(articles)} articles")
except Exception as e:
logger.error(f" Erreur refresh articles: {e}", exc_info=True)
# =========================================================================
# API PUBLIQUE (ultra-rapide grâce au cache)
# =========================================================================
def lister_tous_clients(self, filtre=""):
"""Retourne les clients depuis le cache (instantané)"""
with self._lock_clients:
if not filtre:
return self._cache_clients.copy()
filtre_lower = filtre.lower()
return [
c
for c in self._cache_clients
if filtre_lower in c["numero"].lower()
or filtre_lower in c["intitule"].lower()
]
def lire_client(self, code_client):
"""Retourne un client depuis le cache (instantané)"""
with self._lock_clients:
return self._cache_clients_dict.get(code_client)
def lister_tous_articles(self, filtre=""):
"""Retourne les articles depuis le cache (instantané)"""
with self._lock_articles:
if not filtre:
return self._cache_articles.copy()
filtre_lower = filtre.lower()
return [
a
for a in self._cache_articles
if filtre_lower in a["reference"].lower()
or filtre_lower in a["designation"].lower()
]
def lire_article(self, reference):
"""Retourne un article depuis le cache (instantané)"""
with self._lock_articles:
return self._cache_articles_dict.get(reference)
def forcer_actualisation_cache(self):
"""Force l'actualisation immédiate du cache (endpoint admin)"""
logger.info("Actualisation forcée du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
logger.info("Cache actualisé")
def get_cache_info(self):
"""Retourne les infos du cache (endpoint monitoring)"""
with self._lock_clients, self._lock_articles:
return {
"clients": {
"count": len(self._cache_clients),
"last_update": (
self._cache_clients_last_update.isoformat()
if self._cache_clients_last_update
else None
),
"age_minutes": (
(
datetime.now() - self._cache_clients_last_update
).total_seconds()
/ 60
if self._cache_clients_last_update
else None
),
},
"articles": {
"count": len(self._cache_articles),
"last_update": (
self._cache_articles_last_update.isoformat()
if self._cache_articles_last_update
else None
),
"age_minutes": (
(
datetime.now() - self._cache_articles_last_update
).total_seconds()
/ 60
if self._cache_articles_last_update
else None
),
},
"ttl_minutes": self._cache_ttl_minutes,
}
# =========================================================================
# CAST HELPERS
# =========================================================================
def _cast_client(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOClient3")
obj.Read()
return obj
except:
return None
def _cast_article(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOArticle3")
obj.Read()
return obj
except:
return None
# =========================================================================
# EXTRACTION
# =========================================================================
def _extraire_client(self, client_obj):
data = {
"numero": getattr(client_obj, "CT_Num", ""),
"intitule": getattr(client_obj, "CT_Intitule", ""),
"type": getattr(client_obj, "CT_Type", 0),
}
try:
adresse = getattr(client_obj, "Adresse", None)
if adresse:
data["adresse"] = getattr(adresse, "Adresse", "")
data["code_postal"] = getattr(adresse, "CodePostal", "")
data["ville"] = getattr(adresse, "Ville", "")
except:
pass
try:
telecom = getattr(client_obj, "Telecom", None)
if telecom:
data["telephone"] = getattr(telecom, "Telephone", "")
data["email"] = getattr(telecom, "EMail", "")
except:
pass
return data
def _extraire_article(self, article_obj):
return {
"reference": getattr(article_obj, "AR_Ref", ""),
"designation": getattr(article_obj, "AR_Design", ""),
"prix_vente": getattr(article_obj, "AR_PrixVen", 0.0),
"prix_achat": getattr(article_obj, "AR_PrixAch", 0.0),
"stock_reel": getattr(article_obj, "AR_Stock", 0.0),
"stock_mini": getattr(article_obj, "AR_StockMini", 0.0),
}
# =========================================================================
# CRÉATION DEVIS (US-A1) - VERSION TRANSACTIONNELLE
# =========================================================================
def creer_devis_enrichi(self, devis_data: dict):
"""
Création de devis avec transaction Sage
✅ SOLUTION FINALE: Utilisation de SetDefaultArticle()
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(
f"🚀 Début création devis pour client {devis_data['client']['code']}"
)
try:
with self._com_context(), self._lock_com:
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("✅ Transaction Sage démarrée")
except Exception as e:
logger.warning(f"⚠️ BeginTrans échoué: {e}")
try:
# ===== CRÉATION DOCUMENT =====
process = self.cial.CreateProcess_Document(0) # Type 0 = Devis
doc = process.Document
try:
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
except:
pass
logger.info("📄 Document devis créé")
# ===== DATE =====
import pywintypes
if isinstance(devis_data["date_devis"], str):
try:
date_obj = datetime.fromisoformat(devis_data["date_devis"])
except:
date_obj = datetime.now()
elif isinstance(devis_data["date_devis"], date):
date_obj = datetime.combine(
devis_data["date_devis"], datetime.min.time()
)
else:
date_obj = datetime.now()
doc.DO_Date = pywintypes.Time(date_obj)
logger.info(f"📅 Date définie: {date_obj.date()}")
# ===== CLIENT (CRITIQUE: Doit être défini AVANT les lignes) =====
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(
devis_data["client"]["code"]
)
if not persist_client:
raise ValueError(
f"❌ Client {devis_data['client']['code']} introuvable"
)
client_obj = self._cast_client(persist_client)
if not client_obj:
raise ValueError(
f"❌ Impossible de charger le client {devis_data['client']['code']}"
)
# ✅ CRITIQUE: Associer le client au document
doc.SetDefaultClient(client_obj)
doc.Write()
logger.info(
f"👤 Client {devis_data['client']['code']} associé et document écrit"
)
# ===== LIGNES AVEC SetDefaultArticle() =====
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
logger.info(f"📦 Ajout de {len(devis_data['lignes'])} lignes...")
for idx, ligne_data in enumerate(devis_data["lignes"], 1):
logger.info(
f"--- Ligne {idx}: {ligne_data['article_code']} ---"
)
# 🔍 ÉTAPE 1: Charger l'article RÉEL depuis Sage pour le prix
persist_article = factory_article.ReadReference(
ligne_data["article_code"]
)
if not persist_article:
raise ValueError(
f"❌ Article {ligne_data['article_code']} introuvable dans Sage"
)
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
# 💰 ÉTAPE 2: Récupérer le prix de vente RÉEL
prix_sage = float(getattr(article_obj, "AR_PrixVen", 0.0))
designation_sage = getattr(article_obj, "AR_Design", "")
logger.info(f"💰 Prix Sage: {prix_sage}")
if prix_sage == 0:
logger.warning(
f"⚠️ ATTENTION: Article {ligne_data['article_code']} a un prix de vente = 0€"
)
# 📝 ÉTAPE 3: Créer la ligne de devis
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# ✅✅✅ SOLUTION FINALE: SetDefaultArticleReference avec 2 paramètres ✅✅✅
quantite = float(ligne_data["quantite"])
try:
# Méthode 1: Via référence (plus simple et plus fiable)
ligne_obj.SetDefaultArticleReference(
ligne_data["article_code"], quantite
)
logger.info(
f"✅ Article associé via SetDefaultArticleReference('{ligne_data['article_code']}', {quantite})"
)
except Exception as e:
logger.warning(
f"⚠️ SetDefaultArticleReference échoué: {e}, tentative avec objet article"
)
try:
# Méthode 2: Via objet article
ligne_obj.SetDefaultArticle(article_obj, quantite)
logger.info(
f"✅ Article associé via SetDefaultArticle(obj, {quantite})"
)
except Exception as e2:
logger.error(
f"❌ Toutes les méthodes d'association ont échoué"
)
# Fallback: définir manuellement
ligne_obj.DL_Design = (
designation_sage or ligne_data["designation"]
)
ligne_obj.DL_Qte = quantite
logger.warning("⚠️ Configuration manuelle appliquée")
# ⚙️ ÉTAPE 4: Vérifier le prix automatique chargé
prix_auto = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
logger.info(f"💰 Prix auto chargé: {prix_auto}")
# 💵 ÉTAPE 5: Ajuster le prix si nécessaire
prix_a_utiliser = ligne_data.get("prix_unitaire_ht")
if prix_a_utiliser is not None and prix_a_utiliser > 0:
# Prix personnalisé fourni
ligne_obj.DL_PrixUnitaire = float(prix_a_utiliser)
logger.info(f"💰 Prix personnalisé: {prix_a_utiliser}")
elif prix_auto == 0:
# Pas de prix auto, forcer le prix Sage
if prix_sage == 0:
raise ValueError(
f"Prix nul pour article {ligne_data['article_code']}"
)
ligne_obj.DL_PrixUnitaire = float(prix_sage)
logger.info(f"💰 Prix Sage forcé: {prix_sage}")
else:
# Prix auto correct, on le garde
logger.info(f"💰 Prix auto conservé: {prix_auto}")
prix_final = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
montant_ligne = quantite * prix_final
logger.info(f"{quantite} x {prix_final}€ = {montant_ligne}")
# 🎁 Remise
remise = ligne_data.get("remise_pourcentage", 0)
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = 0
montant_apres_remise = montant_ligne * (
1 - remise / 100
)
logger.info(
f"🎁 Remise {remise}% → {montant_apres_remise}"
)
except Exception as e:
logger.warning(f"⚠️ Remise non appliquée: {e}")
# 💾 ÉTAPE 6: Écrire la ligne
ligne_obj.Write()
logger.info(f"✅ Ligne {idx} écrite")
# 🔍 VÉRIFICATION: Relire la ligne pour confirmer
try:
ligne_obj.Read()
prix_enregistre = float(
getattr(ligne_obj, "DL_PrixUnitaire", 0.0)
)
montant_enregistre = float(
getattr(ligne_obj, "DL_MontantHT", 0.0)
)
logger.info(
f"🔍 Vérif: Prix={prix_enregistre}€, Montant HT={montant_enregistre}"
)
if montant_enregistre == 0:
logger.error(
f"❌ PROBLÈME: Montant enregistré = 0 pour ligne {idx}"
)
else:
logger.info(f"✅ Ligne {idx} OK: {montant_enregistre}")
except Exception as e:
logger.warning(f"⚠️ Impossible de vérifier la ligne: {e}")
# ===== VALIDATION DOCUMENT =====
logger.info("💾 Écriture finale du document...")
doc.Write()
logger.info("🔄 Lancement du traitement (Process)...")
process.Process()
# ===== RÉCUPÉRATION NUMÉRO =====
numero_devis = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(
doc_result, "IBODocumentVente3"
)
doc_result.Read()
numero_devis = getattr(doc_result, "DO_Piece", "")
logger.info(
f"📄 Numéro (via DocumentResult): {numero_devis}"
)
except Exception as e:
logger.warning(f"⚠️ DocumentResult non accessible: {e}")
if not numero_devis:
numero_devis = getattr(doc, "DO_Piece", "")
logger.info(f"📄 Numéro (via Document): {numero_devis}")
if not numero_devis:
raise RuntimeError("❌ Numéro devis vide après création")
# ===== COMMIT TRANSACTION =====
if transaction_active:
self.cial.CptaApplication.CommitTrans()
logger.info("✅ Transaction committée")
# ===== ATTENTE INDEXATION =====
logger.info("⏳ Attente indexation Sage (2s)...")
time.sleep(2)
# ===== RELECTURE COMPLÈTE =====
logger.info("🔍 Relecture complète du document...")
factory_doc = self.cial.FactoryDocumentVente
persist_reread = factory_doc.ReadPiece(0, numero_devis)
if not persist_reread:
logger.error(f"❌ Impossible de relire le devis {numero_devis}")
# Fallback: retourner les totaux calculés
total_calcule = sum(
l.get("montant_ligne_ht", 0) for l in devis_data["lignes"]
)
logger.warning(f"⚠️ Utilisation total calculé: {total_calcule}")
return {
"numero_devis": numero_devis,
"total_ht": total_calcule,
"total_ttc": round(total_calcule * 1.20, 2),
"nb_lignes": len(devis_data["lignes"]),
"client_code": devis_data["client"]["code"],
"date_devis": str(date_obj.date()),
}
doc_final = win32com.client.CastTo(
persist_reread, "IBODocumentVente3"
)
doc_final.Read()
# ===== EXTRACTION TOTAUX =====
total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0))
client_code_final = getattr(doc_final, "CT_Num", "")
date_finale = getattr(doc_final, "DO_Date", None)
logger.info(f"💰 Total HT: {total_ht}")
logger.info(f"💰 Total TTC: {total_ttc}")
# ===== DIAGNOSTIC EN CAS D'ANOMALIE =====
if total_ht == 0 and total_ttc > 0:
logger.warning("⚠️ Anomalie: Total HT = 0 mais Total TTC > 0")
logger.info("🔍 Lecture des lignes pour diagnostic...")
try:
factory_lignes_verif = doc_final.FactoryDocumentLigne
except:
factory_lignes_verif = doc_final.FactoryDocumentVenteLigne
index = 1
total_calcule = 0.0
while index <= 20:
try:
ligne_p = factory_lignes_verif.List(index)
if ligne_p is None:
break
ligne_verif = win32com.client.CastTo(
ligne_p, "IBODocumentLigne3"
)
ligne_verif.Read()
montant = float(
getattr(ligne_verif, "DL_MontantHT", 0.0)
)
logger.info(
f" Ligne {index}: Montant HT = {montant}"
)
total_calcule += montant
index += 1
except:
break
logger.info(f"📊 Total calculé manuellement: {total_calcule}")
if total_calcule > 0:
total_ht = total_calcule
total_ttc = round(total_ht * 1.20, 2)
logger.info(
f"✅ Correction appliquée: HT={total_ht}€, TTC={total_ttc}"
)
logger.info(
f"✅ ✅ ✅ DEVIS CRÉÉ: {numero_devis} - {total_ttc}€ TTC ✅ ✅ ✅"
)
return {
"numero_devis": numero_devis,
"total_ht": total_ht,
"total_ttc": total_ttc,
"nb_lignes": len(devis_data["lignes"]),
"client_code": client_code_final,
"date_devis": (
str(date_finale) if date_finale else str(date_obj.date())
),
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("❌ Transaction annulée (rollback)")
except:
pass
raise
except Exception as e:
logger.error(f"❌ ❌ ❌ ERREUR CRÉATION DEVIS: {e}", exc_info=True)
raise RuntimeError(f"Échec création devis: {str(e)}")
# =========================================================================
# LECTURE DEVIS
# =========================================================================
def lire_devis(self, numero_devis):
"""
Lecture d'un devis (y compris brouillon)
✅ CORRIGÉ: Utilise .Client pour le client et .Article pour les lignes
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
# ✅ PRIORITÉ 1: Essayer ReadPiece (documents validés)
persist = factory.ReadPiece(0, numero_devis)
# ✅ PRIORITÉ 2: Rechercher dans la liste (brouillons)
if not persist:
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(
persist_test, "IBODocumentVente3"
)
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero_devis
):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
logger.warning(f"Devis {numero_devis} introuvable")
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ CHARGEMENT CLIENT VIA .Client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
logger.debug(
f"Client chargé via .Client: {client_code} - {client_intitule}"
)
except Exception as e:
logger.debug(f"Erreur chargement client: {e}")
# Fallback sur cache si disponible
if client_code:
client_obj_cache = self.lire_client(client_code)
if client_obj_cache:
client_intitule = client_obj_cache.get("intitule", "")
devis = {
"numero": getattr(doc, "DO_Piece", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": getattr(doc, "DO_Statut", 0),
"lignes": [],
}
# Lecture des lignes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_persist = factory_lignes.List(index)
if ligne_persist is None:
break
ligne = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
ligne.Read()
# ✅✅✅ CHARGEMENT ARTICLE VIA .Article ✅✅✅
article_ref = ""
try:
# Méthode 1: Essayer AR_Ref direct (parfois disponible)
article_ref = getattr(ligne, "AR_Ref", "").strip()
# Méthode 2: Si vide, utiliser la propriété .Article
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
logger.debug(
f"Article chargé via .Article: {article_ref}"
)
except Exception as e:
logger.debug(
f"Erreur chargement article ligne {index}: {e}"
)
devis["lignes"].append(
{
"article": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"montant_ht": float(
getattr(ligne, "DL_MontantHT", 0.0)
),
}
)
index += 1
except Exception as e:
logger.debug(f"Erreur lecture ligne {index}: {e}")
break
logger.info(
f"✅ Devis {numero_devis} lu: {len(devis['lignes'])} lignes, {devis['total_ttc']:.2f}€, client: {client_intitule}"
)
return devis
except Exception as e:
logger.error(f"❌ Erreur lecture devis {numero_devis}: {e}")
return None
def lire_document(self, numero, type_doc):
"""Lecture générique document (pour PDF)"""
if type_doc == 0:
return self.lire_devis(numero)
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, numero)
if not persist:
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# Lire lignes
lignes = []
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_p = factory_lignes.List(index)
if ligne_p is None:
break
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
ligne.Read()
lignes.append(
{
"designation": getattr(ligne, "DL_Design", ""),
"quantite": getattr(ligne, "DL_Qte", 0.0),
"prix_unitaire": getattr(ligne, "DL_PrixUnitaire", 0.0),
"montant_ht": getattr(ligne, "DL_MontantHT", 0.0),
}
)
index += 1
except:
break
return {
"numero": getattr(doc, "DO_Piece", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": getattr(doc, "CT_Num", ""),
"client_intitule": getattr(doc, "CT_Intitule", ""),
"total_ht": getattr(doc, "DO_TotalHT", 0.0),
"total_ttc": getattr(doc, "DO_TotalTTC", 0.0),
"lignes": lignes,
}
except Exception as e:
logger.error(f" Erreur lecture document: {e}")
return None
# =========================================================================
# TRANSFORMATION (US-A2)
# =========================================================================
def transformer_document(self, numero_source, type_source, type_cible):
"""
🔧 Transformation de document - MÉTHODE MANUELLE
⚠️ TransformInto() n'est pas disponible sur cette installation Sage
→ On crée manuellement le document cible en copiant les données
"""
if not self.cial:
raise RuntimeError("Connexion Sage non etablie")
type_source = int(type_source)
type_cible = int(type_cible)
logger.info(
f"[TRANSFORM] Demande MANUELLE: {numero_source} "
f"(type {type_source}) -> type {type_cible}"
)
# ✅ Matrice de transformations
transformations_autorisees = {
(0, 10): "Devis -> Commande",
(10, 30): "Commande -> Bon de livraison",
(10, 60): "Commande -> Facture",
(30, 60): "Bon de livraison -> Facture",
(0, 60): "Devis -> Facture",
}
if (type_source, type_cible) not in transformations_autorisees:
raise ValueError(
f"Transformation non autorisee: {type_source} -> {type_cible}"
)
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : LIRE LE DOCUMENT SOURCE
# ========================================
factory = self.cial.FactoryDocumentVente
persist_source = factory.ReadPiece(type_source, numero_source)
if not persist_source:
persist_source = self._find_document_in_list(
numero_source, type_source
)
if not persist_source:
raise ValueError(
f"Document {numero_source} (type {type_source}) introuvable"
)
doc_source = win32com.client.CastTo(persist_source, "IBODocumentVente3")
doc_source.Read()
# Vérifications statut
statut_actuel = getattr(doc_source, "DO_Statut", 0)
type_reel = getattr(doc_source, "DO_Type", -1)
logger.info(
f"[TRANSFORM] Source: type={type_reel}, statut={statut_actuel}"
)
if type_reel != type_source:
raise ValueError(
f"Incoherence: document est de type {type_reel}, pas {type_source}"
)
if statut_actuel == 5:
raise ValueError("Document deja transforme (statut=5)")
if statut_actuel == 6:
raise ValueError("Document annule (statut=6)")
if statut_actuel in [3, 4]:
raise ValueError(f"Document deja realise (statut={statut_actuel})")
# Forcer statut "Accepté" si devis brouillon
if type_source == 0 and statut_actuel == 0:
logger.info("[TRANSFORM] Passage devis a statut Accepte (2)")
doc_source.DO_Statut = 2
doc_source.Write()
doc_source.Read()
# ========================================
# ÉTAPE 2 : EXTRAIRE LES DONNÉES SOURCE
# ========================================
logger.info("[TRANSFORM] Extraction donnees source...")
# Client
client_code = ""
client_obj = None
try:
client_obj = getattr(doc_source, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
except Exception as e:
logger.error(f"Erreur lecture client: {e}")
raise ValueError(f"Impossible de lire le client du document source")
if not client_code:
raise ValueError("Client introuvable dans document source")
logger.info(f"[TRANSFORM] Client: {client_code}")
# Date
date_source = getattr(doc_source, "DO_Date", None)
# Lignes
lignes_source = []
try:
factory_lignes_source = getattr(
doc_source, "FactoryDocumentLigne", None
)
if not factory_lignes_source:
factory_lignes_source = getattr(
doc_source, "FactoryDocumentVenteLigne", None
)
if factory_lignes_source:
index = 1
while index <= 1000:
try:
ligne_p = factory_lignes_source.List(index)
if ligne_p is None:
break
ligne = win32com.client.CastTo(
ligne_p, "IBODocumentLigne3"
)
ligne.Read()
# Récupérer référence article
article_ref = ""
try:
article_ref = getattr(ligne, "AR_Ref", "").strip()
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
except:
pass
lignes_source.append(
{
"article_ref": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(
getattr(ligne, "DL_Qte", 0.0)
),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"remise": float(
getattr(ligne, "DL_Remise01REM_Valeur", 0.0)
),
"type_remise": int(
getattr(ligne, "DL_Remise01REM_Type", 0)
),
}
)
index += 1
except Exception as e:
logger.debug(f"Erreur ligne {index}: {e}")
break
except Exception as e:
logger.error(f"Erreur extraction lignes: {e}")
raise ValueError(
"Impossible d'extraire les lignes du document source"
)
nb_lignes = len(lignes_source)
logger.info(f"[TRANSFORM] {nb_lignes} lignes extraites")
if nb_lignes == 0:
raise ValueError("Document source vide (aucune ligne)")
# ========================================
# ÉTAPE 3 : TRANSACTION
# ========================================
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("[TRANSFORM] Transaction demarree")
except:
logger.debug("[TRANSFORM] BeginTrans non disponible")
try:
# ========================================
# ÉTAPE 4 : CRÉER LE DOCUMENT CIBLE
# ========================================
logger.info(f"[TRANSFORM] Creation document type {type_cible}...")
process = self.cial.CreateProcess_Document(type_cible)
if not process:
raise RuntimeError(
f"CreateProcess_Document({type_cible}) a retourne None"
)
doc_cible = process.Document
try:
doc_cible = win32com.client.CastTo(
doc_cible, "IBODocumentVente3"
)
except:
pass
logger.info("[TRANSFORM] Document cible cree")
# ========================================
# ÉTAPE 5 : DÉFINIR LA DATE
# ========================================
import pywintypes
if date_source:
try:
doc_cible.DO_Date = date_source
logger.info(f"[TRANSFORM] Date copiee: {date_source}")
except Exception as e:
logger.warning(f"Impossible de copier date: {e}")
doc_cible.DO_Date = pywintypes.Time(datetime.now())
else:
doc_cible.DO_Date = pywintypes.Time(datetime.now())
# ========================================
# ÉTAPE 6 : ASSOCIER LE CLIENT
# ========================================
logger.info(f"[TRANSFORM] Association client {client_code}...")
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(client_code)
if not persist_client:
raise ValueError(f"Client {client_code} introuvable")
client_obj_cible = self._cast_client(persist_client)
if not client_obj_cible:
raise ValueError(f"Impossible de charger client {client_code}")
doc_cible.SetDefaultClient(client_obj_cible)
doc_cible.Write()
logger.info(f"[TRANSFORM] Client {client_code} associe")
# ========================================
# ÉTAPE 7 : COPIER LES LIGNES
# ========================================
logger.info(f"[TRANSFORM] Copie de {nb_lignes} lignes...")
try:
factory_lignes_cible = doc_cible.FactoryDocumentLigne
except:
factory_lignes_cible = doc_cible.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
for idx, ligne_data in enumerate(lignes_source, 1):
logger.debug(f"[TRANSFORM] Ligne {idx}/{nb_lignes}")
# Charger article
article_ref = ligne_data["article_ref"]
if not article_ref:
logger.warning(
f"Ligne {idx}: pas de reference article, skip"
)
continue
persist_article = factory_article.ReadReference(article_ref)
if not persist_article:
logger.warning(
f"Ligne {idx}: article {article_ref} introuvable, skip"
)
continue
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
# Créer ligne
ligne_persist = factory_lignes_cible.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# Associer article avec quantité
quantite = ligne_data["quantite"]
try:
ligne_obj.SetDefaultArticleReference(article_ref, quantite)
except:
try:
ligne_obj.SetDefaultArticle(article_obj, quantite)
except:
# Fallback manuel
ligne_obj.DL_Design = ligne_data["designation"]
ligne_obj.DL_Qte = quantite
# Définir prix
prix = ligne_data["prix_unitaire"]
if prix > 0:
ligne_obj.DL_PrixUnitaire = float(prix)
# Copier remise
remise = ligne_data["remise"]
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = ligne_data[
"type_remise"
]
except:
pass
# Écrire ligne
ligne_obj.Write()
logger.info(f"[TRANSFORM] {nb_lignes} lignes copiees")
# ========================================
# ÉTAPE 8 : VALIDER LE DOCUMENT
# ========================================
logger.info("[TRANSFORM] Validation document cible...")
doc_cible.Write()
process.Process()
logger.info("[TRANSFORM] Document cible valide")
# ========================================
# ÉTAPE 9 : RÉCUPÉRER LE NUMÉRO
# ========================================
numero_cible = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(
doc_result, "IBODocumentVente3"
)
doc_result.Read()
numero_cible = getattr(doc_result, "DO_Piece", "")
except:
pass
if not numero_cible:
numero_cible = getattr(doc_cible, "DO_Piece", "")
if not numero_cible:
raise RuntimeError("Numero document cible vide")
logger.info(f"[TRANSFORM] Document cible cree: {numero_cible}")
# ========================================
# ÉTAPE 10 : COMMIT & MAJ STATUT SOURCE
# ========================================
if transaction_active:
try:
self.cial.CptaApplication.CommitTrans()
logger.info("[TRANSFORM] Transaction committee")
except:
pass
# Attente indexation
time.sleep(1)
# Marquer source comme "Transformé"
try:
doc_source.Read()
doc_source.DO_Statut = 5
doc_source.Write()
logger.info("[TRANSFORM] Statut source -> 5 (TRANSFORME)")
except Exception as e:
logger.warning(f"Impossible MAJ statut source: {e}")
logger.info(
f"[TRANSFORM] ✅ SUCCES: {numero_source} ({type_source}) -> "
f"{numero_cible} ({type_cible}) - {nb_lignes} lignes"
)
return {
"success": True,
"document_source": numero_source,
"document_cible": numero_cible,
"nb_lignes": nb_lignes,
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("[TRANSFORM] Transaction annulee (rollback)")
except:
pass
raise
except Exception as e:
logger.error(f"[TRANSFORM] ❌ ERREUR: {e}", exc_info=True)
raise RuntimeError(f"Echec transformation: {str(e)}")
def _find_document_in_list(self, numero, type_doc):
"""Cherche un document dans List() si ReadPiece échoue"""
try:
factory = self.cial.FactoryDocumentVente
index = 1
while index < 10000:
try:
persist = factory.List(index)
if persist is None:
break
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
if (
getattr(doc, "DO_Type", -1) == type_doc
and getattr(doc, "DO_Piece", "") == numero
):
logger.info(f"[TRANSFORM] Document trouve a l'index {index}")
return persist
index += 1
except:
index += 1
continue
return None
except Exception as e:
logger.error(f"[TRANSFORM] Erreur recherche document: {e}")
return None
# =========================================================================
# CHAMPS LIBRES (US-A3)
# =========================================================================
def mettre_a_jour_champ_libre(self, doc_id, type_doc, nom_champ, valeur):
"""Mise à jour champ libre pour Universign ID"""
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, doc_id)
if persist:
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
try:
setattr(doc, f"DO_{nom_champ}", valeur)
doc.Write()
logger.debug(f"Champ libre {nom_champ} = {valeur} sur {doc_id}")
return True
except Exception as e:
logger.warning(f"Impossible de mettre à jour {nom_champ}: {e}")
except Exception as e:
logger.error(f"Erreur MAJ champ libre: {e}")
return False
def _lire_client_obj(self, code_client):
"""Retourne l'objet client Sage brut (pour remises)"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
persist = factory.ReadNumero(code_client)
if persist:
return self._cast_client(persist)
except:
pass
return None
# =========================================================================
# US-A6 - LECTURE CONTACTS
# =========================================================================
def lire_contact_principal_client(self, code_client):
"""
NOUVEAU: Lecture contact principal d'un client
Pour US-A6: relance devis via Universign
Récupère l'email du contact principal pour l'envoi
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(code_client)
if not persist_client:
return None
client = self._cast_client(persist_client)
if not client:
return None
# Récupérer infos contact principal
contact_info = {
"client_code": code_client,
"client_intitule": getattr(client, "CT_Intitule", ""),
"email": None,
"nom": None,
"telephone": None,
}
# Email principal depuis Telecom
try:
telecom = getattr(client, "Telecom", None)
if telecom:
contact_info["email"] = getattr(telecom, "EMail", "")
contact_info["telephone"] = getattr(telecom, "Telephone", "")
except:
pass
# Nom du contact
try:
contact_info["nom"] = (
getattr(client, "CT_Contact", "")
or contact_info["client_intitule"]
)
except:
contact_info["nom"] = contact_info["client_intitule"]
return contact_info
except Exception as e:
logger.error(f"Erreur lecture contact client {code_client}: {e}")
return None
# =========================================================================
# US-A7 - MAJ CHAMP DERNIERE RELANCE
# =========================================================================
def mettre_a_jour_derniere_relance(self, doc_id, type_doc):
"""
NOUVEAU: Met à jour le champ libre "Dernière relance"
Pour US-A7: relance facture en un clic
"""
date_relance = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return self.mettre_a_jour_champ_libre(
doc_id, type_doc, "DerniereRelance", date_relance
)