Added contact handling
This commit is contained in:
parent
d9506337ff
commit
08686a7b2f
2 changed files with 961 additions and 44 deletions
116
main.py
116
main.py
|
|
@ -857,6 +857,46 @@ class FamilleCreate(BaseModel):
|
|||
)
|
||||
|
||||
|
||||
class ContactCreateRequest(BaseModel):
|
||||
"""Requête de création de contact"""
|
||||
numero: str
|
||||
civilite: Optional[str] = None
|
||||
nom: str
|
||||
prenom: Optional[str] = None
|
||||
fonction: Optional[str] = None
|
||||
service_code: Optional[int] = None
|
||||
telephone: Optional[str] = None
|
||||
portable: Optional[str] = None
|
||||
telecopie: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
facebook: Optional[str] = None
|
||||
linkedin: Optional[str] = None
|
||||
skype: Optional[str] = None
|
||||
|
||||
|
||||
class ContactListRequest(BaseModel):
|
||||
"""Requête de liste des contacts"""
|
||||
numero: str
|
||||
|
||||
|
||||
class ContactGetRequest(BaseModel):
|
||||
"""Requête de récupération d'un contact"""
|
||||
numero: str
|
||||
contact_numero: int
|
||||
|
||||
|
||||
class ContactUpdateRequest(BaseModel):
|
||||
"""Requête de modification d'un contact"""
|
||||
numero: str
|
||||
contact_numero: int
|
||||
updates: Dict
|
||||
|
||||
|
||||
class ContactDeleteRequest(BaseModel):
|
||||
"""Requête de suppression d'un contact"""
|
||||
numero: str
|
||||
contact_numero: int
|
||||
|
||||
def verify_token(x_sage_token: str = Header(...)):
|
||||
"""Vérification du token d'authentification"""
|
||||
if x_sage_token != settings.sage_gateway_token:
|
||||
|
|
@ -2044,6 +2084,82 @@ def lire_mouvement_stock(numero: str):
|
|||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/create", dependencies=[Depends(verify_token)])
|
||||
def contacts_create(req: ContactCreateRequest):
|
||||
"""Crée un nouveau contact"""
|
||||
try:
|
||||
contact = sage.creer_contact(req.dict())
|
||||
return {"success": True, "data": contact}
|
||||
except ValueError as e:
|
||||
logger.error(f"Erreur validation contact: {e}")
|
||||
raise HTTPException(400, str(e))
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur création contact: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/list", dependencies=[Depends(verify_token)])
|
||||
def contacts_list(req: ContactListRequest):
|
||||
"""Liste les contacts d'un client"""
|
||||
try:
|
||||
contacts = sage.lister_contacts(req.numero)
|
||||
return {"success": True, "data": contacts}
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur liste contacts: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/get", dependencies=[Depends(verify_token)])
|
||||
def contacts_get(req: ContactGetRequest):
|
||||
"""Récupère un contact spécifique"""
|
||||
try:
|
||||
contact = sage.obtenir_contact(req.numero, req.contact_numero)
|
||||
if not contact:
|
||||
raise HTTPException(404, f"Contact {req.contact_numero} non trouvé")
|
||||
return {"success": True, "data": contact}
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur récupération contact: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/update", dependencies=[Depends(verify_token)])
|
||||
def contacts_update(req: ContactUpdateRequest):
|
||||
"""Modifie un contact existant"""
|
||||
try:
|
||||
contact = sage.modifier_contact(req.numero, req.contact_numero, req.updates)
|
||||
return {"success": True, "data": contact}
|
||||
except ValueError as e:
|
||||
logger.error(f"Erreur validation contact: {e}")
|
||||
raise HTTPException(400, str(e))
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur modification contact: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/delete", dependencies=[Depends(verify_token)])
|
||||
def contacts_delete(req: ContactDeleteRequest):
|
||||
"""Supprime un contact"""
|
||||
try:
|
||||
result = sage.supprimer_contact(req.numero, req.contact_numero)
|
||||
return {"success": True, "data": result}
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur suppression contact: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
@app.post("/sage/contacts/set-default", dependencies=[Depends(verify_token)])
|
||||
def contacts_set_default(req: ContactGetRequest):
|
||||
"""Définit un contact comme contact par défaut"""
|
||||
try:
|
||||
result = sage.definir_contact_defaut(req.numero, req.contact_numero)
|
||||
return {"success": True, "data": result}
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur définition contact par défaut: {e}")
|
||||
raise HTTPException(500, str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(
|
||||
"main:app",
|
||||
|
|
|
|||
|
|
@ -74,13 +74,13 @@ class SageConnector:
|
|||
if conn:
|
||||
conn.close()
|
||||
|
||||
def _safe_strip(self, value):
|
||||
"""Strip sécurisé pour valeurs SQL"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value.strip()
|
||||
return value
|
||||
# def _safe_strip(self, value):
|
||||
# """Strip sécurisé pour valeurs SQL"""
|
||||
# if value is None:
|
||||
# return None
|
||||
# if isinstance(value, str):
|
||||
# return value.strip()
|
||||
# return value
|
||||
|
||||
def _cleanup_com_thread(self):
|
||||
"""Nettoie COM pour le thread actuel (à appeler à la fin)"""
|
||||
|
|
@ -915,70 +915,63 @@ class SageConnector:
|
|||
raise RuntimeError(f"Erreur technique Sage: {error_message}")
|
||||
|
||||
|
||||
def _get_contacts_client(self, ct_num: str, conn) -> list:
|
||||
def _get_contacts_client(self, numero: str, conn) -> list:
|
||||
"""
|
||||
Récupère tous les contacts d'un client avec TOUS les champs
|
||||
Récupère tous les contacts d'un client avec indication du contact par défaut
|
||||
"""
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = """
|
||||
SELECT
|
||||
-- IDENTIFICATION
|
||||
CT_Num, CT_No, N_Contact,
|
||||
|
||||
-- IDENTITÉ
|
||||
CT_Civilite, CT_Nom, CT_Prenom, CT_Fonction,
|
||||
|
||||
-- ORGANISATION
|
||||
N_Service,
|
||||
|
||||
-- COORDONNÉES
|
||||
CT_Telephone, CT_TelPortable, CT_Telecopie, CT_EMail,
|
||||
|
||||
-- RÉSEAUX SOCIAUX
|
||||
CT_Facebook, CT_LinkedIn, CT_Skype
|
||||
|
||||
FROM F_CONTACTT
|
||||
WHERE CT_Num = ?
|
||||
ORDER BY N_Contact, CT_Nom, CT_Prenom
|
||||
"""
|
||||
|
||||
cursor.execute(query, [ct_num])
|
||||
cursor.execute(query, [numero])
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Récupérer le contact par défaut du client
|
||||
query_client = """
|
||||
SELECT CT_Contact
|
||||
FROM F_COMPTET
|
||||
WHERE CT_Num = ?
|
||||
"""
|
||||
cursor.execute(query_client, [numero])
|
||||
client_row = cursor.fetchone()
|
||||
|
||||
nom_contact_defaut = None
|
||||
if client_row:
|
||||
nom_contact_defaut = self._safe_strip(client_row.CT_Contact)
|
||||
|
||||
contacts = []
|
||||
for row in rows:
|
||||
contact = {
|
||||
"ct_num": self._safe_strip(row.CT_Num),
|
||||
"ct_no": row.CT_No,
|
||||
"n_contact": row.N_Contact,
|
||||
|
||||
"civilite": self._safe_strip(row.CT_Civilite),
|
||||
"nom": self._safe_strip(row.CT_Nom),
|
||||
"prenom": self._safe_strip(row.CT_Prenom),
|
||||
"fonction": self._safe_strip(row.CT_Fonction),
|
||||
|
||||
"service_code": row.N_Service,
|
||||
|
||||
"telephone": self._safe_strip(row.CT_Telephone),
|
||||
"portable": self._safe_strip(row.CT_TelPortable),
|
||||
"telecopie": self._safe_strip(row.CT_Telecopie),
|
||||
"email": self._safe_strip(row.CT_EMail),
|
||||
|
||||
"facebook": self._safe_strip(row.CT_Facebook),
|
||||
"linkedin": self._safe_strip(row.CT_LinkedIn),
|
||||
"skype": self._safe_strip(row.CT_Skype)
|
||||
}
|
||||
contact = self._row_to_contact_dict(row)
|
||||
|
||||
# Vérifier si c'est le contact par défaut
|
||||
if nom_contact_defaut:
|
||||
nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip()
|
||||
contact["est_defaut"] = (
|
||||
nom_complet == nom_contact_defaut or
|
||||
contact['nom'] == nom_contact_defaut
|
||||
)
|
||||
else:
|
||||
contact["est_defaut"] = False
|
||||
|
||||
contacts.append(contact)
|
||||
|
||||
return contacts
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Impossible de récupérer contacts pour {ct_num}: {e}")
|
||||
logger.warning(f"⚠️ Impossible de récupérer contacts pour {numero}: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def lister_tous_clients(self, filtre=""):
|
||||
"""
|
||||
Liste tous les clients avec TOUS les champs gérés par creer_client
|
||||
|
|
@ -1314,7 +1307,6 @@ class SageConnector:
|
|||
return None
|
||||
|
||||
|
||||
|
||||
def lister_tous_articles(self, filtre=""):
|
||||
try:
|
||||
with self._get_sql_connection() as conn:
|
||||
|
|
@ -6670,6 +6662,815 @@ class SageConnector:
|
|||
""" Lit UNE livraison via SQL (avec lignes)"""
|
||||
return self._lire_document_sql(numero, type_doc=30)
|
||||
|
||||
|
||||
|
||||
def creer_contact(self, contact_data: Dict) -> Dict:
|
||||
"""
|
||||
Crée un nouveau contact pour un client via COM
|
||||
VERSION COMPLÈTE avec gestion du contact par défaut
|
||||
|
||||
Champs Sage Contact (F_CONTACTT):
|
||||
- CT_Num: Code client parent (obligatoire)
|
||||
- CT_Nom: Nom (obligatoire, max 35 car)
|
||||
- CT_Prenom: Prénom (max 35 car)
|
||||
- CT_Civilite: 0=M., 1=Mme, 2=Mlle, 3=Société
|
||||
- CT_Fonction: Fonction (max 35 car)
|
||||
- N_Service: Code service (int)
|
||||
- CT_Telephone: Tél fixe (max 21 car)
|
||||
- CT_TelPortable: Mobile (max 21 car)
|
||||
- CT_Telecopie: Fax (max 21 car)
|
||||
- CT_EMail: Email (max 69 car)
|
||||
- CT_Facebook: Facebook (max 69 car)
|
||||
- CT_LinkedIn: LinkedIn (max 69 car)
|
||||
- CT_Skype: Skype (max 69 car)
|
||||
|
||||
Nouveau:
|
||||
- est_defaut: Définir comme contact par défaut (boolean)
|
||||
"""
|
||||
if not self.cial:
|
||||
raise RuntimeError("Connexion Sage non établie")
|
||||
|
||||
try:
|
||||
with self._com_context(), self._lock_com:
|
||||
logger.info("=" * 80)
|
||||
logger.info("[CRÉATION CONTACT SAGE]")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Validation des champs obligatoires
|
||||
if not contact_data.get("numero"):
|
||||
raise ValueError("numero (code client) obligatoire")
|
||||
|
||||
if not contact_data.get("nom"):
|
||||
raise ValueError("nom obligatoire")
|
||||
|
||||
numero = self._clean_str(contact_data["numero"], 17).upper()
|
||||
nom = self._clean_str(contact_data["nom"], 35)
|
||||
|
||||
# Vérifier que le client existe
|
||||
logger.info(f"[1] Vérification client: {numero}")
|
||||
factory_client = self.cial.CptaApplication.FactoryClient
|
||||
try:
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
if not persist_client:
|
||||
raise ValueError(f"Client {numero} non trouvé")
|
||||
logger.info(f" ✓ Client {numero} existe")
|
||||
except Exception as e:
|
||||
raise ValueError(f"Client {numero} introuvable: {e}")
|
||||
|
||||
# Créer l'objet contact
|
||||
logger.info("[2] Création objet Contact")
|
||||
factory_contact = self.cial.CptaApplication.FactoryContactT
|
||||
persist = factory_contact.Create()
|
||||
contact = win32com.client.CastTo(persist, "IBOContactT3")
|
||||
logger.info(" ✓ Objet IBOContactT3 créé")
|
||||
|
||||
# Configuration obligatoire
|
||||
logger.info("[3] Configuration obligatoire")
|
||||
contact.CT_Num = numero
|
||||
contact.CT_Nom = nom
|
||||
logger.info(f" CT_Num = {numero}")
|
||||
logger.info(f" CT_Nom = {nom}")
|
||||
|
||||
# SetDefault pour initialiser les valeurs par défaut
|
||||
contact.SetDefault()
|
||||
logger.info(" ✓ SetDefault() appliqué")
|
||||
|
||||
# Civilité
|
||||
logger.info("[4] Identité")
|
||||
civilite_input = contact_data.get("civilite")
|
||||
if civilite_input:
|
||||
civilite_map = {
|
||||
"M.": 0,
|
||||
"Mme": 1,
|
||||
"Mlle": 2,
|
||||
"Société": 3
|
||||
}
|
||||
civilite_code = civilite_map.get(civilite_input)
|
||||
if civilite_code is not None:
|
||||
self._try_set_attribute(contact, "CT_Civilite", civilite_code)
|
||||
logger.info(f" CT_Civilite = {civilite_code} ({civilite_input})")
|
||||
|
||||
# Prénom
|
||||
if contact_data.get("prenom"):
|
||||
prenom = self._clean_str(contact_data["prenom"], 35)
|
||||
self._try_set_attribute(contact, "CT_Prenom", prenom)
|
||||
logger.info(f" CT_Prenom = {prenom}")
|
||||
|
||||
# Fonction
|
||||
if contact_data.get("fonction"):
|
||||
fonction = self._clean_str(contact_data["fonction"], 35)
|
||||
self._try_set_attribute(contact, "CT_Fonction", fonction)
|
||||
logger.info(f" CT_Fonction = {fonction}")
|
||||
|
||||
# Service
|
||||
logger.info("[5] Organisation")
|
||||
if contact_data.get("service_code") is not None:
|
||||
service = self._safe_int(contact_data["service_code"])
|
||||
if service is not None:
|
||||
self._try_set_attribute(contact, "N_Service", service)
|
||||
logger.info(f" N_Service = {service}")
|
||||
|
||||
# Coordonnées
|
||||
logger.info("[6] Coordonnées")
|
||||
|
||||
if contact_data.get("telephone"):
|
||||
telephone = self._clean_str(contact_data["telephone"], 21)
|
||||
self._try_set_attribute(contact, "CT_Telephone", telephone)
|
||||
logger.info(f" CT_Telephone = {telephone}")
|
||||
|
||||
if contact_data.get("portable"):
|
||||
portable = self._clean_str(contact_data["portable"], 21)
|
||||
self._try_set_attribute(contact, "CT_TelPortable", portable)
|
||||
logger.info(f" CT_TelPortable = {portable}")
|
||||
|
||||
if contact_data.get("telecopie"):
|
||||
fax = self._clean_str(contact_data["telecopie"], 21)
|
||||
self._try_set_attribute(contact, "CT_Telecopie", fax)
|
||||
logger.info(f" CT_Telecopie = {fax}")
|
||||
|
||||
if contact_data.get("email"):
|
||||
email = self._clean_str(contact_data["email"], 69)
|
||||
self._try_set_attribute(contact, "CT_EMail", email)
|
||||
logger.info(f" CT_EMail = {email}")
|
||||
|
||||
# Réseaux sociaux
|
||||
logger.info("[7] Réseaux sociaux")
|
||||
|
||||
if contact_data.get("facebook"):
|
||||
facebook = self._clean_str(contact_data["facebook"], 69)
|
||||
self._try_set_attribute(contact, "CT_Facebook", facebook)
|
||||
logger.info(f" CT_Facebook = {facebook}")
|
||||
|
||||
if contact_data.get("linkedin"):
|
||||
linkedin = self._clean_str(contact_data["linkedin"], 69)
|
||||
self._try_set_attribute(contact, "CT_LinkedIn", linkedin)
|
||||
logger.info(f" CT_LinkedIn = {linkedin}")
|
||||
|
||||
if contact_data.get("skype"):
|
||||
skype = self._clean_str(contact_data["skype"], 69)
|
||||
self._try_set_attribute(contact, "CT_Skype", skype)
|
||||
logger.info(f" CT_Skype = {skype}")
|
||||
|
||||
# Enregistrement du contact
|
||||
logger.info("[8] WRITE CONTACT")
|
||||
try:
|
||||
contact.Write()
|
||||
contact.Read()
|
||||
logger.info(" ✓ Write() réussi")
|
||||
except Exception as e:
|
||||
error_detail = str(e)
|
||||
try:
|
||||
sage_error = self.cial.CptaApplication.LastError
|
||||
if sage_error:
|
||||
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
|
||||
except:
|
||||
pass
|
||||
logger.error(f" ✗ Erreur Write: {error_detail}")
|
||||
raise RuntimeError(f"Échec création contact: {error_detail}")
|
||||
|
||||
# Récupération des données finales
|
||||
contact_numero = getattr(contact, "CT_No", None)
|
||||
n_contact = getattr(contact, "N_Contact", None)
|
||||
|
||||
logger.info(f" Contact créé: CT_No={contact_numero}, N_Contact={n_contact}")
|
||||
|
||||
# Gestion du contact par défaut
|
||||
est_defaut = contact_data.get("est_defaut", False)
|
||||
|
||||
if est_defaut:
|
||||
logger.info("[9] Définition comme contact par défaut")
|
||||
try:
|
||||
if contact_numero:
|
||||
# Construire le nom complet pour CT_Contact
|
||||
prenom = self._clean_str(contact_data.get("prenom", ""), 35)
|
||||
nom_complet = f"{prenom} {nom}".strip() if prenom else nom
|
||||
|
||||
# Charger le client pour mise à jour
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
client_obj = win32com.client.CastTo(persist_client, "IBOClient3")
|
||||
client_obj.Read()
|
||||
|
||||
# Définir CT_Contact
|
||||
ancien_contact = getattr(client_obj, "CT_Contact", "")
|
||||
client_obj.CT_Contact = nom_complet
|
||||
logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'")
|
||||
|
||||
# Essayer CT_NoContact si disponible
|
||||
if self._try_set_attribute(client_obj, "CT_NoContact", contact_numero):
|
||||
logger.info(f" CT_NoContact = {contact_numero}")
|
||||
|
||||
# Enregistrer le client
|
||||
client_obj.Write()
|
||||
client_obj.Read()
|
||||
logger.info(f" ✓ Contact défini comme par défaut")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f" ⚠ Échec définition par défaut: {e}")
|
||||
# On ne fait pas échouer la création pour autant
|
||||
est_defaut = False # On indique que ça n'a pas marché
|
||||
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[SUCCÈS] Contact créé: CT_No={contact_numero}, N_Contact={n_contact}")
|
||||
if est_defaut:
|
||||
logger.info(f" Défini comme contact par défaut")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Retourner les données complètes
|
||||
contact_dict = self._contact_to_dict(contact)
|
||||
contact_dict["est_defaut"] = est_defaut
|
||||
|
||||
return contact_dict
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"[ERREUR VALIDATION] {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"[ERREUR] {e}", exc_info=True)
|
||||
raise RuntimeError(f"Erreur technique: {e}")
|
||||
|
||||
|
||||
def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict:
|
||||
"""
|
||||
Modifie un contact existant via COM
|
||||
VERSION COMPLÈTE avec gestion du contact par défaut
|
||||
"""
|
||||
if not self.cial:
|
||||
raise RuntimeError("Connexion Sage non établie")
|
||||
|
||||
try:
|
||||
with self._com_context(), self._lock_com:
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[MODIFICATION CONTACT] CT_No={contact_numero}")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Lire le contact existant
|
||||
logger.info("[1] Lecture contact existant")
|
||||
factory_contact = self.cial.CptaApplication.FactoryContactT
|
||||
|
||||
try:
|
||||
persist = factory_contact.ReadNumero(numero, contact_numero)
|
||||
if not persist:
|
||||
raise ValueError(f"Contact CT_No={contact_numero} non trouvé pour client {numero}")
|
||||
|
||||
contact = win32com.client.CastTo(persist, "IBOContactT3")
|
||||
contact.Read()
|
||||
logger.info(f" ✓ Contact chargé: {contact.CT_Nom}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Contact introuvable: {e}")
|
||||
|
||||
# Appliquer les modifications
|
||||
logger.info("[2] Application des modifications")
|
||||
modifications_appliquees = []
|
||||
|
||||
# Identité
|
||||
if "civilite" in updates:
|
||||
civilite_map = {"M.": 0, "Mme": 1, "Mlle": 2, "Société": 3}
|
||||
civilite_code = civilite_map.get(updates["civilite"])
|
||||
if civilite_code is not None:
|
||||
self._try_set_attribute(contact, "CT_Civilite", civilite_code)
|
||||
logger.info(f" CT_Civilite = {civilite_code}")
|
||||
modifications_appliquees.append("civilite")
|
||||
|
||||
if "nom" in updates:
|
||||
nom = self._clean_str(updates["nom"], 35)
|
||||
if nom: # Ne pas autoriser nom vide
|
||||
self._try_set_attribute(contact, "CT_Nom", nom)
|
||||
logger.info(f" CT_Nom = {nom}")
|
||||
modifications_appliquees.append("nom")
|
||||
|
||||
if "prenom" in updates:
|
||||
prenom = self._clean_str(updates["prenom"], 35)
|
||||
self._try_set_attribute(contact, "CT_Prenom", prenom)
|
||||
logger.info(f" CT_Prenom = {prenom}")
|
||||
modifications_appliquees.append("prenom")
|
||||
|
||||
if "fonction" in updates:
|
||||
fonction = self._clean_str(updates["fonction"], 35)
|
||||
self._try_set_attribute(contact, "CT_Fonction", fonction)
|
||||
logger.info(f" CT_Fonction = {fonction}")
|
||||
modifications_appliquees.append("fonction")
|
||||
|
||||
# Service
|
||||
if "service_code" in updates:
|
||||
service = self._safe_int(updates["service_code"])
|
||||
if service is not None:
|
||||
self._try_set_attribute(contact, "N_Service", service)
|
||||
logger.info(f" N_Service = {service}")
|
||||
modifications_appliquees.append("service_code")
|
||||
|
||||
# Coordonnées
|
||||
coord_fields = {
|
||||
"telephone": ("CT_Telephone", 21),
|
||||
"portable": ("CT_TelPortable", 21),
|
||||
"telecopie": ("CT_Telecopie", 21),
|
||||
"email": ("CT_EMail", 69),
|
||||
}
|
||||
|
||||
for key, (attr, max_len) in coord_fields.items():
|
||||
if key in updates:
|
||||
value = self._clean_str(updates[key], max_len)
|
||||
self._try_set_attribute(contact, attr, value)
|
||||
logger.info(f" {attr} = {value}")
|
||||
modifications_appliquees.append(key)
|
||||
|
||||
# Réseaux sociaux
|
||||
social_fields = {
|
||||
"facebook": ("CT_Facebook", 69),
|
||||
"linkedin": ("CT_LinkedIn", 69),
|
||||
"skype": ("CT_Skype", 69),
|
||||
}
|
||||
|
||||
for key, (attr, max_len) in social_fields.items():
|
||||
if key in updates:
|
||||
value = self._clean_str(updates[key], max_len)
|
||||
self._try_set_attribute(contact, attr, value)
|
||||
logger.info(f" {attr} = {value}")
|
||||
modifications_appliquees.append(key)
|
||||
|
||||
# Enregistrement du contact
|
||||
logger.info("[3] WRITE CONTACT")
|
||||
try:
|
||||
contact.Write()
|
||||
contact.Read()
|
||||
logger.info(" ✓ Write() réussi")
|
||||
except Exception as e:
|
||||
error_detail = str(e)
|
||||
try:
|
||||
sage_error = self.cial.CptaApplication.LastError
|
||||
if sage_error:
|
||||
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
|
||||
except:
|
||||
pass
|
||||
logger.error(f" ✗ Erreur Write: {error_detail}")
|
||||
raise RuntimeError(f"Échec modification contact: {error_detail}")
|
||||
|
||||
logger.info(f" Modifications appliquées: {', '.join(modifications_appliquees)}")
|
||||
|
||||
# Gestion du contact par défaut
|
||||
est_defaut_demande = updates.get("est_defaut")
|
||||
est_actuellement_defaut = False
|
||||
|
||||
if est_defaut_demande is not None:
|
||||
logger.info("[4] Gestion contact par défaut")
|
||||
|
||||
if est_defaut_demande:
|
||||
# Définir comme contact par défaut
|
||||
try:
|
||||
# Construire le nom complet
|
||||
nom_final = getattr(contact, "CT_Nom", "")
|
||||
prenom_final = getattr(contact, "CT_Prenom", "")
|
||||
nom_complet = f"{prenom_final} {nom_final}".strip() if prenom_final else nom_final
|
||||
|
||||
# Charger le client
|
||||
factory_client = self.cial.CptaApplication.FactoryClient
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
client_obj = win32com.client.CastTo(persist_client, "IBOClient3")
|
||||
client_obj.Read()
|
||||
|
||||
# Mettre à jour CT_Contact
|
||||
ancien_contact = getattr(client_obj, "CT_Contact", "")
|
||||
client_obj.CT_Contact = nom_complet
|
||||
logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'")
|
||||
|
||||
# Essayer CT_NoContact si disponible
|
||||
if self._try_set_attribute(client_obj, "CT_NoContact", contact_numero):
|
||||
logger.info(f" CT_NoContact = {contact_numero}")
|
||||
|
||||
# Enregistrer le client
|
||||
client_obj.Write()
|
||||
client_obj.Read()
|
||||
logger.info(" ✓ Contact défini comme par défaut")
|
||||
est_actuellement_defaut = True
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f" ⚠ Échec définition par défaut: {e}")
|
||||
est_actuellement_defaut = False
|
||||
else:
|
||||
# Retirer le statut par défaut
|
||||
# Note: On ne fait rien car Sage gère via CT_Contact du client
|
||||
# Si on veut vraiment retirer, il faut définir un autre contact comme par défaut
|
||||
logger.info(" ⓘ Pour retirer le statut par défaut, définir un autre contact")
|
||||
else:
|
||||
# Vérifier si c'est déjà le contact par défaut
|
||||
try:
|
||||
factory_client = self.cial.CptaApplication.FactoryClient
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
client_obj = win32com.client.CastTo(persist_client, "IBOClient3")
|
||||
client_obj.Read()
|
||||
|
||||
ct_contact = getattr(client_obj, "CT_Contact", "")
|
||||
nom_final = getattr(contact, "CT_Nom", "")
|
||||
prenom_final = getattr(contact, "CT_Prenom", "")
|
||||
nom_complet = f"{prenom_final} {nom_final}".strip() if prenom_final else nom_final
|
||||
|
||||
est_actuellement_defaut = (ct_contact == nom_complet)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Impossible de vérifier statut par défaut: {e}")
|
||||
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[SUCCÈS] Contact modifié: CT_No={contact_numero}")
|
||||
if est_actuellement_defaut:
|
||||
logger.info(f" Contact par défaut")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Retourner les données mises à jour
|
||||
contact_dict = self._contact_to_dict(contact)
|
||||
contact_dict["est_defaut"] = est_actuellement_defaut
|
||||
|
||||
return contact_dict
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"[ERREUR VALIDATION] {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"[ERREUR] {e}", exc_info=True)
|
||||
raise RuntimeError(f"Erreur technique: {e}")
|
||||
|
||||
|
||||
def lister_contacts(self, numero: str) -> List[Dict]:
|
||||
"""
|
||||
Liste tous les contacts d'un client
|
||||
"""
|
||||
try:
|
||||
with self._get_sql_connection() as conn:
|
||||
return self._get_contacts_client(numero, conn)
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur liste contacts: {e}")
|
||||
raise RuntimeError(f"Erreur lecture contacts: {str(e)}")
|
||||
|
||||
|
||||
def obtenir_contact(self, numero: str, contact_numero: int) -> Optional[Dict]:
|
||||
"""
|
||||
Récupère un contact spécifique par son CT_No
|
||||
"""
|
||||
try:
|
||||
with self._get_sql_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = """
|
||||
SELECT
|
||||
CT_Num, CT_No, N_Contact,
|
||||
CT_Civilite, CT_Nom, CT_Prenom, CT_Fonction,
|
||||
N_Service,
|
||||
CT_Telephone, CT_TelPortable, CT_Telecopie, CT_EMail,
|
||||
CT_Facebook, CT_LinkedIn, CT_Skype
|
||||
FROM F_CONTACTT
|
||||
WHERE CT_Num = ? AND CT_No = ?
|
||||
"""
|
||||
|
||||
cursor.execute(query, [numero, contact_numero])
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
return self._row_to_contact_dict(row)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur obtention contact: {e}")
|
||||
raise RuntimeError(f"Erreur lecture contact: {str(e)}")
|
||||
|
||||
|
||||
def obtenir_contact_defaut(self, numero: str) -> Optional[Dict]:
|
||||
"""
|
||||
Récupère le contact par défaut d'un client
|
||||
|
||||
Returns:
|
||||
Dictionnaire avec les infos du contact par défaut, ou None si non défini
|
||||
"""
|
||||
if not self.cial:
|
||||
raise RuntimeError("Connexion Sage non établie")
|
||||
|
||||
try:
|
||||
with self._com_context(), self._lock_com:
|
||||
# Charger le client
|
||||
factory_client = self.cial.CptaApplication.FactoryClient
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
|
||||
if not persist_client:
|
||||
raise ValueError(f"Client {numero} non trouvé")
|
||||
|
||||
client = win32com.client.CastTo(persist_client, "IBOClient3")
|
||||
client.Read()
|
||||
|
||||
# Méthode 1: Via CT_NoContact (si disponible)
|
||||
ct_no_defaut = None
|
||||
try:
|
||||
ct_no_defaut = getattr(client, "CT_NoContact", None)
|
||||
if ct_no_defaut:
|
||||
logger.info(f"Contact par défaut via CT_NoContact: {ct_no_defaut}")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Méthode 2: Via CT_Contact (nom)
|
||||
nom_contact_defaut = None
|
||||
try:
|
||||
nom_contact_defaut = getattr(client, "CT_Contact", None)
|
||||
if nom_contact_defaut:
|
||||
logger.info(f"Contact par défaut via CT_Contact: {nom_contact_defaut}")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Si on a le CT_No, on retourne le contact complet
|
||||
if ct_no_defaut:
|
||||
return self.obtenir_contact(numero, ct_no_defaut)
|
||||
|
||||
# Sinon, chercher par nom dans la liste des contacts
|
||||
if nom_contact_defaut:
|
||||
contacts = self.lister_contacts(numero)
|
||||
for contact in contacts:
|
||||
nom_complet = f"{contact.get('prenom', '')} {contact['nom']}".strip()
|
||||
if nom_complet == nom_contact_defaut or contact['nom'] == nom_contact_defaut:
|
||||
return {**contact, "est_defaut": True}
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Erreur obtention contact par défaut: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def supprimer_contact(self, numero: str, contact_numero: int) -> Dict:
|
||||
"""
|
||||
Supprime un contact via COM
|
||||
"""
|
||||
if not self.cial:
|
||||
raise RuntimeError("Connexion Sage non établie")
|
||||
|
||||
try:
|
||||
with self._com_context(), self._lock_com:
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[SUPPRESSION CONTACT] CT_No={contact_numero}")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Lire le contact
|
||||
factory_contact = self.cial.CptaApplication.FactoryContactT
|
||||
|
||||
try:
|
||||
persist = factory_contact.ReadNumero(numero, contact_numero)
|
||||
if not persist:
|
||||
raise ValueError(f"Contact CT_No={contact_numero} non trouvé")
|
||||
|
||||
contact = win32com.client.CastTo(persist, "IBOContactT3")
|
||||
contact.Read()
|
||||
nom_contact = contact.CT_Nom
|
||||
logger.info(f" ✓ Contact trouvé: {nom_contact}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Contact introuvable: {e}")
|
||||
|
||||
# Supprimer
|
||||
logger.info("[SUPPRESSION]")
|
||||
try:
|
||||
contact.Remove()
|
||||
logger.info(" ✓ Remove() réussi")
|
||||
except Exception as e:
|
||||
error_detail = str(e)
|
||||
try:
|
||||
sage_error = self.cial.CptaApplication.LastError
|
||||
if sage_error:
|
||||
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
|
||||
except:
|
||||
pass
|
||||
logger.error(f" ✗ Erreur Remove: {error_detail}")
|
||||
raise RuntimeError(f"Échec suppression contact: {error_detail}")
|
||||
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[SUCCÈS] Contact supprimé: {nom_contact}")
|
||||
logger.info("=" * 80)
|
||||
|
||||
return {
|
||||
"numero": numero,
|
||||
"contact_numero": contact_numero,
|
||||
"nom": nom_contact,
|
||||
"supprime": True,
|
||||
"date_suppression": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"[ERREUR VALIDATION] {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"[ERREUR] {e}", exc_info=True)
|
||||
raise RuntimeError(f"Erreur technique: {e}")
|
||||
|
||||
|
||||
def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict:
|
||||
"""
|
||||
Définit un contact comme contact par défaut du client
|
||||
|
||||
Sage gère le contact par défaut via :
|
||||
1. CT_Contact dans F_COMPTET : Nom du contact principal (35 car)
|
||||
2. CT_NoContact dans F_COMPTET : Numéro du contact par défaut (si disponible)
|
||||
|
||||
Cette méthode :
|
||||
- Vérifie que le contact existe
|
||||
- Met à jour le client (F_COMPTET) pour référencer ce contact
|
||||
- Retourne les infos du client et contact mis à jour
|
||||
"""
|
||||
if not self.cial:
|
||||
raise RuntimeError("Connexion Sage non établie")
|
||||
|
||||
try:
|
||||
with self._com_context(), self._lock_com:
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[DÉFINIR CONTACT PAR DÉFAUT] Client={numero}, Contact={contact_numero}")
|
||||
logger.info("=" * 80)
|
||||
|
||||
# Étape 1: Charger le contact
|
||||
logger.info("[1] Chargement du contact")
|
||||
factory_contact = self.cial.CptaApplication.FactoryContactT
|
||||
|
||||
try:
|
||||
persist_contact = factory_contact.ReadNumero(numero, contact_numero)
|
||||
if not persist_contact:
|
||||
raise ValueError(f"Contact CT_No={contact_numero} non trouvé pour client {numero}")
|
||||
|
||||
contact = win32com.client.CastTo(persist_contact, "IBOContactT3")
|
||||
contact.Read()
|
||||
|
||||
nom_contact = getattr(contact, "CT_Nom", "")
|
||||
prenom_contact = getattr(contact, "CT_Prenom", "")
|
||||
|
||||
# Construire le nom complet
|
||||
nom_complet = f"{prenom_contact} {nom_contact}".strip() if prenom_contact else nom_contact
|
||||
|
||||
logger.info(f" ✓ Contact trouvé: {nom_complet}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Contact introuvable: {e}")
|
||||
|
||||
# Étape 2: Charger le client
|
||||
logger.info("[2] Chargement du client")
|
||||
factory_client = self.cial.CptaApplication.FactoryClient
|
||||
|
||||
try:
|
||||
persist_client = factory_client.ReadNumero(numero)
|
||||
if not persist_client:
|
||||
raise ValueError(f"Client {numero} non trouvé")
|
||||
|
||||
client = win32com.client.CastTo(persist_client, "IBOClient3")
|
||||
client.Read()
|
||||
|
||||
logger.info(f" ✓ Client chargé: {client.CT_Intitule}")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Client introuvable: {e}")
|
||||
|
||||
# Étape 3: Définir le contact par défaut
|
||||
logger.info("[3] Définition du contact par défaut")
|
||||
|
||||
# Méthode 1: Via CT_Contact (nom du contact)
|
||||
try:
|
||||
ancien_contact = getattr(client, "CT_Contact", "")
|
||||
client.CT_Contact = nom_complet
|
||||
logger.info(f" CT_Contact: '{ancien_contact}' → '{nom_complet}'")
|
||||
except Exception as e:
|
||||
logger.warning(f" Impossible de définir CT_Contact: {e}")
|
||||
|
||||
# Méthode 2: Via CT_NoContact (numéro du contact) - si disponible
|
||||
# Ce champ n'existe pas dans toutes les versions de Sage
|
||||
if self._try_set_attribute(client, "CT_NoContact", contact_numero):
|
||||
logger.info(f" CT_NoContact = {contact_numero}")
|
||||
else:
|
||||
logger.info(f" CT_NoContact non disponible (normal dans certaines versions)")
|
||||
|
||||
# Méthode 3: Via l'objet Contact (relation) - si disponible
|
||||
try:
|
||||
if hasattr(client, "Contact") or hasattr(client, "ContactPrincipal"):
|
||||
client.Contact = contact
|
||||
logger.info(f" Contact (objet) défini")
|
||||
except Exception as e:
|
||||
logger.debug(f" Contact (objet) non disponible: {e}")
|
||||
|
||||
# Étape 4: Enregistrement
|
||||
logger.info("[4] WRITE")
|
||||
try:
|
||||
client.Write()
|
||||
client.Read()
|
||||
logger.info(" ✓ Client mis à jour")
|
||||
except Exception as e:
|
||||
error_detail = str(e)
|
||||
try:
|
||||
sage_error = self.cial.CptaApplication.LastError
|
||||
if sage_error:
|
||||
error_detail = f"{sage_error.Description} (Code: {sage_error.Number})"
|
||||
except:
|
||||
pass
|
||||
logger.error(f" ✗ Erreur Write: {error_detail}")
|
||||
raise RuntimeError(f"Échec mise à jour client: {error_detail}")
|
||||
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"[SUCCÈS] Contact par défaut défini: {nom_complet}")
|
||||
logger.info("=" * 80)
|
||||
|
||||
return {
|
||||
"numero": numero,
|
||||
"contact_numero": contact_numero,
|
||||
"contact_nom": nom_complet,
|
||||
"client_intitule": client.CT_Intitule,
|
||||
"est_defaut": True,
|
||||
"date_modification": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"[ERREUR VALIDATION] {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"[ERREUR] {e}", exc_info=True)
|
||||
raise RuntimeError(f"Erreur technique: {e}")
|
||||
|
||||
|
||||
def _contact_to_dict(self, contact) -> Dict:
|
||||
"""Convertit un objet COM Contact en dictionnaire"""
|
||||
try:
|
||||
civilite_code = getattr(contact, "CT_Civilite", None)
|
||||
civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"}
|
||||
civilite = civilite_map.get(civilite_code) if civilite_code is not None else None
|
||||
|
||||
return {
|
||||
"numero": self._safe_strip(getattr(contact, "CT_Num", None)),
|
||||
"contact_numero": getattr(contact, "CT_No", None),
|
||||
"n_contact": getattr(contact, "N_Contact", None),
|
||||
"civilite": civilite,
|
||||
"nom": self._safe_strip(getattr(contact, "CT_Nom", None)),
|
||||
"prenom": self._safe_strip(getattr(contact, "CT_Prenom", None)),
|
||||
"fonction": self._safe_strip(getattr(contact, "CT_Fonction", None)),
|
||||
"service_code": getattr(contact, "N_Service", None),
|
||||
"telephone": self._safe_strip(getattr(contact, "CT_Telephone", None)),
|
||||
"portable": self._safe_strip(getattr(contact, "CT_TelPortable", None)),
|
||||
"telecopie": self._safe_strip(getattr(contact, "CT_Telecopie", None)),
|
||||
"email": self._safe_strip(getattr(contact, "CT_EMail", None)),
|
||||
"facebook": self._safe_strip(getattr(contact, "CT_Facebook", None)),
|
||||
"linkedin": self._safe_strip(getattr(contact, "CT_LinkedIn", None)),
|
||||
"skype": self._safe_strip(getattr(contact, "CT_Skype", None)),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Erreur conversion contact: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
def _row_to_contact_dict(self, row) -> Dict:
|
||||
"""Convertit une ligne SQL en dictionnaire contact"""
|
||||
civilite_code = row.CT_Civilite
|
||||
civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"}
|
||||
|
||||
return {
|
||||
"numero": self._safe_strip(row.CT_Num),
|
||||
"contact_numero": row.CT_No,
|
||||
"n_contact": row.N_Contact,
|
||||
"civilite": civilite_map.get(civilite_code) if civilite_code is not None else None,
|
||||
"nom": self._safe_strip(row.CT_Nom),
|
||||
"prenom": self._safe_strip(row.CT_Prenom),
|
||||
"fonction": self._safe_strip(row.CT_Fonction),
|
||||
"service_code": row.N_Service,
|
||||
"telephone": self._safe_strip(row.CT_Telephone),
|
||||
"portable": self._safe_strip(row.CT_TelPortable),
|
||||
"telecopie": self._safe_strip(row.CT_Telecopie),
|
||||
"email": self._safe_strip(row.CT_EMail),
|
||||
"facebook": self._safe_strip(row.CT_Facebook),
|
||||
"linkedin": self._safe_strip(row.CT_LinkedIn),
|
||||
"skype": self._safe_strip(row.CT_Skype),
|
||||
}
|
||||
|
||||
|
||||
def _clean_str(self, value, max_len: int) -> str:
|
||||
"""Nettoie et tronque une chaîne"""
|
||||
if value is None or str(value).lower() in ('none', 'null', ''):
|
||||
return ""
|
||||
return str(value)[:max_len].strip()
|
||||
|
||||
|
||||
def _safe_int(self, value, default=None):
|
||||
"""Conversion sécurisée en entier"""
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
return int(value)
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
|
||||
def _try_set_attribute(self, obj, attr_name, value, variants=None):
|
||||
"""Essaie de définir un attribut avec plusieurs variantes"""
|
||||
if variants is None:
|
||||
variants = [attr_name]
|
||||
else:
|
||||
variants = [attr_name] + variants
|
||||
|
||||
for variant in variants:
|
||||
try:
|
||||
if hasattr(obj, variant):
|
||||
setattr(obj, variant, value)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug(f" {variant} échec: {str(e)[:50]}")
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def creer_client(self, client_data: Dict) -> Dict:
|
||||
"""
|
||||
Creation client Sage - Version corrigée pour erreur cohérence
|
||||
|
|
|
|||
Loading…
Reference in a new issue