Restrucred and reorganized files for more consistency

This commit is contained in:
fanilo 2025-12-29 13:32:40 +01:00
parent 6bb1253a1a
commit 2819578ca2
20 changed files with 3436 additions and 3355 deletions

View file

@ -6,7 +6,7 @@ def supprimer_commentaires_ligne(fichier: str) -> None:
lignes_filtrees = [] lignes_filtrees = []
with path.open("r", encoding="utf-8") as f: with path.open("r", encoding="utf-8") as f:
for ligne in f: for ligne in f:
if ligne.lstrip().startswith("#"): if ligne.lstrip().startswith("logger"):
continue continue
lignes_filtrees.append(ligne) lignes_filtrees.append(ligne)

View file

@ -44,7 +44,7 @@ settings = Settings()
def validate_settings(): def validate_settings():
"""Validation au démarrage""" """Validation au démarrage"""
if not settings.chemin_base or not settings.mot_de_passe: if not settings.chemin_base or not settings.mot_de_passe:
raise ValueError(" CHEMIN_BASE et MOT_DE_PASSE requis dans .env") raise ValueError(" CHEMIN_BASE et MOT_DE_PASSE requis dans .env")
if not settings.sage_gateway_token: if not settings.sage_gateway_token:
raise ValueError(" SAGE_GATEWAY_TOKEN requis (doit être identique sur Linux)") raise ValueError(" SAGE_GATEWAY_TOKEN requis (doit être identique sur Linux)")
return True return True

44
main.py
View file

@ -13,6 +13,7 @@ from config import settings, validate_settings
from sage_connector import SageConnector from sage_connector import SageConnector
import pyodbc import pyodbc
import os import os
from utils.tiers import TiersListRequest
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -496,7 +497,7 @@ class ClientCreateRequest(BaseModel):
def to_sage_dict(self) -> dict: def to_sage_dict(self) -> dict:
""" """
Convertit le modèle en dictionnaire compatible avec creer_client() Convertit le modèle en dictionnaire compatible avec creer_client()
Mapping 1:1 avec les paramètres réels de la fonction Mapping 1:1 avec les paramètres réels de la fonction
""" """
stat01 = self.statistique01 or self.secteur stat01 = self.statistique01 or self.secteur
@ -1378,7 +1379,7 @@ def create_fournisseur_endpoint(req: FournisseurCreateRequest):
return {"success": True, "data": resultat} return {"success": True, "data": resultat}
except ValueError as e: except ValueError as e:
logger.warning(f"⚠️ Erreur métier création fournisseur: {e}") logger.warning(f" Erreur métier création fournisseur: {e}")
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
except Exception as e: except Exception as e:
@ -1450,7 +1451,7 @@ def avoir_get(req: CodeRequest):
logger.info(f" Avoir {req.code} retourné depuis le cache") logger.info(f" Avoir {req.code} retourné depuis le cache")
return {"success": True, "data": avoir, "source": "cache"} return {"success": True, "data": avoir, "source": "cache"}
logger.info(f"⚠️ Avoir {req.code} absent du cache, lecture depuis Sage...") logger.info(f" Avoir {req.code} absent du cache, lecture depuis Sage...")
avoir = sage.lire_avoir(req.code) avoir = sage.lire_avoir(req.code)
if not avoir: if not avoir:
@ -1497,7 +1498,7 @@ def livraison_get(req: CodeRequest):
logger.info(f" Livraison {req.code} retournée depuis le cache") logger.info(f" Livraison {req.code} retournée depuis le cache")
return {"success": True, "data": livraison, "source": "cache"} return {"success": True, "data": livraison, "source": "cache"}
logger.info(f"⚠️ Livraison {req.code} absente du cache, lecture depuis Sage...") logger.info(f" Livraison {req.code} absente du cache, lecture depuis Sage...")
livraison = sage.lire_livraison(req.code) livraison = sage.lire_livraison(req.code)
if not livraison: if not livraison:
@ -1922,7 +1923,7 @@ def lister_depots():
pass pass
if not code: if not code:
logger.warning(f" ⚠️ Dépôt à l'index {index} sans code") logger.warning(f" Dépôt à l'index {index} sans code")
index += 1 index += 1
continue continue
@ -2029,7 +2030,7 @@ def creer_entree_stock(req: EntreeStockRequest):
return {"success": True, "data": resultat} return {"success": True, "data": resultat}
except ValueError as e: except ValueError as e:
logger.warning(f"⚠️ Erreur métier entrée stock : {e}") logger.warning(f" Erreur métier entrée stock : {e}")
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
except Exception as e: except Exception as e:
@ -2059,7 +2060,7 @@ def creer_sortie_stock(req: SortieStockRequest):
return {"success": True, "data": resultat} return {"success": True, "data": resultat}
except ValueError as e: except ValueError as e:
logger.warning(f"⚠️ Erreur métier sortie stock : {e}") logger.warning(f" Erreur métier sortie stock : {e}")
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
except Exception as e: except Exception as e:
@ -2160,6 +2161,35 @@ def contacts_set_default(req: ContactGetRequest):
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@app.post("/sage/tiers/list", dependencies=[Depends(verify_token)])
def tiers_list(req: TiersListRequest):
"""Liste des tiers avec filtres optionnels"""
try:
tiers = sage.lister_tous_tiers(
type_tiers=req.type_tiers,
filtre=req.filtre
)
return {"success": True, "data": tiers}
except Exception as e:
logger.error(f" Erreur liste tiers: {e}")
raise HTTPException(500, str(e))
@app.post("/sage/tiers/get", dependencies=[Depends(verify_token)])
def tiers_get(req: CodeRequest):
"""Lecture d'un tiers par code"""
try:
tiers = sage.lire_tiers(req.code)
if not tiers:
raise HTTPException(404, f"Tiers {req.code} non trouvé")
return {"success": True, "data": tiers}
except HTTPException:
raise
except Exception as e:
logger.error(f" Erreur lecture tiers: {e}")
raise HTTPException(500, str(e))
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run( uvicorn.run(
"main:app", "main:app",

View file

@ -237,7 +237,7 @@ class ArticleUpdateGatewayRequest(BaseModel):
def verify_token(x_sage_token: str = Header(...)): def verify_token(x_sage_token: str = Header(...)):
"""Vérification du token d'authentification""" """Vérification du token d'authentification"""
if x_sage_token != settings.sage_gateway_token: if x_sage_token != settings.sage_gateway_token:
logger.warning(f" Token invalide reçu: {x_sage_token[:20]}...") logger.warning(f" Token invalide reçu: {x_sage_token[:20]}...")
raise HTTPException(401, "Token invalide") raise HTTPException(401, "Token invalide")
return True return True
@ -274,9 +274,9 @@ def startup():
# Validation config # Validation config
try: try:
validate_settings() validate_settings()
logger.info(" Configuration validée") logger.info(" Configuration validée")
except ValueError as e: except ValueError as e:
logger.error(f" Configuration invalide: {e}") logger.error(f" Configuration invalide: {e}")
raise raise
# Connexion Sage # Connexion Sage
@ -285,9 +285,9 @@ def startup():
) )
if not sage.connecter(): if not sage.connecter():
raise RuntimeError(" Impossible de se connecter à Sage 100c") raise RuntimeError(" Impossible de se connecter à Sage 100c")
logger.info(" Sage Gateway démarré et connecté") logger.info(" Sage Gateway démarré et connecté")
@app.on_event("shutdown") @app.on_event("shutdown")
@ -430,11 +430,11 @@ def lire_devis(req: CodeRequest):
""" """
📄 Lecture d'un devis AVEC ses lignes (lecture Sage directe) 📄 Lecture d'un devis AVEC ses lignes (lecture Sage directe)
Plus lent que /list car charge les lignes depuis Sage Plus lent que /list car charge les lignes depuis Sage
💡 Utiliser /list pour afficher une table rapide 💡 Utiliser /list pour afficher une table rapide
""" """
try: try:
# Lecture complète depuis Sage (avec lignes) # Lecture complète depuis Sage (avec lignes)
devis = sage.lire_devis(req.code) devis = sage.lire_devis(req.code)
if not devis: if not devis:
raise HTTPException(404, f"Devis {req.code} non trouvé") raise HTTPException(404, f"Devis {req.code} non trouvé")
@ -459,7 +459,7 @@ def devis_list(
💡 Pour les détails avec lignes, utiliser GET /sage/devis/get 💡 Pour les détails avec lignes, utiliser GET /sage/devis/get
""" """
try: try:
# Récupération depuis le cache (instantané) # Récupération depuis le cache (instantané)
devis_list = sage.lister_tous_devis_cache(filtre) devis_list = sage.lister_tous_devis_cache(filtre)
# Filtrer par statut si demandé # Filtrer par statut si demandé
@ -469,12 +469,12 @@ def devis_list(
# Limiter le nombre de résultats # Limiter le nombre de résultats
devis_list = devis_list[:limit] devis_list = devis_list[:limit]
logger.info(f" {len(devis_list)} devis retournés depuis le cache") logger.info(f" {len(devis_list)} devis retournés depuis le cache")
return {"success": True, "data": devis_list} return {"success": True, "data": devis_list}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste devis: {e}", exc_info=True) logger.error(f" Erreur liste devis: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -496,7 +496,7 @@ def changer_statut_devis_endpoint(numero: str, nouveau_statut: int):
doc.DO_Statut = nouveau_statut doc.DO_Statut = nouveau_statut
doc.Write() doc.Write()
logger.info(f" Statut devis {numero}: {statut_actuel}{nouveau_statut}") logger.info(f" Statut devis {numero}: {statut_actuel}{nouveau_statut}")
return { return {
"success": True, "success": True,
@ -540,7 +540,7 @@ def transformer_document(
""" """
🔧 Transformation de document 🔧 Transformation de document
CORRECTION : Utilise les VRAIS types Sage Dataven CORRECTION : Utilise les VRAIS types Sage Dataven
Types valides : Types valides :
- 0: Devis - 0: Devis
@ -563,7 +563,7 @@ def transformer_document(
f"(type {type_source}) → type {type_cible}" f"(type {type_source}) → type {type_cible}"
) )
# Matrice des transformations valides pour VOTRE Sage # Matrice des transformations valides pour VOTRE Sage
transformations_valides = { transformations_valides = {
(0, 10), # Devis → Commande (0, 10), # Devis → Commande
(10, 30), # Commande → Bon de livraison (10, 30), # Commande → Bon de livraison
@ -574,7 +574,7 @@ def transformer_document(
if (type_source, type_cible) not in transformations_valides: if (type_source, type_cible) not in transformations_valides:
logger.error( logger.error(
f" Transformation non autorisée: {type_source}{type_cible}" f" Transformation non autorisée: {type_source}{type_cible}"
) )
raise HTTPException( raise HTTPException(
400, 400,
@ -586,7 +586,7 @@ def transformer_document(
resultat = sage.transformer_document(numero_source, type_source, type_cible) resultat = sage.transformer_document(numero_source, type_source, type_cible)
logger.info( logger.info(
f" Transformation réussie: {numero_source}" f" Transformation réussie: {numero_source}"
f"{resultat.get('document_cible', '?')} " f"{resultat.get('document_cible', '?')} "
f"({resultat.get('nb_lignes', 0)} lignes)" f"({resultat.get('nb_lignes', 0)} lignes)"
) )
@ -596,10 +596,10 @@ def transformer_document(
except HTTPException: except HTTPException:
raise raise
except ValueError as e: except ValueError as e:
logger.error(f" Erreur métier transformation: {e}") logger.error(f" Erreur métier transformation: {e}")
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
except Exception as e: except Exception as e:
logger.error(f" Erreur technique transformation: {e}", exc_info=True) logger.error(f" Erreur technique transformation: {e}", exc_info=True)
raise HTTPException(500, f"Erreur transformation: {str(e)}") raise HTTPException(500, f"Erreur transformation: {str(e)}")
@ -664,12 +664,12 @@ def commandes_list(
commandes = commandes[:limit] commandes = commandes[:limit]
logger.info(f" {len(commandes)} commandes retournées depuis le cache") logger.info(f" {len(commandes)} commandes retournées depuis le cache")
return {"success": True, "data": commandes} return {"success": True, "data": commandes}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste commandes: {e}", exc_info=True) logger.error(f" Erreur liste commandes: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -693,12 +693,12 @@ def factures_list(
factures = factures[:limit] factures = factures[:limit]
logger.info(f" {len(factures)} factures retournées depuis le cache") logger.info(f" {len(factures)} factures retournées depuis le cache")
return {"success": True, "data": factures} return {"success": True, "data": factures}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste factures: {e}", exc_info=True) logger.error(f" Erreur liste factures: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -718,7 +718,7 @@ def lire_remise_max_client(code: str):
except: except:
pass pass
logger.info(f" Remise max client {code}: {remise_max}%") logger.info(f" Remise max client {code}: {remise_max}%")
return { return {
"success": True, "success": True,
@ -1777,7 +1777,7 @@ def diagnostiquer_statuts_globaux():
matrice_complete[type_doc] = analyse_type matrice_complete[type_doc] = analyse_type
logger.info( logger.info(
f"[DIAG] Type {type_doc}: {analyse_type['nb_documents_total']} docs, " f"[DIAG] Type {type_doc}: {analyse_type['nb_documents_total']} docs, "
f"{len(analyse_type['statuts_observes'])} statuts différents" f"{len(analyse_type['statuts_observes'])} statuts différents"
) )
@ -2045,7 +2045,7 @@ def diagnostiquer_statuts_permis(numero: str):
resultat_test["autorise"] = True resultat_test["autorise"] = True
resultat_test["note"] = "Changement de statut réussi" resultat_test["note"] = "Changement de statut réussi"
logger.info(f"[DIAG] Statut {statut_test} AUTORISÉ") logger.info(f"[DIAG] Statut {statut_test} AUTORISÉ")
# Restaurer le statut d'origine immédiatement # Restaurer le statut d'origine immédiatement
doc.Read() doc.Read()
@ -2058,7 +2058,7 @@ def diagnostiquer_statuts_permis(numero: str):
resultat_test["erreur"] = erreur_str resultat_test["erreur"] = erreur_str
logger.debug( logger.debug(
f"[DIAG] Statut {statut_test} REFUSÉ: {erreur_str[:100]}" f"[DIAG] Statut {statut_test} REFUSÉ: {erreur_str[:100]}"
) )
# Restaurer en cas d'erreur # Restaurer en cas d'erreur
@ -2092,20 +2092,20 @@ def diagnostiquer_statuts_permis(numero: str):
if 2 in statuts_autorises and statut_actuel == 0: if 2 in statuts_autorises and statut_actuel == 0:
recommendations.append( recommendations.append(
" Vous pouvez passer ce document de 'Brouillon' (0) à 'Accepté' (2)" " Vous pouvez passer ce document de 'Brouillon' (0) à 'Accepté' (2)"
) )
if 5 in statuts_autorises: if 5 in statuts_autorises:
recommendations.append( recommendations.append(
" Le statut 'Transformé' (5) est disponible - utilisé après transformation" " Le statut 'Transformé' (5) est disponible - utilisé après transformation"
) )
if 6 in statuts_autorises: if 6 in statuts_autorises:
recommendations.append(" Vous pouvez annuler ce document (statut 6)") recommendations.append(" Vous pouvez annuler ce document (statut 6)")
if not any(s in statuts_autorises for s in [2, 3, 4]): if not any(s in statuts_autorises for s in [2, 3, 4]):
recommendations.append( recommendations.append(
"⚠️ Aucun statut de validation (2/3/4) n'est disponible - " " Aucun statut de validation (2/3/4) n'est disponible - "
"le document a peut-être déjà été traité" "le document a peut-être déjà été traité"
) )
@ -2365,11 +2365,11 @@ def diagnostiquer_erreur_transformation(
if nb_bloquants == 0: if nb_bloquants == 0:
diagnostic["suggestions"].append( diagnostic["suggestions"].append(
" Aucun problème bloquant détecté. La transformation devrait fonctionner." " Aucun problème bloquant détecté. La transformation devrait fonctionner."
) )
else: else:
diagnostic["suggestions"].append( diagnostic["suggestions"].append(
f" {nb_bloquants} problème(s) bloquant(s) doivent être résolus avant la transformation." f" {nb_bloquants} problème(s) bloquant(s) doivent être résolus avant la transformation."
) )
return {"success": True, "diagnostic": diagnostic} return {"success": True, "diagnostic": diagnostic}
@ -2418,19 +2418,19 @@ def fournisseurs_list(req: FiltreRequest):
""" """
Liste rapide des fournisseurs depuis le CACHE Liste rapide des fournisseurs depuis le CACHE
Utilise le cache mémoire pour une réponse instantanée Utilise le cache mémoire pour une réponse instantanée
🔄 Cache actualisé automatiquement toutes les 15 minutes 🔄 Cache actualisé automatiquement toutes les 15 minutes
""" """
try: try:
# Utiliser le cache au lieu de la lecture directe # Utiliser le cache au lieu de la lecture directe
fournisseurs = sage.lister_tous_fournisseurs_cache(req.filtre) fournisseurs = sage.lister_tous_fournisseurs_cache(req.filtre)
logger.info(f" {len(fournisseurs)} fournisseurs retournés depuis le cache") logger.info(f" {len(fournisseurs)} fournisseurs retournés depuis le cache")
return {"success": True, "data": fournisseurs} return {"success": True, "data": fournisseurs}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste fournisseurs: {e}", exc_info=True) logger.error(f" Erreur liste fournisseurs: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -2439,24 +2439,24 @@ def create_fournisseur_endpoint(req: FournisseurCreateRequest):
""" """
Création d'un fournisseur dans Sage Création d'un fournisseur dans Sage
Utilise FactoryFournisseur.Create() directement Utilise FactoryFournisseur.Create() directement
""" """
try: try:
# Appel au connecteur Sage # Appel au connecteur Sage
resultat = sage.creer_fournisseur(req.dict()) resultat = sage.creer_fournisseur(req.dict())
logger.info(f" Fournisseur créé: {resultat.get('numero')}") logger.info(f" Fournisseur créé: {resultat.get('numero')}")
return {"success": True, "data": resultat} return {"success": True, "data": resultat}
except ValueError as e: except ValueError as e:
# Erreur métier (ex: doublon) # Erreur métier (ex: doublon)
logger.warning(f"⚠️ Erreur métier création fournisseur: {e}") logger.warning(f" Erreur métier création fournisseur: {e}")
raise HTTPException(400, str(e)) raise HTTPException(400, str(e))
except Exception as e: except Exception as e:
# Erreur technique (ex: COM) # Erreur technique (ex: COM)
logger.error(f" Erreur technique création fournisseur: {e}") logger.error(f" Erreur technique création fournisseur: {e}")
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -2480,7 +2480,7 @@ def modifier_fournisseur_endpoint(req: FournisseurUpdateGatewayRequest):
@app.post("/sage/fournisseurs/get", dependencies=[Depends(verify_token)]) @app.post("/sage/fournisseurs/get", dependencies=[Depends(verify_token)])
def fournisseur_get(req: CodeRequest): def fournisseur_get(req: CodeRequest):
""" """
NOUVEAU : Lecture d'un fournisseur par code NOUVEAU : Lecture d'un fournisseur par code
""" """
try: try:
fournisseur = sage.lire_fournisseur(req.code) fournisseur = sage.lire_fournisseur(req.code)
@ -2507,11 +2507,11 @@ def avoirs_list(
📋 Liste rapide des avoirs depuis le CACHE (avec lignes) 📋 Liste rapide des avoirs depuis le CACHE (avec lignes)
ULTRA-RAPIDE: Utilise le cache mémoire ULTRA-RAPIDE: Utilise le cache mémoire
LIGNES INCLUSES: Contrairement aux anciennes méthodes LIGNES INCLUSES: Contrairement aux anciennes méthodes
💡 Pour forcer une relecture depuis Sage, utiliser /sage/avoirs/get 💡 Pour forcer une relecture depuis Sage, utiliser /sage/avoirs/get
""" """
try: try:
# Récupération depuis le cache (instantané) # Récupération depuis le cache (instantané)
avoirs = sage.lister_tous_avoirs_cache(filtre) avoirs = sage.lister_tous_avoirs_cache(filtre)
# Filtrer par statut si demandé # Filtrer par statut si demandé
@ -2521,12 +2521,12 @@ def avoirs_list(
# Limiter le nombre de résultats # Limiter le nombre de résultats
avoirs = avoirs[:limit] avoirs = avoirs[:limit]
logger.info(f" {len(avoirs)} avoirs retournés depuis le cache") logger.info(f" {len(avoirs)} avoirs retournés depuis le cache")
return {"success": True, "data": avoirs} return {"success": True, "data": avoirs}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste avoirs: {e}", exc_info=True) logger.error(f" Erreur liste avoirs: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -2539,15 +2539,15 @@ def avoir_get(req: CodeRequest):
🔄 Si introuvable, force une relecture depuis Sage 🔄 Si introuvable, force une relecture depuis Sage
""" """
try: try:
# Essayer le cache d'abord # Essayer le cache d'abord
avoir = sage.lire_avoir_cache(req.code) avoir = sage.lire_avoir_cache(req.code)
if avoir: if avoir:
logger.info(f" Avoir {req.code} retourné depuis le cache") logger.info(f" Avoir {req.code} retourné depuis le cache")
return {"success": True, "data": avoir, "source": "cache"} return {"success": True, "data": avoir, "source": "cache"}
# Pas dans le cache → Lecture directe depuis Sage # Pas dans le cache → Lecture directe depuis Sage
logger.info(f"⚠️ Avoir {req.code} absent du cache, lecture depuis Sage...") logger.info(f" Avoir {req.code} absent du cache, lecture depuis Sage...")
avoir = sage.lire_avoir(req.code) avoir = sage.lire_avoir(req.code)
if not avoir: if not avoir:
@ -2575,11 +2575,11 @@ def livraisons_list(
📋 Liste rapide des livraisons depuis le CACHE (avec lignes) 📋 Liste rapide des livraisons depuis le CACHE (avec lignes)
ULTRA-RAPIDE: Utilise le cache mémoire ULTRA-RAPIDE: Utilise le cache mémoire
LIGNES INCLUSES: Contrairement aux anciennes méthodes LIGNES INCLUSES: Contrairement aux anciennes méthodes
💡 Pour forcer une relecture depuis Sage, utiliser /sage/livraisons/get 💡 Pour forcer une relecture depuis Sage, utiliser /sage/livraisons/get
""" """
try: try:
# Récupération depuis le cache (instantané) # Récupération depuis le cache (instantané)
livraisons = sage.lister_toutes_livraisons_cache(filtre) livraisons = sage.lister_toutes_livraisons_cache(filtre)
# Filtrer par statut si demandé # Filtrer par statut si demandé
@ -2589,12 +2589,12 @@ def livraisons_list(
# Limiter le nombre de résultats # Limiter le nombre de résultats
livraisons = livraisons[:limit] livraisons = livraisons[:limit]
logger.info(f" {len(livraisons)} livraisons retournées depuis le cache") logger.info(f" {len(livraisons)} livraisons retournées depuis le cache")
return {"success": True, "data": livraisons} return {"success": True, "data": livraisons}
except Exception as e: except Exception as e:
logger.error(f" Erreur liste livraisons: {e}", exc_info=True) logger.error(f" Erreur liste livraisons: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -2607,15 +2607,15 @@ def livraison_get(req: CodeRequest):
🔄 Si introuvable, force une relecture depuis Sage 🔄 Si introuvable, force une relecture depuis Sage
""" """
try: try:
# Essayer le cache d'abord # Essayer le cache d'abord
livraison = sage.lire_livraison_cache(req.code) livraison = sage.lire_livraison_cache(req.code)
if livraison: if livraison:
logger.info(f" Livraison {req.code} retournée depuis le cache") logger.info(f" Livraison {req.code} retournée depuis le cache")
return {"success": True, "data": livraison, "source": "cache"} return {"success": True, "data": livraison, "source": "cache"}
# Pas dans le cache → Lecture directe depuis Sage # Pas dans le cache → Lecture directe depuis Sage
logger.info(f"⚠️ Livraison {req.code} absente du cache, lecture depuis Sage...") logger.info(f" Livraison {req.code} absente du cache, lecture depuis Sage...")
livraison = sage.lire_livraison(req.code) livraison = sage.lire_livraison(req.code)
if not livraison: if not livraison:
@ -2804,7 +2804,7 @@ def creer_facture_endpoint(req: FactureCreateGatewayRequest):
""" """
Création d'une facture dans Sage Création d'une facture dans Sage
NOTE: Les factures peuvent avoir des champs obligatoires supplémentaires NOTE: Les factures peuvent avoir des champs obligatoires supplémentaires
selon la configuration Sage (DO_CodeJournal, DO_Souche, etc.) selon la configuration Sage (DO_CodeJournal, DO_Souche, etc.)
""" """
try: try:
@ -2837,7 +2837,7 @@ def modifier_facture_endpoint(req: FactureUpdateGatewayRequest):
""" """
Modification d'une facture dans Sage Modification d'une facture dans Sage
ATTENTION: Les factures comptabilisées peuvent être verrouillées ATTENTION: Les factures comptabilisées peuvent être verrouillées
""" """
try: try:
resultat = sage.modifier_facture(req.numero, req.facture_data) resultat = sage.modifier_facture(req.numero, req.facture_data)
@ -2946,7 +2946,7 @@ def generer_pdf_document(req: PDFGenerationRequest):
pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8") pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8")
logger.info(f" PDF généré: {len(pdf_bytes)} octets") logger.info(f" PDF généré: {len(pdf_bytes)} octets")
return { return {
"success": True, "success": True,
@ -2961,7 +2961,7 @@ def generer_pdf_document(req: PDFGenerationRequest):
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
logger.error(f" Erreur génération PDF: {e}", exc_info=True) logger.error(f" Erreur génération PDF: {e}", exc_info=True)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@ -2970,7 +2970,7 @@ def nettoyer_verrous_sage():
""" """
🧹 Nettoyage des verrous Sage (cbRegFile) 🧹 Nettoyage des verrous Sage (cbRegFile)
À utiliser uniquement si l'API est bloquée À utiliser uniquement si l'API est bloquée
""" """
try: try:
if not sage or not sage.cial: if not sage or not sage.cial:
@ -2981,7 +2981,7 @@ def nettoyer_verrous_sage():
for _ in range(10): for _ in range(10):
try: try:
sage.cial.CptaApplication.RollbackTrans() sage.cial.CptaApplication.RollbackTrans()
logger.info(" Rollback effectué") logger.info(" Rollback effectué")
except: except:
break break
@ -4207,7 +4207,7 @@ def diagnostic_article_complet(
"tous_champs_remplis": champs_remplis, "tous_champs_remplis": champs_remplis,
"champs_vides": champs_vides, "champs_vides": champs_vides,
"conseil": ( "conseil": (
f" {len(attributs_unite)} attribut(s) d'unité trouvé(s). " f" {len(attributs_unite)} attribut(s) d'unité trouvé(s). "
f"Utilisez mode='create' avec ce reference_modele pour tester la création." f"Utilisez mode='create' avec ce reference_modele pour tester la création."
) )
} }
@ -4230,11 +4230,11 @@ def diagnostic_article_complet(
logs_copie = [] logs_copie = []
article_modele_obj = None article_modele_obj = None
# CHAMPS À EXCLURE (doivent être uniques ou auto-générés) # CHAMPS À EXCLURE (doivent être uniques ou auto-générés)
champs_exclus = { champs_exclus = {
"AR_Ref", # Référence (on met la nôtre) "AR_Ref", # Référence (on met la nôtre)
"AR_Raccourci", # Doit être unique ! "AR_Raccourci", # Doit être unique !
"AR_CodeBarre", # Code-barres doit être unique ! ⚠️ CRITIQUE "AR_CodeBarre", # Code-barres doit être unique ! CRITIQUE
"AR_Photo", # Chemin photo spécifique "AR_Photo", # Chemin photo spécifique
"cbMarq", # ID interne Sage "cbMarq", # ID interne Sage
"cbCreateur", # Créateur "cbCreateur", # Créateur
@ -4251,7 +4251,7 @@ def diagnostic_article_complet(
if attr.startswith('_') or attr[0].islower(): if attr.startswith('_') or attr[0].islower():
continue continue
# Exclure les champs problématiques # Exclure les champs problématiques
if attr in champs_exclus: if attr in champs_exclus:
logs_copie.append(f"⏭️ {attr} EXCLU (doit être unique)") logs_copie.append(f"⏭️ {attr} EXCLU (doit être unique)")
continue continue
@ -4266,14 +4266,14 @@ def diagnostic_article_complet(
# Garder les valeurs non-None et non-vides # Garder les valeurs non-None et non-vides
if val is not None and str(val) not in ['None', '']: if val is not None and str(val) not in ['None', '']:
champs_modele[attr] = val champs_modele[attr] = val
logs_copie.append(f" {attr} = {val}") logs_copie.append(f" {attr} = {val}")
except: except:
continue continue
logger.info(f"📋 Modèle: {len(champs_modele)} champs extraits") logger.info(f"📋 Modèle: {len(champs_modele)} champs extraits")
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Modèle '{reference_modele}' non chargé: {e}") logger.warning(f" Modèle '{reference_modele}' non chargé: {e}")
# 🆕 Étape 2: Créer le nouvel article # 🆕 Étape 2: Créer le nouvel article
persist = factory.Create() persist = factory.Create()
@ -4289,9 +4289,9 @@ def diagnostic_article_complet(
for champ, valeur in champs_modele.items(): for champ, valeur in champs_modele.items():
try: try:
setattr(article, champ, valeur) setattr(article, champ, valeur)
logs_application.append(f" {champ} = {valeur}") logs_application.append(f" {champ} = {valeur}")
except Exception as e: except Exception as e:
logs_application.append(f" {champ}: {str(e)[:50]}") logs_application.append(f" {champ}: {str(e)[:50]}")
else: else:
# Défauts minimaux si pas de modèle # Défauts minimaux si pas de modèle
article.AR_Design = f"Test {reference}" article.AR_Design = f"Test {reference}"
@ -4310,10 +4310,10 @@ def diagnostic_article_complet(
famille_obj = getattr(article_modele_obj, "Famille", None) famille_obj = getattr(article_modele_obj, "Famille", None)
if famille_obj is not None: if famille_obj is not None:
article.Famille = famille_obj article.Famille = famille_obj
fallbacks_appliques.append(f" Famille copiée depuis {reference_modele}") fallbacks_appliques.append(f" Famille copiée depuis {reference_modele}")
logger.info("[DIAGNOSTIC] Famille copiée avec succès") logger.info("[DIAGNOSTIC] Famille copiée avec succès")
else: else:
fallbacks_echecs.append("⚠️ Famille: objet NULL dans modèle") fallbacks_echecs.append(" Famille: objet NULL dans modèle")
logger.warning("[DIAGNOSTIC] Famille NULL dans modèle") logger.warning("[DIAGNOSTIC] Famille NULL dans modèle")
except Exception as e: except Exception as e:
fallbacks_echecs.append(f"Famille: {str(e)[:80]}") fallbacks_echecs.append(f"Famille: {str(e)[:80]}")
@ -4324,7 +4324,7 @@ def diagnostic_article_complet(
taxe_obj = getattr(article_modele_obj, "Taxe1", None) taxe_obj = getattr(article_modele_obj, "Taxe1", None)
if taxe_obj is not None: if taxe_obj is not None:
article.Taxe1 = taxe_obj article.Taxe1 = taxe_obj
fallbacks_appliques.append(f" Taxe1 copiée depuis {reference_modele}") fallbacks_appliques.append(f" Taxe1 copiée depuis {reference_modele}")
logger.info("[DIAGNOSTIC] Taxe1 copiée avec succès") logger.info("[DIAGNOSTIC] Taxe1 copiée avec succès")
else: else:
# Taxe NULL - essayer de charger une taxe par défaut # Taxe NULL - essayer de charger une taxe par défaut
@ -4337,7 +4337,7 @@ def diagnostic_article_complet(
taxe_defaut = factory_taxe.ReadIntitule(code_taxe) taxe_defaut = factory_taxe.ReadIntitule(code_taxe)
if taxe_defaut: if taxe_defaut:
article.Taxe1 = taxe_defaut article.Taxe1 = taxe_defaut
fallbacks_appliques.append(f" Taxe par défaut '{code_taxe}' chargée") fallbacks_appliques.append(f" Taxe par défaut '{code_taxe}' chargée")
logger.info(f"[DIAGNOSTIC] Taxe '{code_taxe}' chargée") logger.info(f"[DIAGNOSTIC] Taxe '{code_taxe}' chargée")
taxe_trouvee = True taxe_trouvee = True
break break
@ -4346,7 +4346,7 @@ def diagnostic_article_complet(
continue continue
if not taxe_trouvee: if not taxe_trouvee:
fallbacks_echecs.append("⚠️ Taxe: aucune taxe par défaut trouvée") fallbacks_echecs.append(" Taxe: aucune taxe par défaut trouvée")
except Exception as e: except Exception as e:
fallbacks_echecs.append(f"Taxe: {str(e)[:80]}") fallbacks_echecs.append(f"Taxe: {str(e)[:80]}")
logger.error(f"[DIAGNOSTIC] Erreur copie Taxe: {e}") logger.error(f"[DIAGNOSTIC] Erreur copie Taxe: {e}")
@ -4356,7 +4356,7 @@ def diagnostic_article_complet(
unite_obj = getattr(article_modele_obj, "UniteVente", None) unite_obj = getattr(article_modele_obj, "UniteVente", None)
if unite_obj is not None: if unite_obj is not None:
article.UniteVente = unite_obj article.UniteVente = unite_obj
fallbacks_appliques.append(f" UniteVente copiée depuis {reference_modele}") fallbacks_appliques.append(f" UniteVente copiée depuis {reference_modele}")
logger.info("[DIAGNOSTIC] UniteVente copiée avec succès") logger.info("[DIAGNOSTIC] UniteVente copiée avec succès")
else: else:
# Unité NULL - essayer de charger une unité par défaut # Unité NULL - essayer de charger une unité par défaut
@ -4369,7 +4369,7 @@ def diagnostic_article_complet(
unite_defaut = factory_unite.ReadIntitule(code_unite) unite_defaut = factory_unite.ReadIntitule(code_unite)
if unite_defaut: if unite_defaut:
article.UniteVente = unite_defaut article.UniteVente = unite_defaut
fallbacks_appliques.append(f" Unité par défaut '{code_unite}' chargée") fallbacks_appliques.append(f" Unité par défaut '{code_unite}' chargée")
logger.info(f"[DIAGNOSTIC] Unité '{code_unite}' chargée") logger.info(f"[DIAGNOSTIC] Unité '{code_unite}' chargée")
unite_trouvee = True unite_trouvee = True
break break
@ -4378,13 +4378,13 @@ def diagnostic_article_complet(
continue continue
if not unite_trouvee: if not unite_trouvee:
fallbacks_echecs.append("⚠️ Unité: aucune unité par défaut trouvée") fallbacks_echecs.append(" Unité: aucune unité par défaut trouvée")
except Exception as e: except Exception as e:
fallbacks_echecs.append(f"Unité: {str(e)[:80]}") fallbacks_echecs.append(f"Unité: {str(e)[:80]}")
logger.error(f"[DIAGNOSTIC] Erreur copie Unité: {e}") logger.error(f"[DIAGNOSTIC] Erreur copie Unité: {e}")
else: else:
fallbacks_echecs.append("⚠️ Aucun modèle fourni - objets COM non définis") fallbacks_echecs.append(" Aucun modèle fourni - objets COM non définis")
# 📊 Étape 5: Scanner l'état final avant Write # 📊 Étape 5: Scanner l'état final avant Write
etat_final = {} etat_final = {}
@ -4409,12 +4409,12 @@ def diagnostic_article_complet(
try: try:
article.Write() article.Write()
write_success = True write_success = True
erreur_write = " SUCCESS - Article créé !" erreur_write = " SUCCESS - Article créé !"
logger.info(f"[DIAGNOSTIC] Article {reference} créé avec succès") logger.info(f"[DIAGNOSTIC] Article {reference} créé avec succès")
except Exception as e: except Exception as e:
erreur_write = str(e) erreur_write = str(e)
logger.error(f"[DIAGNOSTIC] Échec Write(): {e}") logger.error(f"[DIAGNOSTIC] Échec Write(): {e}")
# Extraire erreurs Sage # Extraire erreurs Sage
try: try:
@ -4473,7 +4473,7 @@ def diagnostic_article_complet(
if write_success: if write_success:
diagnostic["message"] = "🎉 Article créé avec succès (rollback effectué)" diagnostic["message"] = "🎉 Article créé avec succès (rollback effectué)"
else: else:
diagnostic["message"] = " Échec de création" diagnostic["message"] = " Échec de création"
# Analyser quels champs posent problème # Analyser quels champs posent problème
champs_none = [k for k, v in etat_final.items() if v in ['None', 'N/A']] champs_none = [k for k, v in etat_final.items() if v in ['None', 'N/A']]

File diff suppressed because it is too large Load diff

44
test.py
View file

@ -35,7 +35,7 @@ def diagnostic_complet_crystal():
for chemin in chemins_installation: for chemin in chemins_installation:
if os.path.exists(chemin): if os.path.exists(chemin):
print(f" Dossier trouvé : {chemin}") print(f" Dossier trouvé : {chemin}")
crystal_trouve = True crystal_trouve = True
chemin_crystal = chemin chemin_crystal = chemin
@ -54,16 +54,16 @@ def diagnostic_complet_crystal():
print(f" Taille : {size_mb:.1f} MB") print(f" Taille : {size_mb:.1f} MB")
if size_mb < 100: if size_mb < 100:
print(f" ⚠️ Taille suspecte (attendu: 300-800 MB)") print(f" Taille suspecte (attendu: 300-800 MB)")
problemes.append("Installation incomplète (taille trop petite)") problemes.append("Installation incomplète (taille trop petite)")
except Exception as e: except Exception as e:
print(f" ⚠️ Impossible de calculer taille : {e}") print(f" Impossible de calculer taille : {e}")
else: else:
print(f" Absent : {chemin}") print(f" Absent : {chemin}")
if not crystal_trouve: if not crystal_trouve:
print("\n PROBLÈME MAJEUR : Crystal Reports n'est pas installé") print("\n PROBLÈME MAJEUR : Crystal Reports n'est pas installé")
problemes.append("Crystal Reports non installé") problemes.append("Crystal Reports non installé")
solutions.append("Télécharger et installer SAP Crystal Reports Runtime") solutions.append("Télécharger et installer SAP Crystal Reports Runtime")
return {"problemes": problemes, "solutions": solutions, "installe": False} return {"problemes": problemes, "solutions": solutions, "installe": False}
@ -89,18 +89,18 @@ def diagnostic_complet_crystal():
if dll_nom.lower() in [f.lower() for f in files]: if dll_nom.lower() in [f.lower() for f in files]:
dll_path = os.path.join(root, dll_nom) dll_path = os.path.join(root, dll_nom)
dll_trouvees[dll_nom] = dll_path dll_trouvees[dll_nom] = dll_path
print(f" {dll_nom}") print(f" {dll_nom}")
print(f" {dll_path}") print(f" {dll_path}")
trouve = True trouve = True
break break
if not trouve: if not trouve:
print(f" {dll_nom} - {description}") print(f" {dll_nom} - {description}")
if "CRITIQUE" in description: if "CRITIQUE" in description:
problemes.append(f"{dll_nom} manquante") problemes.append(f"{dll_nom} manquante")
if len(dll_trouvees) < 2: if len(dll_trouvees) < 2:
print("\n ⚠️ Trop peu de DLL trouvées - Installation corrompue") print("\n Trop peu de DLL trouvées - Installation corrompue")
problemes.append("DLL manquantes - Installation corrompue") problemes.append("DLL manquantes - Installation corrompue")
solutions.append("Réinstaller Crystal Reports Runtime") solutions.append("Réinstaller Crystal Reports Runtime")
@ -123,7 +123,7 @@ def diagnostic_complet_crystal():
try: try:
# Vérifier existence # Vérifier existence
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, prog_id) key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, prog_id)
print(f" {prog_id}") print(f" {prog_id}")
# Lire le CLSID # Lire le CLSID
try: try:
@ -144,28 +144,28 @@ def diagnostic_complet_crystal():
# Vérifier que la DLL existe # Vérifier que la DLL existe
if not os.path.exists(dll_path): if not os.path.exists(dll_path):
print(f" DLL INTROUVABLE: {dll_path}") print(f" DLL INTROUVABLE: {dll_path}")
problemes.append(f"{prog_id}: DLL manquante ({dll_path})") problemes.append(f"{prog_id}: DLL manquante ({dll_path})")
else: else:
prog_ids_trouves.append(prog_id) prog_ids_trouves.append(prog_id)
except: except:
print(f" ⚠️ InprocServer32 non trouvé") print(f" InprocServer32 non trouvé")
except: except:
print(f" CLSID {clsid} non trouvé dans registre") print(f" CLSID {clsid} non trouvé dans registre")
problemes.append(f"{prog_id}: CLSID cassé") problemes.append(f"{prog_id}: CLSID cassé")
except: except:
print(f" ⚠️ Pas de CLSID") print(f" Pas de CLSID")
winreg.CloseKey(key) winreg.CloseKey(key)
except: except:
print(f" {prog_id}") print(f" {prog_id}")
if not prog_ids_trouves: if not prog_ids_trouves:
print("\n ⚠️ Aucun ProgID valide - Enregistrement COM échoué") print("\n Aucun ProgID valide - Enregistrement COM échoué")
problemes.append("ProgID non enregistrés correctement") problemes.append("ProgID non enregistrés correctement")
solutions.append("Réenregistrer les DLL Crystal avec regsvr32") solutions.append("Réenregistrer les DLL Crystal avec regsvr32")
@ -196,7 +196,7 @@ def diagnostic_complet_crystal():
print(f" Crystal : {crystal_arch}") print(f" Crystal : {crystal_arch}")
if python_arch != crystal_arch: if python_arch != crystal_arch:
print(f"\n INCOMPATIBILITÉ ARCHITECTURE") print(f"\n INCOMPATIBILITÉ ARCHITECTURE")
print(f" Python {python_arch} ne peut pas utiliser Crystal {crystal_arch}") print(f" Python {python_arch} ne peut pas utiliser Crystal {crystal_arch}")
problemes.append(f"Incompatibilité: Python {python_arch} vs Crystal {crystal_arch}") problemes.append(f"Incompatibilité: Python {python_arch} vs Crystal {crystal_arch}")
solutions.append(f"Réinstaller Crystal en version {python_arch}") solutions.append(f"Réinstaller Crystal en version {python_arch}")
@ -224,14 +224,14 @@ def diagnostic_complet_crystal():
for service in services_crystal_attendus: for service in services_crystal_attendus:
if service.lower() in result.stdout.lower(): if service.lower() in result.stdout.lower():
services_trouves.append(service) services_trouves.append(service)
print(f" Service trouvé: {service}") print(f" Service trouvé: {service}")
if not services_trouves: if not services_trouves:
print(f" ⚠️ Aucun service Crystal trouvé") print(f" Aucun service Crystal trouvé")
print(f" (Normal pour Runtime léger)") print(f" (Normal pour Runtime léger)")
except Exception as e: except Exception as e:
print(f" ⚠️ Impossible de vérifier services: {e}") print(f" Impossible de vérifier services: {e}")
# ========================================== # ==========================================
# 6. TEST INSTANCIATION COM DÉTAILLÉ # 6. TEST INSTANCIATION COM DÉTAILLÉ
@ -244,7 +244,7 @@ def diagnostic_complet_crystal():
print(f"\n Test: {prog_id}") print(f"\n Test: {prog_id}")
try: try:
obj = win32com.client.Dispatch(prog_id) obj = win32com.client.Dispatch(prog_id)
print(f" Instanciation RÉUSSIE") print(f" Instanciation RÉUSSIE")
# Lister méthodes disponibles # Lister méthodes disponibles
print(f" Méthodes disponibles:") print(f" Méthodes disponibles:")
@ -260,7 +260,7 @@ def diagnostic_complet_crystal():
} }
except Exception as e: except Exception as e:
print(f" Échec: {e}") print(f" Échec: {e}")
print(f" Type erreur: {type(e).__name__}") print(f" Type erreur: {type(e).__name__}")
print(f" Code: {e.args if hasattr(e, 'args') else 'N/A'}") print(f" Code: {e.args if hasattr(e, 'args') else 'N/A'}")
@ -277,7 +277,7 @@ def diagnostic_complet_crystal():
print(f"🔧 Architecture: Python {python_arch}, Crystal {crystal_arch or 'INCONNUE'}") print(f"🔧 Architecture: Python {python_arch}, Crystal {crystal_arch or 'INCONNUE'}")
if problemes: if problemes:
print(f"\n PROBLÈMES DÉTECTÉS ({len(problemes)}):") print(f"\n PROBLÈMES DÉTECTÉS ({len(problemes)}):")
for i, pb in enumerate(problemes, 1): for i, pb in enumerate(problemes, 1):
print(f" {i}. {pb}") print(f" {i}. {pb}")

View file

@ -0,0 +1,244 @@
import logging
logger = logging.getLogger(__name__)
def _extraire_article(article_obj):
try:
data = {
"reference": getattr(article_obj, "AR_Ref", "").strip(),
"designation": getattr(article_obj, "AR_Design", "").strip(),
}
data["code_ean"] = ""
data["code_barre"] = ""
try:
code_barre = getattr(article_obj, "AR_CodeBarre", "").strip()
if code_barre:
data["code_ean"] = code_barre
data["code_barre"] = code_barre
if not data["code_ean"]:
code_barre1 = getattr(article_obj, "AR_CodeBarre1", "").strip()
if code_barre1:
data["code_ean"] = code_barre1
data["code_barre"] = code_barre1
except:
pass
try:
data["prix_vente"] = float(getattr(article_obj, "AR_PrixVen", 0.0))
except:
data["prix_vente"] = 0.0
try:
data["prix_achat"] = float(getattr(article_obj, "AR_PrixAch", 0.0))
except:
data["prix_achat"] = 0.0
try:
data["prix_revient"] = float(
getattr(article_obj, "AR_PrixRevient", 0.0)
)
except:
data["prix_revient"] = 0.0
try:
data["stock_reel"] = float(getattr(article_obj, "AR_Stock", 0.0))
except:
data["stock_reel"] = 0.0
try:
data["stock_mini"] = float(getattr(article_obj, "AR_StockMini", 0.0))
except:
data["stock_mini"] = 0.0
try:
data["stock_maxi"] = float(getattr(article_obj, "AR_StockMaxi", 0.0))
except:
data["stock_maxi"] = 0.0
try:
data["stock_reserve"] = float(getattr(article_obj, "AR_QteCom", 0.0))
except:
data["stock_reserve"] = 0.0
try:
data["stock_commande"] = float(
getattr(article_obj, "AR_QteComFou", 0.0)
)
except:
data["stock_commande"] = 0.0
try:
data["stock_disponible"] = data["stock_reel"] - data["stock_reserve"]
except:
data["stock_disponible"] = data["stock_reel"]
try:
commentaire = getattr(article_obj, "AR_Commentaire", "").strip()
data["description"] = commentaire
except:
data["description"] = ""
try:
design2 = getattr(article_obj, "AR_Design2", "").strip()
data["designation_complementaire"] = design2
except:
data["designation_complementaire"] = ""
try:
type_art = getattr(article_obj, "AR_Type", 0)
data["type_article"] = type_art
data["type_article_libelle"] = {
0: "Article",
1: "Prestation",
2: "Divers",
}.get(type_art, "Inconnu")
except:
data["type_article"] = 0
data["type_article_libelle"] = "Article"
try:
famille_code = getattr(article_obj, "FA_CodeFamille", "").strip()
data["famille_code"] = famille_code
if famille_code:
try:
famille_obj = getattr(article_obj, "Famille", None)
if famille_obj:
famille_obj.Read()
data["famille_libelle"] = getattr(
famille_obj, "FA_Intitule", ""
).strip()
else:
data["famille_libelle"] = ""
except:
data["famille_libelle"] = ""
else:
data["famille_libelle"] = ""
except:
data["famille_code"] = ""
data["famille_libelle"] = ""
try:
fournisseur_code = getattr(article_obj, "CT_Num", "").strip()
data["fournisseur_principal"] = fournisseur_code
if fournisseur_code:
try:
fourn_obj = getattr(article_obj, "Fournisseur", None)
if fourn_obj:
fourn_obj.Read()
data["fournisseur_nom"] = getattr(
fourn_obj, "CT_Intitule", ""
).strip()
else:
data["fournisseur_nom"] = ""
except:
data["fournisseur_nom"] = ""
else:
data["fournisseur_nom"] = ""
except:
data["fournisseur_principal"] = ""
data["fournisseur_nom"] = ""
try:
data["unite_vente"] = getattr(article_obj, "AR_UniteVen", "").strip()
except:
data["unite_vente"] = ""
try:
data["unite_achat"] = getattr(article_obj, "AR_UniteAch", "").strip()
except:
data["unite_achat"] = ""
try:
data["poids"] = float(getattr(article_obj, "AR_Poids", 0.0))
except:
data["poids"] = 0.0
try:
data["volume"] = float(getattr(article_obj, "AR_Volume", 0.0))
except:
data["volume"] = 0.0
try:
sommeil = getattr(article_obj, "AR_Sommeil", 0)
data["est_actif"] = sommeil == 0
data["en_sommeil"] = sommeil == 1
except:
data["est_actif"] = True
data["en_sommeil"] = False
try:
tva_code = getattr(article_obj, "TA_Code", "").strip()
data["tva_code"] = tva_code
try:
tva_obj = getattr(article_obj, "Taxe1", None)
if tva_obj:
tva_obj.Read()
data["tva_taux"] = float(getattr(tva_obj, "TA_Taux", 20.0))
else:
data["tva_taux"] = 20.0
except:
data["tva_taux"] = 20.0
except:
data["tva_code"] = ""
data["tva_taux"] = 20.0
try:
date_creation = getattr(article_obj, "AR_DateCreate", None)
data["date_creation"] = str(date_creation) if date_creation else ""
except:
data["date_creation"] = ""
try:
date_modif = getattr(article_obj, "AR_DateModif", None)
data["date_modification"] = str(date_modif) if date_modif else ""
except:
data["date_modification"] = ""
return data
except Exception as e:
logger.error(f" Erreur extraction article: {e}", exc_info=True)
return {
"reference": getattr(article_obj, "AR_Ref", "").strip(),
"designation": getattr(article_obj, "AR_Design", "").strip(),
"prix_vente": 0.0,
"stock_reel": 0.0,
"code_ean": "",
"description": "",
"designation_complementaire": "",
"prix_achat": 0.0,
"prix_revient": 0.0,
"stock_mini": 0.0,
"stock_maxi": 0.0,
"stock_reserve": 0.0,
"stock_commande": 0.0,
"stock_disponible": 0.0,
"code_barre": "",
"type_article": 0,
"type_article_libelle": "Article",
"famille_code": "",
"famille_libelle": "",
"fournisseur_principal": "",
"fournisseur_nom": "",
"unite_vente": "",
"unite_achat": "",
"poids": 0.0,
"volume": 0.0,
"est_actif": True,
"en_sommeil": False,
"tva_code": "",
"tva_taux": 20.0,
"date_creation": "",
"date_modification": "",
}
__all__ = [
"_extraire_article",
]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,41 @@
import logging
logger = logging.getLogger(__name__)
def verifier_stock_suffisant(article_ref, quantite, cursor, depot=None):
"""Version thread-safe avec lock SQL"""
try:
cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
cursor.execute("BEGIN TRANSACTION")
try:
cursor.execute(
"""
SELECT SUM(AS_QteSto)
FROM F_ARTSTOCK WITH (UPDLOCK, ROWLOCK)
WHERE AR_Ref = ?
""",
(article_ref.upper(),),
)
row = cursor.fetchone()
stock_dispo = float(row[0]) if row and row[0] else 0.0
suffisant = stock_dispo >= quantite
cursor.execute("COMMIT")
return {
"suffisant": suffisant,
"stock_disponible": stock_dispo,
"quantite_demandee": quantite,
}
except:
cursor.execute("ROLLBACK")
raise
except Exception as e:
logger.error(f"Erreur vérification stock: {e}")
raise

View file

@ -0,0 +1,89 @@
import win32com.client
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def _afficher_etat_document(doc, titre: str):
"""Affiche l'état complet d'un document."""
logger.info("-" * 80)
logger.info(titre)
logger.info("-" * 80)
try:
logger.info(f" DO_Piece: {getattr(doc, 'DO_Piece', 'N/A')}")
logger.info(f" DO_Ref: '{getattr(doc, 'DO_Ref', 'N/A')}'")
logger.info(f" DO_Statut: {getattr(doc, 'DO_Statut', 'N/A')}")
date_doc = getattr(doc, 'DO_Date', None)
date_str = date_doc.strftime('%Y-%m-%d') if date_doc else 'None'
logger.info(f" DO_Date: {date_str}")
date_livr = getattr(doc, 'DO_DateLivr', None)
date_livr_str = date_livr.strftime('%Y-%m-%d') if date_livr else 'None'
logger.info(f" DO_DateLivr: {date_livr_str}")
logger.info(f" DO_TotalHT: {getattr(doc, 'DO_TotalHT', 0)}")
logger.info(f" DO_TotalTTC: {getattr(doc, 'DO_TotalTTC', 0)}")
except Exception as e:
logger.error(f" Erreur affichage état: {e}")
logger.info("-" * 80)
def _compter_lignes_document(doc) -> int:
"""Compte les lignes d'un document."""
try:
try:
factory_lignes = doc.FactoryDocumentLigne
except:
factory_lignes = doc.FactoryDocumentVenteLigne
count = 0
index = 1
while index <= 100:
try:
ligne_p = factory_lignes.List(index)
if ligne_p is None:
break
count += 1
index += 1
except:
break
return count
except Exception as e:
logger.warning(f" Erreur comptage lignes: {e}")
return 0
def _rechercher_devis_par_numero(numero: str, factory):
"""Recherche un devis par numéro dans la liste."""
logger.info(f" Recherche de {numero} dans la liste...")
index = 1
while index < 10000:
try:
persist_test = factory.List(index)
if persist_test is None:
break
doc_test = win32com.client.CastTo(persist_test, "IBODocumentVente3")
doc_test.Read()
if (
getattr(doc_test, "DO_Type", -1) == 0
and getattr(doc_test, "DO_Piece", "") == numero
):
logger.info(f" Trouvé à l'index {index}")
return persist_test
index += 1
except:
index += 1
logger.error(f" Devis {numero} non trouvé dans la liste")
return None
__all__ = [
"_afficher_etat_document",
"_compter_lignes_document",
"_rechercher_devis_par_numero"
]

View file

@ -0,0 +1,49 @@
from typing import Dict, List, Optional, Any
def _extraire_infos_devis(doc, numero: str, champs_modifies: list) -> Dict:
"""Extrait les informations complètes du devis."""
total_ht = float(getattr(doc, "DO_TotalHT", 0.0))
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
statut = getattr(doc, "DO_Statut", 0)
reference = getattr(doc, "DO_Ref", "")
date_devis = None
try:
date_doc = getattr(doc, "DO_Date", None)
if date_doc:
date_devis = date_doc.strftime("%Y-%m-%d")
except:
pass
date_livraison = None
try:
date_livr = getattr(doc, "DO_DateLivr", None)
if date_livr:
date_livraison = date_livr.strftime("%Y-%m-%d")
except:
pass
client_code = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "")
except:
pass
return {
"numero": numero,
"total_ht": total_ht,
"total_ttc": total_ttc,
"reference": reference,
"date_devis": date_devis,
"date_livraison": date_livraison,
"champs_modifies": champs_modifies,
"statut": statut,
"client_code": client_code,
}
__all__ = [
"_extraire_infos_devis",
]

View file

View file

@ -0,0 +1,118 @@
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def _clean_str(value, max_len: int) -> str:
"""Nettoie et tronque une chaîne"""
if value is None or str(value).lower() in ('none', 'null', ''):
return ""
return str(value)[:max_len].strip()
def _safe_strip(value) -> Optional[str]:
"""Nettoie une valeur string en toute sécurité"""
if value is None:
return None
if isinstance(value, str):
stripped = value.strip()
return stripped if stripped else None
return str(value).strip() or None
def _safe_int(value, default=None):
"""Conversion sécurisée en entier"""
if value is None:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def _try_set_attribute(obj, attr_name, value, variants=None):
"""Essaie de définir un attribut avec plusieurs variantes"""
if variants is None:
variants = [attr_name]
else:
variants = [attr_name] + variants
for variant in variants:
try:
if hasattr(obj, variant):
setattr(obj, variant, value)
return True
except Exception as e:
logger.debug(f" {variant} échec: {str(e)[:50]}")
return False
def _get_type_libelle(type_doc: int) -> str:
types_officiels = {
0: "Devis",
10: "Bon de commande",
20: "Préparation",
30: "Bon de livraison",
40: "Bon de retour",
50: "Bon d'avoir",
60: "Facture",
}
types_alternatifs = {
1: "Bon de commande",
2: "Préparation",
3: "Bon de livraison",
4: "Bon de retour",
5: "Bon d'avoir",
6: "Facture",
}
if type_doc in types_officiels:
return types_officiels[type_doc]
if type_doc in types_alternatifs:
return types_alternatifs[type_doc]
return f"Type {type_doc}"
def _convertir_type_pour_sql(self, type_doc: int) -> int:
"""COM → SQL : 0, 10, 20, 30... → 0, 1, 2, 3..."""
mapping = {0: 0, 10: 1, 20: 2, 30: 3, 40: 4, 50: 5, 60: 6}
return mapping.get(type_doc, type_doc)
def _convertir_type_depuis_sql(self, type_sql: int) -> int:
"""SQL → COM : 0, 1, 2, 3... → 0, 10, 20, 30..."""
mapping = {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60}
return mapping.get(type_sql, type_sql)
def _normaliser_type_document(type_doc: int) -> int:
logger.info(f"[INFO] TYPE RECU{type_doc}")
if type_doc in [0, 10, 20, 30, 40, 50, 60]:
return type_doc
mapping_normalisation = {
1: 10, # Commande
2: 20, # Préparation
3: 30, # BL
4: 40, # Retour
5: 50, # Avoir
6: 60, # Facture
}
return mapping_normalisation.get(type_doc, type_doc)
__all__ = [
"_clean_str",
"_safe_strip",
"_safe_int",
"_try_set_attribute",
"_get_type_libelle",
"_normaliser_type_document",
"_convertir_type_depuis_sql",
"_convertir_type_pour_sql"
]

View file

@ -0,0 +1,162 @@
from typing import Dict, List, Optional, Any
import logging
from utils.functions.functions import _safe_strip
logger = logging.getLogger(__name__)
def _contact_to_dict(contact, numero_client=None, contact_numero=None, n_contact=None) -> Dict:
try:
civilite_code = getattr(contact, "Civilite", None)
civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"}
civilite = civilite_map.get(civilite_code) if civilite_code is not None else None
telephone = None
portable = None
telecopie = None
email = None
if hasattr(contact, 'Telecom'):
try:
telecom = contact.Telecom
telephone = _safe_strip(getattr(telecom, "Telephone", None))
portable = _safe_strip(getattr(telecom, "Portable", None))
telecopie = _safe_strip(getattr(telecom, "Telecopie", None))
email = _safe_strip(getattr(telecom, "EMail", None))
except:
pass
return {
"numero": numero_client,
"contact_numero": contact_numero,
"n_contact": n_contact or contact_numero,
"civilite": civilite,
"nom": _safe_strip(getattr(contact, "Nom", None)),
"prenom": _safe_strip(getattr(contact, "Prenom", None)),
"fonction": _safe_strip(getattr(contact, "Fonction", None)),
"service_code": getattr(contact, "ServiceContact", None),
"telephone": telephone,
"portable": portable,
"telecopie": telecopie,
"email": email,
"facebook": _safe_strip(getattr(contact, "Facebook", None)),
"linkedin": _safe_strip(getattr(contact, "LinkedIn", None)),
"skype": _safe_strip(getattr(contact, "Skype", None)),
}
except Exception as e:
logger.warning(f"Erreur conversion contact: {e}")
return {}
def _row_to_contact_dict(row) -> Dict:
"""Convertit une ligne SQL en dictionnaire contact"""
civilite_code = row.CT_Civilite
civilite_map = {0: "M.", 1: "Mme", 2: "Mlle", 3: "Société"}
return {
"numero": _safe_strip(row.CT_Num),
"contact_numero": row.CT_No,
"n_contact": row.N_Contact,
"civilite": civilite_map.get(civilite_code) if civilite_code is not None else None,
"nom": _safe_strip(row.CT_Nom),
"prenom": _safe_strip(row.CT_Prenom),
"fonction": _safe_strip(row.CT_Fonction),
"service_code": row.N_Service,
"telephone": _safe_strip(row.CT_Telephone),
"portable": _safe_strip(row.CT_TelPortable),
"telecopie": _safe_strip(row.CT_Telecopie),
"email": _safe_strip(row.CT_EMail),
"facebook": _safe_strip(row.CT_Facebook),
"linkedin": _safe_strip(row.CT_LinkedIn),
"skype": _safe_strip(row.CT_Skype),
}
def _row_to_tiers_dict(row) -> Dict:
"""Convertit une ligne SQL en dictionnaire tiers (factorisation DRY)"""
return {
"numero": _safe_strip(row.CT_Num),
"intitule": _safe_strip(row.CT_Intitule),
"type_tiers": row.CT_Type,
"qualite": _safe_strip(row.CT_Qualite),
"classement": _safe_strip(row.CT_Classement),
"raccourci": _safe_strip(row.CT_Raccourci),
"siret": _safe_strip(row.CT_Siret),
"tva_intra": _safe_strip(row.CT_Identifiant),
"code_naf": _safe_strip(row.CT_Ape),
"contact": _safe_strip(row.CT_Contact),
"adresse": _safe_strip(row.CT_Adresse),
"complement": _safe_strip(row.CT_Complement),
"code_postal": _safe_strip(row.CT_CodePostal),
"ville": _safe_strip(row.CT_Ville),
"region": _safe_strip(row.CT_CodeRegion),
"pays": _safe_strip(row.CT_Pays),
"telephone": _safe_strip(row.CT_Telephone),
"telecopie": _safe_strip(row.CT_Telecopie),
"email": _safe_strip(row.CT_EMail),
"site_web": _safe_strip(row.CT_Site),
"facebook": _safe_strip(row.CT_Facebook),
"linkedin": _safe_strip(row.CT_LinkedIn),
"taux01": row.CT_Taux01,
"taux02": row.CT_Taux02,
"taux03": row.CT_Taux03,
"taux04": row.CT_Taux04,
"statistique01": _safe_strip(row.CT_Statistique01),
"statistique02": _safe_strip(row.CT_Statistique02),
"statistique03": _safe_strip(row.CT_Statistique03),
"statistique04": _safe_strip(row.CT_Statistique04),
"statistique05": _safe_strip(row.CT_Statistique05),
"statistique06": _safe_strip(row.CT_Statistique06),
"statistique07": _safe_strip(row.CT_Statistique07),
"statistique08": _safe_strip(row.CT_Statistique08),
"statistique09": _safe_strip(row.CT_Statistique09),
"statistique10": _safe_strip(row.CT_Statistique10),
"encours_autorise": row.CT_Encours,
"assurance_credit": row.CT_Assurance,
"langue": row.CT_Langue,
"commercial_code": row.CO_No,
"lettrage_auto": (row.CT_Lettrage == 1),
"est_actif": (row.CT_Sommeil == 0),
"type_facture": row.CT_Facture,
"est_prospect": (row.CT_Prospect == 1),
"bl_en_facture": row.CT_BLFact,
"saut_page": row.CT_Saut,
"validation_echeance": row.CT_ValidEch,
"controle_encours": row.CT_ControlEnc,
"exclure_relance": (row.CT_NotRappel == 1),
"exclure_penalites": (row.CT_NotPenal == 1),
"bon_a_payer": row.CT_BonAPayer,
"priorite_livraison": row.CT_PrioriteLivr,
"livraison_partielle": row.CT_LivrPartielle,
"delai_transport": row.CT_DelaiTransport,
"delai_appro": row.CT_DelaiAppro,
"commentaire": _safe_strip(row.CT_Commentaire),
"section_analytique": _safe_strip(row.CA_Num),
"mode_reglement_code": row.MR_No,
"surveillance_active": (row.CT_Surveillance == 1),
"coface": _safe_strip(row.CT_Coface),
"forme_juridique": _safe_strip(row.CT_SvFormeJuri),
"effectif": _safe_strip(row.CT_SvEffectif),
"sv_regularite": _safe_strip(row.CT_SvRegul),
"sv_cotation": _safe_strip(row.CT_SvCotation),
"sv_objet_maj": _safe_strip(row.CT_SvObjetMaj),
"sv_chiffre_affaires": row.CT_SvCA,
"sv_resultat": row.CT_SvResultat,
"compte_general": _safe_strip(row.CG_NumPrinc),
"categorie_tarif": row.N_CatTarif,
"categorie_compta": row.N_CatCompta,
}
__all__ = [
"_contact_to_dict",
"_row_to_contact_dict",
"_row_to_tiers_dict",
]

View file

@ -0,0 +1,234 @@
import logging
from utils.functions.functions import (
_convertir_type_depuis_sql,
_convertir_type_pour_sql,
_normaliser_type_document,
_get_type_libelle
)
logger = logging.getLogger(__name__)
def _verifier_devis_non_transforme(numero: str, doc, cursor):
"""Vérifie que le devis n'est pas transformé."""
verification = verifier_si_deja_transforme_sql(numero, cursor, 0)
if verification["deja_transforme"]:
docs_cibles = verification["documents_cibles"]
nums = [d["numero"] for d in docs_cibles]
raise ValueError(
f" Devis {numero} déjà transformé en {len(docs_cibles)} document(s): {', '.join(nums)}"
)
statut_actuel = getattr(doc, "DO_Statut", 0)
if statut_actuel == 5:
raise ValueError(f" Devis {numero} déjà transformé (statut=5)")
def verifier_si_deja_transforme_sql(numero_source, cursor, type_source):
"""Version corrigée avec normalisation des types"""
logger.info(
f"[VERIF] Vérification transformations de {numero_source} (type {type_source})"
)
logger.info(
f"[VERIF] Vérification transformations de {numero_source} (type {type_source})"
)
logger.info(f"[DEBUG] Type source brut: {type_source}")
logger.info(
f"[DEBUG] Type source après normalisation: {_normaliser_type_document(type_source)}"
)
logger.info(
f"[DEBUG] Type source après normalisation SQL: {_convertir_type_pour_sql(type_source)}"
)
type_source = _convertir_type_pour_sql(type_source)
champ_liaison_mapping = {
0: "DL_PieceDE",
1: "DL_PieceBC",
3: "DL_PieceBL",
}
champ_liaison = champ_liaison_mapping.get(type_source)
if not champ_liaison:
logger.warning(f"[VERIF] Type source {type_source} non géré")
return {"deja_transforme": False, "documents_cibles": []}
try:
query = f"""
SELECT DISTINCT
dc.DO_Piece,
dc.DO_Type,
dc.DO_Statut,
(SELECT COUNT(*) FROM F_DOCLIGNE
WHERE DO_Piece = dc.DO_Piece AND DO_Type = dc.DO_Type) as NbLignes
FROM F_DOCENTETE dc
INNER JOIN F_DOCLIGNE dl ON dc.DO_Piece = dl.DO_Piece AND dc.DO_Type = dl.DO_Type
WHERE dl.{champ_liaison} = ?
ORDER BY dc.DO_Type, dc.DO_Piece
"""
cursor.execute(query, (numero_source,))
resultats = cursor.fetchall()
documents_cibles = []
for row in resultats:
type_brut = int(row.DO_Type)
type_normalise = _convertir_type_depuis_sql(type_brut)
doc = {
"numero": row.DO_Piece.strip() if row.DO_Piece else "",
"type": type_normalise, # ← TYPE NORMALISÉ
"type_brut": type_brut, # Garder aussi le type original
"type_libelle": _get_type_libelle(type_brut),
"statut": int(row.DO_Statut) if row.DO_Statut else 0,
"nb_lignes": int(row.NbLignes) if row.NbLignes else 0,
}
documents_cibles.append(doc)
logger.info(
f"[VERIF] Trouvé: {doc['numero']} "
f"(type {type_brut}{type_normalise} - {doc['type_libelle']}) "
f"- {doc['nb_lignes']} lignes"
)
deja_transforme = len(documents_cibles) > 0
if deja_transforme:
logger.info(
f"[VERIF] Document {numero_source} a {len(documents_cibles)} transformation(s)"
)
else:
logger.info(
f"[VERIF] Document {numero_source} pas encore transformé"
)
return {
"deja_transforme": deja_transforme,
"documents_cibles": documents_cibles,
}
except Exception as e:
logger.error(f"[VERIF] Erreur vérification: {e}")
return {"deja_transforme": False, "documents_cibles": []}
def peut_etre_transforme(numero_source, type_source, type_cible):
"""Version corrigée avec normalisation"""
type_source = _normaliser_type_document(type_source)
type_cible = _normaliser_type_document(type_cible)
logger.info(
f"[VERIF_TRANSFO] {numero_source} "
f"(type {type_source}) → type {type_cible}"
)
verif = verifier_si_deja_transforme_sql(numero_source, type_source)
docs_meme_type = [
d for d in verif["documents_cibles"] if d["type"] == type_cible
]
if docs_meme_type:
nums = [d["numero"] for d in docs_meme_type]
return {
"possible": False,
"raison": f"Document déjà transformé en {_get_type_libelle(type_cible)}",
"documents_existants": docs_meme_type,
"message_detaille": f"Document(s) existant(s): {', '.join(nums)}",
}
return {
"possible": True,
"raison": "Transformation possible",
"documents_existants": [],
}
def lire_erreurs_sage(obj, nom_obj=""):
erreurs = []
try:
if not hasattr(obj, "Errors") or obj.Errors is None:
return erreurs
nb_erreurs = 0
try:
nb_erreurs = obj.Errors.Count
except:
return erreurs
if nb_erreurs == 0:
return erreurs
for i in range(1, nb_erreurs + 1):
try:
err = None
try:
err = obj.Errors.Item(i)
except:
try:
err = obj.Errors(i)
except:
try:
err = obj.Errors.Item(i - 1)
except:
pass
if err is not None:
description = ""
field = ""
number = ""
for attr in ["Description", "Descr", "Message", "Text"]:
try:
val = getattr(err, attr, None)
if val:
description = str(val)
break
except:
pass
for attr in ["Field", "FieldName", "Champ", "Property"]:
try:
val = getattr(err, attr, None)
if val:
field = str(val)
break
except:
pass
for attr in ["Number", "Code", "ErrorCode", "Numero"]:
try:
val = getattr(err, attr, None)
if val is not None:
number = str(val)
break
except:
pass
if description or field or number:
erreurs.append(
{
"source": nom_obj,
"index": i,
"description": description or "Erreur inconnue",
"field": field or "?",
"number": number or "?",
}
)
except Exception as e:
logger.debug(f"Erreur lecture erreur {i}: {e}")
continue
except Exception as e:
logger.debug(f"Erreur globale lecture erreurs {nom_obj}: {e}")
return erreurs
__all__ = [
"_verifier_devis_non_transforme",
"verifier_si_deja_transforme_sql",
"peut_etre_transforme",
"lire_erreurs_sage"
]

5
utils/tiers/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from utils.tiers.tiers import (TiersListRequest,)
__all__ = [
"TiersListRequest",
]

View file

@ -0,0 +1,332 @@
import win32com.client
import logging
logger = logging.getLogger(__name__)
def _cast_client(persist_obj):
try:
obj = win32com.client.CastTo(persist_obj, "IBOClient3")
obj.Read()
return obj
except Exception as e:
logger.debug(f" _cast_client échoue: {e}")
return None
def _extraire_client(client_obj):
try:
try:
numero = getattr(client_obj, "CT_Num", "").strip()
if not numero:
logger.debug("Objet sans CT_Num, skip")
return None
except Exception as e:
logger.debug(f" Erreur lecture CT_Num: {e}")
return None
try:
intitule = getattr(client_obj, "CT_Intitule", "").strip()
if not intitule:
logger.debug(f"{numero} sans CT_Intitule")
except Exception as e:
logger.debug(f"Erreur CT_Intitule sur {numero}: {e}")
intitule = ""
data = {
"numero": numero,
"intitule": intitule,
}
try:
qualite_code = getattr(client_obj, "CT_Type", None)
qualite_map = {
0: "CLI", # Client
1: "FOU", # Fournisseur
2: "CLIFOU", # Client + Fournisseur
3: "SAL", # Salarié
4: "PRO", # Prospect
}
data["qualite"] = qualite_map.get(qualite_code, "CLI")
data["est_fournisseur"] = qualite_code in [1, 2]
except:
data["qualite"] = "CLI"
data["est_fournisseur"] = False
try:
data["est_prospect"] = getattr(client_obj, "CT_Prospect", 0) == 1
except:
data["est_prospect"] = False
if data["est_prospect"]:
data["type_tiers"] = "prospect"
elif data["est_fournisseur"] and data["qualite"] != "CLIFOU":
data["type_tiers"] = "fournisseur"
elif data["qualite"] == "CLIFOU":
data["type_tiers"] = "client_fournisseur"
else:
data["type_tiers"] = "client"
try:
sommeil = getattr(client_obj, "CT_Sommeil", 0)
data["est_actif"] = sommeil == 0
data["est_en_sommeil"] = sommeil == 1
except:
data["est_actif"] = True
data["est_en_sommeil"] = False
try:
forme_juridique = getattr(client_obj, "CT_FormeJuridique", "").strip()
data["forme_juridique"] = forme_juridique
data["est_entreprise"] = bool(forme_juridique)
data["est_particulier"] = not bool(forme_juridique)
except:
data["forme_juridique"] = ""
data["est_entreprise"] = False
data["est_particulier"] = True
try:
data["civilite"] = getattr(client_obj, "CT_Civilite", "").strip()
except:
data["civilite"] = ""
try:
data["nom"] = getattr(client_obj, "CT_Nom", "").strip()
except:
data["nom"] = ""
try:
data["prenom"] = getattr(client_obj, "CT_Prenom", "").strip()
except:
data["prenom"] = ""
if data.get("nom") or data.get("prenom"):
parts = []
if data.get("civilite"):
parts.append(data["civilite"])
if data.get("prenom"):
parts.append(data["prenom"])
if data.get("nom"):
parts.append(data["nom"])
data["nom_complet"] = " ".join(parts)
else:
data["nom_complet"] = ""
try:
data["contact"] = getattr(client_obj, "CT_Contact", "").strip()
except:
data["contact"] = ""
try:
adresse_obj = getattr(client_obj, "Adresse", None)
if adresse_obj:
try:
data["adresse"] = getattr(adresse_obj, "Adresse", "").strip()
except:
data["adresse"] = ""
try:
data["complement"] = getattr(
adresse_obj, "Complement", ""
).strip()
except:
data["complement"] = ""
try:
data["code_postal"] = getattr(
adresse_obj, "CodePostal", ""
).strip()
except:
data["code_postal"] = ""
try:
data["ville"] = getattr(adresse_obj, "Ville", "").strip()
except:
data["ville"] = ""
try:
data["region"] = getattr(adresse_obj, "Region", "").strip()
except:
data["region"] = ""
try:
data["pays"] = getattr(adresse_obj, "Pays", "").strip()
except:
data["pays"] = ""
else:
data["adresse"] = ""
data["complement"] = ""
data["code_postal"] = ""
data["ville"] = ""
data["region"] = ""
data["pays"] = ""
except Exception as e:
logger.debug(f"Erreur adresse sur {numero}: {e}")
data["adresse"] = ""
data["complement"] = ""
data["code_postal"] = ""
data["ville"] = ""
data["region"] = ""
data["pays"] = ""
try:
telecom = getattr(client_obj, "Telecom", None)
if telecom:
try:
data["telephone"] = getattr(telecom, "Telephone", "").strip()
except:
data["telephone"] = ""
try:
data["portable"] = getattr(telecom, "Portable", "").strip()
except:
data["portable"] = ""
try:
data["telecopie"] = getattr(telecom, "Telecopie", "").strip()
except:
data["telecopie"] = ""
try:
data["email"] = getattr(telecom, "EMail", "").strip()
except:
data["email"] = ""
try:
site = (
getattr(telecom, "Site", None)
or getattr(telecom, "Web", None)
or getattr(telecom, "SiteWeb", "")
)
data["site_web"] = str(site).strip() if site else ""
except:
data["site_web"] = ""
else:
data["telephone"] = ""
data["portable"] = ""
data["telecopie"] = ""
data["email"] = ""
data["site_web"] = ""
except Exception as e:
logger.debug(f"Erreur telecom sur {numero}: {e}")
data["telephone"] = ""
data["portable"] = ""
data["telecopie"] = ""
data["email"] = ""
data["site_web"] = ""
try:
data["siret"] = getattr(client_obj, "CT_Siret", "").strip()
except:
data["siret"] = ""
try:
data["siren"] = getattr(client_obj, "CT_Siren", "").strip()
except:
data["siren"] = ""
try:
data["tva_intra"] = getattr(client_obj, "CT_Identifiant", "").strip()
except:
data["tva_intra"] = ""
try:
data["code_naf"] = (
getattr(client_obj, "CT_CodeNAF", "").strip()
or getattr(client_obj, "CT_APE", "").strip()
)
except:
data["code_naf"] = ""
try:
data["secteur"] = getattr(client_obj, "CT_Secteur", "").strip()
except:
data["secteur"] = ""
try:
effectif = getattr(client_obj, "CT_Effectif", None)
data["effectif"] = int(effectif) if effectif is not None else None
except:
data["effectif"] = None
try:
ca = getattr(client_obj, "CT_ChiffreAffaire", None)
data["ca_annuel"] = float(ca) if ca is not None else None
except:
data["ca_annuel"] = None
try:
data["commercial_code"] = getattr(client_obj, "CO_No", "").strip()
except:
try:
data["commercial_code"] = getattr(
client_obj, "CT_Commercial", ""
).strip()
except:
data["commercial_code"] = ""
if data.get("commercial_code"):
try:
commercial_obj = getattr(client_obj, "Commercial", None)
if commercial_obj:
commercial_obj.Read()
data["commercial_nom"] = getattr(
commercial_obj, "CO_Nom", ""
).strip()
else:
data["commercial_nom"] = ""
except:
data["commercial_nom"] = ""
else:
data["commercial_nom"] = ""
try:
data["categorie_tarifaire"] = getattr(client_obj, "N_CatTarif", None)
except:
data["categorie_tarifaire"] = None
try:
data["categorie_comptable"] = getattr(client_obj, "N_CatCompta", None)
except:
data["categorie_comptable"] = None
try:
data["encours_autorise"] = float(getattr(client_obj, "CT_Encours", 0.0))
except:
data["encours_autorise"] = 0.0
try:
data["assurance_credit"] = float(
getattr(client_obj, "CT_Assurance", 0.0)
)
except:
data["assurance_credit"] = 0.0
try:
data["compte_general"] = getattr(client_obj, "CG_Num", "").strip()
except:
data["compte_general"] = ""
try:
date_creation = getattr(client_obj, "CT_DateCreate", None)
data["date_creation"] = str(date_creation) if date_creation else ""
except:
data["date_creation"] = ""
try:
date_modif = getattr(client_obj, "CT_DateModif", None)
data["date_modification"] = str(date_modif) if date_modif else ""
except:
data["date_modification"] = ""
return data
except Exception as e:
logger.error(f" ERREUR GLOBALE _extraire_client: {e}", exc_info=True)
return None
__all__ = [
"_extraire_client",
"_cast_client"
]

View file

@ -0,0 +1,37 @@
from typing import Dict, List, Optional, Any
from utils.functions.items_to_dict import _row_to_contact_dict
import logging
logger = logging.getLogger(__name__)
def obtenir_contact(self, numero: str, contact_numero: int) -> Optional[Dict]:
"""
Récupère un contact spécifique par son CT_No
"""
try:
with self._get_sql_connection() as conn:
cursor = conn.cursor()
query = """
SELECT
CT_Num, CT_No, N_Contact,
CT_Civilite, CT_Nom, CT_Prenom, CT_Fonction,
N_Service,
CT_Telephone, CT_TelPortable, CT_Telecopie, CT_EMail,
CT_Facebook, CT_LinkedIn, CT_Skype
FROM F_CONTACTT
WHERE CT_Num = ? AND CT_No = ?
"""
cursor.execute(query, [numero, contact_numero])
row = cursor.fetchone()
if not row:
return None
return _row_to_contact_dict(row)
except Exception as e:
logger.error(f"Erreur obtention contact: {e}")
raise RuntimeError(f"Erreur lecture contact: {str(e)}")

13
utils/tiers/tiers.py Normal file
View file

@ -0,0 +1,13 @@
from pydantic import BaseModel, Field, validator, EmailStr, field_validator
from typing import Optional, List, Dict
class TiersListRequest(BaseModel):
"""Requête de listage des tiers"""
type_tiers: Optional[str] = Field(
None,
description="Type: client, fournisseur, prospect, all"
)
filtre: str = Field(
"",
description="Filtre sur code ou intitulé"
)

View file

@ -0,0 +1,55 @@
def _build_tiers_select_query() -> str:
"""Construit la requête SELECT pour les tiers (factorisation)"""
return """
SELECT
-- IDENTIFICATION (9)
CT_Num, CT_Intitule, CT_Type, CT_Qualite,
CT_Classement, CT_Raccourci, CT_Siret, CT_Identifiant,
CT_Ape,
-- ADRESSE (7)
CT_Contact, CT_Adresse, CT_Complement,
CT_CodePostal, CT_Ville, CT_CodeRegion, CT_Pays,
-- TELECOM (6)
CT_Telephone, CT_Telecopie, CT_EMail, CT_Site,
CT_Facebook, CT_LinkedIn,
-- TAUX (4)
CT_Taux01, CT_Taux02, CT_Taux03, CT_Taux04,
-- STATISTIQUES (10)
CT_Statistique01, CT_Statistique02, CT_Statistique03,
CT_Statistique04, CT_Statistique05, CT_Statistique06,
CT_Statistique07, CT_Statistique08, CT_Statistique09,
CT_Statistique10,
-- COMMERCIAL (4)
CT_Encours, CT_Assurance, CT_Langue, CO_No,
-- FACTURATION (11)
CT_Lettrage, CT_Sommeil, CT_Facture, CT_Prospect,
CT_BLFact, CT_Saut, CT_ValidEch, CT_ControlEnc,
CT_NotRappel, CT_NotPenal, CT_BonAPayer,
-- LOGISTIQUE (4)
CT_PrioriteLivr, CT_LivrPartielle,
CT_DelaiTransport, CT_DelaiAppro,
-- COMMENTAIRE (1)
CT_Commentaire,
-- ANALYTIQUE (1)
CA_Num,
-- ORGANISATION / SURVEILLANCE (10)
MR_No, CT_Surveillance, CT_Coface,
CT_SvFormeJuri, CT_SvEffectif, CT_SvRegul,
CT_SvCotation, CT_SvObjetMaj, CT_SvCA, CT_SvResultat,
-- COMPTE GENERAL ET CATEGORIES (3)
CG_NumPrinc, N_CatTarif, N_CatCompta
"""
__all__ = [
"_build_tiers_select_query"
]