diff --git a/main.py b/main.py index a5376c0..9a09486 100644 --- a/main.py +++ b/main.py @@ -857,6 +857,46 @@ class FamilleCreate(BaseModel): ) +class ContactCreateRequest(BaseModel): + """Requête de création de contact""" + numero: str + civilite: Optional[str] = None + nom: str + prenom: Optional[str] = None + fonction: Optional[str] = None + service_code: Optional[int] = None + telephone: Optional[str] = None + portable: Optional[str] = None + telecopie: Optional[str] = None + email: Optional[str] = None + facebook: Optional[str] = None + linkedin: Optional[str] = None + skype: Optional[str] = None + + +class ContactListRequest(BaseModel): + """Requête de liste des contacts""" + numero: str + + +class ContactGetRequest(BaseModel): + """Requête de récupération d'un contact""" + numero: str + contact_numero: int + + +class ContactUpdateRequest(BaseModel): + """Requête de modification d'un contact""" + numero: str + contact_numero: int + updates: Dict + + +class ContactDeleteRequest(BaseModel): + """Requête de suppression d'un contact""" + numero: str + contact_numero: int + def verify_token(x_sage_token: str = Header(...)): """Vérification du token d'authentification""" if x_sage_token != settings.sage_gateway_token: @@ -2044,6 +2084,82 @@ def lire_mouvement_stock(numero: str): raise HTTPException(500, str(e)) +@app.post("/sage/contacts/create", dependencies=[Depends(verify_token)]) +def contacts_create(req: ContactCreateRequest): + """Crée un nouveau contact""" + try: + contact = sage.creer_contact(req.dict()) + return {"success": True, "data": contact} + except ValueError as e: + logger.error(f"Erreur validation contact: {e}") + raise HTTPException(400, str(e)) + except Exception as e: + logger.error(f"Erreur création contact: {e}") + raise HTTPException(500, str(e)) + + +@app.post("/sage/contacts/list", dependencies=[Depends(verify_token)]) +def contacts_list(req: ContactListRequest): + """Liste les contacts d'un client""" + try: + contacts = sage.lister_contacts(req.numero) + return {"success": True, "data": contacts} + except Exception as e: + logger.error(f"Erreur liste contacts: {e}") + raise HTTPException(500, str(e)) + + +@app.post("/sage/contacts/get", dependencies=[Depends(verify_token)]) +def contacts_get(req: ContactGetRequest): + """Récupère un contact spécifique""" + try: + contact = sage.obtenir_contact(req.numero, req.contact_numero) + if not contact: + raise HTTPException(404, f"Contact {req.contact_numero} non trouvé") + return {"success": True, "data": contact} + except HTTPException: + raise + except Exception as e: + logger.error(f"Erreur récupération contact: {e}") + raise HTTPException(500, str(e)) + + +@app.post("/sage/contacts/update", dependencies=[Depends(verify_token)]) +def contacts_update(req: ContactUpdateRequest): + """Modifie un contact existant""" + try: + contact = sage.modifier_contact(req.numero, req.contact_numero, req.updates) + return {"success": True, "data": contact} + except ValueError as e: + logger.error(f"Erreur validation contact: {e}") + raise HTTPException(400, str(e)) + except Exception as e: + logger.error(f"Erreur modification contact: {e}") + raise HTTPException(500, str(e)) + + +@app.post("/sage/contacts/delete", dependencies=[Depends(verify_token)]) +def contacts_delete(req: ContactDeleteRequest): + """Supprime un contact""" + try: + result = sage.supprimer_contact(req.numero, req.contact_numero) + return {"success": True, "data": result} + except Exception as e: + logger.error(f"Erreur suppression contact: {e}") + raise HTTPException(500, str(e)) + + +@app.post("/sage/contacts/set-default", dependencies=[Depends(verify_token)]) +def contacts_set_default(req: ContactGetRequest): + """Définit un contact comme contact par défaut""" + try: + result = sage.definir_contact_defaut(req.numero, req.contact_numero) + return {"success": True, "data": result} + except Exception as e: + logger.error(f"Erreur définition contact par défaut: {e}") + raise HTTPException(500, str(e)) + + if __name__ == "__main__": uvicorn.run( "main:app", diff --git a/sage_connector.py b/sage_connector.py index d2a20d8..814a8fa 100644 --- a/sage_connector.py +++ b/sage_connector.py @@ -74,13 +74,13 @@ class SageConnector: if conn: conn.close() - def _safe_strip(self, value): - """Strip sécurisé pour valeurs SQL""" - if value is None: - return None - if isinstance(value, str): - return value.strip() - return value +# def _safe_strip(self, value): +# """Strip sécurisé pour valeurs SQL""" +# if value is None: +# return None +# if isinstance(value, str): +# return value.strip() +# return value def _cleanup_com_thread(self): """Nettoie COM pour le thread actuel (à appeler à la fin)""" @@ -915,70 +915,63 @@ class SageConnector: raise RuntimeError(f"Erreur technique Sage: {error_message}") - def _get_contacts_client(self, ct_num: str, conn) -> list: + def _get_contacts_client(self, numero: str, conn) -> list: """ - Récupère tous les contacts d'un client avec TOUS les champs + Récupère tous les contacts d'un client avec indication du contact par défaut """ try: cursor = conn.cursor() query = """ SELECT - -- IDENTIFICATION CT_Num, CT_No, N_Contact, - - -- IDENTITÉ CT_Civilite, CT_Nom, CT_Prenom, CT_Fonction, - - -- ORGANISATION N_Service, - - -- COORDONNÉES CT_Telephone, CT_TelPortable, CT_Telecopie, CT_EMail, - - -- RÉSEAUX SOCIAUX CT_Facebook, CT_LinkedIn, CT_Skype - FROM F_CONTACTT WHERE CT_Num = ? ORDER BY N_Contact, CT_Nom, CT_Prenom """ - cursor.execute(query, [ct_num]) + cursor.execute(query, [numero]) rows = cursor.fetchall() + # Récupérer le contact par défaut du client + query_client = """ + SELECT CT_Contact + FROM F_COMPTET + WHERE CT_Num = ? + """ + cursor.execute(query_client, [numero]) + client_row = cursor.fetchone() + + nom_contact_defaut = None + if client_row: + nom_contact_defaut = self._safe_strip(client_row.CT_Contact) + contacts = [] for row in rows: - contact = { - "ct_num": self._safe_strip(row.CT_Num), - "ct_no": row.CT_No, - "n_contact": row.N_Contact, - - "civilite": self._safe_strip(row.CT_Civilite), - "nom": self._safe_strip(row.CT_Nom), - "prenom": self._safe_strip(row.CT_Prenom), - "fonction": self._safe_strip(row.CT_Fonction), - - "service_code": row.N_Service, - - "telephone": self._safe_strip(row.CT_Telephone), - "portable": self._safe_strip(row.CT_TelPortable), - "telecopie": self._safe_strip(row.CT_Telecopie), - "email": self._safe_strip(row.CT_EMail), - - "facebook": self._safe_strip(row.CT_Facebook), - "linkedin": self._safe_strip(row.CT_LinkedIn), - "skype": self._safe_strip(row.CT_Skype) - } + contact = self._row_to_contact_dict(row) + + # Vérifier si c'est le contact par défaut + if nom_contact_defaut: + nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip() + contact["est_defaut"] = ( + nom_complet == nom_contact_defaut or + contact['nom'] == nom_contact_defaut + ) + else: + contact["est_defaut"] = False + contacts.append(contact) return contacts except Exception as e: - logger.warning(f"⚠️ Impossible de récupérer contacts pour {ct_num}: {e}") + logger.warning(f"⚠️ Impossible de récupérer contacts pour {numero}: {e}") return [] - def lister_tous_clients(self, filtre=""): """ Liste tous les clients avec TOUS les champs gérés par creer_client @@ -1314,7 +1307,6 @@ class SageConnector: return None - def lister_tous_articles(self, filtre=""): try: with self._get_sql_connection() as conn: @@ -6670,6 +6662,815 @@ class SageConnector: """ Lit UNE livraison via SQL (avec lignes)""" return self._lire_document_sql(numero, type_doc=30) + + + def creer_contact(self, contact_data: Dict) -> Dict: + """ + Crée un nouveau contact pour un client via COM + VERSION COMPLÈTE avec gestion du contact par défaut + + Champs Sage Contact (F_CONTACTT): + - CT_Num: Code client parent (obligatoire) + - CT_Nom: Nom (obligatoire, max 35 car) + - CT_Prenom: Prénom (max 35 car) + - CT_Civilite: 0=M., 1=Mme, 2=Mlle, 3=Société + - CT_Fonction: Fonction (max 35 car) + - N_Service: Code service (int) + - CT_Telephone: Tél fixe (max 21 car) + - CT_TelPortable: Mobile (max 21 car) + - CT_Telecopie: Fax (max 21 car) + - CT_EMail: Email (max 69 car) + - CT_Facebook: Facebook (max 69 car) + - CT_LinkedIn: LinkedIn (max 69 car) + - CT_Skype: Skype (max 69 car) + + Nouveau: + - est_defaut: Définir comme contact par défaut (boolean) + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + logger.info("=" * 80) + logger.info("[CRÉATION CONTACT SAGE]") + logger.info("=" * 80) + + # Validation des champs obligatoires + if not contact_data.get("numero"): + raise ValueError("numero (code client) obligatoire") + + if not contact_data.get("nom"): + raise ValueError("nom obligatoire") + + numero = self._clean_str(contact_data["numero"], 17).upper() + nom = self._clean_str(contact_data["nom"], 35) + + # Vérifier que le client existe + logger.info(f"[1] Vérification client: {numero}") + factory_client = self.cial.CptaApplication.FactoryClient + try: + persist_client = factory_client.ReadNumero(numero) + if not persist_client: + raise ValueError(f"Client {numero} non trouvé") + logger.info(f" ✓ Client {numero} existe") + except Exception as e: + raise ValueError(f"Client {numero} introuvable: {e}") + + # Créer l'objet contact + logger.info("[2] Création objet Contact") + factory_contact = self.cial.CptaApplication.FactoryContactT + persist = factory_contact.Create() + contact = win32com.client.CastTo(persist, "IBOContactT3") + logger.info(" ✓ Objet IBOContactT3 créé") + + # Configuration obligatoire + logger.info("[3] Configuration obligatoire") + contact.CT_Num = numero + contact.CT_Nom = nom + logger.info(f" CT_Num = {numero}") + logger.info(f" CT_Nom = {nom}") + + # SetDefault pour initialiser les valeurs par défaut + contact.SetDefault() + logger.info(" ✓ SetDefault() appliqué") + + # Civilité + logger.info("[4] Identité") + civilite_input = contact_data.get("civilite") + if civilite_input: + civilite_map = { + "M.": 0, + "Mme": 1, + "Mlle": 2, + "Société": 3 + } + civilite_code = civilite_map.get(civilite_input) + if civilite_code is not None: + self._try_set_attribute(contact, "CT_Civilite", civilite_code) + logger.info(f" CT_Civilite = {civilite_code} ({civilite_input})") + + # Prénom + if contact_data.get("prenom"): + prenom = self._clean_str(contact_data["prenom"], 35) + self._try_set_attribute(contact, "CT_Prenom", prenom) + logger.info(f" CT_Prenom = {prenom}") + + # Fonction + if contact_data.get("fonction"): + fonction = self._clean_str(contact_data["fonction"], 35) + self._try_set_attribute(contact, "CT_Fonction", fonction) + logger.info(f" CT_Fonction = {fonction}") + + # Service + logger.info("[5] Organisation") + if contact_data.get("service_code") is not None: + service = self._safe_int(contact_data["service_code"]) + if service is not None: + self._try_set_attribute(contact, "N_Service", service) + logger.info(f" N_Service = {service}") + + # Coordonnées + logger.info("[6] Coordonnées") + + if contact_data.get("telephone"): + telephone = self._clean_str(contact_data["telephone"], 21) + self._try_set_attribute(contact, "CT_Telephone", telephone) + logger.info(f" CT_Telephone = {telephone}") + + if contact_data.get("portable"): + portable = self._clean_str(contact_data["portable"], 21) + self._try_set_attribute(contact, "CT_TelPortable", portable) + logger.info(f" CT_TelPortable = {portable}") + + if contact_data.get("telecopie"): + fax = self._clean_str(contact_data["telecopie"], 21) + self._try_set_attribute(contact, "CT_Telecopie", fax) + logger.info(f" CT_Telecopie = {fax}") + + if contact_data.get("email"): + email = self._clean_str(contact_data["email"], 69) + self._try_set_attribute(contact, "CT_EMail", email) + logger.info(f" CT_EMail = {email}") + + # Réseaux sociaux + logger.info("[7] Réseaux sociaux") + + if contact_data.get("facebook"): + facebook = self._clean_str(contact_data["facebook"], 69) + self._try_set_attribute(contact, "CT_Facebook", facebook) + logger.info(f" CT_Facebook = {facebook}") + + if contact_data.get("linkedin"): + linkedin = self._clean_str(contact_data["linkedin"], 69) + self._try_set_attribute(contact, "CT_LinkedIn", linkedin) + logger.info(f" CT_LinkedIn = {linkedin}") + + if contact_data.get("skype"): + skype = self._clean_str(contact_data["skype"], 69) + self._try_set_attribute(contact, "CT_Skype", skype) + logger.info(f" CT_Skype = {skype}") + + # Enregistrement du contact + logger.info("[8] WRITE CONTACT") + try: + contact.Write() + contact.Read() + logger.info(" ✓ Write() réussi") + except Exception as e: + error_detail = str(e) + try: + sage_error = self.cial.CptaApplication.LastError + if sage_error: + error_detail = f"{sage_error.Description} (Code: {sage_error.Number})" + except: + pass + logger.error(f" ✗ Erreur Write: {error_detail}") + raise RuntimeError(f"Échec création contact: {error_detail}") + + # Récupération des données finales + contact_numero = getattr(contact, "CT_No", None) + n_contact = getattr(contact, "N_Contact", None) + + logger.info(f" Contact créé: CT_No={contact_numero}, N_Contact={n_contact}") + + # Gestion du contact par défaut + est_defaut = contact_data.get("est_defaut", False) + + if est_defaut: + logger.info("[9] Définition comme contact par défaut") + try: + if contact_numero: + # Construire le nom complet pour CT_Contact + prenom = self._clean_str(contact_data.get("prenom", ""), 35) + nom_complet = f"{prenom} {nom}".strip() if prenom else nom + + # Charger le client pour mise à jour + persist_client = factory_client.ReadNumero(numero) + client_obj = win32com.client.CastTo(persist_client, "IBOClient3") + client_obj.Read() + + # Définir CT_Contact + ancien_contact = getattr(client_obj, "CT_Contact", "") + client_obj.CT_Contact = nom_complet + logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'") + + # Essayer CT_NoContact si disponible + if self._try_set_attribute(client_obj, "CT_NoContact", contact_numero): + logger.info(f" CT_NoContact = {contact_numero}") + + # Enregistrer le client + client_obj.Write() + client_obj.Read() + logger.info(f" ✓ Contact défini comme par défaut") + + except Exception as e: + logger.warning(f" ⚠ Échec définition par défaut: {e}") + # On ne fait pas échouer la création pour autant + est_defaut = False # On indique que ça n'a pas marché + + logger.info("=" * 80) + logger.info(f"[SUCCÈS] Contact créé: CT_No={contact_numero}, N_Contact={n_contact}") + if est_defaut: + logger.info(f" Défini comme contact par défaut") + logger.info("=" * 80) + + # Retourner les données complètes + contact_dict = self._contact_to_dict(contact) + contact_dict["est_defaut"] = est_defaut + + return contact_dict + + except ValueError as e: + logger.error(f"[ERREUR VALIDATION] {e}") + raise + except Exception as e: + logger.error(f"[ERREUR] {e}", exc_info=True) + raise RuntimeError(f"Erreur technique: {e}") + + + def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict: + """ + Modifie un contact existant via COM + VERSION COMPLÈTE avec gestion du contact par défaut + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + logger.info("=" * 80) + logger.info(f"[MODIFICATION CONTACT] CT_No={contact_numero}") + logger.info("=" * 80) + + # Lire le contact existant + logger.info("[1] Lecture contact existant") + factory_contact = self.cial.CptaApplication.FactoryContactT + + try: + persist = factory_contact.ReadNumero(numero, contact_numero) + if not persist: + raise ValueError(f"Contact CT_No={contact_numero} non trouvé pour client {numero}") + + contact = win32com.client.CastTo(persist, "IBOContactT3") + contact.Read() + logger.info(f" ✓ Contact chargé: {contact.CT_Nom}") + + except Exception as e: + raise ValueError(f"Contact introuvable: {e}") + + # Appliquer les modifications + logger.info("[2] Application des modifications") + modifications_appliquees = [] + + # Identité + if "civilite" in updates: + civilite_map = {"M.": 0, "Mme": 1, "Mlle": 2, "Société": 3} + civilite_code = civilite_map.get(updates["civilite"]) + if civilite_code is not None: + self._try_set_attribute(contact, "CT_Civilite", civilite_code) + logger.info(f" CT_Civilite = {civilite_code}") + modifications_appliquees.append("civilite") + + if "nom" in updates: + nom = self._clean_str(updates["nom"], 35) + if nom: # Ne pas autoriser nom vide + self._try_set_attribute(contact, "CT_Nom", nom) + logger.info(f" CT_Nom = {nom}") + modifications_appliquees.append("nom") + + if "prenom" in updates: + prenom = self._clean_str(updates["prenom"], 35) + self._try_set_attribute(contact, "CT_Prenom", prenom) + logger.info(f" CT_Prenom = {prenom}") + modifications_appliquees.append("prenom") + + if "fonction" in updates: + fonction = self._clean_str(updates["fonction"], 35) + self._try_set_attribute(contact, "CT_Fonction", fonction) + logger.info(f" CT_Fonction = {fonction}") + modifications_appliquees.append("fonction") + + # Service + if "service_code" in updates: + service = self._safe_int(updates["service_code"]) + if service is not None: + self._try_set_attribute(contact, "N_Service", service) + logger.info(f" N_Service = {service}") + modifications_appliquees.append("service_code") + + # Coordonnées + coord_fields = { + "telephone": ("CT_Telephone", 21), + "portable": ("CT_TelPortable", 21), + "telecopie": ("CT_Telecopie", 21), + "email": ("CT_EMail", 69), + } + + for key, (attr, max_len) in coord_fields.items(): + if key in updates: + value = self._clean_str(updates[key], max_len) + self._try_set_attribute(contact, attr, value) + logger.info(f" {attr} = {value}") + modifications_appliquees.append(key) + + # Réseaux sociaux + social_fields = { + "facebook": ("CT_Facebook", 69), + "linkedin": ("CT_LinkedIn", 69), + "skype": ("CT_Skype", 69), + } + + for key, (attr, max_len) in social_fields.items(): + if key in updates: + value = self._clean_str(updates[key], max_len) + self._try_set_attribute(contact, attr, value) + logger.info(f" {attr} = {value}") + modifications_appliquees.append(key) + + # Enregistrement du contact + logger.info("[3] WRITE CONTACT") + try: + contact.Write() + contact.Read() + logger.info(" ✓ Write() réussi") + except Exception as e: + error_detail = str(e) + try: + sage_error = self.cial.CptaApplication.LastError + if sage_error: + error_detail = f"{sage_error.Description} (Code: {sage_error.Number})" + except: + pass + logger.error(f" ✗ Erreur Write: {error_detail}") + raise RuntimeError(f"Échec modification contact: {error_detail}") + + logger.info(f" Modifications appliquées: {', '.join(modifications_appliquees)}") + + # Gestion du contact par défaut + est_defaut_demande = updates.get("est_defaut") + est_actuellement_defaut = False + + if est_defaut_demande is not None: + logger.info("[4] Gestion contact par défaut") + + if est_defaut_demande: + # Définir comme contact par défaut + try: + # Construire le nom complet + nom_final = getattr(contact, "CT_Nom", "") + prenom_final = getattr(contact, "CT_Prenom", "") + nom_complet = f"{prenom_final} {nom_final}".strip() if prenom_final else nom_final + + # Charger le client + factory_client = self.cial.CptaApplication.FactoryClient + persist_client = factory_client.ReadNumero(numero) + client_obj = win32com.client.CastTo(persist_client, "IBOClient3") + client_obj.Read() + + # Mettre à jour CT_Contact + ancien_contact = getattr(client_obj, "CT_Contact", "") + client_obj.CT_Contact = nom_complet + logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'") + + # Essayer CT_NoContact si disponible + if self._try_set_attribute(client_obj, "CT_NoContact", contact_numero): + logger.info(f" CT_NoContact = {contact_numero}") + + # Enregistrer le client + client_obj.Write() + client_obj.Read() + logger.info(" ✓ Contact défini comme par défaut") + est_actuellement_defaut = True + + except Exception as e: + logger.warning(f" ⚠ Échec définition par défaut: {e}") + est_actuellement_defaut = False + else: + # Retirer le statut par défaut + # Note: On ne fait rien car Sage gère via CT_Contact du client + # Si on veut vraiment retirer, il faut définir un autre contact comme par défaut + logger.info(" ⓘ Pour retirer le statut par défaut, définir un autre contact") + else: + # Vérifier si c'est déjà le contact par défaut + try: + factory_client = self.cial.CptaApplication.FactoryClient + persist_client = factory_client.ReadNumero(numero) + client_obj = win32com.client.CastTo(persist_client, "IBOClient3") + client_obj.Read() + + ct_contact = getattr(client_obj, "CT_Contact", "") + nom_final = getattr(contact, "CT_Nom", "") + prenom_final = getattr(contact, "CT_Prenom", "") + nom_complet = f"{prenom_final} {nom_final}".strip() if prenom_final else nom_final + + est_actuellement_defaut = (ct_contact == nom_complet) + + except Exception as e: + logger.debug(f"Impossible de vérifier statut par défaut: {e}") + + logger.info("=" * 80) + logger.info(f"[SUCCÈS] Contact modifié: CT_No={contact_numero}") + if est_actuellement_defaut: + logger.info(f" Contact par défaut") + logger.info("=" * 80) + + # Retourner les données mises à jour + contact_dict = self._contact_to_dict(contact) + contact_dict["est_defaut"] = est_actuellement_defaut + + return contact_dict + + except ValueError as e: + logger.error(f"[ERREUR VALIDATION] {e}") + raise + except Exception as e: + logger.error(f"[ERREUR] {e}", exc_info=True) + raise RuntimeError(f"Erreur technique: {e}") + + + def lister_contacts(self, numero: str) -> List[Dict]: + """ + Liste tous les contacts d'un client + """ + try: + with self._get_sql_connection() as conn: + return self._get_contacts_client(numero, conn) + except Exception as e: + logger.error(f"Erreur liste contacts: {e}") + raise RuntimeError(f"Erreur lecture contacts: {str(e)}") + + + def obtenir_contact(self, numero: str, contact_numero: int) -> Optional[Dict]: + """ + Récupère un contact spécifique par son CT_No + """ + try: + with self._get_sql_connection() as conn: + cursor = conn.cursor() + + query = """ + SELECT + CT_Num, CT_No, N_Contact, + CT_Civilite, CT_Nom, CT_Prenom, CT_Fonction, + N_Service, + CT_Telephone, CT_TelPortable, CT_Telecopie, CT_EMail, + CT_Facebook, CT_LinkedIn, CT_Skype + FROM F_CONTACTT + WHERE CT_Num = ? AND CT_No = ? + """ + + cursor.execute(query, [numero, contact_numero]) + row = cursor.fetchone() + + if not row: + return None + + return self._row_to_contact_dict(row) + + except Exception as e: + logger.error(f"Erreur obtention contact: {e}") + raise RuntimeError(f"Erreur lecture contact: {str(e)}") + + + def obtenir_contact_defaut(self, numero: str) -> Optional[Dict]: + """ + Récupère le contact par défaut d'un client + + Returns: + Dictionnaire avec les infos du contact par défaut, ou None si non défini + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + # Charger le client + factory_client = self.cial.CptaApplication.FactoryClient + persist_client = factory_client.ReadNumero(numero) + + if not persist_client: + raise ValueError(f"Client {numero} non trouvé") + + client = win32com.client.CastTo(persist_client, "IBOClient3") + client.Read() + + # Méthode 1: Via CT_NoContact (si disponible) + ct_no_defaut = None + try: + ct_no_defaut = getattr(client, "CT_NoContact", None) + if ct_no_defaut: + logger.info(f"Contact par défaut via CT_NoContact: {ct_no_defaut}") + except: + pass + + # Méthode 2: Via CT_Contact (nom) + nom_contact_defaut = None + try: + nom_contact_defaut = getattr(client, "CT_Contact", None) + if nom_contact_defaut: + logger.info(f"Contact par défaut via CT_Contact: {nom_contact_defaut}") + except: + pass + + # Si on a le CT_No, on retourne le contact complet + if ct_no_defaut: + return self.obtenir_contact(numero, ct_no_defaut) + + # Sinon, chercher par nom dans la liste des contacts + if nom_contact_defaut: + contacts = self.lister_contacts(numero) + for contact in contacts: + nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip() + if nom_complet == nom_contact_defaut or contact['nom'] == nom_contact_defaut: + return {**contact, "est_defaut": True} + + return None + + except Exception as e: + logger.error(f"Erreur obtention contact par défaut: {e}") + return None + + + def supprimer_contact(self, numero: str, contact_numero: int) -> Dict: + """ + Supprime un contact via COM + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + logger.info("=" * 80) + logger.info(f"[SUPPRESSION CONTACT] CT_No={contact_numero}") + logger.info("=" * 80) + + # Lire le contact + factory_contact = self.cial.CptaApplication.FactoryContactT + + try: + persist = factory_contact.ReadNumero(numero, contact_numero) + if not persist: + raise ValueError(f"Contact CT_No={contact_numero} non trouvé") + + contact = win32com.client.CastTo(persist, "IBOContactT3") + contact.Read() + nom_contact = contact.CT_Nom + logger.info(f" ✓ Contact trouvé: {nom_contact}") + + except Exception as e: + raise ValueError(f"Contact introuvable: {e}") + + # Supprimer + logger.info("[SUPPRESSION]") + try: + contact.Remove() + logger.info(" ✓ Remove() réussi") + except Exception as e: + error_detail = str(e) + try: + sage_error = self.cial.CptaApplication.LastError + if sage_error: + error_detail = f"{sage_error.Description} (Code: {sage_error.Number})" + except: + pass + logger.error(f" ✗ Erreur Remove: {error_detail}") + raise RuntimeError(f"Échec suppression contact: {error_detail}") + + logger.info("=" * 80) + logger.info(f"[SUCCÈS] Contact supprimé: {nom_contact}") + logger.info("=" * 80) + + return { + "numero": numero, + "contact_numero": contact_numero, + "nom": nom_contact, + "supprime": True, + "date_suppression": datetime.now().isoformat() + } + + except ValueError as e: + logger.error(f"[ERREUR VALIDATION] {e}") + raise + except Exception as e: + logger.error(f"[ERREUR] {e}", exc_info=True) + raise RuntimeError(f"Erreur technique: {e}") + + + def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict: + """ + Définit un contact comme contact par défaut du client + + Sage gère le contact par défaut via : + 1. CT_Contact dans F_COMPTET : Nom du contact principal (35 car) + 2. CT_NoContact dans F_COMPTET : Numéro du contact par défaut (si disponible) + + Cette méthode : + - Vérifie que le contact existe + - Met à jour le client (F_COMPTET) pour référencer ce contact + - Retourne les infos du client et contact mis à jour + """ + if not self.cial: + raise RuntimeError("Connexion Sage non établie") + + try: + with self._com_context(), self._lock_com: + logger.info("=" * 80) + logger.info(f"[DÉFINIR CONTACT PAR DÉFAUT] Client={numero}, Contact={contact_numero}") + logger.info("=" * 80) + + # Étape 1: Charger le contact + logger.info("[1] Chargement du contact") + factory_contact = self.cial.CptaApplication.FactoryContactT + + try: + persist_contact = factory_contact.ReadNumero(numero, contact_numero) + if not persist_contact: + raise ValueError(f"Contact CT_No={contact_numero} non trouvé pour client {numero}") + + contact = win32com.client.CastTo(persist_contact, "IBOContactT3") + contact.Read() + + nom_contact = getattr(contact, "CT_Nom", "") + prenom_contact = getattr(contact, "CT_Prenom", "") + + # Construire le nom complet + nom_complet = f"{prenom_contact} {nom_contact}".strip() if prenom_contact else nom_contact + + logger.info(f" ✓ Contact trouvé: {nom_complet}") + + except Exception as e: + raise ValueError(f"Contact introuvable: {e}") + + # Étape 2: Charger le client + logger.info("[2] Chargement du client") + factory_client = self.cial.CptaApplication.FactoryClient + + try: + persist_client = factory_client.ReadNumero(numero) + if not persist_client: + raise ValueError(f"Client {numero} non trouvé") + + client = win32com.client.CastTo(persist_client, "IBOClient3") + client.Read() + + logger.info(f" ✓ Client chargé: {client.CT_Intitule}") + + except Exception as e: + raise ValueError(f"Client introuvable: {e}") + + # Étape 3: Définir le contact par défaut + logger.info("[3] Définition du contact par défaut") + + # Méthode 1: Via CT_Contact (nom du contact) + try: + ancien_contact = getattr(client, "CT_Contact", "") + client.CT_Contact = nom_complet + logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'") + except Exception as e: + logger.warning(f" Impossible de définir CT_Contact: {e}") + + # Méthode 2: Via CT_NoContact (numéro du contact) - si disponible + # Ce champ n'existe pas dans toutes les versions de Sage + if self._try_set_attribute(client, "CT_NoContact", contact_numero): + logger.info(f" CT_NoContact = {contact_numero}") + else: + logger.info(f" CT_NoContact non disponible (normal dans certaines versions)") + + # Méthode 3: Via l'objet Contact (relation) - si disponible + try: + if hasattr(client, "Contact") or hasattr(client, "ContactPrincipal"): + client.Contact = contact + logger.info(f" Contact (objet) défini") + except Exception as e: + logger.debug(f" Contact (objet) non disponible: {e}") + + # Étape 4: Enregistrement + logger.info("[4] WRITE") + try: + client.Write() + client.Read() + logger.info(" ✓ Client mis à jour") + except Exception as e: + error_detail = str(e) + try: + sage_error = self.cial.CptaApplication.LastError + if sage_error: + error_detail = f"{sage_error.Description} (Code: {sage_error.Number})" + except: + pass + logger.error(f" ✗ Erreur Write: {error_detail}") + raise RuntimeError(f"Échec mise à jour client: {error_detail}") + + logger.info("=" * 80) + logger.info(f"[SUCCÈS] Contact par défaut défini: {nom_complet}") + logger.info("=" * 80) + + return { + "numero": numero, + "contact_numero": contact_numero, + "contact_nom": nom_complet, + "client_intitule": client.CT_Intitule, + "est_defaut": True, + "date_modification": datetime.now().isoformat() + } + + except ValueError as e: + logger.error(f"[ERREUR VALIDATION] {e}") + raise + except Exception as e: + logger.error(f"[ERREUR] {e}", exc_info=True) + raise RuntimeError(f"Erreur technique: {e}") + + + def _contact_to_dict(self, contact) -> Dict: + """Convertit un objet COM Contact en dictionnaire""" + try: + civilite_code = getattr(contact, "CT_Civilite", None) + civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"} + civilite = civilite_map.get(civilite_code) if civilite_code is not None else None + + return { + "numero": self._safe_strip(getattr(contact, "CT_Num", None)), + "contact_numero": getattr(contact, "CT_No", None), + "n_contact": getattr(contact, "N_Contact", None), + "civilite": civilite, + "nom": self._safe_strip(getattr(contact, "CT_Nom", None)), + "prenom": self._safe_strip(getattr(contact, "CT_Prenom", None)), + "fonction": self._safe_strip(getattr(contact, "CT_Fonction", None)), + "service_code": getattr(contact, "N_Service", None), + "telephone": self._safe_strip(getattr(contact, "CT_Telephone", None)), + "portable": self._safe_strip(getattr(contact, "CT_TelPortable", None)), + "telecopie": self._safe_strip(getattr(contact, "CT_Telecopie", None)), + "email": self._safe_strip(getattr(contact, "CT_EMail", None)), + "facebook": self._safe_strip(getattr(contact, "CT_Facebook", None)), + "linkedin": self._safe_strip(getattr(contact, "CT_LinkedIn", None)), + "skype": self._safe_strip(getattr(contact, "CT_Skype", None)), + } + except Exception as e: + logger.warning(f"Erreur conversion contact: {e}") + return {} + + + def _row_to_contact_dict(self, row) -> Dict: + """Convertit une ligne SQL en dictionnaire contact""" + civilite_code = row.CT_Civilite + civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"} + + return { + "numero": self._safe_strip(row.CT_Num), + "contact_numero": row.CT_No, + "n_contact": row.N_Contact, + "civilite": civilite_map.get(civilite_code) if civilite_code is not None else None, + "nom": self._safe_strip(row.CT_Nom), + "prenom": self._safe_strip(row.CT_Prenom), + "fonction": self._safe_strip(row.CT_Fonction), + "service_code": row.N_Service, + "telephone": self._safe_strip(row.CT_Telephone), + "portable": self._safe_strip(row.CT_TelPortable), + "telecopie": self._safe_strip(row.CT_Telecopie), + "email": self._safe_strip(row.CT_EMail), + "facebook": self._safe_strip(row.CT_Facebook), + "linkedin": self._safe_strip(row.CT_LinkedIn), + "skype": self._safe_strip(row.CT_Skype), + } + + + def _clean_str(self, value, max_len: int) -> str: + """Nettoie et tronque une chaîne""" + if value is None or str(value).lower() in ('none', 'null', ''): + return "" + return str(value)[:max_len].strip() + + + def _safe_int(self, value, default=None): + """Conversion sécurisée en entier""" + if value is None: + return default + try: + return int(value) + except (ValueError, TypeError): + return default + + + def _try_set_attribute(self, obj, attr_name, value, variants=None): + """Essaie de définir un attribut avec plusieurs variantes""" + if variants is None: + variants = [attr_name] + else: + variants = [attr_name] + variants + + for variant in variants: + try: + if hasattr(obj, variant): + setattr(obj, variant, value) + return True + except Exception as e: + logger.debug(f" {variant} échec: {str(e)[:50]}") + + return False + + def creer_client(self, client_data: Dict) -> Dict: """ Creation client Sage - Version corrigée pour erreur cohérence