diff --git a/main.py b/main.py index 705e81c..a5376c0 100644 --- a/main.py +++ b/main.py @@ -14,9 +14,6 @@ from sage_connector import SageConnector import pyodbc import os -# ===================================================== -# LOGGING -# ===================================================== logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", @@ -25,9 +22,6 @@ logging.basicConfig( logger = logging.getLogger(__name__) -# ===================================================== -# ENUMS -# ===================================================== class TypeDocument(int, Enum): DEVIS = 0 BON_LIVRAISON = 1 @@ -37,9 +31,6 @@ class TypeDocument(int, Enum): FACTURE = 5 -# ===================================================== -# MODÈLES -# ===================================================== class DocumentGetRequest(BaseModel): @@ -866,9 +857,6 @@ class FamilleCreate(BaseModel): ) -# ===================================================== -# SÉCURITÉ -# ===================================================== def verify_token(x_sage_token: str = Header(...)): """Vérification du token d'authentification""" if x_sage_token != settings.sage_gateway_token: @@ -877,9 +865,6 @@ def verify_token(x_sage_token: str = Header(...)): return True -# ===================================================== -# APPLICATION -# ===================================================== app = FastAPI( title="Sage Gateway - Windows Server", version="1.0.0", @@ -897,16 +882,12 @@ app.add_middleware( sage: Optional[SageConnector] = None -# ===================================================== -# LIFECYCLE -# ===================================================== @app.on_event("startup") def startup(): global sage logger.info("🚀 Démarrage Sage Gateway Windows...") - # Validation config try: validate_settings() logger.info(" Configuration validée") @@ -914,7 +895,6 @@ def startup(): logger.error(f" Configuration invalide: {e}") raise - # Connexion Sage sage = SageConnector( settings.chemin_base, settings.utilisateur, settings.mot_de_passe ) @@ -932,9 +912,6 @@ def shutdown(): logger.info("👋 Sage Gateway arrêté") -# ===================================================== -# ENDPOINTS - SYSTÈME -# ===================================================== @app.get("/health") def health(): """Health check""" @@ -946,9 +923,6 @@ def health(): } -# ===================================================== -# ENDPOINTS - CLIENTS -# ===================================================== @app.post("/sage/clients/list", dependencies=[Depends(verify_token)]) def clients_list(req: FiltreRequest): """Liste des clients avec filtre optionnel""" @@ -989,27 +963,20 @@ def client_get(req: CodeRequest): raise HTTPException(500, str(e)) -# DANS main.py @app.post("/sage/clients/create", dependencies=[Depends(verify_token)]) def create_client_endpoint(req: ClientCreateRequest): """Création d'un client dans Sage""" try: - # L'appel au connecteur est fait ici resultat = sage.creer_client(req.dict()) return {"success": True, "data": resultat} except ValueError as e: logger.warning(f"Erreur métier création client: {e}") - # Erreur métier (ex: doublon) -> 400 Bad Request raise HTTPException(400, str(e)) except Exception as e: logger.error(f"Erreur technique création client: {e}") - # Erreur technique (ex: COM) -> 500 Internal Server Error raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - ARTICLES -# ===================================================== @app.post("/sage/articles/list", dependencies=[Depends(verify_token)]) def articles_list(req: FiltreRequest): """Liste des articles avec filtre optionnel""" @@ -1036,14 +1003,10 @@ def article_get(req: CodeRequest): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - DEVIS -# ===================================================== @app.post("/sage/devis/create", dependencies=[Depends(verify_token)]) def creer_devis(req: DevisRequest): """Création d'un devis""" try: - # Transformer en format attendu par sage_connector devis_data = { "client": {"code": req.client_id, "intitule": ""}, "date_devis": req.date_devis or date.today(), @@ -1062,7 +1025,6 @@ def creer_devis(req: DevisRequest): @app.post("/sage/devis/get", dependencies=[Depends(verify_token)]) def lire_devis(req: CodeRequest): try: - # Lecture complète depuis Sage (avec lignes) devis = sage.lire_devis(req.code) if not devis: raise HTTPException(404, f"Devis {req.code} non trouvé") @@ -1081,14 +1043,11 @@ def devis_list( filtre: str = Query("", description="Filtre texte (numero, client)"), ): try: - # Récupération depuis le cache (instantané) devis_list = sage.lister_tous_devis_cache(filtre) - # Filtrer par statut si demandé if statut is not None: devis_list = [d for d in devis_list if d.get("statut") == statut] - # Limiter le nombre de résultats devis_list = devis_list[:limit] logger.info(f" {len(devis_list)} devis retournés depuis le cache") @@ -1135,9 +1094,6 @@ def changer_statut_devis_endpoint(numero: str, nouveau_statut: int): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - DOCUMENTS -# ===================================================== @app.post("/sage/documents/get", dependencies=[Depends(verify_token)]) def lire_document(req: DocumentGetRequest): """Lecture d'un document (commande, facture, etc.)""" @@ -1165,7 +1121,6 @@ def transformer_document( f"(type {type_source}) → type {type_cible}" ) - # Matrice des transformations valides pour VOTRE Sage transformations_valides = { (0, 10), # Devis → Commande (10, 30), # Commande → Bon de livraison @@ -1184,7 +1139,6 @@ def transformer_document( f"Transformations valides: {transformations_valides}", ) - # Appel au connecteur Sage resultat = sage.transformer_document(numero_source, type_source, type_cible) logger.info( @@ -1227,9 +1181,6 @@ def maj_derniere_relance(doc_id: str, type_doc: int): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - CONTACTS -# ===================================================== @app.post("/sage/contact/read", dependencies=[Depends(verify_token)]) def contact_read(req: CodeRequest): """Lecture du contact principal d'un client""" @@ -1314,9 +1265,6 @@ def lire_remise_max_client(code: str): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - ADMIN -# ===================================================== @app.post("/sage/cache/refresh", dependencies=[Depends(verify_token)]) def refresh_cache(): """Force le rafraîchissement du cache""" @@ -1342,9 +1290,6 @@ def cache_info_get(): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - PROSPECTS -# ===================================================== @app.post("/sage/prospects/list", dependencies=[Depends(verify_token)]) def prospects_list(req: FiltreRequest): try: @@ -1369,13 +1314,9 @@ def prospect_get(req: CodeRequest): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - FOURNISSEURS -# ===================================================== @app.post("/sage/fournisseurs/list", dependencies=[Depends(verify_token)]) def fournisseurs_list(req: FiltreRequest): try: - # Utiliser le cache au lieu de la lecture directe fournisseurs = sage.lister_tous_fournisseurs_cache(req.filtre) logger.info(f" {len(fournisseurs)} fournisseurs retournés depuis le cache") @@ -1390,7 +1331,6 @@ def fournisseurs_list(req: FiltreRequest): @app.post("/sage/fournisseurs/create", dependencies=[Depends(verify_token)]) def create_fournisseur_endpoint(req: FournisseurCreateRequest): try: - # Appel au connecteur Sage resultat = sage.creer_fournisseur(req.dict()) logger.info(f" Fournisseur créé: {resultat.get('numero')}") @@ -1398,12 +1338,10 @@ def create_fournisseur_endpoint(req: FournisseurCreateRequest): return {"success": True, "data": resultat} except ValueError as e: - # Erreur métier (ex: doublon) logger.warning(f"⚠️ Erreur métier création fournisseur: {e}") raise HTTPException(400, str(e)) except Exception as e: - # Erreur technique (ex: COM) logger.error(f" Erreur technique création fournisseur: {e}") raise HTTPException(500, str(e)) @@ -1440,9 +1378,6 @@ def fournisseur_get(req: CodeRequest): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - AVOIRS -# ===================================================== @app.post("/sage/avoirs/list", dependencies=[Depends(verify_token)]) def avoirs_list( limit: int = Query(100, description="Nombre max d'avoirs"), @@ -1450,14 +1385,11 @@ def avoirs_list( filtre: str = Query("", description="Filtre texte"), ): try: - # Récupération depuis le cache (instantané) avoirs = sage.lister_tous_avoirs_cache(filtre) - # Filtrer par statut si demandé if statut is not None: avoirs = [a for a in avoirs if a.get("statut") == statut] - # Limiter le nombre de résultats avoirs = avoirs[:limit] logger.info(f" {len(avoirs)} avoirs retournés depuis le cache") @@ -1472,14 +1404,12 @@ def avoirs_list( @app.post("/sage/avoirs/get", dependencies=[Depends(verify_token)]) def avoir_get(req: CodeRequest): try: - # Essayer le cache d'abord avoir = sage.lire_avoir_cache(req.code) if avoir: logger.info(f" Avoir {req.code} retourné depuis le cache") return {"success": True, "data": avoir, "source": "cache"} - # Pas dans le cache → Lecture directe depuis Sage logger.info(f"⚠️ Avoir {req.code} absent du cache, lecture depuis Sage...") avoir = sage.lire_avoir(req.code) @@ -1495,9 +1425,6 @@ def avoir_get(req: CodeRequest): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - LIVRAISONS -# ===================================================== @app.post("/sage/livraisons/list", dependencies=[Depends(verify_token)]) def livraisons_list( limit: int = Query(100, description="Nombre max de livraisons"), @@ -1505,14 +1432,11 @@ def livraisons_list( filtre: str = Query("", description="Filtre texte"), ): try: - # Récupération depuis le cache (instantané) livraisons = sage.lister_toutes_livraisons_cache(filtre) - # Filtrer par statut si demandé if statut is not None: livraisons = [l for l in livraisons if l.get("statut") == statut] - # Limiter le nombre de résultats livraisons = livraisons[:limit] logger.info(f" {len(livraisons)} livraisons retournées depuis le cache") @@ -1527,14 +1451,12 @@ def livraisons_list( @app.post("/sage/livraisons/get", dependencies=[Depends(verify_token)]) def livraison_get(req: CodeRequest): try: - # Essayer le cache d'abord livraison = sage.lire_livraison_cache(req.code) if livraison: logger.info(f" Livraison {req.code} retournée depuis le cache") return {"success": True, "data": livraison, "source": "cache"} - # Pas dans le cache → Lecture directe depuis Sage logger.info(f"⚠️ Livraison {req.code} absente du cache, lecture depuis Sage...") livraison = sage.lire_livraison(req.code) @@ -1564,15 +1486,11 @@ def modifier_devis_endpoint(req: DevisUpdateGatewayRequest): raise HTTPException(500, str(e)) -# ===================================================== -# ENDPOINTS - CRÉATION ET MODIFICATION COMMANDES -# ===================================================== @app.post("/sage/commandes/create", dependencies=[Depends(verify_token)]) def creer_commande_endpoint(req: CommandeCreateRequest): try: - # Transformer en format attendu par sage_connector commande_data = { "client": {"code": req.client_id, "intitule": ""}, "date_commande": req.date_commande or date.today(), @@ -1609,12 +1527,10 @@ def modifier_commande_endpoint(req: CommandeUpdateGatewayRequest): @app.post("/sage/livraisons/create", dependencies=[Depends(verify_token)]) def creer_livraison_endpoint(req: LivraisonCreateGatewayRequest): try: - # Vérifier que le client existe client = sage.lire_client(req.client_id) if not client: raise HTTPException(404, f"Client {req.client_id} introuvable") - # Préparer les données pour le connecteur livraison_data = { "client": {"code": req.client_id, "intitule": ""}, "date_livraison": req.date_livraison or date.today(), @@ -1651,12 +1567,10 @@ def modifier_livraison_endpoint(req: LivraisonUpdateGatewayRequest): @app.post("/sage/avoirs/create", dependencies=[Depends(verify_token)]) def creer_avoir_endpoint(req: AvoirCreateGatewayRequest): try: - # Vérifier que le client existe client = sage.lire_client(req.client_id) if not client: raise HTTPException(404, f"Client {req.client_id} introuvable") - # Préparer les données pour le connecteur avoir_data = { "client": {"code": req.client_id, "intitule": ""}, "date_avoir": req.date_avoir or date.today(), @@ -1696,12 +1610,10 @@ def modifier_avoir_endpoint(req: AvoirUpdateGatewayRequest): @app.post("/sage/factures/create", dependencies=[Depends(verify_token)]) def creer_facture_endpoint(req: FactureCreateGatewayRequest): try: - # Vérifier que le client existe client = sage.lire_client(req.client_id) if not client: raise HTTPException(404, f"Client {req.client_id} introuvable") - # Préparer les données pour le connecteur facture_data = { "client": {"code": req.client_id, "intitule": ""}, "date_facture": req.date_facture or date.today(), @@ -1788,9 +1700,6 @@ async def creer_famille(famille: FamilleCreate): raise HTTPException(status_code=500, detail=f"Erreur serveur : {str(e)}") -# ======================================== -# ROUTE GET : Lister toutes les familles -# ======================================== @app.get( @@ -1817,9 +1726,6 @@ async def lister_familles(filtre: str = ""): raise HTTPException(status_code=500, detail=f"Erreur serveur : {str(e)}") -# ======================================== -# ROUTE GET : Lire UNE famille par son code -# ======================================== @app.get( @@ -1844,9 +1750,6 @@ async def lire_famille(code: str): raise HTTPException(status_code=500, detail=f"Erreur serveur : {str(e)}") -# ======================================== -# ROUTE GET : Statistiques sur les familles -# ======================================== @app.get("/sage/familles/stats", response_model=dict) @@ -1854,13 +1757,11 @@ async def stats_familles(): try: familles = sage.lister_toutes_familles() - # Calculer les stats nb_total = len(familles) nb_detail = sum(1 for f in familles if f["type"] == 0) nb_total_type = sum(1 for f in familles if f["type"] == 1) nb_statistiques = sum(1 for f in familles if f["est_statistique"]) - # Top 10 familles par intitulé (alphabétique) top_familles = sorted(familles, key=lambda f: f["intitule"])[:10] return { @@ -1894,13 +1795,11 @@ def generer_pdf_document(req: PDFGenerationRequest): try: logger.info(f" Génération PDF: {req.doc_id} (type={req.type_doc})") - # Appel au connecteur Sage pdf_bytes = sage.generer_pdf_document(req.doc_id, req.type_doc) if not pdf_bytes: raise HTTPException(500, "PDF vide généré") - # Encoder en base64 pour le transport JSON import base64 pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8") @@ -1948,7 +1847,6 @@ def lister_depots(): depot = win32com.client.CastTo(persist, "IBODepot3") depot.Read() - # Lire les attributs identifiés code = "" numero = 0 intitule = "" @@ -1963,7 +1861,6 @@ def lister_depots(): try: numero = int(getattr(depot, "Compteur", 0)) except: - # Fallback : convertir DE_Code en int try: numero = int(code) except: @@ -1984,13 +1881,11 @@ def lister_depots(): except: pass - # Validation : un dépôt doit avoir au moins un code if not code: logger.warning(f" ⚠️ Dépôt à l'index {index} sans code") index += 1 continue - # Récupérer adresse (objet COM complexe) adresse_complete = "" try: adresse_obj = getattr(depot, "Adresse", None) @@ -2005,7 +1900,6 @@ def lister_depots(): except: pass - # Déterminer si principal (premier non exclu = principal) principal = False if not exclu and len(depots) == 0: principal = True @@ -2030,7 +1924,6 @@ def lister_depots(): index += 1 except Exception as e: - # ⚠️ CORRECTION : "Accès refusé" = fin de liste dans cette version Sage error_msg = str(e) if "Accès refusé" in error_msg or "-1073741819" in error_msg: logger.info( @@ -2081,7 +1974,6 @@ def creer_entree_stock(req: EntreeStockRequest): f"📦 [ENTREE STOCK] Création bon d'entrée : {len(req.lignes)} ligne(s)" ) - # Préparer les données pour le connecteur entree_data = { "date_mouvement": req.date_entree or date.today(), "reference": req.reference, @@ -2090,7 +1982,6 @@ def creer_entree_stock(req: EntreeStockRequest): "commentaire": req.commentaire, } - # Appel au connecteur resultat = sage.creer_entree_stock(entree_data) logger.info(f" [ENTREE STOCK] Créé : {resultat.get('numero')}") @@ -2113,7 +2004,6 @@ def creer_sortie_stock(req: SortieStockRequest): f"📤 [SORTIE STOCK] Création bon de sortie : {len(req.lignes)} ligne(s)" ) - # Préparer les données pour le connecteur sortie_data = { "date_mouvement": req.date_sortie or date.today(), "reference": req.reference, @@ -2122,7 +2012,6 @@ def creer_sortie_stock(req: SortieStockRequest): "commentaire": req.commentaire, } - # Appel au connecteur resultat = sage.creer_sortie_stock(sortie_data) logger.info(f" [SORTIE STOCK] Créé : {resultat.get('numero')}") @@ -2155,9 +2044,6 @@ def lire_mouvement_stock(numero: str): raise HTTPException(500, str(e)) -# ===================================================== -# LANCEMENT -# ===================================================== if __name__ == "__main__": uvicorn.run( "main:app", diff --git a/sage_connector.py b/sage_connector.py index f21b8a8..a8435f9 100644 --- a/sage_connector.py +++ b/sage_connector.py @@ -10334,84 +10334,345 @@ class SageConnector: raise RuntimeError(f"Erreur technique Sage : {error_message}") def lister_toutes_familles( - self, filtre: str = "", inclure_totaux: bool = False - ) -> List[Dict]: - try: - with self._get_sql_connection() as conn: - cursor = conn.cursor() + self, filtre: str = "", inclure_totaux: bool = True + ) -> List[Dict]: + try: + with self._get_sql_connection() as conn: + cursor = conn.cursor() - logger.info("[SQL] Détection des colonnes de F_FAMILLE...") + logger.info("[SQL] Détection des colonnes de F_FAMILLE...") - cursor.execute("SELECT TOP 1 * FROM F_FAMILLE") - colonnes_disponibles = [column[0] for column in cursor.description] + cursor.execute("SELECT TOP 1 * FROM F_FAMILLE") + colonnes_disponibles = [column[0] for column in cursor.description] - logger.info(f"[SQL] Colonnes trouvées : {len(colonnes_disponibles)}") + logger.info(f"[SQL] Colonnes trouvées : {len(colonnes_disponibles)}") - colonnes_souhaitees = [ - "FA_CodeFamille", - "FA_Intitule", - "FA_Type", - "FA_UniteVen", - "FA_Coef", - "FA_Central", - "FA_Nature", - "CG_NumAch", - "CG_NumVte", - "FA_Stat", - "FA_Raccourci", - ] + # Colonnes organisées par catégorie + colonnes_souhaitees = [ + # Identification + "FA_CodeFamille", + "FA_Intitule", + "FA_Type", + + # Vente et stock + "FA_UniteVen", + "FA_Coef", + "FA_SuiviStock", + "FA_Garantie", + "FA_UnitePoids", + "FA_Delai", + "FA_NbColis", + + # Comptabilité + "CG_NumAch", + "CG_NumVte", + "FA_CodeFiscal", + "FA_Escompte", + + # Organisation et classification + "FA_Central", + "FA_Nature", + "CL_No1", + "CL_No2", + "CL_No3", + "CL_No4", + + # Statistiques + "FA_Stat01", + "FA_Stat02", + "FA_Stat03", + "FA_Stat04", + "FA_Stat05", + "FA_HorsStat", + + # Paramètres commerciaux + "FA_Pays", + "FA_VteDebit", + "FA_NotImp", + "FA_Contremarque", + "FA_FactPoids", + "FA_FactForfait", + "FA_Publie", + + # Références et codes + "FA_RacineRef", + "FA_RacineCB", + "FA_Raccourci", + + # Gestion + "FA_SousTraitance", + "FA_Fictif", + "FA_Criticite" + ] - colonnes_a_lire = [ - col for col in colonnes_souhaitees if col in colonnes_disponibles - ] + colonnes_a_lire = [ + col for col in colonnes_souhaitees if col in colonnes_disponibles + ] - if not colonnes_a_lire: - colonnes_a_lire = colonnes_disponibles + if not colonnes_a_lire: + colonnes_a_lire = colonnes_disponibles - logger.info(f"[SQL] Colonnes sélectionnées : {len(colonnes_a_lire)}") + logger.info(f"[SQL] Colonnes sélectionnées : {len(colonnes_a_lire)}") - colonnes_str = ", ".join(colonnes_a_lire) + colonnes_str = ", ".join([f"f.{col}" for col in colonnes_a_lire]) - query = f""" - SELECT {colonnes_str} - FROM F_FAMILLE - WHERE 1=1 - """ + # Requête avec LEFT JOIN pour compter les articles par famille + query = f""" + SELECT {colonnes_str}, + ISNULL(COUNT(a.AR_Ref), 0) as nb_articles + FROM F_FAMILLE f + LEFT JOIN F_ARTICLE a ON f.FA_CodeFamille = a.FA_CodeFamille + WHERE 1=1 + """ - params = [] + params = [] - if "FA_Type" in colonnes_disponibles: - if not inclure_totaux: - query += " AND FA_Type = 0" # Seulement Détail + # Filtrage par FA_Type (si demandé) + if "FA_Type" in colonnes_disponibles and not inclure_totaux: + query += " AND f.FA_Type = 0" # Seulement Détail logger.info("[SQL] Filtre : FA_Type = 0 (Détail uniquement)") else: logger.info("[SQL] Filtre : TOUS les types (Détail + Total)") - if filtre: - conditions_filtre = [] + # Filtrage par texte + if filtre: + conditions_filtre = [] - if "FA_CodeFamille" in colonnes_a_lire: - conditions_filtre.append("FA_CodeFamille LIKE ?") - params.append(f"%{filtre}%") + if "FA_CodeFamille" in colonnes_a_lire: + conditions_filtre.append("f.FA_CodeFamille LIKE ?") + params.append(f"%{filtre}%") + if "FA_Intitule" in colonnes_a_lire: + conditions_filtre.append("f.FA_Intitule LIKE ?") + params.append(f"%{filtre}%") + + if conditions_filtre: + query += " AND (" + " OR ".join(conditions_filtre) + ")" + + # GROUP BY pour le COUNT + query += f" GROUP BY {colonnes_str}" + + # ORDER BY if "FA_Intitule" in colonnes_a_lire: - conditions_filtre.append("FA_Intitule LIKE ?") - params.append(f"%{filtre}%") + query += " ORDER BY f.FA_Intitule" + elif "FA_CodeFamille" in colonnes_a_lire: + query += " ORDER BY f.FA_CodeFamille" - if conditions_filtre: - query += " AND (" + " OR ".join(conditions_filtre) + ")" + cursor.execute(query, params) + rows = cursor.fetchall() - if "FA_Intitule" in colonnes_a_lire: - query += " ORDER BY FA_Intitule" - elif "FA_CodeFamille" in colonnes_a_lire: - query += " ORDER BY FA_CodeFamille" + familles = [] - cursor.execute(query, params) - rows = cursor.fetchall() + for row in rows: + famille = {} - familles = [] + # Récupération des colonnes (sauf la dernière qui est nb_articles) + for idx, colonne in enumerate(colonnes_a_lire): + valeur = row[idx] - for row in rows: + if isinstance(valeur, str): + valeur = valeur.strip() + + famille[colonne] = valeur + + # Récupération du nb_articles (dernière colonne) + famille["nb_articles"] = row[-1] + + # Champs de base (compatibilité) + if "FA_CodeFamille" in famille: + famille["code"] = famille["FA_CodeFamille"] + + if "FA_Intitule" in famille: + famille["intitule"] = famille["FA_Intitule"] + + if "FA_Type" in famille: + type_val = famille["FA_Type"] + famille["type"] = type_val + famille["type_libelle"] = "Total" if type_val == 1 else "Détail" + famille["est_total"] = type_val == 1 + else: + famille["type"] = 0 + famille["type_libelle"] = "Détail" + famille["est_total"] = False + + # Vente et unités + famille["unite_vente"] = str(famille.get("FA_UniteVen", "")) + famille["unite_poids"] = str(famille.get("FA_UnitePoids", "")) + famille["coef"] = ( + float(famille.get("FA_Coef", 0.0)) + if famille.get("FA_Coef") is not None + else 0.0 + ) + + # Stock et logistique + famille["suivi_stock"] = bool(famille.get("FA_SuiviStock", 0)) + famille["garantie"] = int(famille.get("FA_Garantie", 0)) + famille["delai"] = int(famille.get("FA_Delai", 0)) + famille["nb_colis"] = int(famille.get("FA_NbColis", 0)) + + # Comptabilité + famille["compte_achat"] = famille.get("CG_NumAch", "") + famille["compte_vente"] = famille.get("CG_NumVte", "") + famille["code_fiscal"] = famille.get("FA_CodeFiscal", "") + famille["escompte"] = bool(famille.get("FA_Escompte", 0)) + + # Organisation + famille["est_centrale"] = bool(famille.get("FA_Central", 0)) + famille["nature"] = famille.get("FA_Nature", 0) + famille["pays"] = famille.get("FA_Pays", "") + + # Classifications + famille["categorie_1"] = famille.get("CL_No1", 0) + famille["categorie_2"] = famille.get("CL_No2", 0) + famille["categorie_3"] = famille.get("CL_No3", 0) + famille["categorie_4"] = famille.get("CL_No4", 0) + + # Statistiques + famille["stat_01"] = famille.get("FA_Stat01", "") + famille["stat_02"] = famille.get("FA_Stat02", "") + famille["stat_03"] = famille.get("FA_Stat03", "") + famille["stat_04"] = famille.get("FA_Stat04", "") + famille["stat_05"] = famille.get("FA_Stat05", "") + famille["hors_statistique"] = bool(famille.get("FA_HorsStat", 0)) + + # Paramètres commerciaux + famille["vente_debit"] = bool(famille.get("FA_VteDebit", 0)) + famille["non_imprimable"] = bool(famille.get("FA_NotImp", 0)) + famille["contremarque"] = bool(famille.get("FA_Contremarque", 0)) + famille["fact_poids"] = bool(famille.get("FA_FactPoids", 0)) + famille["fact_forfait"] = bool(famille.get("FA_FactForfait", 0)) + famille["publie"] = bool(famille.get("FA_Publie", 0)) + + # Références + famille["racine_reference"] = famille.get("FA_RacineRef", "") + famille["racine_code_barre"] = famille.get("FA_RacineCB", "") + famille["raccourci"] = famille.get("FA_Raccourci", "") + + # Gestion + famille["sous_traitance"] = bool(famille.get("FA_SousTraitance", 0)) + famille["fictif"] = bool(famille.get("FA_Fictif", 0)) + famille["criticite"] = int(famille.get("FA_Criticite", 0)) + + familles.append(famille) + + type_msg = "DÉTAIL uniquement" if not inclure_totaux else "TOUS types" + logger.info(f"SQL: {len(familles)} familles chargées ({type_msg})") + + return familles + + except Exception as e: + logger.error(f"Erreur SQL familles: {e}", exc_info=True) + raise RuntimeError(f"Erreur lecture familles: {str(e)}") + + + def lire_famille(self, code: str) -> Dict: + """ + Lit une seule famille - même structure que lister_toutes_familles + + Args: + code: Code de la famille à lire + + Returns: + Dict avec la structure identique à lister_toutes_familles + """ + try: + with self._get_sql_connection() as conn: + cursor = conn.cursor() + + logger.info(f"[SQL] Lecture famille : {code}") + + # Détection des colonnes disponibles + cursor.execute("SELECT TOP 1 * FROM F_FAMILLE") + colonnes_disponibles = [column[0] for column in cursor.description] + + logger.info(f"[SQL] Colonnes trouvées : {len(colonnes_disponibles)}") + + # Colonnes organisées par catégorie (IDENTIQUE à lister_toutes_familles) + colonnes_souhaitees = [ + # Identification + "FA_CodeFamille", + "FA_Intitule", + "FA_Type", + + # Vente et stock + "FA_UniteVen", + "FA_Coef", + "FA_SuiviStock", + "FA_Garantie", + "FA_UnitePoids", + "FA_Delai", + "FA_NbColis", + + # Comptabilité + "CG_NumAch", + "CG_NumVte", + "FA_CodeFiscal", + "FA_Escompte", + + # Organisation et classification + "FA_Central", + "FA_Nature", + "CL_No1", + "CL_No2", + "CL_No3", + "CL_No4", + + # Statistiques + "FA_Stat01", + "FA_Stat02", + "FA_Stat03", + "FA_Stat04", + "FA_Stat05", + "FA_HorsStat", + + # Paramètres commerciaux + "FA_Pays", + "FA_VteDebit", + "FA_NotImp", + "FA_Contremarque", + "FA_FactPoids", + "FA_FactForfait", + "FA_Publie", + + # Références et codes + "FA_RacineRef", + "FA_RacineCB", + "FA_Raccourci", + + # Gestion + "FA_SousTraitance", + "FA_Fictif", + "FA_Criticite" + ] + + colonnes_a_lire = [ + col for col in colonnes_souhaitees if col in colonnes_disponibles + ] + + if not colonnes_a_lire: + colonnes_a_lire = colonnes_disponibles + + logger.info(f"[SQL] Colonnes sélectionnées : {len(colonnes_a_lire)}") + + colonnes_str = ", ".join([f"f.{col}" for col in colonnes_a_lire]) + + # Requête avec LEFT JOIN pour compter les articles (IDENTIQUE à lister_toutes_familles) + query = f""" + SELECT {colonnes_str}, + ISNULL(COUNT(a.AR_Ref), 0) as nb_articles + FROM F_FAMILLE f + LEFT JOIN F_ARTICLE a ON f.FA_CodeFamille = a.FA_CodeFamille + WHERE UPPER(f.FA_CodeFamille) = ? + GROUP BY {colonnes_str} + """ + + cursor.execute(query, (code.upper().strip(),)) + row = cursor.fetchone() + + if not row: + raise ValueError(f"Famille '{code}' introuvable dans Sage") + + # Construction du dictionnaire (IDENTIQUE à lister_toutes_familles) famille = {} for idx, colonne in enumerate(colonnes_a_lire): @@ -10422,6 +10683,10 @@ class SageConnector: famille[colonne] = valeur + # Récupération du nb_articles (dernière colonne) + famille["nb_articles"] = row[-1] + + # Champs de base (compatibilité) if "FA_CodeFamille" in famille: famille["code"] = famille["FA_CodeFamille"] @@ -10438,308 +10703,75 @@ class SageConnector: famille["type_libelle"] = "Détail" famille["est_total"] = False + # Vente et unités famille["unite_vente"] = str(famille.get("FA_UniteVen", "")) + famille["unite_poids"] = str(famille.get("FA_UnitePoids", "")) famille["coef"] = ( float(famille.get("FA_Coef", 0.0)) if famille.get("FA_Coef") is not None else 0.0 ) + + # Stock et logistique + famille["suivi_stock"] = bool(famille.get("FA_SuiviStock", 0)) + famille["garantie"] = int(famille.get("FA_Garantie", 0)) + famille["delai"] = int(famille.get("FA_Delai", 0)) + famille["nb_colis"] = int(famille.get("FA_NbColis", 0)) + + # Comptabilité famille["compte_achat"] = famille.get("CG_NumAch", "") famille["compte_vente"] = famille.get("CG_NumVte", "") - famille["est_statistique"] = ( - (famille.get("FA_Stat") == 1) if "FA_Stat" in famille else False - ) - famille["est_centrale"] = ( - (famille.get("FA_Central") == 1) - if "FA_Central" in famille - else False - ) + famille["code_fiscal"] = famille.get("FA_CodeFiscal", "") + famille["escompte"] = bool(famille.get("FA_Escompte", 0)) + + # Organisation + famille["est_centrale"] = bool(famille.get("FA_Central", 0)) famille["nature"] = famille.get("FA_Nature", 0) + famille["pays"] = famille.get("FA_Pays", "") - familles.append(famille) + # Classifications + famille["categorie_1"] = famille.get("CL_No1", 0) + famille["categorie_2"] = famille.get("CL_No2", 0) + famille["categorie_3"] = famille.get("CL_No3", 0) + famille["categorie_4"] = famille.get("CL_No4", 0) - type_msg = "DÉTAIL uniquement" if not inclure_totaux else "TOUS types" - logger.info(f"SQL: {len(familles)} familles chargées ({type_msg})") + # Statistiques + famille["stat_01"] = famille.get("FA_Stat01", "") + famille["stat_02"] = famille.get("FA_Stat02", "") + famille["stat_03"] = famille.get("FA_Stat03", "") + famille["stat_04"] = famille.get("FA_Stat04", "") + famille["stat_05"] = famille.get("FA_Stat05", "") + famille["hors_statistique"] = bool(famille.get("FA_HorsStat", 0)) - return familles + # Paramètres commerciaux + famille["vente_debit"] = bool(famille.get("FA_VteDebit", 0)) + famille["non_imprimable"] = bool(famille.get("FA_NotImp", 0)) + famille["contremarque"] = bool(famille.get("FA_Contremarque", 0)) + famille["fact_poids"] = bool(famille.get("FA_FactPoids", 0)) + famille["fact_forfait"] = bool(famille.get("FA_FactForfait", 0)) + famille["publie"] = bool(famille.get("FA_Publie", 0)) - except Exception as e: - logger.error(f"Erreur SQL familles: {e}", exc_info=True) - raise RuntimeError(f"Erreur lecture familles: {str(e)}") + # Références + famille["racine_reference"] = famille.get("FA_RacineRef", "") + famille["racine_code_barre"] = famille.get("FA_RacineCB", "") + famille["raccourci"] = famille.get("FA_Raccourci", "") - def lire_famille(self, code: str) -> Dict: - try: - with self._com_context(), self._lock_com: - logger.info(f"[FAMILLE] Lecture : {code}") + # Gestion + famille["sous_traitance"] = bool(famille.get("FA_SousTraitance", 0)) + famille["fictif"] = bool(famille.get("FA_Fictif", 0)) + famille["criticite"] = int(famille.get("FA_Criticite", 0)) - code_recherche = code.upper().strip() + logger.info(f"SQL: Famille '{famille['code']}' chargée ({famille['nb_articles']} articles)") - famille_existe_sql = False - famille_code_exact = None - famille_type_sql = None - famille_intitule_sql = None + return famille - try: - with self._get_sql_connection() as conn: - cursor = conn.cursor() + except ValueError as e: + logger.error(f"Erreur famille: {e}") + raise + except Exception as e: + logger.error(f"Erreur SQL famille: {e}", exc_info=True) + raise RuntimeError(f"Erreur lecture famille: {str(e)}") - cursor.execute("SELECT TOP 1 * FROM F_FAMILLE") - colonnes_disponibles = [col[0] for col in cursor.description] - - colonnes_select = ["FA_CodeFamille", "FA_Intitule"] - - if "FA_Type" in colonnes_disponibles: - colonnes_select.append("FA_Type") - - colonnes_str = ", ".join(colonnes_select) - - cursor.execute( - f""" - SELECT {colonnes_str} - FROM F_FAMILLE - WHERE UPPER(FA_CodeFamille) = ? - """, - (code_recherche,), - ) - - row = cursor.fetchone() - - if row: - famille_existe_sql = True - famille_code_exact = self._safe_strip(row.FA_CodeFamille) - famille_intitule_sql = self._safe_strip(row.FA_Intitule) - - if "FA_Type" in colonnes_disponibles and len(row) > 2: - famille_type_sql = row.FA_Type - - logger.info( - f" [SQL] Trouvée : {famille_code_exact} - {famille_intitule_sql} (type={famille_type_sql})" - ) - else: - raise ValueError(f"Famille '{code}' introuvable dans Sage") - - except ValueError: - raise - except Exception as e: - logger.warning(f" [SQL] Erreur : {e}") - - if not famille_code_exact: - famille_code_exact = code_recherche - - logger.info( - f" [COM] Recherche de '{famille_code_exact}' via scanner..." - ) - - factory_famille = self.cial.FactoryFamille - famille_obj = None - index_trouve = None - - try: - index = 1 - max_scan = 2000 # Scanner jusqu'à 2000 familles - - while index <= max_scan: - try: - persist_test = factory_famille.List(index) - if persist_test is None: - break - - fam_test = win32com.client.CastTo( - persist_test, "IBOFamille3" - ) - fam_test.Read() - - code_test = ( - getattr(fam_test, "FA_CodeFamille", "").strip().upper() - ) - - if code_test == famille_code_exact: - famille_obj = fam_test - index_trouve = index - logger.info(f" [OK] Famille trouvée à l'index {index}") - break - - index += 1 - - except Exception as e: - if "Accès refusé" in str(e) or "Access" in str(e): - break - index += 1 - - if not famille_obj: - if famille_existe_sql: - raise ValueError( - f"Famille '{code}' trouvée en SQL mais inaccessible via COM. " - f"Vérifiez les permissions." - ) - else: - raise ValueError(f"Famille '{code}' introuvable") - - except ValueError: - raise - except Exception as e: - logger.error(f" [COM] Erreur scanner : {e}") - raise RuntimeError(f"Erreur chargement famille : {str(e)}") - - logger.info("[FAMILLE] Extraction des informations...") - - famille_obj.Read() - - resultat = { - "code": getattr(famille_obj, "FA_CodeFamille", "").strip(), - "intitule": getattr(famille_obj, "FA_Intitule", "").strip(), - } - - try: - fa_type = getattr(famille_obj, "FA_Type", 0) - resultat["type"] = fa_type - resultat["type_libelle"] = "Total" if fa_type == 1 else "Détail" - resultat["est_total"] = fa_type == 1 - resultat["est_detail"] = fa_type == 0 - - if fa_type == 1: - resultat["avertissement"] = ( - "Cette famille est de type 'Total' (agrégation comptable) " - "et ne peut pas contenir d'articles directement." - ) - logger.warning( - f" [TYPE] Famille Total détectée : {resultat['code']}" - ) - except: - resultat["type"] = 0 - resultat["type_libelle"] = "Détail" - resultat["est_total"] = False - resultat["est_detail"] = True - - try: - resultat["unite_vente"] = getattr( - famille_obj, "FA_UniteVen", "" - ).strip() - except: - resultat["unite_vente"] = "" - - try: - coef = getattr(famille_obj, "FA_Coef", None) - resultat["coef"] = float(coef) if coef is not None else 0.0 - except: - resultat["coef"] = 0.0 - - try: - resultat["nature"] = getattr(famille_obj, "FA_Nature", 0) - except: - resultat["nature"] = 0 - - try: - central = getattr(famille_obj, "FA_Central", None) - resultat["est_centrale"] = ( - (central == 1) if central is not None else False - ) - except: - resultat["est_centrale"] = False - - try: - stat = getattr(famille_obj, "FA_Stat", None) - resultat["est_statistique"] = ( - (stat == 1) if stat is not None else False - ) - except: - resultat["est_statistique"] = False - - try: - resultat["raccourci"] = getattr( - famille_obj, "FA_Raccourci", "" - ).strip() - except: - resultat["raccourci"] = "" - - try: - compte_achat_obj = getattr(famille_obj, "CompteGAchat", None) - if compte_achat_obj: - compte_achat_obj.Read() - resultat["compte_achat"] = getattr( - compte_achat_obj, "CG_Num", "" - ).strip() - else: - resultat["compte_achat"] = "" - except: - resultat["compte_achat"] = "" - - try: - compte_vente_obj = getattr(famille_obj, "CompteGVente", None) - if compte_vente_obj: - compte_vente_obj.Read() - resultat["compte_vente"] = getattr( - compte_vente_obj, "CG_Num", "" - ).strip() - else: - resultat["compte_vente"] = "" - except: - resultat["compte_vente"] = "" - - resultat["index_com"] = index_trouve - - try: - date_creation = getattr(famille_obj, "cbCreation", None) - resultat["date_creation"] = ( - str(date_creation) if date_creation else "" - ) - except: - resultat["date_creation"] = "" - - try: - date_modif = getattr(famille_obj, "cbModification", None) - resultat["date_modification"] = ( - str(date_modif) if date_modif else "" - ) - except: - resultat["date_modification"] = "" - - try: - with self._get_sql_connection() as conn: - cursor = conn.cursor() - - cursor.execute( - """ - SELECT COUNT(*) - FROM F_ARTICLE - WHERE FA_CodeFamille = ? - """, - (resultat["code"],), - ) - - row = cursor.fetchone() - if row: - resultat["nb_articles"] = row[0] - logger.info( - f" [STAT] {resultat['nb_articles']} article(s) dans cette famille" - ) - except Exception as e: - logger.warning(f" [STAT] Impossible de compter les articles : {e}") - resultat["nb_articles"] = None - - logger.info( - f"[FAMILLE] Lecture complète : {resultat['code']} - {resultat['intitule']}" - ) - - return resultat - - except ValueError as e: - logger.error(f"[FAMILLE] Erreur métier : {e}") - raise - - except Exception as e: - logger.error(f"[FAMILLE] Erreur technique : {e}", exc_info=True) - - error_message = str(e) - if self.cial: - try: - err = self.cial.CptaApplication.LastError - if err: - error_message = f"Erreur Sage: {err.Description}" - except: - pass - - raise RuntimeError(f"Erreur technique Sage : {error_message}") def creer_entree_stock(self, entree_data: Dict) -> Dict: try: