Better usage and exploitation of contact_to_dict

This commit is contained in:
fanilo 2025-12-28 21:40:27 +01:00
parent 09e3589132
commit 6bb1253a1a

View file

@ -74,13 +74,6 @@ class SageConnector:
if conn: if conn:
conn.close() conn.close()
# def _safe_strip(self, value):
# """Strip sécurisé pour valeurs SQL"""
# if value is None:
# return None
# if isinstance(value, str):
# return value.strip()
# return value
def _cleanup_com_thread(self): def _cleanup_com_thread(self):
"""Nettoie COM pour le thread actuel (à appeler à la fin)""" """Nettoie COM pour le thread actuel (à appeler à la fin)"""
@ -937,7 +930,6 @@ class SageConnector:
cursor.execute(query, [numero]) cursor.execute(query, [numero])
rows = cursor.fetchall() rows = cursor.fetchall()
# Récupérer le contact par défaut du client
query_client = """ query_client = """
SELECT CT_Contact SELECT CT_Contact
FROM F_COMPTET FROM F_COMPTET
@ -954,7 +946,6 @@ class SageConnector:
for row in rows: for row in rows:
contact = self._row_to_contact_dict(row) contact = self._row_to_contact_dict(row)
# Vérifier si c'est le contact par défaut
if nom_contact_defaut: if nom_contact_defaut:
nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip() nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip()
contact["est_defaut"] = ( contact["est_defaut"] = (
@ -6666,7 +6657,7 @@ class SageConnector:
def creer_contact(self, contact_data: Dict) -> Dict: def creer_contact(self, contact_data: Dict) -> Dict:
""" """
Crée un nouveau contact dans F_CONTACTT via COM Crée un nouveau contact dans F_CONTACTT via COM
VERSION CORRIGÉE - Utilise IBOTiersContact3 VERSION FINALE COMPLÈTE
""" """
if not self.cial: if not self.cial:
raise RuntimeError("Connexion Sage non établie") raise RuntimeError("Connexion Sage non établie")
@ -6717,7 +6708,7 @@ class SageConnector:
persist = factory_contact.Create() persist = factory_contact.Create()
logger.info(f" Objet cree: {type(persist).__name__}") logger.info(f" Objet cree: {type(persist).__name__}")
# Cast vers IBOTiersContact3 (qui a Nom, Prenom, etc.) # Cast vers IBOTiersContact3
contact = None contact = None
interfaces_a_tester = [ interfaces_a_tester = [
"IBOTiersContact3", "IBOTiersContact3",
@ -6730,12 +6721,11 @@ class SageConnector:
try: try:
temp = win32com.client.CastTo(persist, interface_name) temp = win32com.client.CastTo(persist, interface_name)
# Vérifier si Nom existe (pas CT_Num !) # Vérifier si Nom existe
if hasattr(temp, '_prop_map_put_'): if hasattr(temp, '_prop_map_put_'):
props = list(temp._prop_map_put_.keys()) props = list(temp._prop_map_put_.keys())
logger.info(f" Test {interface_name}: props={props[:15]}") logger.info(f" Test {interface_name}: props={props[:15]}")
# Chercher Nom ou CT_Nom
if 'Nom' in props or 'CT_Nom' in props: if 'Nom' in props or 'CT_Nom' in props:
contact = temp contact = temp
logger.info(f" OK Cast reussi vers {interface_name}") logger.info(f" OK Cast reussi vers {interface_name}")
@ -6930,24 +6920,22 @@ class SageConnector:
logger.info(f" CT_No={contact_no}") logger.info(f" CT_No={contact_no}")
logger.info("=" * 80) logger.info("=" * 80)
# Retour # Utiliser _contact_to_dict
contact_dict = { contact_dict = self._contact_to_dict(
"numero": numero_client, contact,
"n_contact": n_contact or contact_no, numero_client=numero_client,
"civilite": contact_data.get("civilite"), contact_numero=contact_no,
"nom": nom, n_contact=n_contact
"prenom": prenom, )
"fonction": contact_data.get("fonction"), contact_dict["est_defaut"] = est_defaut
"service_code": contact_data.get("service_code"),
"telephone": contact_data.get("telephone"), logger.info("=" * 80)
"portable": contact_data.get("portable"), logger.info("[DEBUG RETOUR]")
"telecopie": contact_data.get("telecopie"), logger.info(f" numero_client = {numero_client}")
"email": contact_data.get("email"), logger.info(f" contact_no = {contact_no}")
"facebook": contact_data.get("facebook"), logger.info(f" n_contact = {n_contact}")
"linkedin": contact_data.get("linkedin"), logger.info(f" contact_dict COMPLET = {contact_dict}")
"skype": contact_data.get("skype"), logger.info("=" * 80)
"est_defaut": est_defaut,
}
return contact_dict return contact_dict
@ -6958,11 +6946,10 @@ class SageConnector:
logger.error(f"[ERREUR] {e}", exc_info=True) logger.error(f"[ERREUR] {e}", exc_info=True)
raise RuntimeError(f"Erreur technique: {e}") raise RuntimeError(f"Erreur technique: {e}")
def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict: def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict:
""" """
Modifie un contact existant via COM Modifie un contact existant via COM
VERSION REFACTORISÉE - Utilise FactoryTiersContact VERSION COMPLÈTE REFACTORISÉE
""" """
if not self.cial: if not self.cial:
raise RuntimeError("Connexion Sage non établie") raise RuntimeError("Connexion Sage non établie")
@ -6987,58 +6974,44 @@ class SageConnector:
except Exception as e: except Exception as e:
raise ValueError(f"Client {numero} introuvable: {e}") raise ValueError(f"Client {numero} introuvable: {e}")
# Charger le contact via FactoryTiersContact # Charger le contact via SQL puis DossierContact
logger.info("[2] Chargement du contact") logger.info("[2] Chargement du contact")
if not hasattr(client_obj, 'FactoryTiersContact'): nom_recherche = None
raise RuntimeError("FactoryTiersContact non trouvee sur le client") prenom_recherche = None
factory_contact = client_obj.FactoryTiersContact
try: try:
# Chercher le contact par son CT_No # Récupérer nom/prénom via SQL
# Il faut lister et trouver celui avec le bon CT_No with self._get_sql_connection() as conn:
# ou utiliser une autre méthode de lecture cursor = conn.cursor()
cursor.execute(
# STRATÉGIE : Lister tous les contacts et trouver le bon "SELECT CT_Nom, CT_Prenom FROM F_CONTACTT WHERE CT_Num = ? AND CT_No = ?",
contacts_collection = None [numero, contact_numero]
all_contacts = [] )
row = cursor.fetchone()
# Essayer de lister via SQL d'abord (plus fiable)
try: if not row:
with self._get_sql_connection() as conn: raise ValueError(f"Contact CT_No={contact_numero} non trouve")
cursor = conn.cursor()
cursor.execute( nom_recherche = row.CT_Nom.strip() if row.CT_Nom else ""
"SELECT CT_Nom, CT_Prenom FROM F_CONTACTT WHERE CT_Num = ? AND CT_No = ?", prenom_recherche = row.CT_Prenom.strip() if row.CT_Prenom else ""
[numero, contact_numero]
) logger.info(f" Contact trouve en SQL: {prenom_recherche} {nom_recherche}")
row = cursor.fetchone()
if not row:
raise ValueError(f"Contact CT_No={contact_numero} non trouve")
nom_recherche = row.CT_Nom.strip() if row.CT_Nom else ""
prenom_recherche = row.CT_Prenom.strip() if row.CT_Prenom else ""
logger.info(f" Contact trouve en SQL: {prenom_recherche} {nom_recherche}")
except Exception as e:
raise ValueError(f"Contact introuvable: {e}")
# Charger via ReadNomPrenom
factory_dossier = self.cial.CptaApplication.FactoryDossierContact
persist = factory_dossier.ReadNomPrenom(nom_recherche, prenom_recherche)
if not persist:
raise ValueError(f"Contact non trouvable via ReadNomPrenom")
contact = win32com.client.CastTo(persist, "IBOTiersContact3")
contact.Read()
logger.info(f" OK Contact charge: {contact.Nom}")
except Exception as e: except Exception as e:
raise ValueError(f"Contact introuvable: {e}") raise ValueError(f"Contact introuvable: {e}")
# Charger via FactoryDossierContact
factory_dossier = self.cial.CptaApplication.FactoryDossierContact
persist = factory_dossier.ReadNomPrenom(nom_recherche, prenom_recherche)
if not persist:
raise ValueError(f"Contact non trouvable via ReadNomPrenom")
contact = win32com.client.CastTo(persist, "IBOTiersContact3")
contact.Read()
logger.info(f" OK Contact charge: {contact.Nom}")
# Appliquer les modifications # Appliquer les modifications
logger.info("[3] Application des modifications") logger.info("[3] Application des modifications")
modifications_appliquees = [] modifications_appliquees = []
@ -7207,7 +7180,13 @@ class SageConnector:
logger.info(f"[SUCCES] Contact modifie: CT_No={contact_numero}") logger.info(f"[SUCCES] Contact modifie: CT_No={contact_numero}")
logger.info("=" * 80) logger.info("=" * 80)
contact_dict = self._contact_to_dict(contact) # Utiliser _contact_to_dict
contact_dict = self._contact_to_dict(
contact,
numero_client=numero,
contact_numero=contact_numero,
n_contact=None
)
contact_dict["est_defaut"] = est_actuellement_defaut contact_dict["est_defaut"] = est_actuellement_defaut
return contact_dict return contact_dict
@ -7218,12 +7197,12 @@ class SageConnector:
except Exception as e: except Exception as e:
logger.error(f"[ERREUR] {e}", exc_info=True) logger.error(f"[ERREUR] {e}", exc_info=True)
raise RuntimeError(f"Erreur technique: {e}") raise RuntimeError(f"Erreur technique: {e}")
def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict: def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict:
""" """
Définit un contact comme contact par défaut du client Définit un contact comme contact par défaut du client
VERSION REFACTORISÉE VERSION COMPLÈTE REFACTORISÉE
""" """
if not self.cial: if not self.cial:
raise RuntimeError("Connexion Sage non établie") raise RuntimeError("Connexion Sage non établie")
@ -7325,7 +7304,7 @@ class SageConnector:
except Exception as e: except Exception as e:
logger.error(f"[ERREUR] {e}", exc_info=True) logger.error(f"[ERREUR] {e}", exc_info=True)
raise RuntimeError(f"Erreur technique: {e}") raise RuntimeError(f"Erreur technique: {e}")
def lister_contacts(self, numero: str) -> List[Dict]: def lister_contacts(self, numero: str) -> List[Dict]:
""" """
@ -7383,7 +7362,6 @@ class SageConnector:
try: try:
with self._com_context(), self._lock_com: with self._com_context(), self._lock_com:
# Charger le client
factory_client = self.cial.CptaApplication.FactoryClient factory_client = self.cial.CptaApplication.FactoryClient
persist_client = factory_client.ReadNumero(numero) persist_client = factory_client.ReadNumero(numero)
@ -7393,7 +7371,6 @@ class SageConnector:
client = win32com.client.CastTo(persist_client, "IBOClient3") client = win32com.client.CastTo(persist_client, "IBOClient3")
client.Read() client.Read()
# Méthode 1: Via CT_NoContact (si disponible)
ct_no_defaut = None ct_no_defaut = None
try: try:
ct_no_defaut = getattr(client, "CT_NoContact", None) ct_no_defaut = getattr(client, "CT_NoContact", None)
@ -7402,7 +7379,6 @@ class SageConnector:
except: except:
pass pass
# Méthode 2: Via CT_Contact (nom)
nom_contact_defaut = None nom_contact_defaut = None
try: try:
nom_contact_defaut = getattr(client, "CT_Contact", None) nom_contact_defaut = getattr(client, "CT_Contact", None)
@ -7411,11 +7387,9 @@ class SageConnector:
except: except:
pass pass
# Si on a le CT_No, on retourne le contact complet
if ct_no_defaut: if ct_no_defaut:
return self.obtenir_contact(numero, ct_no_defaut) return self.obtenir_contact(numero, ct_no_defaut)
# Sinon, chercher par nom dans la liste des contacts
if nom_contact_defaut: if nom_contact_defaut:
contacts = self.lister_contacts(numero) contacts = self.lister_contacts(numero)
for contact in contacts: for contact in contacts:
@ -7433,7 +7407,7 @@ class SageConnector:
def supprimer_contact(self, numero: str, contact_numero: int) -> Dict: def supprimer_contact(self, numero: str, contact_numero: int) -> Dict:
""" """
Supprime un contact via COM Supprime un contact via COM
VERSION REFACTORISÉE VERSION COMPLÈTE REFACTORISÉE
""" """
if not self.cial: if not self.cial:
raise RuntimeError("Connexion Sage non établie") raise RuntimeError("Connexion Sage non établie")
@ -7521,15 +7495,19 @@ class SageConnector:
except Exception as e: except Exception as e:
logger.error(f"[ERREUR] {e}", exc_info=True) logger.error(f"[ERREUR] {e}", exc_info=True)
raise RuntimeError(f"Erreur technique: {e}") raise RuntimeError(f"Erreur technique: {e}")
def _contact_to_dict(self, contact) -> Dict: def _contact_to_dict(self, contact, numero_client=None, contact_numero=None, n_contact=None) -> Dict:
""" """
Convertit un objet COM Contact (IBOTiersContact3) en dictionnaire Convertit un objet COM Contact (IBOTiersContact3) en dictionnaire
VERSION REFACTORISÉE
Args:
contact: Objet COM contact
numero_client: Code du client (optionnel)
contact_numero: CT_No du contact (optionnel)
n_contact: N_Contact du contact (optionnel)
""" """
try: try:
# IBOTiersContact3 utilise Nom/Prenom (sans préfixe CT_) # Civilité
civilite_code = getattr(contact, "Civilite", None) civilite_code = getattr(contact, "Civilite", None)
civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"} civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"}
civilite = civilite_map.get(civilite_code) if civilite_code is not None else None civilite = civilite_map.get(civilite_code) if civilite_code is not None else None
@ -7550,19 +7528,10 @@ class SageConnector:
except: except:
pass pass
# Récupérer CT_No et N_Contact via SQL si possible
# Car IBOTiersContact3 du DossierContact ne les a pas
contact_numero = None
n_contact = None
numero = None
# Ces infos doivent venir de F_CONTACTT
# On les passe plutôt en paramètre ou on les récupère différemment
return { return {
"numero": numero, # À passer en paramètre "numero": numero_client,
"contact_numero": contact_numero, # À passer en paramètre "contact_numero": contact_numero,
"n_contact": n_contact, # À passer en paramètre "n_contact": n_contact or contact_numero,
"civilite": civilite, "civilite": civilite,
"nom": self._safe_strip(getattr(contact, "Nom", None)), "nom": self._safe_strip(getattr(contact, "Nom", None)),
"prenom": self._safe_strip(getattr(contact, "Prenom", None)), "prenom": self._safe_strip(getattr(contact, "Prenom", None)),
@ -7579,7 +7548,8 @@ class SageConnector:
except Exception as e: except Exception as e:
logger.warning(f"Erreur conversion contact: {e}") logger.warning(f"Erreur conversion contact: {e}")
return {} return {}
def _row_to_contact_dict(self, row) -> Dict: def _row_to_contact_dict(self, row) -> Dict:
"""Convertit une ligne SQL en dictionnaire contact""" """Convertit une ligne SQL en dictionnaire contact"""
civilite_code = row.CT_Civilite civilite_code = row.CT_Civilite