Sage100-ws/sage_connector.py

4207 lines
No EOL
180 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import win32com.client
import pythoncom # AJOUT CRITIQUE
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional
import threading
import time
import logging
from contextlib import contextmanager
from config import settings, validate_settings
logger = logging.getLogger(__name__)
class SageConnector:
"""
Connecteur Sage 100c avec gestion COM threading correcte
CHANGEMENTS PRODUCTION:
- Initialisation COM par thread (CoInitialize/CoUninitialize)
- Lock robuste pour thread-safety
- Gestion d'erreurs exhaustive
- Logging structuré
- Retry automatique sur erreurs COM
"""
def __init__(self, chemin_base, utilisateur="<Administrateur>", mot_de_passe=""):
self.chemin_base = chemin_base
self.utilisateur = utilisateur
self.mot_de_passe = mot_de_passe
self.cial = None
# Caches existants
self._cache_clients: List[Dict] = []
self._cache_articles: List[Dict] = []
self._cache_clients_dict: Dict[str, Dict] = {}
self._cache_articles_dict: Dict[str, Dict] = {}
# Métadonnées cache existantes
self._cache_clients_last_update: Optional[datetime] = None
self._cache_articles_last_update: Optional[datetime] = None
self._cache_ttl_minutes = 15
# Thread d'actualisation
self._refresh_thread: Optional[threading.Thread] = None
self._stop_refresh = threading.Event()
# Locks
self._lock_clients = threading.RLock()
self._lock_articles = threading.RLock()
self._lock_com = threading.RLock()
# Thread-local storage pour COM
self._thread_local = threading.local()
# =========================================================================
# GESTION COM THREAD-SAFE
# =========================================================================
@contextmanager
def _com_context(self):
"""
Context manager pour initialiser COM dans chaque thread
CRITIQUE: FastAPI utilise un pool de threads.
Chaque thread doit initialiser COM avant d'utiliser les objets Sage.
"""
# Vérifier si COM est déjà initialisé pour ce thread
if not hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoInitialize()
self._thread_local.com_initialized = True
logger.debug(
f"COM initialisé pour thread {threading.current_thread().name}"
)
except Exception as e:
logger.error(f"Erreur initialisation COM: {e}")
raise
try:
yield
finally:
# Ne pas désinitialiser COM ici car le thread peut être réutilisé
pass
def _cleanup_com_thread(self):
"""Nettoie COM pour le thread actuel (à appeler à la fin)"""
if hasattr(self._thread_local, "com_initialized"):
try:
pythoncom.CoUninitialize()
delattr(self._thread_local, "com_initialized")
logger.debug(
f"COM nettoyé pour thread {threading.current_thread().name}"
)
except:
pass
# =========================================================================
# CONNEXION
# =========================================================================
def connecter(self):
"""Connexion initiale à Sage"""
try:
with self._com_context():
self.cial = win32com.client.gencache.EnsureDispatch("Objets100c.Cial.Stream")
self.cial.Name = self.chemin_base
self.cial.Loggable.UserName = self.utilisateur
self.cial.Loggable.UserPwd = self.mot_de_passe
self.cial.Open()
logger.info(f"✅ Connexion Sage réussie: {self.chemin_base}")
# Chargement initial du cache
logger.info("📦 Chargement initial du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
# Démarrage du thread d'actualisation
self._start_refresh_thread()
return True
except Exception as e:
logger.error(f"❌ Erreur connexion Sage: {e}", exc_info=True)
return False
def deconnecter(self):
"""Déconnexion propre"""
self._stop_refresh.set()
if self._refresh_thread:
self._refresh_thread.join(timeout=5)
if self.cial:
try:
with self._com_context():
self.cial.Close()
logger.info("Connexion Sage fermée")
except:
pass
# =========================================================================
# SYSTÈME DE CACHE
# =========================================================================
def _start_refresh_thread(self):
"""Démarre le thread d'actualisation automatique"""
def refresh_loop():
pythoncom.CoInitialize()
try:
while not self._stop_refresh.is_set():
time.sleep(60)
# Clients
if self._cache_clients_last_update:
age = datetime.now() - self._cache_clients_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
self._refresh_cache_clients()
# Articles
if self._cache_articles_last_update:
age = datetime.now() - self._cache_articles_last_update
if age.total_seconds() > self._cache_ttl_minutes * 60:
self._refresh_cache_articles()
finally:
pythoncom.CoUninitialize()
self._refresh_thread = threading.Thread(
target=refresh_loop, daemon=True, name="SageCacheRefresh"
)
self._refresh_thread.start()
def _refresh_cache_clients(self):
"""
Actualise le cache des clients
Charge TOUS les tiers (CT_Type=0 ET CT_Type=1)
"""
if not self.cial:
return
clients = []
clients_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
index = 1
erreurs_consecutives = 0
max_erreurs = 50
logger.info("🔄 Actualisation cache clients et prospects...")
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_client(persist)
if obj:
data = self._extraire_client(obj)
# ✅ INCLURE TOUS LES TYPES (clients, prospects)
clients.append(data)
clients_dict[data["numero"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh clients après {max_erreurs} erreurs"
)
break
with self._lock_clients:
self._cache_clients = clients
self._cache_clients_dict = clients_dict
self._cache_clients_last_update = datetime.now()
# 📊 Statistiques détaillées
nb_clients = sum(1 for c in clients if c.get("type") == 0 and not c.get("est_prospect"))
nb_prospects = sum(1 for c in clients if c.get("type") == 0 and c.get("est_prospect"))
logger.info(
f"✅ Cache actualisé: {len(clients)} tiers "
f"({nb_clients} clients, {nb_prospects} prospects)"
)
except Exception as e:
logger.error(f"❌ Erreur refresh clients: {e}", exc_info=True)
def _refresh_cache_articles(self):
"""Actualise le cache des articles"""
if not self.cial:
return
articles = []
articles_dict = {}
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryArticle
index = 1
erreurs_consecutives = 0
max_erreurs = 50
while index < 10000 and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
break
obj = self._cast_article(persist)
if obj:
data = self._extraire_article(obj)
articles.append(data)
articles_dict[data["reference"]] = data
erreurs_consecutives = 0
index += 1
except Exception as e:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(
f"Arrêt refresh articles après {max_erreurs} erreurs"
)
break
with self._lock_articles:
self._cache_articles = articles
self._cache_articles_dict = articles_dict
self._cache_articles_last_update = datetime.now()
logger.info(f" Cache articles actualisé: {len(articles)} articles")
except Exception as e:
logger.error(f" Erreur refresh articles: {e}", exc_info=True)
def lister_tous_fournisseurs(self, filtre=""):
"""
✅ CORRECTION FINALE : Liste fournisseurs SANS passer par _extraire_client()
BYPASS TOTAL de _extraire_client() car :
- Les objets fournisseurs n'ont pas les mêmes champs que les clients
- _extraire_client() plante sur "CT_Qualite" (n'existe pas sur fournisseurs)
- Le diagnostic fournisseurs-analyse-complete fonctionne SANS _extraire_client()
→ On fait EXACTEMENT comme le diagnostic qui marche
"""
if not self.cial:
logger.error("❌ self.cial est None")
return []
fournisseurs = []
try:
with self._com_context(), self._lock_com:
logger.info(f"🔍 Lecture fournisseurs via FactoryFournisseur (filtre='{filtre}')")
factory = self.cial.CptaApplication.FactoryFournisseur
index = 1
max_iterations = 10000
erreurs_consecutives = 0
max_erreurs = 50
filtre_lower = filtre.lower() if filtre else ""
while index < max_iterations and erreurs_consecutives < max_erreurs:
try:
persist = factory.List(index)
if persist is None:
logger.debug(f"Fin de liste à l'index {index}")
break
# Cast
fourn = self._cast_client(persist)
if fourn:
# ✅✅✅ EXTRACTION DIRECTE (pas de _extraire_client) ✅✅✅
try:
numero = getattr(fourn, "CT_Num", "").strip()
intitule = getattr(fourn, "CT_Intitule", "").strip()
if not numero:
logger.debug(f"Index {index}: CT_Num vide, skip")
erreurs_consecutives += 1
index += 1
continue
# Construction objet minimal
data = {
"numero": numero,
"intitule": intitule,
"type": 1, # Fournisseur
"est_fournisseur": True
}
# Champs optionnels (avec gestion d'erreur)
try:
adresse_obj = getattr(fourn, "Adresse", None)
if adresse_obj:
data["adresse"] = getattr(adresse_obj, "Adresse", "").strip()
data["code_postal"] = getattr(adresse_obj, "CodePostal", "").strip()
data["ville"] = getattr(adresse_obj, "Ville", "").strip()
except:
data["adresse"] = ""
data["code_postal"] = ""
data["ville"] = ""
try:
telecom_obj = getattr(fourn, "Telecom", None)
if telecom_obj:
data["telephone"] = getattr(telecom_obj, "Telephone", "").strip()
data["email"] = getattr(telecom_obj, "EMail", "").strip()
except:
data["telephone"] = ""
data["email"] = ""
# Filtrer si nécessaire
if not filtre_lower or \
filtre_lower in numero.lower() or \
filtre_lower in intitule.lower():
fournisseurs.append(data)
logger.debug(f"✅ Fournisseur ajouté: {numero} - {intitule}")
erreurs_consecutives = 0
except Exception as e:
logger.debug(f"⚠️ Erreur extraction index {index}: {e}")
erreurs_consecutives += 1
else:
erreurs_consecutives += 1
index += 1
except Exception as e:
logger.debug(f"⚠️ Erreur index {index}: {e}")
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
logger.warning(f"⚠️ Arrêt après {max_erreurs} erreurs consécutives")
break
logger.info(f"{len(fournisseurs)} fournisseurs retournés")
return fournisseurs
except Exception as e:
logger.error(f"❌ Erreur liste fournisseurs: {e}", exc_info=True)
return []
def creer_fournisseur(self, fournisseur_data: Dict) -> Dict:
"""
✅ Crée un nouveau fournisseur dans Sage 100c via FactoryFournisseur
IMPORTANT: Utilise FactoryFournisseur.Create() et NON FactoryClient.Create()
car les fournisseurs sont gérés séparément dans Sage.
Args:
fournisseur_data: Dictionnaire contenant:
- intitule (obligatoire): Raison sociale
- compte_collectif (défaut: "401000"): Compte général
- num (optionnel): Code fournisseur personnalisé
- adresse, code_postal, ville, pays
- email, telephone
- siret, tva_intra
Returns:
Dict contenant le fournisseur créé avec son numéro définitif
Raises:
ValueError: Si le fournisseur existe déjà ou données invalides
RuntimeError: Si erreur technique Sage
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 0 : VALIDATION & NETTOYAGE
# ========================================
logger.info("🔍 === VALIDATION DONNÉES FOURNISSEUR ===")
if not fournisseur_data.get("intitule"):
raise ValueError("Le champ 'intitule' est obligatoire")
# Nettoyage et troncature (longueurs max Sage)
intitule = str(fournisseur_data["intitule"])[:69].strip()
num_prop = str(fournisseur_data.get("num", "")).upper()[:17].strip() if fournisseur_data.get("num") else ""
compte = str(fournisseur_data.get("compte_collectif", "401000"))[:13].strip()
adresse = str(fournisseur_data.get("adresse", ""))[:35].strip()
code_postal = str(fournisseur_data.get("code_postal", ""))[:9].strip()
ville = str(fournisseur_data.get("ville", ""))[:35].strip()
pays = str(fournisseur_data.get("pays", ""))[:35].strip()
telephone = str(fournisseur_data.get("telephone", ""))[:21].strip()
email = str(fournisseur_data.get("email", ""))[:69].strip()
siret = str(fournisseur_data.get("siret", ""))[:14].strip()
tva_intra = str(fournisseur_data.get("tva_intra", ""))[:25].strip()
logger.info(f" intitule: '{intitule}' (len={len(intitule)})")
logger.info(f" num: '{num_prop or 'AUTO'}' (len={len(num_prop)})")
logger.info(f" compte: '{compte}' (len={len(compte)})")
# ========================================
# ÉTAPE 1 : CRÉATION OBJET FOURNISSEUR
# ========================================
# 🔑 CRITIQUE: Utiliser FactoryFournisseur, PAS FactoryClient !
factory_fournisseur = self.cial.CptaApplication.FactoryFournisseur
persist = factory_fournisseur.Create()
fournisseur = win32com.client.CastTo(persist, "IBOFournisseur3")
# 🔑 CRITIQUE : Initialiser l'objet
fournisseur.SetDefault()
logger.info("✅ Objet fournisseur créé et initialisé")
# ========================================
# ÉTAPE 2 : CHAMPS OBLIGATOIRES
# ========================================
logger.info("📝 Définition des champs obligatoires...")
# 1. Intitulé (OBLIGATOIRE)
fournisseur.CT_Intitule = intitule
logger.debug(f" ✅ CT_Intitule: '{intitule}'")
# 2. Type = Fournisseur (1)
# ⚠️ NOTE: Sur certaines versions Sage, CT_Type n'existe pas
# et le type est automatiquement défini par la factory utilisée
try:
fournisseur.CT_Type = 1 # 1 = Fournisseur
logger.debug(" ✅ CT_Type: 1 (Fournisseur)")
except:
logger.debug(" ⚠️ CT_Type non défini (géré par FactoryFournisseur)")
# 3. Qualité (pour versions récentes Sage)
try:
fournisseur.CT_Qualite = "FOU"
logger.debug(" ✅ CT_Qualite: 'FOU'")
except:
logger.debug(" ⚠️ CT_Qualite non défini (pas critique)")
# 4. Compte général principal (OBLIGATOIRE)
try:
factory_compte = self.cial.CptaApplication.FactoryCompteG
persist_compte = factory_compte.ReadNumero(compte)
if persist_compte:
compte_obj = win32com.client.CastTo(persist_compte, "IBOCompteG3")
compte_obj.Read()
# Assigner l'objet CompteG
fournisseur.CompteGPrinc = compte_obj
logger.debug(f" ✅ CompteGPrinc: objet '{compte}' assigné")
else:
logger.warning(f" ⚠️ Compte {compte} introuvable - utilisation défaut")
except Exception as e:
logger.warning(f" ⚠️ Erreur CompteGPrinc: {e}")
# 5. Numéro fournisseur (OBLIGATOIRE - générer si vide)
if num_prop:
fournisseur.CT_Num = num_prop
logger.debug(f" ✅ CT_Num fourni: '{num_prop}'")
else:
# 🔑 CRITIQUE : Générer le numéro automatiquement
try:
# Méthode 1 : SetDefaultNumPiece (si disponible)
if hasattr(fournisseur, 'SetDefaultNumPiece'):
fournisseur.SetDefaultNumPiece()
num_genere = getattr(fournisseur, "CT_Num", "")
logger.debug(f" ✅ CT_Num auto-généré: '{num_genere}'")
else:
# Méthode 2 : GetNextNumero depuis la factory
num_genere = factory_fournisseur.GetNextNumero()
if num_genere:
fournisseur.CT_Num = num_genere
logger.debug(f" ✅ CT_Num auto (GetNextNumero): '{num_genere}'")
else:
# Méthode 3 : Fallback - timestamp
import time
num_genere = f"FOUR{int(time.time()) % 1000000}"
fournisseur.CT_Num = num_genere
logger.warning(f" ⚠️ CT_Num fallback: '{num_genere}'")
except Exception as e:
logger.error(f" ❌ Impossible de générer CT_Num: {e}")
raise ValueError("Impossible de générer le numéro fournisseur automatiquement")
# 6. Catégories (valeurs par défaut)
try:
if hasattr(fournisseur, 'N_CatTarif'):
fournisseur.N_CatTarif = 1
if hasattr(fournisseur, 'N_CatCompta'):
fournisseur.N_CatCompta = 1
if hasattr(fournisseur, 'N_Period'):
fournisseur.N_Period = 1
logger.debug(" ✅ Catégories (N_*) initialisées")
except Exception as e:
logger.warning(f" ⚠️ Catégories: {e}")
# ========================================
# ÉTAPE 3 : CHAMPS OPTIONNELS
# ========================================
logger.info("📝 Définition champs optionnels...")
# Adresse (objet IAdresse)
if any([adresse, code_postal, ville, pays]):
try:
adresse_obj = fournisseur.Adresse
if adresse:
adresse_obj.Adresse = adresse
if code_postal:
adresse_obj.CodePostal = code_postal
if ville:
adresse_obj.Ville = ville
if pays:
adresse_obj.Pays = pays
logger.debug(" ✅ Adresse définie")
except Exception as e:
logger.warning(f" ⚠️ Adresse: {e}")
# Télécom (objet ITelecom)
if telephone or email:
try:
telecom_obj = fournisseur.Telecom
if telephone:
telecom_obj.Telephone = telephone
if email:
telecom_obj.EMail = email
logger.debug(" ✅ Télécom défini")
except Exception as e:
logger.warning(f" ⚠️ Télécom: {e}")
# Identifiants fiscaux
if siret:
try:
fournisseur.CT_Siret = siret
logger.debug(f" ✅ SIRET: '{siret}'")
except Exception as e:
logger.warning(f" ⚠️ SIRET: {e}")
if tva_intra:
try:
fournisseur.CT_Identifiant = tva_intra
logger.debug(f" ✅ TVA intra: '{tva_intra}'")
except Exception as e:
logger.warning(f" ⚠️ TVA: {e}")
# Options par défaut
try:
if hasattr(fournisseur, 'CT_Lettrage'):
fournisseur.CT_Lettrage = True
if hasattr(fournisseur, 'CT_Sommeil'):
fournisseur.CT_Sommeil = False
logger.debug(" ✅ Options par défaut définies")
except Exception as e:
logger.debug(f" ⚠️ Options: {e}")
# ========================================
# ÉTAPE 4 : VÉRIFICATION PRÉ-WRITE
# ========================================
logger.info("🔍 === DIAGNOSTIC PRÉ-WRITE ===")
num_avant_write = getattr(fournisseur, "CT_Num", "")
if not num_avant_write:
logger.error("❌ CRITIQUE: CT_Num toujours vide !")
raise ValueError("Le numéro fournisseur (CT_Num) est obligatoire")
logger.info(f"✅ CT_Num confirmé: '{num_avant_write}'")
# ========================================
# ÉTAPE 5 : ÉCRITURE EN BASE
# ========================================
logger.info("💾 Écriture du fournisseur dans Sage...")
try:
fournisseur.Write()
logger.info("✅ Write() réussi !")
except Exception as e:
error_detail = str(e)
# Récupérer l'erreur Sage détaillée
try:
sage_error = self.cial.CptaApplication.LastError
if sage_error:
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
logger.error(f"❌ Erreur Sage: {error_detail}")
except:
pass
# Analyser l'erreur
if "doublon" in error_detail.lower() or "existe" in error_detail.lower():
raise ValueError(f"Ce fournisseur existe déjà : {error_detail}")
raise RuntimeError(f"Échec Write(): {error_detail}")
# ========================================
# ÉTAPE 6 : RELECTURE & FINALISATION
# ========================================
try:
fournisseur.Read()
except Exception as e:
logger.warning(f"⚠️ Impossible de relire: {e}")
num_final = getattr(fournisseur, "CT_Num", "")
if not num_final:
raise RuntimeError("CT_Num vide après Write()")
logger.info(f"✅✅✅ FOURNISSEUR CRÉÉ: {num_final} - {intitule} ✅✅✅")
# ========================================
# ÉTAPE 7 : CONSTRUCTION RÉPONSE
# ========================================
resultat = {
"numero": num_final,
"intitule": intitule,
"compte_collectif": compte,
"type": 1, # Fournisseur
"est_fournisseur": True,
"adresse": adresse or None,
"code_postal": code_postal or None,
"ville": ville or None,
"pays": pays or None,
"email": email or None,
"telephone": telephone or None,
"siret": siret or None,
"tva_intra": tva_intra or None
}
# ⚠️ PAS DE REFRESH CACHE ICI
# Car lister_tous_fournisseurs() utilise FactoryFournisseur.List()
# qui lit directement depuis Sage (pas de cache)
return resultat
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur création fournisseur: {e}", exc_info=True)
error_message = str(e)
if self.cial:
try:
err = self.cial.CptaApplication.LastError
if err:
error_message = f"Erreur Sage: {err.Description}"
except:
pass
raise RuntimeError(f"Erreur technique Sage: {error_message}")
def modifier_fournisseur(self, code: str, fournisseur_data: Dict) -> Dict:
"""
✏️ Modification d'un fournisseur existant dans Sage 100c
IMPORTANT: Utilise FactoryFournisseur.ReadNumero() pour charger le fournisseur
Args:
code: Code du fournisseur à modifier
fournisseur_data: Dictionnaire avec les champs à mettre à jour
Returns:
Fournisseur modifié
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : CHARGER LE FOURNISSEUR EXISTANT
# ========================================
logger.info(f"🔍 Recherche fournisseur {code}...")
factory_fournisseur = self.cial.CptaApplication.FactoryFournisseur
persist = factory_fournisseur.ReadNumero(code)
if not persist:
raise ValueError(f"Fournisseur {code} introuvable")
fournisseur = self._cast_client(persist) # ✅ Réutiliser _cast_client
if not fournisseur:
raise ValueError(f"Impossible de charger le fournisseur {code}")
logger.info(f"✅ Fournisseur {code} trouvé: {getattr(fournisseur, 'CT_Intitule', '')}")
# ========================================
# ÉTAPE 2 : METTRE À JOUR LES CHAMPS FOURNIS
# ========================================
logger.info("📝 Mise à jour des champs...")
champs_modifies = []
# Intitulé
if "intitule" in fournisseur_data:
intitule = str(fournisseur_data["intitule"])[:69].strip()
fournisseur.CT_Intitule = intitule
champs_modifies.append(f"intitule='{intitule}'")
# Adresse
if any(k in fournisseur_data for k in ["adresse", "code_postal", "ville", "pays"]):
try:
adresse_obj = fournisseur.Adresse
if "adresse" in fournisseur_data:
adresse = str(fournisseur_data["adresse"])[:35].strip()
adresse_obj.Adresse = adresse
champs_modifies.append("adresse")
if "code_postal" in fournisseur_data:
cp = str(fournisseur_data["code_postal"])[:9].strip()
adresse_obj.CodePostal = cp
champs_modifies.append("code_postal")
if "ville" in fournisseur_data:
ville = str(fournisseur_data["ville"])[:35].strip()
adresse_obj.Ville = ville
champs_modifies.append("ville")
if "pays" in fournisseur_data:
pays = str(fournisseur_data["pays"])[:35].strip()
adresse_obj.Pays = pays
champs_modifies.append("pays")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour adresse: {e}")
# Télécom
if "email" in fournisseur_data or "telephone" in fournisseur_data:
try:
telecom_obj = fournisseur.Telecom
if "email" in fournisseur_data:
email = str(fournisseur_data["email"])[:69].strip()
telecom_obj.EMail = email
champs_modifies.append("email")
if "telephone" in fournisseur_data:
tel = str(fournisseur_data["telephone"])[:21].strip()
telecom_obj.Telephone = tel
champs_modifies.append("telephone")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour télécom: {e}")
# SIRET
if "siret" in fournisseur_data:
try:
siret = str(fournisseur_data["siret"])[:14].strip()
fournisseur.CT_Siret = siret
champs_modifies.append("siret")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour SIRET: {e}")
# TVA Intracommunautaire
if "tva_intra" in fournisseur_data:
try:
tva = str(fournisseur_data["tva_intra"])[:25].strip()
fournisseur.CT_Identifiant = tva
champs_modifies.append("tva_intra")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour TVA: {e}")
if not champs_modifies:
logger.warning("⚠️ Aucun champ à modifier")
# Retourner les données actuelles via extraction directe
return {
"numero": getattr(fournisseur, "CT_Num", "").strip(),
"intitule": getattr(fournisseur, "CT_Intitule", "").strip(),
"type": 1,
"est_fournisseur": True
}
logger.info(f"📝 Champs à modifier: {', '.join(champs_modifies)}")
# ========================================
# ÉTAPE 3 : ÉCRIRE LES MODIFICATIONS
# ========================================
logger.info("💾 Écriture des modifications...")
try:
fournisseur.Write()
logger.info("✅ Write() réussi !")
except Exception as e:
error_detail = str(e)
try:
sage_error = self.cial.CptaApplication.LastError
if sage_error:
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
except:
pass
logger.error(f"❌ Erreur Write(): {error_detail}")
raise RuntimeError(f"Échec modification: {error_detail}")
# ========================================
# ÉTAPE 4 : RELIRE ET RETOURNER
# ========================================
fournisseur.Read()
logger.info(f"✅✅✅ FOURNISSEUR MODIFIÉ: {code} ({len(champs_modifies)} champs) ✅✅✅")
# Extraction directe (comme lire_fournisseur)
numero = getattr(fournisseur, "CT_Num", "").strip()
intitule = getattr(fournisseur, "CT_Intitule", "").strip()
data = {
"numero": numero,
"intitule": intitule,
"type": 1,
"est_fournisseur": True
}
# Adresse
try:
adresse_obj = getattr(fournisseur, "Adresse", None)
if adresse_obj:
data["adresse"] = getattr(adresse_obj, "Adresse", "").strip()
data["code_postal"] = getattr(adresse_obj, "CodePostal", "").strip()
data["ville"] = getattr(adresse_obj, "Ville", "").strip()
except:
data["adresse"] = ""
data["code_postal"] = ""
data["ville"] = ""
# Télécom
try:
telecom_obj = getattr(fournisseur, "Telecom", None)
if telecom_obj:
data["telephone"] = getattr(telecom_obj, "Telephone", "").strip()
data["email"] = getattr(telecom_obj, "EMail", "").strip()
except:
data["telephone"] = ""
data["email"] = ""
return data
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur modification fournisseur: {e}", exc_info=True)
error_message = str(e)
if self.cial:
try:
err = self.cial.CptaApplication.LastError
if err:
error_message = f"Erreur Sage: {err.Description}"
except:
pass
raise RuntimeError(f"Erreur technique Sage: {error_message}")
def lire_fournisseur(self, code):
"""
✅ NOUVEAU : Lecture d'un fournisseur par code
Utilise FactoryFournisseur.ReadNumero() directement
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryFournisseur
persist = factory.ReadNumero(code)
if not persist:
logger.warning(f"Fournisseur {code} introuvable")
return None
fourn = self._cast_client(persist)
if not fourn:
return None
# Extraction directe (même logique que lister_tous_fournisseurs)
numero = getattr(fourn, "CT_Num", "").strip()
intitule = getattr(fourn, "CT_Intitule", "").strip()
data = {
"numero": numero,
"intitule": intitule,
"type": 1,
"est_fournisseur": True
}
# Adresse
try:
adresse_obj = getattr(fourn, "Adresse", None)
if adresse_obj:
data["adresse"] = getattr(adresse_obj, "Adresse", "").strip()
data["code_postal"] = getattr(adresse_obj, "CodePostal", "").strip()
data["ville"] = getattr(adresse_obj, "Ville", "").strip()
except:
data["adresse"] = ""
data["code_postal"] = ""
data["ville"] = ""
# Télécom
try:
telecom_obj = getattr(fourn, "Telecom", None)
if telecom_obj:
data["telephone"] = getattr(telecom_obj, "Telephone", "").strip()
data["email"] = getattr(telecom_obj, "EMail", "").strip()
except:
data["telephone"] = ""
data["email"] = ""
logger.info(f"✅ Fournisseur {code} lu: {intitule}")
return data
except Exception as e:
logger.error(f"❌ Erreur lecture fournisseur {code}: {e}")
return None
# =========================================================================
# API PUBLIQUE (ultra-rapide grâce au cache)
# =========================================================================
def lister_tous_clients(self, filtre=""):
"""Retourne les clients depuis le cache (instantané)"""
with self._lock_clients:
if not filtre:
return self._cache_clients.copy()
filtre_lower = filtre.lower()
return [
c
for c in self._cache_clients
if filtre_lower in c["numero"].lower()
or filtre_lower in c["intitule"].lower()
]
def lire_client(self, code_client):
"""Retourne un client depuis le cache (instantané)"""
with self._lock_clients:
return self._cache_clients_dict.get(code_client)
def lister_tous_articles(self, filtre=""):
"""Retourne les articles depuis le cache (instantané)"""
with self._lock_articles:
if not filtre:
return self._cache_articles.copy()
filtre_lower = filtre.lower()
return [
a
for a in self._cache_articles
if filtre_lower in a["reference"].lower()
or filtre_lower in a["designation"].lower()
]
def lire_article(self, reference):
"""Retourne un article depuis le cache (instantané)"""
with self._lock_articles:
return self._cache_articles_dict.get(reference)
def forcer_actualisation_cache(self):
"""Force l'actualisation immédiate du cache (endpoint admin)"""
logger.info("🔄 Actualisation forcée du cache...")
self._refresh_cache_clients()
self._refresh_cache_articles()
logger.info("✅ Cache actualisé")
logger.info("Cache actualisé")
def get_cache_info(self):
"""Retourne les infos du cache (endpoint monitoring)"""
with self._lock_clients:
info = {
"clients": {
"count": len(self._cache_clients),
"last_update": (
self._cache_clients_last_update.isoformat()
if self._cache_clients_last_update
else None
),
"age_minutes": (
(datetime.now() - self._cache_clients_last_update).total_seconds() / 60
if self._cache_clients_last_update
else None
),
},
"articles": {
"count": len(self._cache_articles),
"last_update": (
self._cache_articles_last_update.isoformat()
if self._cache_articles_last_update
else None
),
"age_minutes": (
(datetime.now() - self._cache_articles_last_update).total_seconds() / 60
if self._cache_articles_last_update
else None
),
}
}
info["ttl_minutes"] = self._cache_ttl_minutes
return info
# =========================================================================
# CAST HELPERS
# =========================================================================
def _cast_client(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOClient3")
obj.Read()
return obj
except Exception as e:
logger.debug(f"❌ _cast_client échoue: {e}") # ✅ AJOUTER CE LOG
return None
def _cast_article(self, persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOArticle3")
obj.Read()
return obj
except:
return None
# =========================================================================
# EXTRACTION
# =========================================================================
def _extraire_client(self, client_obj):
"""
✅ CORRECTION : Extraction ULTRA-ROBUSTE pour clients ET fournisseurs
Gère tous les cas où des champs peuvent être manquants
"""
try:
# === 1. CHAMPS OBLIGATOIRES ===
try:
numero = getattr(client_obj, "CT_Num", "").strip()
if not numero:
logger.debug("⚠️ Objet sans CT_Num, skip")
return None
except Exception as e:
logger.debug(f"❌ Erreur lecture CT_Num: {e}")
return None
try:
intitule = getattr(client_obj, "CT_Intitule", "").strip()
if not intitule:
logger.debug(f"⚠️ {numero} sans CT_Intitule")
except Exception as e:
logger.debug(f"⚠️ Erreur CT_Intitule sur {numero}: {e}")
intitule = ""
# === 2. CONSTRUCTION OBJET MINIMAL ===
data = {
"numero": numero,
"intitule": intitule,
}
# === 3. CHAMPS OPTIONNELS (avec try-except individuels) ===
# Type
try:
data["type"] = getattr(client_obj, "CT_Type", 0)
except:
data["type"] = 0
# Qualité
try:
qualite = getattr(client_obj, "CT_Qualite", None)
data["qualite"] = qualite
data["est_fournisseur"] = qualite in [2, 3] if qualite is not None else False
except:
data["qualite"] = None
data["est_fournisseur"] = False
# Prospect
try:
data["est_prospect"] = getattr(client_obj, "CT_Prospect", 0) == 1
except:
data["est_prospect"] = False
# === 4. ADRESSE (non critique) ===
try:
adresse = getattr(client_obj, "Adresse", None)
if adresse:
try:
data["adresse"] = getattr(adresse, "Adresse", "").strip()
except:
data["adresse"] = ""
try:
data["code_postal"] = getattr(adresse, "CodePostal", "").strip()
except:
data["code_postal"] = ""
try:
data["ville"] = getattr(adresse, "Ville", "").strip()
except:
data["ville"] = ""
except Exception as e:
logger.debug(f"⚠️ Erreur adresse sur {numero}: {e}")
data["adresse"] = ""
data["code_postal"] = ""
data["ville"] = ""
# === 5. TELECOM (non critique) ===
try:
telecom = getattr(client_obj, "Telecom", None)
if telecom:
try:
data["telephone"] = getattr(telecom, "Telephone", "").strip()
except:
data["telephone"] = ""
try:
data["email"] = getattr(telecom, "EMail", "").strip()
except:
data["email"] = ""
except Exception as e:
logger.debug(f"⚠️ Erreur telecom sur {numero}: {e}")
data["telephone"] = ""
data["email"] = ""
return data
except Exception as e:
logger.error(f"❌ ERREUR GLOBALE _extraire_client: {e}", exc_info=True)
return None
def _extraire_article(self, article_obj):
return {
"reference": getattr(article_obj, "AR_Ref", ""),
"designation": getattr(article_obj, "AR_Design", ""),
"prix_vente": getattr(article_obj, "AR_PrixVen", 0.0),
"prix_achat": getattr(article_obj, "AR_PrixAch", 0.0),
"stock_reel": getattr(article_obj, "AR_Stock", 0.0),
"stock_mini": getattr(article_obj, "AR_StockMini", 0.0),
}
# =========================================================================
# CRÉATION DEVIS (US-A1) - VERSION TRANSACTIONNELLE
# =========================================================================
def creer_devis_enrichi(self, devis_data: dict, valider: bool = False):
"""
Création de devis avec option brouillon/validé
Args:
devis_data: Données du devis
valider: Si True, valide le devis (statut 2). Si False, reste en brouillon (statut 0)
✅ CORRECTION: Par défaut, crée un BROUILLON (statut 0)
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(
f"🚀 Début création devis pour client {devis_data['client']['code']} "
f"(valider={valider})"
)
try:
with self._com_context(), self._lock_com:
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("✅ Transaction Sage démarrée")
except Exception as e:
logger.warning(f"⚠️ BeginTrans échoué: {e}")
try:
# ===== CRÉATION DOCUMENT =====
process = self.cial.CreateProcess_Document(0) # Type 0 = Devis
doc = process.Document
try:
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
except:
pass
logger.info("📄 Document devis créé")
# ===== DATE =====
import pywintypes
if isinstance(devis_data["date_devis"], str):
try:
date_obj = datetime.fromisoformat(devis_data["date_devis"])
except:
date_obj = datetime.now()
elif isinstance(devis_data["date_devis"], date):
date_obj = datetime.combine(
devis_data["date_devis"], datetime.min.time()
)
else:
date_obj = datetime.now()
doc.DO_Date = pywintypes.Time(date_obj)
logger.info(f"📅 Date définie: {date_obj.date()}")
# ===== CLIENT =====
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(
devis_data["client"]["code"]
)
if not persist_client:
raise ValueError(
f"❌ Client {devis_data['client']['code']} introuvable"
)
client_obj = self._cast_client(persist_client)
if not client_obj:
raise ValueError(
f"❌ Impossible de charger le client {devis_data['client']['code']}"
)
doc.SetDefaultClient(client_obj)
doc.Write()
logger.info(
f"👤 Client {devis_data['client']['code']} associé et document écrit"
)
# ===== LIGNES =====
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
logger.info(f"📦 Ajout de {len(devis_data['lignes'])} lignes...")
for idx, ligne_data in enumerate(devis_data["lignes"], 1):
logger.info(
f"--- Ligne {idx}: {ligne_data['article_code']} ---"
)
# Charger l'article
persist_article = factory_article.ReadReference(
ligne_data["article_code"]
)
if not persist_article:
raise ValueError(
f"❌ Article {ligne_data['article_code']} introuvable dans Sage"
)
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
prix_sage = float(getattr(article_obj, "AR_PrixVen", 0.0))
designation_sage = getattr(article_obj, "AR_Design", "")
# Créer la ligne
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# Associer article
quantite = float(ligne_data["quantite"])
try:
ligne_obj.SetDefaultArticleReference(
ligne_data["article_code"], quantite
)
except:
try:
ligne_obj.SetDefaultArticle(article_obj, quantite)
except:
ligne_obj.DL_Design = (
designation_sage or ligne_data.get("designation", "")
)
ligne_obj.DL_Qte = quantite
# Prix
prix_a_utiliser = ligne_data.get("prix_unitaire_ht")
if prix_a_utiliser:
ligne_obj.DL_PrixUnitaire = float(prix_a_utiliser)
elif prix_sage > 0:
ligne_obj.DL_PrixUnitaire = float(prix_sage)
# Remise
remise = ligne_data.get("remise_pourcentage", 0)
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = 0
except:
pass
ligne_obj.Write()
logger.info(f"✅ Ligne {idx} écrite")
# ===== STATUT ET VALIDATION =====
doc.Write()
# 🔑 DIFFÉRENCE CRITIQUE ICI
if valider:
# Option 1: VALIDER le devis (statut 2)
logger.info("🔄 Validation du devis (Process)...")
process.Process()
# Le Process() met généralement le statut à 2
doc.Read()
statut_final = getattr(doc, "DO_Statut", 0)
logger.info(f"✅ Devis validé, statut: {statut_final}")
else:
# Option 2: BROUILLON (statut 0)
logger.info("📝 Devis créé en BROUILLON (pas de Process)...")
# Ne PAS appeler Process() pour garder en brouillon
# Forcer le statut à 0
doc.DO_Statut = 0
doc.Write()
statut_final = 0
logger.info("✅ Devis en brouillon (statut 0)")
# ===== COMMIT =====
if transaction_active:
self.cial.CptaApplication.CommitTrans()
logger.info("✅ Transaction committée")
# ===== RÉCUPÉRATION NUMÉRO =====
time.sleep(2)
numero_devis = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(
doc_result, "IBODocumentVente3"
)
doc_result.Read()
numero_devis = getattr(doc_result, "DO_Piece", "")
except:
pass
if not numero_devis:
doc.Read()
numero_devis = getattr(doc, "DO_Piece", "")
if not numero_devis:
raise RuntimeError("❌ Numéro devis vide après création")
# ===== RELECTURE COMPLÈTE =====
logger.info("🔍 Relecture complète du document...")
factory_doc = self.cial.FactoryDocumentVente
persist_reread = factory_doc.ReadPiece(0, numero_devis)
if persist_reread:
doc_final = win32com.client.CastTo(
persist_reread, "IBODocumentVente3"
)
doc_final.Read()
total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0))
statut_final = getattr(doc_final, "DO_Statut", 0)
else:
total_ht = 0.0
total_ttc = 0.0
logger.info(
f"✅✅✅ DEVIS CRÉÉ: {numero_devis} - {total_ttc}€ TTC "
f"(statut={statut_final}) ✅✅✅"
)
return {
"numero_devis": numero_devis,
"total_ht": total_ht,
"total_ttc": total_ttc,
"nb_lignes": len(devis_data["lignes"]),
"client_code": devis_data["client"]["code"],
"date_devis": str(date_obj.date()),
"statut": statut_final, # ✅ AJOUT
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("❌ Transaction annulée (rollback)")
except:
pass
raise
except Exception as e:
logger.error(f"❌❌❌ ERREUR CRÉATION DEVIS: {e}", exc_info=True)
raise RuntimeError(f"Échec création devis: {str(e)}")
# =========================================================================
# LECTURE DEVIS
# =========================================================================
def lire_devis(self, numero_devis):
"""
Lecture d'un devis (y compris brouillon)
✅ CORRIGÉ: Utilise .Client pour le client et .Article pour les lignes
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
# ✅ PRIORITÉ 1: Essayer ReadPiece (documents validés)
persist = factory.ReadPiece(0, numero_devis)
# ✅ PRIORITÉ 2: Rechercher dans la liste (brouillons)
if not persist:
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(
persist_test, "IBODocumentVente3"
)
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero_devis
):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
logger.warning(f"Devis {numero_devis} introuvable")
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ CHARGEMENT CLIENT VIA .Client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
logger.debug(
f"Client chargé via .Client: {client_code} - {client_intitule}"
)
except Exception as e:
logger.debug(f"Erreur chargement client: {e}")
# Fallback sur cache si disponible
if client_code:
client_obj_cache = self.lire_client(client_code)
if client_obj_cache:
client_intitule = client_obj_cache.get("intitule", "")
devis = {
"numero": getattr(doc, "DO_Piece", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": getattr(doc, "DO_Statut", 0),
"lignes": [],
}
# Lecture des lignes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_persist = factory_lignes.List(index)
if ligne_persist is None:
break
ligne = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
ligne.Read()
# ✅✅✅ CHARGEMENT ARTICLE VIA .Article ✅✅✅
article_ref = ""
try:
# Méthode 1: Essayer AR_Ref direct (parfois disponible)
article_ref = getattr(ligne, "AR_Ref", "").strip()
# Méthode 2: Si vide, utiliser la propriété .Article
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
logger.debug(
f"Article chargé via .Article: {article_ref}"
)
except Exception as e:
logger.debug(
f"Erreur chargement article ligne {index}: {e}"
)
devis["lignes"].append(
{
"article": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"montant_ht": float(
getattr(ligne, "DL_MontantHT", 0.0)
),
}
)
index += 1
except Exception as e:
logger.debug(f"Erreur lecture ligne {index}: {e}")
break
logger.info(
f"✅ Devis {numero_devis} lu: {len(devis['lignes'])} lignes, {devis['total_ttc']:.2f}€, client: {client_intitule}"
)
return devis
except Exception as e:
logger.error(f"❌ Erreur lecture devis {numero_devis}: {e}")
return None
def lire_document(self, numero, type_doc):
"""Lecture générique document (pour PDF et lecture commandes/factures)"""
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, numero)
if not persist:
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ Charger client via .Client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
except Exception as e:
logger.debug(f"Erreur chargement client: {e}")
# Lire lignes
lignes = []
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
index = 1
while True:
try:
ligne_p = factory_lignes.List(index)
if ligne_p is None:
break
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
ligne.Read()
# ✅ Charger article via .Article
article_ref = ""
try:
article_ref = getattr(ligne, "AR_Ref", "").strip()
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(article_obj, "AR_Ref", "").strip()
except:
pass
lignes.append(
{
"article": article_ref, # ✅ Ajout référence article
"designation": getattr(ligne, "DL_Design", ""),
"quantite": getattr(ligne, "DL_Qte", 0.0),
"prix_unitaire": getattr(ligne, "DL_PrixUnitaire", 0.0),
"montant_ht": getattr(ligne, "DL_MontantHT", 0.0),
}
)
index += 1
except:
break
return {
"numero": getattr(doc, "DO_Piece", ""),
"reference": getattr(doc, "DO_Ref", ""), # ✅ Ajout référence
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": getattr(doc, "DO_TotalHT", 0.0),
"total_ttc": getattr(doc, "DO_TotalTTC", 0.0),
"statut": getattr(doc, "DO_Statut", 0), # ✅ Ajout statut
"lignes": lignes,
}
except Exception as e:
logger.error(f"❌ Erreur lecture document: {e}")
return None
# =========================================================================
# TRANSFORMATION (US-A2)
# =========================================================================
def transformer_document(self, numero_source, type_source, type_cible):
"""
🔧 Transformation de document - MÉTHODE MANUELLE
⚠️ TransformInto() n'est pas disponible sur cette installation Sage
→ On crée manuellement le document cible en copiant les données
"""
if not self.cial:
raise RuntimeError("Connexion Sage non etablie")
type_source = int(type_source)
type_cible = int(type_cible)
logger.info(
f"[TRANSFORM] Demande MANUELLE: {numero_source} "
f"(type {type_source}) -> type {type_cible}"
)
# ✅ Matrice de transformations
transformations_autorisees = {
(0, 10): "Devis -> Commande",
(10, 30): "Commande -> Bon de livraison",
(10, 60): "Commande -> Facture",
(30, 60): "Bon de livraison -> Facture",
(0, 60): "Devis -> Facture",
}
if (type_source, type_cible) not in transformations_autorisees:
raise ValueError(
f"Transformation non autorisee: {type_source} -> {type_cible}"
)
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : LIRE LE DOCUMENT SOURCE
# ========================================
factory = self.cial.FactoryDocumentVente
persist_source = factory.ReadPiece(type_source, numero_source)
if not persist_source:
persist_source = self._find_document_in_list(
numero_source, type_source
)
if not persist_source:
raise ValueError(
f"Document {numero_source} (type {type_source}) introuvable"
)
doc_source = win32com.client.CastTo(persist_source, "IBODocumentVente3")
doc_source.Read()
# Vérifications statut
statut_actuel = getattr(doc_source, "DO_Statut", 0)
type_reel = getattr(doc_source, "DO_Type", -1)
logger.info(
f"[TRANSFORM] Source: type={type_reel}, statut={statut_actuel}"
)
if type_reel != type_source:
raise ValueError(
f"Incoherence: document est de type {type_reel}, pas {type_source}"
)
if statut_actuel == 5:
raise ValueError("Document deja transforme (statut=5)")
if statut_actuel == 6:
raise ValueError("Document annule (statut=6)")
if statut_actuel in [3, 4]:
raise ValueError(f"Document deja realise (statut={statut_actuel})")
# Forcer statut "Accepté" si devis brouillon
if type_source == 0 and statut_actuel == 0:
logger.info("[TRANSFORM] Passage devis a statut Accepte (2)")
doc_source.DO_Statut = 2
doc_source.Write()
doc_source.Read()
# ========================================
# ÉTAPE 2 : EXTRAIRE LES DONNÉES SOURCE
# ========================================
logger.info("[TRANSFORM] Extraction donnees source...")
# Client
client_code = ""
client_obj = None
try:
client_obj = getattr(doc_source, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
except Exception as e:
logger.error(f"Erreur lecture client: {e}")
raise ValueError(f"Impossible de lire le client du document source")
if not client_code:
raise ValueError("Client introuvable dans document source")
logger.info(f"[TRANSFORM] Client: {client_code}")
# Date
date_source = getattr(doc_source, "DO_Date", None)
# Lignes
lignes_source = []
try:
factory_lignes_source = getattr(
doc_source, "FactoryDocumentLigne", None
)
if not factory_lignes_source:
factory_lignes_source = getattr(
doc_source, "FactoryDocumentVenteLigne", None
)
if factory_lignes_source:
index = 1
while index <= 1000:
try:
ligne_p = factory_lignes_source.List(index)
if ligne_p is None:
break
ligne = win32com.client.CastTo(
ligne_p, "IBODocumentLigne3"
)
ligne.Read()
# Récupérer référence article
article_ref = ""
try:
article_ref = getattr(ligne, "AR_Ref", "").strip()
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
except:
pass
lignes_source.append(
{
"article_ref": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(
getattr(ligne, "DL_Qte", 0.0)
),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"remise": float(
getattr(ligne, "DL_Remise01REM_Valeur", 0.0)
),
"type_remise": int(
getattr(ligne, "DL_Remise01REM_Type", 0)
),
}
)
index += 1
except Exception as e:
logger.debug(f"Erreur ligne {index}: {e}")
break
except Exception as e:
logger.error(f"Erreur extraction lignes: {e}")
raise ValueError(
"Impossible d'extraire les lignes du document source"
)
nb_lignes = len(lignes_source)
logger.info(f"[TRANSFORM] {nb_lignes} lignes extraites")
if nb_lignes == 0:
raise ValueError("Document source vide (aucune ligne)")
# ========================================
# ÉTAPE 3 : TRANSACTION
# ========================================
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("[TRANSFORM] Transaction demarree")
except:
logger.debug("[TRANSFORM] BeginTrans non disponible")
try:
# ========================================
# ÉTAPE 4 : CRÉER LE DOCUMENT CIBLE
# ========================================
logger.info(f"[TRANSFORM] Creation document type {type_cible}...")
process = self.cial.CreateProcess_Document(type_cible)
if not process:
raise RuntimeError(
f"CreateProcess_Document({type_cible}) a retourne None"
)
doc_cible = process.Document
try:
doc_cible = win32com.client.CastTo(
doc_cible, "IBODocumentVente3"
)
except:
pass
logger.info("[TRANSFORM] Document cible cree")
# ========================================
# ÉTAPE 5 : DÉFINIR LA DATE
# ========================================
import pywintypes
if date_source:
try:
doc_cible.DO_Date = date_source
logger.info(f"[TRANSFORM] Date copiee: {date_source}")
except Exception as e:
logger.warning(f"Impossible de copier date: {e}")
doc_cible.DO_Date = pywintypes.Time(datetime.now())
else:
doc_cible.DO_Date = pywintypes.Time(datetime.now())
# ========================================
# ÉTAPE 6 : ASSOCIER LE CLIENT
# ========================================
logger.info(f"[TRANSFORM] Association client {client_code}...")
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(client_code)
if not persist_client:
raise ValueError(f"Client {client_code} introuvable")
client_obj_cible = self._cast_client(persist_client)
if not client_obj_cible:
raise ValueError(f"Impossible de charger client {client_code}")
# ✅ Associer le client
try:
doc_cible.SetClient(client_obj_cible)
logger.info(
f"[TRANSFORM] SetClient() appele pour {client_code}"
)
except Exception as e:
logger.warning(
f"[TRANSFORM] SetClient() echoue: {e}, tentative SetDefaultClient()"
)
doc_cible.SetDefaultClient(client_obj_cible)
doc_cible.Write()
# Vérifier que le client est bien attaché
doc_cible.Read()
client_verifie = getattr(doc_cible, "CT_Num", None)
if not client_verifie:
# Dernière tentative : récupérer via la propriété Client
try:
client_test = getattr(doc_cible, "Client", None)
if client_test:
client_test.Read()
client_verifie = getattr(client_test, "CT_Num", None)
except:
pass
if not client_verifie:
raise ValueError(
f"Echec association client {client_code} - CT_Num reste vide apres Write()"
)
logger.info(
f"[TRANSFORM] Client {client_code} associe et verifie (CT_Num={client_verifie})"
)
# 🔒 GARDER UNE RÉFÉRENCE À L'OBJET CLIENT POUR RÉASSOCIATION
client_obj_sauvegarde = client_obj_cible
# ========================================
# ÉTAPE 7 : COPIER LES LIGNES
# ========================================
logger.info(f"[TRANSFORM] Copie de {nb_lignes} lignes...")
try:
factory_lignes_cible = doc_cible.FactoryDocumentLigne
except:
factory_lignes_cible = doc_cible.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
for idx, ligne_data in enumerate(lignes_source, 1):
logger.debug(f"[TRANSFORM] Ligne {idx}/{nb_lignes}")
# Charger article
article_ref = ligne_data["article_ref"]
if not article_ref:
logger.warning(
f"Ligne {idx}: pas de reference article, skip"
)
continue
persist_article = factory_article.ReadReference(article_ref)
if not persist_article:
logger.warning(
f"Ligne {idx}: article {article_ref} introuvable, skip"
)
continue
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
# Créer ligne
ligne_persist = factory_lignes_cible.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# Associer article avec quantité
quantite = ligne_data["quantite"]
try:
ligne_obj.SetDefaultArticleReference(article_ref, quantite)
except:
try:
ligne_obj.SetDefaultArticle(article_obj, quantite)
except:
# Fallback manuel
ligne_obj.DL_Design = ligne_data["designation"]
ligne_obj.DL_Qte = quantite
# Définir prix
prix = ligne_data["prix_unitaire"]
if prix > 0:
ligne_obj.DL_PrixUnitaire = float(prix)
# Copier remise
remise = ligne_data["remise"]
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = ligne_data[
"type_remise"
]
except:
pass
# Écrire ligne
ligne_obj.Write()
logger.info(f"[TRANSFORM] {nb_lignes} lignes copiees")
# ========================================
# ÉTAPE 8 : COMPLÉTER LES CHAMPS OBLIGATOIRES POUR FACTURE
# ========================================
if type_cible == 60: # Facture
logger.info(
"[TRANSFORM] Completion champs obligatoires facture..."
)
# 1. Code journal
try:
journal = None
try:
journal = getattr(doc_source, "DO_CodeJournal", None)
if journal:
logger.info(
f"[TRANSFORM] Journal source: {journal}"
)
except:
pass
if not journal:
journal = "VTE"
logger.info("[TRANSFORM] Journal par defaut: VTE")
if hasattr(doc_cible, "DO_CodeJournal"):
doc_cible.DO_CodeJournal = journal
logger.info(
f"[TRANSFORM] Code journal defini: {journal}"
)
else:
logger.warning(
"[TRANSFORM] DO_CodeJournal inexistant sur ce document"
)
except Exception as e:
logger.warning(
f"[TRANSFORM] Impossible de definir code journal: {e}"
)
# 2. Souche
try:
souche = getattr(doc_source, "DO_Souche", 0)
if hasattr(doc_cible, "DO_Souche"):
doc_cible.DO_Souche = souche
logger.info(f"[TRANSFORM] Souche: {souche}")
except Exception as e:
logger.debug(f"[TRANSFORM] Souche non definie: {e}")
# 3. Régime de TVA
try:
regime = getattr(doc_source, "DO_Regime", None)
if regime is not None and hasattr(doc_cible, "DO_Regime"):
doc_cible.DO_Regime = regime
logger.info(f"[TRANSFORM] Regime TVA: {regime}")
except Exception as e:
logger.debug(f"[TRANSFORM] Regime TVA non defini: {e}")
# 4. Type de transaction
try:
transaction = getattr(doc_source, "DO_Transaction", None)
if transaction is not None and hasattr(
doc_cible, "DO_Transaction"
):
doc_cible.DO_Transaction = transaction
logger.info(f"[TRANSFORM] Transaction: {transaction}")
except Exception as e:
logger.debug(f"[TRANSFORM] Transaction non definie: {e}")
# 5. Domaine (Vente = 0)
try:
if hasattr(doc_cible, "DO_Domaine"):
doc_cible.DO_Domaine = 0
logger.info("[TRANSFORM] Domaine: 0 (Vente)")
except Exception as e:
logger.debug(f"[TRANSFORM] Domaine non defini: {e}")
# ========================================
# 🔒 RÉASSOCIER LE CLIENT AVANT VALIDATION
# ========================================
logger.info("[TRANSFORM] Reassociation client avant validation...")
try:
doc_cible.SetClient(client_obj_sauvegarde)
except:
doc_cible.SetDefaultClient(client_obj_sauvegarde)
# Écriture finale avec tous les champs complétés
logger.info("[TRANSFORM] Ecriture document finale...")
doc_cible.Write()
# ========================================
# ÉTAPE 9 : VALIDER LE DOCUMENT
# ========================================
logger.info("[TRANSFORM] Validation document cible...")
# Relire pour vérifier
doc_cible.Read()
# Diagnostic pré-validation
logger.info("[TRANSFORM] === PRE-VALIDATION CHECK ===")
champs_a_verifier = [
"DO_Type",
"CT_Num",
"DO_Date",
"DO_Souche",
"DO_Statut",
"DO_Regime",
"DO_Transaction",
]
for champ in champs_a_verifier:
try:
if hasattr(doc_cible, champ):
valeur = getattr(doc_cible, champ, "?")
logger.info(f" {champ}: {valeur}")
except:
pass
# ✅ VÉRIFICATION CLIENT AMÉLIORÉE
client_final = getattr(doc_cible, "CT_Num", None)
if not client_final:
try:
client_obj_test = getattr(doc_cible, "Client", None)
if client_obj_test:
client_obj_test.Read()
client_final = getattr(client_obj_test, "CT_Num", None)
logger.info(
f"[TRANSFORM] Client recupere via .Client: {client_final}"
)
except:
pass
# Si toujours pas de client, dernière réassociation forcée
if not client_final:
logger.warning(
"[TRANSFORM] Client perdu ! Tentative reassociation d'urgence..."
)
try:
doc_cible.SetClient(client_obj_sauvegarde)
except:
doc_cible.SetDefaultClient(client_obj_sauvegarde)
doc_cible.Write()
doc_cible.Read()
client_final = getattr(doc_cible, "CT_Num", None)
if not client_final:
try:
client_obj_test = getattr(doc_cible, "Client", None)
if client_obj_test:
client_obj_test.Read()
client_final = getattr(
client_obj_test, "CT_Num", None
)
except:
pass
if not client_final:
logger.error(
"[TRANSFORM] IMPOSSIBLE d'associer le client malgre toutes les tentatives"
)
raise ValueError(
f"Client {client_code} impossible a associer au document"
)
logger.info(
f"[TRANSFORM] ✅ Client confirme avant validation: {client_final}"
)
# Lancer le processus
try:
logger.info("[TRANSFORM] Appel Process()...")
process.Process()
logger.info("[TRANSFORM] Document cible valide avec succes")
except Exception as e:
logger.error(f"[TRANSFORM] ERREUR Process(): {e}")
logger.error("[TRANSFORM] === DIAGNOSTIC COMPLET ===")
try:
attributs_doc = [
attr
for attr in dir(doc_cible)
if (attr.startswith("DO_") or attr.startswith("CT_"))
and not callable(getattr(doc_cible, attr, None))
]
for attr in sorted(attributs_doc):
try:
valeur = getattr(doc_cible, attr, "N/A")
logger.error(f" {attr}: {valeur}")
except:
pass
except:
pass
raise
# ========================================
# ÉTAPE 10 : RÉCUPÉRER LE NUMÉRO
# ========================================
numero_cible = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(
doc_result, "IBODocumentVente3"
)
doc_result.Read()
numero_cible = getattr(doc_result, "DO_Piece", "")
except:
pass
if not numero_cible:
numero_cible = getattr(doc_cible, "DO_Piece", "")
if not numero_cible:
raise RuntimeError("Numero document cible vide")
logger.info(f"[TRANSFORM] Document cible cree: {numero_cible}")
# ========================================
# ÉTAPE 11 : COMMIT & MAJ STATUT SOURCE
# ========================================
if transaction_active:
try:
self.cial.CptaApplication.CommitTrans()
logger.info("[TRANSFORM] Transaction committee")
except:
pass
# Attente indexation
time.sleep(1)
# Marquer source comme "Transformé"
try:
doc_source.Read()
doc_source.DO_Statut = 5
doc_source.Write()
logger.info("[TRANSFORM] Statut source -> 5 (TRANSFORME)")
except Exception as e:
logger.warning(f"Impossible MAJ statut source: {e}")
logger.info(
f"[TRANSFORM] ✅ SUCCES: {numero_source} ({type_source}) -> "
f"{numero_cible} ({type_cible}) - {nb_lignes} lignes"
)
return {
"success": True,
"document_source": numero_source,
"document_cible": numero_cible,
"nb_lignes": nb_lignes,
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
logger.error("[TRANSFORM] Transaction annulee (rollback)")
except:
pass
raise
except Exception as e:
logger.error(f"[TRANSFORM] ❌ ERREUR: {e}", exc_info=True)
raise RuntimeError(f"Echec transformation: {str(e)}")
def _find_document_in_list(self, numero, type_doc):
"""Cherche un document dans List() si ReadPiece échoue"""
try:
factory = self.cial.FactoryDocumentVente
index = 1
while index < 10000:
try:
persist = factory.List(index)
if persist is None:
break
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
if (
getattr(doc, "DO_Type", -1) == type_doc
and getattr(doc, "DO_Piece", "") == numero
):
logger.info(f"[TRANSFORM] Document trouve a l'index {index}")
return persist
index += 1
except:
index += 1
continue
return None
except Exception as e:
logger.error(f"[TRANSFORM] Erreur recherche document: {e}")
return None
# =========================================================================
# CHAMPS LIBRES (US-A3)
# =========================================================================
def mettre_a_jour_champ_libre(self, doc_id, type_doc, nom_champ, valeur):
"""Mise à jour champ libre pour Universign ID"""
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(type_doc, doc_id)
if persist:
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
try:
setattr(doc, f"DO_{nom_champ}", valeur)
doc.Write()
logger.debug(f"Champ libre {nom_champ} = {valeur} sur {doc_id}")
return True
except Exception as e:
logger.warning(f"Impossible de mettre à jour {nom_champ}: {e}")
except Exception as e:
logger.error(f"Erreur MAJ champ libre: {e}")
return False
def _lire_client_obj(self, code_client):
"""Retourne l'objet client Sage brut (pour remises)"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.CptaApplication.FactoryClient
persist = factory.ReadNumero(code_client)
if persist:
return self._cast_client(persist)
except:
pass
return None
# =========================================================================
# US-A6 - LECTURE CONTACTS
# =========================================================================
def lire_contact_principal_client(self, code_client):
"""
NOUVEAU: Lecture contact principal d'un client
Pour US-A6: relance devis via Universign
Récupère l'email du contact principal pour l'envoi
"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(code_client)
if not persist_client:
return None
client = self._cast_client(persist_client)
if not client:
return None
# Récupérer infos contact principal
contact_info = {
"client_code": code_client,
"client_intitule": getattr(client, "CT_Intitule", ""),
"email": None,
"nom": None,
"telephone": None,
}
# Email principal depuis Telecom
try:
telecom = getattr(client, "Telecom", None)
if telecom:
contact_info["email"] = getattr(telecom, "EMail", "")
contact_info["telephone"] = getattr(telecom, "Telephone", "")
except:
pass
# Nom du contact
try:
contact_info["nom"] = (
getattr(client, "CT_Contact", "")
or contact_info["client_intitule"]
)
except:
contact_info["nom"] = contact_info["client_intitule"]
return contact_info
except Exception as e:
logger.error(f"Erreur lecture contact client {code_client}: {e}")
return None
# =========================================================================
# US-A7 - MAJ CHAMP DERNIERE RELANCE
# =========================================================================
def mettre_a_jour_derniere_relance(self, doc_id, type_doc):
"""
NOUVEAU: Met à jour le champ libre "Dernière relance"
Pour US-A7: relance facture en un clic
"""
date_relance = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return self.mettre_a_jour_champ_libre(
doc_id, type_doc, "DerniereRelance", date_relance
)
# =========================================================================
# PROSPECTS (CT_Type = 0 AND CT_Prospect = 1)
# =========================================================================
def lister_tous_prospects(self, filtre=""):
"""Liste tous les prospects depuis le cache"""
with self._lock_clients:
if not filtre:
return [
c
for c in self._cache_clients
if c.get("type") == 0 and c.get("est_prospect")
]
filtre_lower = filtre.lower()
return [
c
for c in self._cache_clients
if c.get("type") == 0
and c.get("est_prospect")
and (
filtre_lower in c["numero"].lower()
or filtre_lower in c["intitule"].lower()
)
]
def lire_prospect(self, code_prospect):
"""Retourne un prospect depuis le cache"""
with self._lock_clients:
prospect = self._cache_clients_dict.get(code_prospect)
if prospect and prospect.get("type") == 0 and prospect.get("est_prospect"):
return prospect
return None
# =========================================================================
# EXTRACTION CLIENTS (Mise à jour pour inclure prospects)
# =========================================================================
def _extraire_client(self, client_obj):
"""MISE À JOUR : Extraction avec détection prospect"""
data = {
"numero": getattr(client_obj, "CT_Num", ""),
"intitule": getattr(client_obj, "CT_Intitule", ""),
"type": getattr(
client_obj, "CT_Type", 0
), # 0=Client/Prospect, 1=Fournisseur
"est_prospect": getattr(client_obj, "CT_Prospect", 0) == 1, # ✅ NOUVEAU
}
try:
adresse = getattr(client_obj, "Adresse", None)
if adresse:
data["adresse"] = getattr(adresse, "Adresse", "")
data["code_postal"] = getattr(adresse, "CodePostal", "")
data["ville"] = getattr(adresse, "Ville", "")
except:
pass
try:
telecom = getattr(client_obj, "Telecom", None)
if telecom:
data["telephone"] = getattr(telecom, "Telephone", "")
data["email"] = getattr(telecom, "EMail", "")
except:
pass
return data
# =========================================================================
# AVOIRS (DO_Domaine = 0 AND DO_Type = 5)
# =========================================================================
def lister_avoirs(self, limit=100, statut=None):
"""Liste tous les avoirs de vente"""
if not self.cial:
return []
avoirs = []
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
index = 1
max_iterations = limit * 3
erreurs_consecutives = 0
max_erreurs = 50
while (
len(avoirs) < limit
and index < max_iterations
and erreurs_consecutives < max_erreurs
):
try:
persist = factory.List(index)
if persist is None:
break
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ Filtrer : DO_Domaine = 0 (Vente) AND DO_Type = 5 (Avoir)
doc_type = getattr(doc, "DO_Type", -1)
doc_domaine = getattr(doc, "DO_Domaine", -1)
if doc_domaine != 0 or doc_type != settings.SAGE_TYPE_BON_AVOIR:
index += 1
continue
doc_statut = getattr(doc, "DO_Statut", 0)
# Filtre statut optionnel
if statut is not None and doc_statut != statut:
index += 1
continue
# Charger client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(
client_obj, "CT_Intitule", ""
).strip()
except:
pass
avoirs.append(
{
"numero": getattr(doc, "DO_Piece", ""),
"reference": getattr(doc, "DO_Ref", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": doc_statut,
}
)
erreurs_consecutives = 0
index += 1
except:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
break
logger.info(f"{len(avoirs)} avoirs retournés")
return avoirs
except Exception as e:
logger.error(f"❌ Erreur liste avoirs: {e}", exc_info=True)
return []
def lire_avoir(self, numero):
"""Lecture d'un avoir avec ses lignes"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
# Essayer ReadPiece
persist = factory.ReadPiece(settings.SAGE_TYPE_BON_AVOIR, numero)
if not persist:
# Chercher dans List()
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(
persist_test, "IBODocumentVente3"
)
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1)
== settings.SAGE_TYPE_BON_AVOIR
and getattr(doc_test, "DO_Piece", "") == numero
):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# Charger client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
except:
pass
avoir = {
"numero": getattr(doc, "DO_Piece", ""),
"reference": getattr(doc, "DO_Ref", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": getattr(doc, "DO_Statut", 0),
"lignes": [],
}
# Charger lignes
try:
factory_lignes = getattr(
doc, "FactoryDocumentLigne", None
) or getattr(doc, "FactoryDocumentVenteLigne", None)
if factory_lignes:
index = 1
while index <= 100:
try:
ligne_persist = factory_lignes.List(index)
if ligne_persist is None:
break
ligne = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
ligne.Read()
article_ref = ""
try:
article_ref = getattr(ligne, "AR_Ref", "").strip()
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
except:
pass
avoir["lignes"].append(
{
"article": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(
getattr(ligne, "DL_Qte", 0.0)
),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"montant_ht": float(
getattr(ligne, "DL_MontantHT", 0.0)
),
}
)
index += 1
except:
break
except:
pass
logger.info(f"✅ Avoir {numero} lu: {len(avoir['lignes'])} lignes")
return avoir
except Exception as e:
logger.error(f"❌ Erreur lecture avoir {numero}: {e}")
return None
# =========================================================================
# LIVRAISONS (DO_Domaine = 0 AND DO_Type = 3)
# =========================================================================
def lister_livraisons(self, limit=100, statut=None):
"""Liste tous les bons de livraison"""
if not self.cial:
return []
livraisons = []
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
index = 1
max_iterations = limit * 3
erreurs_consecutives = 0
max_erreurs = 50
while (
len(livraisons) < limit
and index < max_iterations
and erreurs_consecutives < max_erreurs
):
try:
persist = factory.List(index)
if persist is None:
break
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# ✅ Filtrer : DO_Domaine = 0 (Vente) AND DO_Type = 30 (Livraison)
doc_type = getattr(doc, "DO_Type", -1)
doc_domaine = getattr(doc, "DO_Domaine", -1)
if (
doc_domaine != 0
or doc_type != settings.SAGE_TYPE_BON_LIVRAISON
):
index += 1
continue
doc_statut = getattr(doc, "DO_Statut", 0)
if statut is not None and doc_statut != statut:
index += 1
continue
# Charger client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(
client_obj, "CT_Intitule", ""
).strip()
except:
pass
livraisons.append(
{
"numero": getattr(doc, "DO_Piece", ""),
"reference": getattr(doc, "DO_Ref", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": doc_statut,
}
)
erreurs_consecutives = 0
index += 1
except:
erreurs_consecutives += 1
index += 1
if erreurs_consecutives >= max_erreurs:
break
logger.info(f"{len(livraisons)} livraisons retournées")
return livraisons
except Exception as e:
logger.error(f"❌ Erreur liste livraisons: {e}", exc_info=True)
return []
def lire_livraison(self, numero):
"""Lecture d'une livraison avec ses lignes"""
if not self.cial:
return None
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
# Essayer ReadPiece
persist = factory.ReadPiece(settings.SAGE_TYPE_BON_LIVRAISON, numero)
if not persist:
# Chercher dans List()
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(
persist_test, "IBODocumentVente3"
)
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1)
== settings.SAGE_TYPE_BON_LIVRAISON
and getattr(doc_test, "DO_Piece", "") == numero
):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
return None
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
# Charger client
client_code = ""
client_intitule = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
except:
pass
livraison = {
"numero": getattr(doc, "DO_Piece", ""),
"reference": getattr(doc, "DO_Ref", ""),
"date": str(getattr(doc, "DO_Date", "")),
"client_code": client_code,
"client_intitule": client_intitule,
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
"statut": getattr(doc, "DO_Statut", 0),
"lignes": [],
}
# Charger lignes
try:
factory_lignes = getattr(
doc, "FactoryDocumentLigne", None
) or getattr(doc, "FactoryDocumentVenteLigne", None)
if factory_lignes:
index = 1
while index <= 100:
try:
ligne_persist = factory_lignes.List(index)
if ligne_persist is None:
break
ligne = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
ligne.Read()
article_ref = ""
try:
article_ref = getattr(ligne, "AR_Ref", "").strip()
if not article_ref:
article_obj = getattr(ligne, "Article", None)
if article_obj:
article_obj.Read()
article_ref = getattr(
article_obj, "AR_Ref", ""
).strip()
except:
pass
livraison["lignes"].append(
{
"article": article_ref,
"designation": getattr(ligne, "DL_Design", ""),
"quantite": float(
getattr(ligne, "DL_Qte", 0.0)
),
"prix_unitaire": float(
getattr(ligne, "DL_PrixUnitaire", 0.0)
),
"montant_ht": float(
getattr(ligne, "DL_MontantHT", 0.0)
),
}
)
index += 1
except:
break
except:
pass
logger.info(
f"✅ Livraison {numero} lue: {len(livraison['lignes'])} lignes"
)
return livraison
except Exception as e:
logger.error(f"❌ Erreur lecture livraison {numero}: {e}")
return None
# =========================================================================
# CREATION CLIENT (US-A8 ?)
# =========================================================================
def creer_client(self, client_data: Dict) -> Dict:
"""
Crée un nouveau client dans Sage 100c via l'API COM.
✅ VERSION CORRIGÉE : CT_Type supprimé (n'existe pas dans cette version)
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 0 : VALIDATION & NETTOYAGE
# ========================================
logger.info("🔍 === VALIDATION DES DONNÉES ===")
if not client_data.get("intitule"):
raise ValueError("Le champ 'intitule' est obligatoire")
# Nettoyage et troncature
intitule = str(client_data["intitule"])[:69].strip()
num_prop = str(client_data.get("num", "")).upper()[:17].strip() if client_data.get("num") else ""
compte = str(client_data.get("compte_collectif", "411000"))[:13].strip()
adresse = str(client_data.get("adresse", ""))[:35].strip()
code_postal = str(client_data.get("code_postal", ""))[:9].strip()
ville = str(client_data.get("ville", ""))[:35].strip()
pays = str(client_data.get("pays", ""))[:35].strip()
telephone = str(client_data.get("telephone", ""))[:21].strip()
email = str(client_data.get("email", ""))[:69].strip()
siret = str(client_data.get("siret", ""))[:14].strip()
tva_intra = str(client_data.get("tva_intra", ""))[:25].strip()
logger.info(f" intitule: '{intitule}' (len={len(intitule)})")
logger.info(f" num: '{num_prop or 'AUTO'}' (len={len(num_prop)})")
logger.info(f" compte: '{compte}' (len={len(compte)})")
# ========================================
# ÉTAPE 1 : CRÉATION OBJET CLIENT
# ========================================
factory_client = self.cial.CptaApplication.FactoryClient
persist = factory_client.Create()
client = win32com.client.CastTo(persist, "IBOClient3")
# 🔑 CRITIQUE : Initialiser l'objet
client.SetDefault()
logger.info("✅ Objet client créé et initialisé")
# ========================================
# ÉTAPE 2 : CHAMPS OBLIGATOIRES (dans l'ordre !)
# ========================================
logger.info("📝 Définition des champs obligatoires...")
# 1. Intitulé (OBLIGATOIRE)
client.CT_Intitule = intitule
logger.debug(f" ✅ CT_Intitule: '{intitule}'")
# ❌ CT_Type SUPPRIMÉ (n'existe pas dans cette version)
# client.CT_Type = 0 # <-- LIGNE SUPPRIMÉE
# 2. Qualité (important pour filtrage Client/Fournisseur)
try:
client.CT_Qualite = "CLI"
logger.debug(" ✅ CT_Qualite: 'CLI'")
except:
logger.debug(" ⚠️ CT_Qualite non défini (pas critique)")
# 3. Compte général principal (OBLIGATOIRE)
try:
factory_compte = self.cial.CptaApplication.FactoryCompteG
persist_compte = factory_compte.ReadNumero(compte)
if persist_compte:
compte_obj = win32com.client.CastTo(persist_compte, "IBOCompteG3")
compte_obj.Read()
# Assigner l'objet CompteG
client.CompteGPrinc = compte_obj
logger.debug(f" ✅ CompteGPrinc: objet '{compte}' assigné")
else:
logger.warning(f" ⚠️ Compte {compte} introuvable - utilisation du compte par défaut")
except Exception as e:
logger.warning(f" ⚠️ Erreur CompteGPrinc: {e}")
# 4. Numéro client (OBLIGATOIRE - générer si vide)
if num_prop:
client.CT_Num = num_prop
logger.debug(f" ✅ CT_Num fourni: '{num_prop}'")
else:
# 🔑 CRITIQUE : Générer le numéro automatiquement
try:
# Méthode 1 : Utiliser SetDefaultNumPiece (si disponible)
if hasattr(client, 'SetDefaultNumPiece'):
client.SetDefaultNumPiece()
num_genere = getattr(client, "CT_Num", "")
logger.debug(f" ✅ CT_Num auto-généré (SetDefaultNumPiece): '{num_genere}'")
else:
# Méthode 2 : Lire le prochain numéro depuis la souche
factory_client = self.cial.CptaApplication.FactoryClient
num_genere = factory_client.GetNextNumero()
if num_genere:
client.CT_Num = num_genere
logger.debug(f" ✅ CT_Num auto-généré (GetNextNumero): '{num_genere}'")
else:
# Méthode 3 : Fallback - utiliser un timestamp
import time
num_genere = f"CLI{int(time.time()) % 1000000}"
client.CT_Num = num_genere
logger.warning(f" ⚠️ CT_Num fallback temporaire: '{num_genere}'")
except Exception as e:
logger.error(f" ❌ Impossible de générer CT_Num: {e}")
raise ValueError("Impossible de générer le numéro client automatiquement. Veuillez fournir un numéro manuellement.")
# 5. Catégories tarifaires (valeurs par défaut)
try:
# Catégorie tarifaire (obligatoire)
if hasattr(client, 'N_CatTarif'):
client.N_CatTarif = 1
# Catégorie comptable (obligatoire)
if hasattr(client, 'N_CatCompta'):
client.N_CatCompta = 1
# Autres catégories
if hasattr(client, 'N_Period'):
client.N_Period = 1
if hasattr(client, 'N_Expedition'):
client.N_Expedition = 1
if hasattr(client, 'N_Condition'):
client.N_Condition = 1
if hasattr(client, 'N_Risque'):
client.N_Risque = 1
logger.debug(" ✅ Catégories (N_*) initialisées")
except Exception as e:
logger.warning(f" ⚠️ Catégories: {e}")
# ========================================
# ÉTAPE 3 : CHAMPS OPTIONNELS MAIS RECOMMANDÉS
# ========================================
logger.info("📝 Définition champs optionnels...")
# Adresse (objet IAdresse)
if any([adresse, code_postal, ville, pays]):
try:
adresse_obj = client.Adresse
if adresse:
adresse_obj.Adresse = adresse
if code_postal:
adresse_obj.CodePostal = code_postal
if ville:
adresse_obj.Ville = ville
if pays:
adresse_obj.Pays = pays
logger.debug(" ✅ Adresse définie")
except Exception as e:
logger.warning(f" ⚠️ Adresse: {e}")
# Télécom (objet ITelecom)
if telephone or email:
try:
telecom_obj = client.Telecom
if telephone:
telecom_obj.Telephone = telephone
if email:
telecom_obj.EMail = email
logger.debug(" ✅ Télécom défini")
except Exception as e:
logger.warning(f" ⚠️ Télécom: {e}")
# Identifiants fiscaux
if siret:
try:
client.CT_Siret = siret
logger.debug(f" ✅ SIRET: '{siret}'")
except Exception as e:
logger.warning(f" ⚠️ SIRET: {e}")
if tva_intra:
try:
client.CT_Identifiant = tva_intra
logger.debug(f" ✅ TVA intracommunautaire: '{tva_intra}'")
except Exception as e:
logger.warning(f" ⚠️ TVA: {e}")
# Autres champs utiles (valeurs par défaut intelligentes)
try:
# Type de facturation (1 = facture normale)
if hasattr(client, 'CT_Facture'):
client.CT_Facture = 1
# Lettrage automatique activé
if hasattr(client, 'CT_Lettrage'):
client.CT_Lettrage = True
# Pas de prospect
if hasattr(client, 'CT_Prospect'):
client.CT_Prospect = False
# Client actif (pas en sommeil)
if hasattr(client, 'CT_Sommeil'):
client.CT_Sommeil = False
logger.debug(" ✅ Options par défaut définies")
except Exception as e:
logger.debug(f" ⚠️ Options: {e}")
# ========================================
# ÉTAPE 4 : DIAGNOSTIC PRÉ-WRITE (pour debug)
# ========================================
logger.info("🔍 === DIAGNOSTIC PRÉ-WRITE ===")
champs_critiques = [
("CT_Intitule", "str"),
("CT_Num", "str"),
("CompteGPrinc", "object"),
("N_CatTarif", "int"),
("N_CatCompta", "int"),
]
for champ, type_attendu in champs_critiques:
try:
val = getattr(client, champ, None)
if type_attendu == "object":
status = "✅ Objet défini" if val else "❌ NULL"
else:
if type_attendu == "str":
status = f"'{val}' (len={len(val)})" if val else "❌ Vide"
else:
status = f"{val}"
logger.info(f" {champ}: {status}")
except Exception as e:
logger.error(f" {champ}: ❌ Erreur - {e}")
# ========================================
# ÉTAPE 5 : VÉRIFICATION FINALE CT_Num
# ========================================
num_avant_write = getattr(client, "CT_Num", "")
if not num_avant_write:
logger.error("❌ CRITIQUE: CT_Num toujours vide après toutes les tentatives !")
raise ValueError(
"Le numéro client (CT_Num) est obligatoire mais n'a pas pu être généré. "
"Veuillez fournir un numéro manuellement via le paramètre 'num'."
)
logger.info(f"✅ CT_Num confirmé avant Write(): '{num_avant_write}'")
# ========================================
# ÉTAPE 6 : ÉCRITURE EN BASE
# ========================================
logger.info("💾 Écriture du client dans Sage...")
try:
client.Write()
logger.info("✅ Write() réussi !")
except Exception as e:
error_detail = str(e)
# Récupérer l'erreur Sage détaillée
try:
sage_error = self.cial.CptaApplication.LastError
if sage_error:
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
logger.error(f"❌ Erreur Sage: {error_detail}")
except:
pass
# Analyser l'erreur spécifique
if "longueur invalide" in error_detail.lower():
logger.error("❌ ERREUR 'longueur invalide' - Dump des champs:")
for attr in dir(client):
if attr.startswith("CT_") or attr.startswith("N_"):
try:
val = getattr(client, attr, None)
if isinstance(val, str):
logger.error(f" {attr}: '{val}' (len={len(val)})")
elif val is not None and not callable(val):
logger.error(f" {attr}: {val} (type={type(val).__name__})")
except:
pass
if "doublon" in error_detail.lower() or "existe" in error_detail.lower():
raise ValueError(f"Ce client existe déjà : {error_detail}")
raise RuntimeError(f"Échec Write(): {error_detail}")
# ========================================
# ÉTAPE 7 : RELECTURE & FINALISATION
# ========================================
try:
client.Read()
except Exception as e:
logger.warning(f"⚠️ Impossible de relire: {e}")
num_final = getattr(client, "CT_Num", "")
if not num_final:
raise RuntimeError("CT_Num vide après Write()")
logger.info(f"✅✅✅ CLIENT CRÉÉ: {num_final} - {intitule} ✅✅✅")
# ========================================
# ÉTAPE 8 : REFRESH CACHE
# ========================================
self._refresh_cache_clients()
return {
"numero": num_final,
"intitule": intitule,
"compte_collectif": compte,
"type": 0, # Par défaut client
"adresse": adresse or None,
"code_postal": code_postal or None,
"ville": ville or None,
"pays": pays or None,
"email": email or None,
"telephone": telephone or None,
"siret": siret or None,
"tva_intra": tva_intra or None
}
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur création client: {e}", exc_info=True)
error_message = str(e)
if self.cial:
try:
err = self.cial.CptaApplication.LastError
if err:
error_message = f"Erreur Sage: {err.Description}"
except:
pass
raise RuntimeError(f"Erreur technique Sage: {error_message}")
def modifier_client(self, code: str, client_data: Dict) -> Dict:
"""
✏️ Modification d'un client existant dans Sage 100c
Args:
code: Code du client à modifier
client_data: Dictionnaire avec les champs à mettre à jour
Returns:
Client modifié
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : CHARGER LE CLIENT EXISTANT
# ========================================
logger.info(f"🔍 Recherche client {code}...")
factory_client = self.cial.CptaApplication.FactoryClient
persist = factory_client.ReadNumero(code)
if not persist:
raise ValueError(f"Client {code} introuvable")
client = win32com.client.CastTo(persist, "IBOClient3")
client.Read()
logger.info(f"✅ Client {code} trouvé: {getattr(client, 'CT_Intitule', '')}")
# ========================================
# ÉTAPE 2 : METTRE À JOUR LES CHAMPS FOURNIS
# ========================================
logger.info("📝 Mise à jour des champs...")
champs_modifies = []
# Intitulé
if "intitule" in client_data:
intitule = str(client_data["intitule"])[:69].strip()
client.CT_Intitule = intitule
champs_modifies.append(f"intitule='{intitule}'")
# Adresse
if any(k in client_data for k in ["adresse", "code_postal", "ville", "pays"]):
try:
adresse_obj = client.Adresse
if "adresse" in client_data:
adresse = str(client_data["adresse"])[:35].strip()
adresse_obj.Adresse = adresse
champs_modifies.append("adresse")
if "code_postal" in client_data:
cp = str(client_data["code_postal"])[:9].strip()
adresse_obj.CodePostal = cp
champs_modifies.append("code_postal")
if "ville" in client_data:
ville = str(client_data["ville"])[:35].strip()
adresse_obj.Ville = ville
champs_modifies.append("ville")
if "pays" in client_data:
pays = str(client_data["pays"])[:35].strip()
adresse_obj.Pays = pays
champs_modifies.append("pays")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour adresse: {e}")
# Télécom
if "email" in client_data or "telephone" in client_data:
try:
telecom_obj = client.Telecom
if "email" in client_data:
email = str(client_data["email"])[:69].strip()
telecom_obj.EMail = email
champs_modifies.append("email")
if "telephone" in client_data:
tel = str(client_data["telephone"])[:21].strip()
telecom_obj.Telephone = tel
champs_modifies.append("telephone")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour télécom: {e}")
# SIRET
if "siret" in client_data:
try:
siret = str(client_data["siret"])[:14].strip()
client.CT_Siret = siret
champs_modifies.append("siret")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour SIRET: {e}")
# TVA Intracommunautaire
if "tva_intra" in client_data:
try:
tva = str(client_data["tva_intra"])[:25].strip()
client.CT_Identifiant = tva
champs_modifies.append("tva_intra")
except Exception as e:
logger.warning(f"⚠️ Erreur mise à jour TVA: {e}")
if not champs_modifies:
logger.warning("⚠️ Aucun champ à modifier")
return self._extraire_client(client)
logger.info(f"📝 Champs à modifier: {', '.join(champs_modifies)}")
# ========================================
# ÉTAPE 3 : ÉCRIRE LES MODIFICATIONS
# ========================================
logger.info("💾 Écriture des modifications...")
try:
client.Write()
logger.info("✅ Write() réussi !")
except Exception as e:
error_detail = str(e)
try:
sage_error = self.cial.CptaApplication.LastError
if sage_error:
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
except:
pass
logger.error(f"❌ Erreur Write(): {error_detail}")
raise RuntimeError(f"Échec modification: {error_detail}")
# ========================================
# ÉTAPE 4 : RELIRE ET RETOURNER
# ========================================
client.Read()
logger.info(f"✅✅✅ CLIENT MODIFIÉ: {code} ({len(champs_modifies)} champs) ✅✅✅")
# Refresh cache
self._refresh_cache_clients()
return self._extraire_client(client)
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur modification client: {e}", exc_info=True)
error_message = str(e)
if self.cial:
try:
err = self.cial.CptaApplication.LastError
if err:
error_message = f"Erreur Sage: {err.Description}"
except:
pass
raise RuntimeError(f"Erreur technique Sage: {error_message}")
def modifier_devis(self, numero: str, devis_data: Dict) -> Dict:
"""
✏️ Modification d'un devis existant dans Sage
Permet de modifier la date, les lignes et le statut.
Si les lignes sont modifiées, elles remplacent TOUTES les lignes existantes.
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : CHARGER LE DEVIS EXISTANT
# ========================================
logger.info(f"🔍 Recherche devis {numero}...")
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(0, numero)
# Si ReadPiece échoue, chercher dans List()
if not persist:
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(persist_test, "IBODocumentVente3")
doc_test.Read()
if (getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
raise ValueError(f"Devis {numero} introuvable")
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
logger.info(f"✅ Devis {numero} trouvé")
# Vérifier le statut (ne pas modifier si déjà transformé)
statut_actuel = getattr(doc, "DO_Statut", 0)
if statut_actuel == 5:
raise ValueError(f"Le devis {numero} a déjà été transformé")
# ========================================
# ÉTAPE 2 : METTRE À JOUR LES CHAMPS
# ========================================
champs_modifies = []
# Mise à jour de la date
if "date_devis" in devis_data:
import pywintypes
date_str = devis_data["date_devis"]
if isinstance(date_str, str):
date_obj = datetime.fromisoformat(date_str)
else:
date_obj = date_str
doc.DO_Date = pywintypes.Time(date_obj)
champs_modifies.append("date")
logger.info(f"📅 Date modifiée: {date_obj.date()}")
# Mise à jour du statut
if "statut" in devis_data:
nouveau_statut = devis_data["statut"]
doc.DO_Statut = nouveau_statut
champs_modifies.append("statut")
logger.info(f"📊 Statut modifié: {statut_actuel}{nouveau_statut}")
# Écriture des modifications de base
if champs_modifies:
doc.Write()
# ========================================
# ÉTAPE 3 : REMPLACEMENT DES LIGNES (si demandé)
# ========================================
if "lignes" in devis_data and devis_data["lignes"] is not None:
logger.info(f"🔄 Remplacement des lignes...")
# Supprimer TOUTES les lignes existantes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
# Compter et supprimer les lignes existantes
index_ligne = 1
while index_ligne <= 100:
try:
ligne_p = factory_lignes.List(index_ligne)
if ligne_p is None:
break
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
ligne.Read()
ligne.Delete()
index_ligne += 1
except:
break
logger.info(f"🗑️ {index_ligne - 1} ligne(s) supprimée(s)")
# Ajouter les nouvelles lignes
factory_article = self.cial.FactoryArticle
for idx, ligne_data in enumerate(devis_data["lignes"], 1):
logger.info(f" Ajout ligne {idx}: {ligne_data['article_code']}")
# Charger l'article
persist_article = factory_article.ReadReference(ligne_data["article_code"])
if not persist_article:
raise ValueError(f"Article {ligne_data['article_code']} introuvable")
article_obj = win32com.client.CastTo(persist_article, "IBOArticle3")
article_obj.Read()
# Créer la nouvelle ligne
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentLigne3")
except:
ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentVenteLigne3")
# Associer article
quantite = float(ligne_data["quantite"])
try:
ligne_obj.SetDefaultArticleReference(ligne_data["article_code"], quantite)
except:
try:
ligne_obj.SetDefaultArticle(article_obj, quantite)
except:
ligne_obj.DL_Design = ligne_data.get("designation", "")
ligne_obj.DL_Qte = quantite
# Définir le prix (si fourni)
if ligne_data.get("prix_unitaire_ht"):
ligne_obj.DL_PrixUnitaire = float(ligne_data["prix_unitaire_ht"])
# Définir la remise (si fournie)
if ligne_data.get("remise_pourcentage", 0) > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(ligne_data["remise_pourcentage"])
ligne_obj.DL_Remise01REM_Type = 0
except:
pass
ligne_obj.Write()
logger.info(f"{len(devis_data['lignes'])} nouvelle(s) ligne(s) ajoutée(s)")
champs_modifies.append("lignes")
# ========================================
# ÉTAPE 4 : VALIDATION FINALE
# ========================================
doc.Write()
# Attente indexation
time.sleep(1)
# Relecture pour récupérer les totaux
doc.Read()
total_ht = float(getattr(doc, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
logger.info(f"✅✅✅ DEVIS MODIFIÉ: {numero} ({', '.join(champs_modifies)}) ✅✅✅")
logger.info(f"💰 Nouveaux totaux: {total_ht}€ HT / {total_ttc}€ TTC")
return {
"numero": numero,
"total_ht": total_ht,
"total_ttc": total_ttc,
"champs_modifies": champs_modifies,
"statut": getattr(doc, "DO_Statut", 0)
}
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur modification devis: {e}", exc_info=True)
raise RuntimeError(f"Erreur technique Sage: {str(e)}")
def creer_commande_enrichi(self, commande_data: dict) -> Dict:
"""
Création d'une commande (type 10 = Bon de commande)
Similaire à creer_devis_enrichi mais pour les commandes.
Utilise CreateProcess_Document(10) au lieu de (0).
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(f"🚀 Début création commande pour client {commande_data['client']['code']}")
try:
with self._com_context(), self._lock_com:
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
logger.debug("✅ Transaction Sage démarrée")
except:
pass
try:
# Création document COMMANDE (type 10)
process = self.cial.CreateProcess_Document(settings.SAGE_TYPE_BON_COMMANDE)
doc = process.Document
try:
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
except:
pass
logger.info("📄 Document commande créé")
# Date
import pywintypes
if isinstance(commande_data["date_commande"], str):
date_obj = datetime.fromisoformat(commande_data["date_commande"])
elif isinstance(commande_data["date_commande"], date):
date_obj = datetime.combine(commande_data["date_commande"], datetime.min.time())
else:
date_obj = datetime.now()
doc.DO_Date = pywintypes.Time(date_obj)
logger.info(f"📅 Date définie: {date_obj.date()}")
# Client (CRITIQUE)
factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(commande_data["client"]["code"])
if not persist_client:
raise ValueError(f"Client {commande_data['client']['code']} introuvable")
client_obj = self._cast_client(persist_client)
if not client_obj:
raise ValueError(f"Impossible de charger le client")
doc.SetDefaultClient(client_obj)
doc.Write()
logger.info(f"👤 Client {commande_data['client']['code']} associé")
# Référence externe (optionnelle)
if commande_data.get("reference"):
try:
doc.DO_Ref = commande_data["reference"]
logger.info(f"🔖 Référence: {commande_data['reference']}")
except:
pass
# Lignes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
factory_article = self.cial.FactoryArticle
logger.info(f"📦 Ajout de {len(commande_data['lignes'])} lignes...")
for idx, ligne_data in enumerate(commande_data["lignes"], 1):
logger.info(
f"--- Ligne {idx}: {ligne_data['article_code']} ---"
)
# 🔍 ÉTAPE 1: Charger l'article RÉEL depuis Sage pour le prix
persist_article = factory_article.ReadReference(
ligne_data["article_code"]
)
if not persist_article:
raise ValueError(
f"❌ Article {ligne_data['article_code']} introuvable dans Sage"
)
article_obj = win32com.client.CastTo(
persist_article, "IBOArticle3"
)
article_obj.Read()
# 💰 ÉTAPE 2: Récupérer le prix de vente RÉEL
prix_sage = float(getattr(article_obj, "AR_PrixVen", 0.0))
designation_sage = getattr(article_obj, "AR_Design", "")
logger.info(f"💰 Prix Sage: {prix_sage}")
if prix_sage == 0:
logger.warning(
f"⚠️ ATTENTION: Article {ligne_data['article_code']} a un prix de vente = 0€"
)
# 📝 ÉTAPE 3: Créer la ligne de devis
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentLigne3"
)
except:
ligne_obj = win32com.client.CastTo(
ligne_persist, "IBODocumentVenteLigne3"
)
# ✅✅✅ SOLUTION FINALE: SetDefaultArticleReference avec 2 paramètres ✅✅✅
quantite = float(ligne_data["quantite"])
try:
# Méthode 1: Via référence (plus simple et plus fiable)
ligne_obj.SetDefaultArticleReference(
ligne_data["article_code"], quantite
)
logger.info(
f"✅ Article associé via SetDefaultArticleReference('{ligne_data['article_code']}', {quantite})"
)
except Exception as e:
logger.warning(
f"⚠️ SetDefaultArticleReference échoué: {e}, tentative avec objet article"
)
try:
# Méthode 2: Via objet article
ligne_obj.SetDefaultArticle(article_obj, quantite)
logger.info(
f"✅ Article associé via SetDefaultArticle(obj, {quantite})"
)
except Exception as e2:
logger.error(
f"❌ Toutes les méthodes d'association ont échoué"
)
# Fallback: définir manuellement
ligne_obj.DL_Design = (
designation_sage or ligne_data["designation"]
)
ligne_obj.DL_Qte = quantite
logger.warning("⚠️ Configuration manuelle appliquée")
# ⚙️ ÉTAPE 4: Vérifier le prix automatique chargé
prix_auto = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
logger.info(f"💰 Prix auto chargé: {prix_auto}")
# 💵 ÉTAPE 5: Ajuster le prix si nécessaire
prix_a_utiliser = ligne_data.get("prix_unitaire_ht")
if prix_a_utiliser is not None and prix_a_utiliser > 0:
# Prix personnalisé fourni
ligne_obj.DL_PrixUnitaire = float(prix_a_utiliser)
logger.info(f"💰 Prix personnalisé: {prix_a_utiliser}")
elif prix_auto == 0:
# Pas de prix auto, forcer le prix Sage
if prix_sage == 0:
raise ValueError(
f"Prix nul pour article {ligne_data['article_code']}"
)
ligne_obj.DL_PrixUnitaire = float(prix_sage)
logger.info(f"💰 Prix Sage forcé: {prix_sage}")
else:
# Prix auto correct, on le garde
logger.info(f"💰 Prix auto conservé: {prix_auto}")
prix_final = float(getattr(ligne_obj, "DL_PrixUnitaire", 0.0))
montant_ligne = quantite * prix_final
logger.info(f"{quantite} x {prix_final}€ = {montant_ligne}")
# 🎁 Remise
remise = ligne_data.get("remise_pourcentage", 0)
if remise > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(remise)
ligne_obj.DL_Remise01REM_Type = 0
montant_apres_remise = montant_ligne * (
1 - remise / 100
)
logger.info(
f"🎁 Remise {remise}% → {montant_apres_remise}"
)
except Exception as e:
logger.warning(f"⚠️ Remise non appliquée: {e}")
# 💾 ÉTAPE 6: Écrire la ligne
ligne_obj.Write()
logger.info(f"✅ Ligne {idx} écrite")
# 🔍 VÉRIFICATION: Relire la ligne pour confirmer
try:
ligne_obj.Read()
prix_enregistre = float(
getattr(ligne_obj, "DL_PrixUnitaire", 0.0)
)
montant_enregistre = float(
getattr(ligne_obj, "DL_MontantHT", 0.0)
)
logger.info(
f"🔍 Vérif: Prix={prix_enregistre}€, Montant HT={montant_enregistre}"
)
if montant_enregistre == 0:
logger.error(
f"❌ PROBLÈME: Montant enregistré = 0 pour ligne {idx}"
)
else:
logger.info(f"✅ Ligne {idx} OK: {montant_enregistre}")
except Exception as e:
logger.warning(f"⚠️ Impossible de vérifier la ligne: {e}")
# Validation
doc.Write()
process.Process()
if transaction_active:
self.cial.CptaApplication.CommitTrans()
# Récupération numéro
time.sleep(2)
numero_commande = None
try:
doc_result = process.DocumentResult
if doc_result:
doc_result = win32com.client.CastTo(doc_result, "IBODocumentVente3")
doc_result.Read()
numero_commande = getattr(doc_result, "DO_Piece", "")
except:
pass
if not numero_commande:
numero_commande = getattr(doc, "DO_Piece", "")
# Relecture
factory_doc = self.cial.FactoryDocumentVente
persist_reread = factory_doc.ReadPiece(settings.SAGE_TYPE_BON_COMMANDE, numero_commande)
if persist_reread:
doc_final = win32com.client.CastTo(persist_reread, "IBODocumentVente3")
doc_final.Read()
total_ht = float(getattr(doc_final, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc_final, "DO_TotalTTC", 0.0))
else:
total_ht = 0.0
total_ttc = 0.0
logger.info(f"✅✅✅ COMMANDE CRÉÉE: {numero_commande} - {total_ttc}€ TTC ✅✅✅")
return {
"numero_commande": numero_commande,
"total_ht": total_ht,
"total_ttc": total_ttc,
"nb_lignes": len(commande_data["lignes"]),
"client_code": commande_data["client"]["code"],
"date_commande": str(date_obj.date()),
}
except Exception as e:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
except:
pass
raise
except Exception as e:
logger.error(f"❌ Erreur création commande: {e}", exc_info=True)
raise RuntimeError(f"Échec création commande: {str(e)}")
def modifier_commande(self, numero: str, commande_data: Dict) -> Dict:
"""
✏️ Modification d'une commande existante
Code similaire à modifier_devis mais pour type 10 (Bon de commande)
"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
try:
with self._com_context(), self._lock_com:
# ========================================
# ÉTAPE 1 : CHARGER LE DEVIS EXISTANT
# ========================================
logger.info(f"🔍 Recherche devis {numero}...")
factory = self.cial.FactoryDocumentVente
persist = factory.ReadPiece(10, numero)
# Si ReadPiece échoue, chercher dans List()
if not persist:
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(persist_test, "IBODocumentVente3")
doc_test.Read()
if (getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero):
persist = persist_test
break
index += 1
except:
index += 1
if not persist:
raise ValueError(f"Devis {numero} introuvable")
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
logger.info(f"✅ Devis {numero} trouvé")
# Vérifier le statut (ne pas modifier si déjà transformé)
statut_actuel = getattr(doc, "DO_Statut", 0)
if statut_actuel == 5:
raise ValueError(f"Le devis {numero} a déjà été transformé")
# ========================================
# ÉTAPE 2 : METTRE À JOUR LES CHAMPS
# ========================================
champs_modifies = []
# Mise à jour de la date
if "date_devis" in commande_data:
import pywintypes
date_str = commande_data["date_devis"]
if isinstance(date_str, str):
date_obj = datetime.fromisoformat(date_str)
else:
date_obj = date_str
doc.DO_Date = pywintypes.Time(date_obj)
champs_modifies.append("date")
logger.info(f"📅 Date modifiée: {date_obj.date()}")
# Mise à jour du statut
if "statut" in commande_data:
nouveau_statut = commande_data["statut"]
doc.DO_Statut = nouveau_statut
champs_modifies.append("statut")
logger.info(f"📊 Statut modifié: {statut_actuel}{nouveau_statut}")
# Écriture des modifications de base
if champs_modifies:
doc.Write()
# ========================================
# ÉTAPE 3 : REMPLACEMENT DES LIGNES (si demandé)
# ========================================
if "lignes" in commande_data and commande_data["lignes"] is not None:
logger.info(f"🔄 Remplacement des lignes...")
# Supprimer TOUTES les lignes existantes
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
# Compter et supprimer les lignes existantes
index_ligne = 1
while index_ligne <= 100:
try:
ligne_p = factory_lignes.List(index_ligne)
if ligne_p is None:
break
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
ligne.Read()
ligne.Delete()
index_ligne += 1
except:
break
logger.info(f"🗑️ {index_ligne - 1} ligne(s) supprimée(s)")
# Ajouter les nouvelles lignes
factory_article = self.cial.FactoryArticle
for idx, ligne_data in enumerate(commande_data["lignes"], 1):
logger.info(f" Ajout ligne {idx}: {ligne_data['article_code']}")
# Charger l'article
persist_article = factory_article.ReadReference(ligne_data["article_code"])
if not persist_article:
raise ValueError(f"Article {ligne_data['article_code']} introuvable")
article_obj = win32com.client.CastTo(persist_article, "IBOArticle3")
article_obj.Read()
# Créer la nouvelle ligne
ligne_persist = factory_lignes.Create()
try:
ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentLigne3")
except:
ligne_obj = win32com.client.CastTo(ligne_persist, "IBODocumentVenteLigne3")
# Associer article
quantite = float(ligne_data["quantite"])
try:
ligne_obj.SetDefaultArticleReference(ligne_data["article_code"], quantite)
except:
try:
ligne_obj.SetDefaultArticle(article_obj, quantite)
except:
ligne_obj.DL_Design = ligne_data.get("designation", "")
ligne_obj.DL_Qte = quantite
# Définir le prix (si fourni)
if ligne_data.get("prix_unitaire_ht"):
ligne_obj.DL_PrixUnitaire = float(ligne_data["prix_unitaire_ht"])
# Définir la remise (si fournie)
if ligne_data.get("remise_pourcentage", 0) > 0:
try:
ligne_obj.DL_Remise01REM_Valeur = float(ligne_data["remise_pourcentage"])
ligne_obj.DL_Remise01REM_Type = 0
except:
pass
ligne_obj.Write()
logger.info(f"{len(commande_data['lignes'])} nouvelle(s) ligne(s) ajoutée(s)")
champs_modifies.append("lignes")
# ========================================
# ÉTAPE 4 : VALIDATION FINALE
# ========================================
doc.Write()
# Attente indexation
time.sleep(1)
# Relecture pour récupérer les totaux
doc.Read()
total_ht = float(getattr(doc, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
logger.info(f"✅✅✅ DEVIS MODIFIÉ: {numero} ({', '.join(champs_modifies)}) ✅✅✅")
logger.info(f"💰 Nouveaux totaux: {total_ht}€ HT / {total_ttc}€ TTC")
return {
"numero": numero,
"total_ht": total_ht,
"total_ttc": total_ttc,
"champs_modifies": champs_modifies,
"statut": getattr(doc, "DO_Statut", 0)
}
except ValueError as e:
logger.error(f"❌ Erreur métier: {e}")
raise
except Exception as e:
logger.error(f"❌ Erreur modification devis: {e}", exc_info=True)
raise RuntimeError(f"Erreur technique Sage: {str(e)}")