2463 lines
89 KiB
Python
2463 lines
89 KiB
Python
from fastapi import FastAPI, HTTPException, Header, Depends, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List, Dict
|
|
from datetime import datetime, date
|
|
from enum import Enum
|
|
import uvicorn
|
|
import logging
|
|
import win32com.client
|
|
from config import settings, validate_settings
|
|
from sage_connector import SageConnector
|
|
|
|
# =====================================================
|
|
# LOGGING
|
|
# =====================================================
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.FileHandler("sage_gateway.log"), logging.StreamHandler()],
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =====================================================
|
|
# ENUMS
|
|
# =====================================================
|
|
class TypeDocument(int, Enum):
|
|
DEVIS = 0
|
|
BON_LIVRAISON = 1
|
|
BON_RETOUR = 2
|
|
COMMANDE = 3
|
|
PREPARATION = 4
|
|
FACTURE = 5
|
|
|
|
|
|
# =====================================================
|
|
# MODÈLES
|
|
# =====================================================
|
|
|
|
class DocumentGetRequest(BaseModel):
|
|
numero: str
|
|
type_doc: int
|
|
|
|
|
|
class FiltreRequest(BaseModel):
|
|
filtre: Optional[str] = ""
|
|
|
|
|
|
class CodeRequest(BaseModel):
|
|
code: str
|
|
|
|
|
|
class ChampLibreRequest(BaseModel):
|
|
doc_id: str
|
|
type_doc: int
|
|
nom_champ: str
|
|
valeur: str
|
|
|
|
|
|
class DevisRequest(BaseModel):
|
|
client_id: str
|
|
date_devis: Optional[date] = None
|
|
lignes: List[
|
|
Dict
|
|
] # Format: {article_code, quantite, prix_unitaire_ht, remise_pourcentage}
|
|
|
|
|
|
class TransformationRequest(BaseModel):
|
|
numero_source: str
|
|
type_source: int
|
|
type_cible: int
|
|
|
|
|
|
class StatutRequest(BaseModel):
|
|
nouveau_statut: int
|
|
|
|
|
|
# =====================================================
|
|
# SÉCURITÉ
|
|
# =====================================================
|
|
def verify_token(x_sage_token: str = Header(...)):
|
|
"""Vérification du token d'authentification"""
|
|
if x_sage_token != settings.sage_gateway_token:
|
|
logger.warning(f"❌ Token invalide reçu: {x_sage_token[:20]}...")
|
|
raise HTTPException(401, "Token invalide")
|
|
return True
|
|
|
|
|
|
# =====================================================
|
|
# APPLICATION
|
|
# =====================================================
|
|
app = FastAPI(
|
|
title="Sage Gateway - Windows Server",
|
|
version="1.0.0",
|
|
description="Passerelle d'accès à Sage 100c pour VPS Linux",
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=settings.cors_origins,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
allow_credentials=True,
|
|
)
|
|
|
|
sage: Optional[SageConnector] = None
|
|
|
|
|
|
# =====================================================
|
|
# LIFECYCLE
|
|
# =====================================================
|
|
@app.on_event("startup")
|
|
def startup():
|
|
global sage
|
|
|
|
logger.info("🚀 Démarrage Sage Gateway Windows...")
|
|
|
|
# Validation config
|
|
try:
|
|
validate_settings()
|
|
logger.info("✅ Configuration validée")
|
|
except ValueError as e:
|
|
logger.error(f"❌ Configuration invalide: {e}")
|
|
raise
|
|
|
|
# Connexion Sage
|
|
sage = SageConnector(
|
|
settings.chemin_base, settings.utilisateur, settings.mot_de_passe
|
|
)
|
|
|
|
if not sage.connecter():
|
|
raise RuntimeError("❌ Impossible de se connecter à Sage 100c")
|
|
|
|
logger.info("✅ Sage Gateway démarré et connecté")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
def shutdown():
|
|
if sage:
|
|
sage.deconnecter()
|
|
logger.info("👋 Sage Gateway arrêté")
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - SYSTÈME
|
|
# =====================================================
|
|
@app.get("/health")
|
|
def health():
|
|
"""Health check"""
|
|
return {
|
|
"status": "ok",
|
|
"sage_connected": sage is not None and sage.cial is not None,
|
|
"cache_info": sage.get_cache_info() if sage else None,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - CLIENTS
|
|
# =====================================================
|
|
@app.post("/sage/clients/list", dependencies=[Depends(verify_token)])
|
|
def clients_list(req: FiltreRequest):
|
|
"""Liste des clients avec filtre optionnel"""
|
|
try:
|
|
clients = sage.lister_tous_clients(req.filtre)
|
|
return {"success": True, "data": clients}
|
|
except Exception as e:
|
|
logger.error(f"Erreur liste clients: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/clients/get", dependencies=[Depends(verify_token)])
|
|
def client_get(req: CodeRequest):
|
|
"""Lecture d'un client par code"""
|
|
try:
|
|
client = sage.lire_client(req.code)
|
|
if not client:
|
|
raise HTTPException(404, f"Client {req.code} non trouvé")
|
|
return {"success": True, "data": client}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture client: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - ARTICLES
|
|
# =====================================================
|
|
@app.post("/sage/articles/list", dependencies=[Depends(verify_token)])
|
|
def articles_list(req: FiltreRequest):
|
|
"""Liste des articles avec filtre optionnel"""
|
|
try:
|
|
articles = sage.lister_tous_articles(req.filtre)
|
|
return {"success": True, "data": articles}
|
|
except Exception as e:
|
|
logger.error(f"Erreur liste articles: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/articles/get", dependencies=[Depends(verify_token)])
|
|
def article_get(req: CodeRequest):
|
|
"""Lecture d'un article par référence"""
|
|
try:
|
|
article = sage.lire_article(req.code)
|
|
if not article:
|
|
raise HTTPException(404, f"Article {req.code} non trouvé")
|
|
return {"success": True, "data": article}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture article: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - DEVIS
|
|
# =====================================================
|
|
@app.post("/sage/devis/create", dependencies=[Depends(verify_token)])
|
|
def creer_devis(req: DevisRequest):
|
|
"""Création d'un devis"""
|
|
try:
|
|
# Transformer en format attendu par sage_connector
|
|
devis_data = {
|
|
"client": {"code": req.client_id, "intitule": ""},
|
|
"date_devis": req.date_devis or date.today(),
|
|
"lignes": req.lignes,
|
|
}
|
|
|
|
resultat = sage.creer_devis_enrichi(devis_data)
|
|
return {"success": True, "data": resultat}
|
|
except Exception as e:
|
|
logger.error(f"Erreur création devis: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/devis/get", dependencies=[Depends(verify_token)])
|
|
def lire_devis(req: CodeRequest):
|
|
"""Lecture d'un devis"""
|
|
try:
|
|
devis = sage.lire_devis(req.code)
|
|
if not devis:
|
|
raise HTTPException(404, f"Devis {req.code} non trouvé")
|
|
return {"success": True, "data": devis}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture devis: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/devis/list", dependencies=[Depends(verify_token)])
|
|
def devis_list(
|
|
limit: int = 100, statut: Optional[int] = None, inclure_lignes: bool = Query(True)
|
|
):
|
|
"""
|
|
📋 Liste tous les devis avec filtres optionnels
|
|
|
|
Args:
|
|
limit: Nombre max de devis à retourner
|
|
statut: Filtre par statut (optionnel)
|
|
inclure_lignes: Si True, charge les lignes de chaque devis (par défaut: True)
|
|
|
|
✅ AMÉLIORATION: Charge maintenant les lignes de chaque devis
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
devis_list = []
|
|
index = 1
|
|
max_iterations = limit * 3
|
|
erreurs_consecutives = 0
|
|
max_erreurs = 50
|
|
|
|
logger.info(
|
|
f"🔍 Recherche devis (limit={limit}, statut={statut}, inclure_lignes={inclure_lignes})"
|
|
)
|
|
|
|
while (
|
|
len(devis_list) < limit
|
|
and index < max_iterations
|
|
and erreurs_consecutives < max_erreurs
|
|
):
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
logger.debug(f"Fin de liste à l'index {index}")
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
# Filtrer uniquement devis (type 0)
|
|
doc_type = getattr(doc, "DO_Type", -1)
|
|
if doc_type != 0:
|
|
index += 1
|
|
continue
|
|
|
|
doc_statut = getattr(doc, "DO_Statut", 0)
|
|
|
|
# Filtre statut
|
|
if statut is not None and doc_statut != statut:
|
|
index += 1
|
|
continue
|
|
|
|
# ✅ Charger client via .Client
|
|
client_code = ""
|
|
client_intitule = ""
|
|
|
|
try:
|
|
client_obj = getattr(doc, "Client", None)
|
|
if client_obj:
|
|
client_obj.Read()
|
|
client_code = getattr(client_obj, "CT_Num", "").strip()
|
|
client_intitule = getattr(
|
|
client_obj, "CT_Intitule", ""
|
|
).strip()
|
|
logger.debug(
|
|
f"✅ Client: {client_code} - {client_intitule}"
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Erreur chargement client: {e}")
|
|
# Fallback sur cache si code disponible
|
|
if client_code:
|
|
client_cache = sage.lire_client(client_code)
|
|
if client_cache:
|
|
client_intitule = client_cache.get("intitule", "")
|
|
|
|
# ✅✅ NOUVEAU: Charger les lignes si demandé
|
|
lignes = []
|
|
if inclure_lignes:
|
|
try:
|
|
factory_lignes = getattr(doc, "FactoryDocumentLigne", None)
|
|
if not factory_lignes:
|
|
factory_lignes = getattr(
|
|
doc, "FactoryDocumentVenteLigne", None
|
|
)
|
|
|
|
if factory_lignes:
|
|
ligne_index = 1
|
|
while ligne_index <= 100: # Max 100 lignes par devis
|
|
try:
|
|
ligne_persist = factory_lignes.List(ligne_index)
|
|
if ligne_persist is None:
|
|
break
|
|
|
|
ligne = win32com.client.CastTo(
|
|
ligne_persist, "IBODocumentLigne3"
|
|
)
|
|
ligne.Read()
|
|
|
|
# Charger référence article
|
|
article_ref = ""
|
|
try:
|
|
article_ref = getattr(
|
|
ligne, "AR_Ref", ""
|
|
).strip()
|
|
if not article_ref:
|
|
article_obj = getattr(
|
|
ligne, "Article", None
|
|
)
|
|
if article_obj:
|
|
article_obj.Read()
|
|
article_ref = getattr(
|
|
article_obj, "AR_Ref", ""
|
|
).strip()
|
|
except:
|
|
pass
|
|
|
|
lignes.append(
|
|
{
|
|
"article": article_ref,
|
|
"designation": getattr(
|
|
ligne, "DL_Design", ""
|
|
),
|
|
"quantite": float(
|
|
getattr(ligne, "DL_Qte", 0.0)
|
|
),
|
|
"prix_unitaire": float(
|
|
getattr(
|
|
ligne, "DL_PrixUnitaire", 0.0
|
|
)
|
|
),
|
|
"montant_ht": float(
|
|
getattr(ligne, "DL_MontantHT", 0.0)
|
|
),
|
|
}
|
|
)
|
|
|
|
ligne_index += 1
|
|
except Exception as e:
|
|
logger.debug(f"Erreur ligne {ligne_index}: {e}")
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Erreur chargement lignes: {e}")
|
|
|
|
devis_list.append(
|
|
{
|
|
"numero": getattr(doc, "DO_Piece", ""),
|
|
"date": str(getattr(doc, "DO_Date", "")),
|
|
"client_code": client_code,
|
|
"client_intitule": client_intitule,
|
|
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
|
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
|
"statut": doc_statut,
|
|
"lignes": lignes, # ✅ Lignes incluses
|
|
}
|
|
)
|
|
|
|
erreurs_consecutives = 0
|
|
index += 1
|
|
|
|
except Exception as e:
|
|
erreurs_consecutives += 1
|
|
logger.debug(f"⚠️ Erreur index {index}: {e}")
|
|
index += 1
|
|
|
|
if erreurs_consecutives >= max_erreurs:
|
|
logger.warning(
|
|
f"⚠️ Arrêt après {max_erreurs} erreurs consécutives"
|
|
)
|
|
break
|
|
|
|
nb_avec_client = sum(1 for d in devis_list if d["client_intitule"])
|
|
nb_avec_lignes = sum(1 for d in devis_list if d.get("lignes"))
|
|
logger.info(
|
|
f"✅ {len(devis_list)} devis retournés "
|
|
f"({nb_avec_client} avec client, {nb_avec_lignes} avec lignes)"
|
|
)
|
|
|
|
return {"success": True, "data": devis_list}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Erreur liste devis: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/devis/statut", dependencies=[Depends(verify_token)])
|
|
def changer_statut_devis_endpoint(numero: str, nouveau_statut: int):
|
|
"""Change le statut d'un devis"""
|
|
try:
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
persist = factory.ReadPiece(0, numero)
|
|
|
|
if not persist:
|
|
raise HTTPException(404, f"Devis {numero} introuvable")
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
statut_actuel = getattr(doc, "DO_Statut", 0)
|
|
doc.DO_Statut = nouveau_statut
|
|
doc.Write()
|
|
|
|
logger.info(f"✅ Statut devis {numero}: {statut_actuel} → {nouveau_statut}")
|
|
|
|
return {
|
|
"success": True,
|
|
"data": {
|
|
"numero": numero,
|
|
"statut_ancien": statut_actuel,
|
|
"statut_nouveau": nouveau_statut,
|
|
},
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur changement statut: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - DOCUMENTS
|
|
# =====================================================
|
|
@app.post("/sage/documents/get", dependencies=[Depends(verify_token)])
|
|
def lire_document(req: DocumentGetRequest):
|
|
"""Lecture d'un document (commande, facture, etc.)"""
|
|
try:
|
|
doc = sage.lire_document(req.numero, req.type_doc)
|
|
if not doc:
|
|
raise HTTPException(404, f"Document {req.numero} non trouvé")
|
|
return {"success": True, "data": doc}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture document: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/documents/transform", dependencies=[Depends(verify_token)])
|
|
def transformer_document(
|
|
numero_source: str = Query(..., description="Numéro du document source"),
|
|
type_source: int = Query(..., description="Type document source"),
|
|
type_cible: int = Query(..., description="Type document cible"),
|
|
):
|
|
"""
|
|
🔧 Transformation de document
|
|
|
|
✅ CORRECTION : Utilise les VRAIS types Sage Dataven
|
|
|
|
Types valides :
|
|
- 0: Devis
|
|
- 10: Bon de commande
|
|
- 20: Préparation
|
|
- 30: Bon de livraison
|
|
- 40: Bon de retour
|
|
- 50: Bon d'avoir
|
|
- 60: Facture
|
|
|
|
Transformations autorisées :
|
|
- Devis (0) → Commande (10)
|
|
- Commande (10) → Bon livraison (30)
|
|
- Commande (10) → Facture (60)
|
|
- Bon livraison (30) → Facture (60)
|
|
"""
|
|
try:
|
|
logger.info(
|
|
f"🔄 Transformation demandée: {numero_source} "
|
|
f"(type {type_source}) → type {type_cible}"
|
|
)
|
|
|
|
# ✅ Matrice des transformations valides pour VOTRE Sage
|
|
transformations_valides = {
|
|
(0, 10), # Devis → Commande
|
|
(10, 30), # Commande → Bon de livraison
|
|
(10, 60), # Commande → Facture
|
|
(30, 60), # Bon de livraison → Facture
|
|
(0, 60), # Devis → Facture (si autorisé)
|
|
}
|
|
|
|
if (type_source, type_cible) not in transformations_valides:
|
|
logger.error(
|
|
f"❌ Transformation non autorisée: {type_source} → {type_cible}"
|
|
)
|
|
raise HTTPException(
|
|
400,
|
|
f"Transformation non autorisée: type {type_source} → type {type_cible}. "
|
|
f"Transformations valides: {transformations_valides}",
|
|
)
|
|
|
|
# Appel au connecteur Sage
|
|
resultat = sage.transformer_document(numero_source, type_source, type_cible)
|
|
|
|
logger.info(
|
|
f"✅ Transformation réussie: {numero_source} → "
|
|
f"{resultat.get('document_cible', '?')} "
|
|
f"({resultat.get('nb_lignes', 0)} lignes)"
|
|
)
|
|
|
|
return {"success": True, "data": resultat}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
logger.error(f"❌ Erreur métier transformation: {e}")
|
|
raise HTTPException(400, str(e))
|
|
except Exception as e:
|
|
logger.error(f"❌ Erreur technique transformation: {e}", exc_info=True)
|
|
raise HTTPException(500, f"Erreur transformation: {str(e)}")
|
|
|
|
|
|
@app.post("/sage/documents/champ-libre", dependencies=[Depends(verify_token)])
|
|
def maj_champ_libre(req: ChampLibreRequest):
|
|
"""Mise à jour d'un champ libre"""
|
|
try:
|
|
success = sage.mettre_a_jour_champ_libre(
|
|
req.doc_id, req.type_doc, req.nom_champ, req.valeur
|
|
)
|
|
return {"success": success}
|
|
except Exception as e:
|
|
logger.error(f"Erreur MAJ champ libre: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/documents/derniere-relance", dependencies=[Depends(verify_token)])
|
|
def maj_derniere_relance(doc_id: str, type_doc: int):
|
|
"""📅 Met à jour le champ 'Dernière relance' d'un document"""
|
|
try:
|
|
success = sage.mettre_a_jour_derniere_relance(doc_id, type_doc)
|
|
return {"success": success}
|
|
except Exception as e:
|
|
logger.error(f"Erreur MAJ dernière relance: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - CONTACTS
|
|
# =====================================================
|
|
@app.post("/sage/contact/read", dependencies=[Depends(verify_token)])
|
|
def contact_read(req: CodeRequest):
|
|
"""Lecture du contact principal d'un client"""
|
|
try:
|
|
contact = sage.lire_contact_principal_client(req.code)
|
|
if not contact:
|
|
raise HTTPException(404, f"Contact non trouvé pour client {req.code}")
|
|
return {"success": True, "data": contact}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture contact: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/commandes/list", dependencies=[Depends(verify_token)])
|
|
def commandes_list(limit: int = 100, statut: Optional[int] = None):
|
|
"""
|
|
📋 Liste toutes les commandes
|
|
✅ CORRECTION: Filtre sur type 10 (BON_COMMANDE)
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
commandes = []
|
|
index = 1
|
|
max_iterations = limit * 10
|
|
erreurs_consecutives = 0
|
|
max_erreurs = 100
|
|
|
|
logger.info(f"🔍 Recherche commandes (limit={limit}, statut={statut})")
|
|
|
|
while (
|
|
len(commandes) < limit
|
|
and index < max_iterations
|
|
and erreurs_consecutives < max_erreurs
|
|
):
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
doc_type = getattr(doc, "DO_Type", -1)
|
|
|
|
# ✅ CRITIQUE : Filtrer sur type 10 (BON_COMMANDE)
|
|
if doc_type != settings.SAGE_TYPE_BON_COMMANDE:
|
|
index += 1
|
|
continue
|
|
|
|
doc_statut = getattr(doc, "DO_Statut", 0)
|
|
|
|
# Filtre statut
|
|
if statut is not None and doc_statut != statut:
|
|
index += 1
|
|
continue
|
|
|
|
# Charger client
|
|
client_code = ""
|
|
client_intitule = ""
|
|
|
|
try:
|
|
client_obj = getattr(doc, "Client", None)
|
|
if client_obj:
|
|
client_obj.Read()
|
|
client_code = getattr(client_obj, "CT_Num", "").strip()
|
|
client_intitule = getattr(
|
|
client_obj, "CT_Intitule", ""
|
|
).strip()
|
|
except Exception as e:
|
|
logger.debug(f"Erreur chargement client: {e}")
|
|
|
|
commande = {
|
|
"numero": getattr(doc, "DO_Piece", ""),
|
|
"date": str(getattr(doc, "DO_Date", "")),
|
|
"client_code": client_code,
|
|
"client_intitule": client_intitule,
|
|
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
|
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
|
"statut": doc_statut,
|
|
}
|
|
|
|
commandes.append(commande)
|
|
erreurs_consecutives = 0
|
|
index += 1
|
|
|
|
except Exception as e:
|
|
erreurs_consecutives += 1
|
|
index += 1
|
|
|
|
if erreurs_consecutives >= max_erreurs:
|
|
break
|
|
|
|
logger.info(f"✅ {len(commandes)} commandes retournées")
|
|
|
|
return {"success": True, "data": commandes}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Erreur liste commandes: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/factures/list", dependencies=[Depends(verify_token)])
|
|
def factures_list(limit: int = 100, statut: Optional[int] = None):
|
|
"""
|
|
📋 Liste toutes les factures
|
|
✅ CORRECTION: Filtre sur type 60 (FACTURE)
|
|
"""
|
|
try:
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
factures = []
|
|
index = 1
|
|
max_iterations = limit * 3
|
|
|
|
while len(factures) < limit and index < max_iterations:
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
# ✅ CRITIQUE: Filtrer factures (type 60)
|
|
if getattr(doc, "DO_Type", -1) != settings.SAGE_TYPE_FACTURE:
|
|
index += 1
|
|
continue
|
|
|
|
doc_statut = getattr(doc, "DO_Statut", 0)
|
|
|
|
if statut is None or doc_statut == statut:
|
|
# Charger client
|
|
client_code = ""
|
|
client_intitule = ""
|
|
|
|
try:
|
|
client_obj = getattr(doc, "Client", None)
|
|
if client_obj:
|
|
client_obj.Read()
|
|
client_code = getattr(client_obj, "CT_Num", "").strip()
|
|
client_intitule = getattr(
|
|
client_obj, "CT_Intitule", ""
|
|
).strip()
|
|
except:
|
|
pass
|
|
|
|
factures.append(
|
|
{
|
|
"numero": getattr(doc, "DO_Piece", ""),
|
|
"date": str(getattr(doc, "DO_Date", "")),
|
|
"client_code": client_code,
|
|
"client_intitule": client_intitule,
|
|
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
|
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
|
"statut": doc_statut,
|
|
}
|
|
)
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
continue
|
|
|
|
logger.info(f"✅ {len(factures)} factures retournées")
|
|
return {"success": True, "data": factures}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur liste factures: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/client/remise-max", dependencies=[Depends(verify_token)])
|
|
def lire_remise_max_client(code: str):
|
|
"""Récupère la remise max autorisée pour un client"""
|
|
try:
|
|
client_obj = sage._lire_client_obj(code)
|
|
|
|
if not client_obj:
|
|
raise HTTPException(404, f"Client {code} introuvable")
|
|
|
|
remise_max = 10.0 # Défaut
|
|
|
|
try:
|
|
remise_max = float(getattr(client_obj, "CT_RemiseMax", 10.0))
|
|
except:
|
|
pass
|
|
|
|
logger.info(f"✅ Remise max client {code}: {remise_max}%")
|
|
|
|
return {
|
|
"success": True,
|
|
"data": {"client_code": code, "remise_max": remise_max},
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lecture remise: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# ENDPOINTS - ADMIN
|
|
# =====================================================
|
|
@app.post("/sage/cache/refresh", dependencies=[Depends(verify_token)])
|
|
def refresh_cache():
|
|
"""Force le rafraîchissement du cache"""
|
|
try:
|
|
sage.forcer_actualisation_cache()
|
|
return {
|
|
"success": True,
|
|
"message": "Cache actualisé",
|
|
"info": sage.get_cache_info(),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Erreur refresh cache: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/cache/info", dependencies=[Depends(verify_token)])
|
|
def cache_info_get():
|
|
"""Informations sur le cache (endpoint GET)"""
|
|
try:
|
|
return {"success": True, "data": sage.get_cache_info()}
|
|
except Exception as e:
|
|
logger.error(f"Erreur info cache: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# Script à ajouter temporairement dans main.py pour diagnostiquer
|
|
|
|
|
|
@app.get("/sage/devis/{numero}/diagnostic", dependencies=[Depends(verify_token)])
|
|
def diagnostiquer_devis(numero: str):
|
|
"""
|
|
ENDPOINT DE DIAGNOSTIC: Affiche TOUTES les infos d'un devis
|
|
|
|
Permet de comprendre pourquoi un devis ne peut pas être transformé
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Essayer ReadPiece
|
|
persist = factory.ReadPiece(0, numero)
|
|
|
|
# Si échec, chercher dans List()
|
|
if not persist:
|
|
logger.info(f"[DIAG] ReadPiece echoue, recherche dans List()...")
|
|
index = 1
|
|
while index < 10000:
|
|
try:
|
|
persist_test = factory.List(index)
|
|
if persist_test is None:
|
|
break
|
|
|
|
doc_test = win32com.client.CastTo(
|
|
persist_test, "IBODocumentVente3"
|
|
)
|
|
doc_test.Read()
|
|
|
|
if (
|
|
getattr(doc_test, "DO_Type", -1) == 0
|
|
and getattr(doc_test, "DO_Piece", "") == numero
|
|
):
|
|
persist = persist_test
|
|
logger.info(f"[DIAG] Trouve a l'index {index}")
|
|
break
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
|
|
if not persist:
|
|
raise HTTPException(404, f"Devis {numero} introuvable")
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
# EXTRACTION COMPLÈTE
|
|
diagnostic = {
|
|
"numero": getattr(doc, "DO_Piece", ""),
|
|
"type": getattr(doc, "DO_Type", -1),
|
|
"statut": getattr(doc, "DO_Statut", -1),
|
|
"statut_libelle": {
|
|
0: "Brouillon",
|
|
1: "Soumis",
|
|
2: "Accepte",
|
|
3: "Realise partiellement",
|
|
4: "Realise totalement",
|
|
5: "Transforme",
|
|
6: "Annule",
|
|
}.get(getattr(doc, "DO_Statut", -1), "Inconnu"),
|
|
"date": str(getattr(doc, "DO_Date", "")),
|
|
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
|
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
|
"est_transformable": False,
|
|
"raison_blocage": None,
|
|
}
|
|
|
|
# Client
|
|
try:
|
|
client_obj = getattr(doc, "Client", None)
|
|
if client_obj:
|
|
client_obj.Read()
|
|
diagnostic["client_code"] = getattr(
|
|
client_obj, "CT_Num", ""
|
|
).strip()
|
|
diagnostic["client_intitule"] = getattr(
|
|
client_obj, "CT_Intitule", ""
|
|
).strip()
|
|
except Exception as e:
|
|
diagnostic["erreur_client"] = str(e)
|
|
|
|
# Lignes
|
|
try:
|
|
factory_lignes = doc.FactoryDocumentLigne
|
|
except:
|
|
factory_lignes = doc.FactoryDocumentVenteLigne
|
|
|
|
lignes = []
|
|
index = 1
|
|
while index <= 100:
|
|
try:
|
|
ligne_p = factory_lignes.List(index)
|
|
if ligne_p is None:
|
|
break
|
|
|
|
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
|
|
ligne.Read()
|
|
|
|
article_ref = ""
|
|
try:
|
|
article_ref = getattr(ligne, "AR_Ref", "").strip()
|
|
if not article_ref:
|
|
article_obj = getattr(ligne, "Article", None)
|
|
if article_obj:
|
|
article_obj.Read()
|
|
article_ref = getattr(article_obj, "AR_Ref", "").strip()
|
|
except:
|
|
pass
|
|
|
|
lignes.append(
|
|
{
|
|
"index": index,
|
|
"article": article_ref,
|
|
"designation": getattr(ligne, "DL_Design", ""),
|
|
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
|
|
"prix_unitaire": float(
|
|
getattr(ligne, "DL_PrixUnitaire", 0.0)
|
|
),
|
|
"montant_ht": float(getattr(ligne, "DL_MontantHT", 0.0)),
|
|
}
|
|
)
|
|
|
|
index += 1
|
|
except:
|
|
break
|
|
|
|
diagnostic["nb_lignes"] = len(lignes)
|
|
diagnostic["lignes"] = lignes
|
|
|
|
# ANALYSE TRANSFORMABILITÉ
|
|
statut = diagnostic["statut"]
|
|
|
|
if statut == 5:
|
|
diagnostic["raison_blocage"] = "Document deja transforme (statut=5)"
|
|
elif statut == 6:
|
|
diagnostic["raison_blocage"] = "Document annule (statut=6)"
|
|
elif statut in [3, 4]:
|
|
diagnostic["raison_blocage"] = (
|
|
f"Document deja realise partiellement ou totalement (statut={statut}). "
|
|
f"Une commande/BL/facture existe probablement deja."
|
|
)
|
|
diagnostic["suggestion"] = (
|
|
"Cherchez les documents lies a ce devis dans Sage. "
|
|
"Il a peut-etre deja ete transforme manuellement."
|
|
)
|
|
elif statut == 0:
|
|
diagnostic["est_transformable"] = True
|
|
diagnostic["action_requise"] = (
|
|
"Statut 'Brouillon'. Le systeme le passera automatiquement a 'Accepte' "
|
|
"avant transformation."
|
|
)
|
|
elif statut == 2:
|
|
diagnostic["est_transformable"] = True
|
|
diagnostic["action_requise"] = (
|
|
"Statut 'Accepte'. Transformation possible."
|
|
)
|
|
else:
|
|
diagnostic["raison_blocage"] = f"Statut inconnu ou non gere: {statut}"
|
|
|
|
# Champs libres (pour Universign, etc.)
|
|
champs_libres = {}
|
|
try:
|
|
for champ in ["UniversignID", "DerniereRelance", "DO_Ref"]:
|
|
try:
|
|
valeur = getattr(doc, f"DO_{champ}", None)
|
|
if valeur:
|
|
champs_libres[champ] = str(valeur)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
if champs_libres:
|
|
diagnostic["champs_libres"] = champs_libres
|
|
|
|
logger.info(
|
|
f"[DIAG] Devis {numero}: statut={statut}, transformable={diagnostic['est_transformable']}"
|
|
)
|
|
|
|
return {"success": True, "diagnostic": diagnostic}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur diagnostic: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/diagnostic/configuration", dependencies=[Depends(verify_token)])
|
|
def diagnostic_configuration():
|
|
"""
|
|
DIAGNOSTIC COMPLET de la configuration Sage
|
|
|
|
Teste:
|
|
- Quelles méthodes COM sont disponibles
|
|
- Quels types de documents sont autorisés
|
|
- Quelles permissions l'utilisateur a
|
|
- Version de Sage
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
diagnostic = {
|
|
"connexion": "OK",
|
|
"chemin_base": sage.chemin_base,
|
|
"utilisateur": sage.utilisateur,
|
|
}
|
|
|
|
# Version Sage
|
|
try:
|
|
version = getattr(sage.cial, "Version", "Inconnue")
|
|
diagnostic["version_sage"] = str(version)
|
|
except:
|
|
diagnostic["version_sage"] = "Non disponible"
|
|
|
|
# Test des méthodes disponibles sur IBSCIALApplication3
|
|
methodes_disponibles = []
|
|
methodes_a_tester = [
|
|
"CreateProcess_Document",
|
|
"FactoryDocumentVente",
|
|
"FactoryArticle",
|
|
"CptaApplication",
|
|
"BeginTrans",
|
|
"CommitTrans",
|
|
"RollbackTrans",
|
|
]
|
|
|
|
for methode in methodes_a_tester:
|
|
try:
|
|
if hasattr(sage.cial, methode):
|
|
methodes_disponibles.append(methode)
|
|
except:
|
|
pass
|
|
|
|
diagnostic["methodes_cial_disponibles"] = methodes_disponibles
|
|
|
|
# Test des types de documents autorisés
|
|
types_autorises = []
|
|
types_bloques = []
|
|
|
|
for type_doc in range(6): # 0-5
|
|
try:
|
|
# Essayer de créer un process (sans le valider)
|
|
process = sage.cial.CreateProcess_Document(type_doc)
|
|
if process:
|
|
types_autorises.append(
|
|
{
|
|
"type": type_doc,
|
|
"libelle": {
|
|
0: "Devis",
|
|
1: "Bon de livraison",
|
|
2: "Bon de retour",
|
|
3: "Commande",
|
|
4: "Preparation",
|
|
5: "Facture",
|
|
}[type_doc],
|
|
}
|
|
)
|
|
# Ne pas valider, juste tester
|
|
del process
|
|
except Exception as e:
|
|
types_bloques.append(
|
|
{
|
|
"type": type_doc,
|
|
"libelle": {
|
|
0: "Devis",
|
|
1: "Bon de livraison",
|
|
2: "Bon de retour",
|
|
3: "Commande",
|
|
4: "Preparation",
|
|
5: "Facture",
|
|
}[type_doc],
|
|
"erreur": str(e)[:200],
|
|
}
|
|
)
|
|
|
|
diagnostic["types_documents_autorises"] = types_autorises
|
|
diagnostic["types_documents_bloques"] = types_bloques
|
|
|
|
# Test TransformInto() sur un devis test
|
|
try:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Chercher n'importe quel devis
|
|
index = 1
|
|
devis_test = None
|
|
|
|
while index < 100:
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
if getattr(doc, "DO_Type", -1) == 0: # Devis
|
|
devis_test = doc
|
|
break
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
|
|
if devis_test:
|
|
# Tester si TransformInto existe
|
|
if hasattr(devis_test, "TransformInto"):
|
|
diagnostic["transforminto_disponible"] = True
|
|
diagnostic["transforminto_test"] = "Methode existe (non testee)"
|
|
else:
|
|
diagnostic["transforminto_disponible"] = False
|
|
diagnostic["transforminto_test"] = (
|
|
"Methode TransformInto() inexistante"
|
|
)
|
|
else:
|
|
diagnostic["transforminto_disponible"] = (
|
|
"Impossible de tester (aucun devis trouve)"
|
|
)
|
|
|
|
except Exception as e:
|
|
diagnostic["transforminto_disponible"] = False
|
|
diagnostic["transforminto_erreur"] = str(e)
|
|
|
|
# Modules Sage actifs
|
|
try:
|
|
# Tester l'accès aux différentes factories
|
|
modules = {}
|
|
|
|
try:
|
|
sage.cial.FactoryDocumentVente
|
|
modules["Ventes"] = "OK"
|
|
except:
|
|
modules["Ventes"] = "INACCESSIBLE"
|
|
|
|
try:
|
|
sage.cial.CptaApplication.FactoryClient
|
|
modules["Clients"] = "OK"
|
|
except:
|
|
modules["Clients"] = "INACCESSIBLE"
|
|
|
|
try:
|
|
sage.cial.FactoryArticle
|
|
modules["Articles"] = "OK"
|
|
except:
|
|
modules["Articles"] = "INACCESSIBLE"
|
|
|
|
diagnostic["modules_actifs"] = modules
|
|
except Exception as e:
|
|
diagnostic["modules_actifs_erreur"] = str(e)
|
|
|
|
# Compter documents existants
|
|
try:
|
|
counts = {}
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
for type_doc in range(6):
|
|
count = 0
|
|
index = 1
|
|
|
|
while index < 1000:
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
if getattr(doc, "DO_Type", -1) == type_doc:
|
|
count += 1
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
break
|
|
|
|
counts[
|
|
{
|
|
0: "Devis",
|
|
1: "Bons_livraison",
|
|
2: "Bons_retour",
|
|
3: "Commandes",
|
|
4: "Preparations",
|
|
5: "Factures",
|
|
}[type_doc]
|
|
] = count
|
|
|
|
diagnostic["documents_existants"] = counts
|
|
except Exception as e:
|
|
diagnostic["documents_existants_erreur"] = str(e)
|
|
|
|
logger.info("[DIAG] Configuration Sage analysee")
|
|
|
|
return {"success": True, "diagnostic": diagnostic}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur diagnostic config: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/diagnostic/types-reels", dependencies=[Depends(verify_token)])
|
|
def decouvrir_types_reels():
|
|
"""
|
|
DIAGNOSTIC CRITIQUE: Découvre les VRAIS types de documents Sage
|
|
|
|
Au lieu de deviner les types (0-5), on va:
|
|
1. Créer manuellement un document de chaque type dans Sage
|
|
2. Les lister ici pour voir leurs vrais numéros de type
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Parcourir TOUS les documents
|
|
documents_par_type = {}
|
|
index = 1
|
|
max_docs = 500 # Limiter pour ne pas bloquer
|
|
|
|
logger.info("[DIAG] Scan de tous les documents...")
|
|
|
|
while index < max_docs:
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
# Récupérer le type ET le sous-type
|
|
type_doc = getattr(doc, "DO_Type", -1)
|
|
piece = getattr(doc, "DO_Piece", "")
|
|
statut = getattr(doc, "DO_Statut", -1)
|
|
|
|
# Essayer de récupérer le domaine (vente/achat)
|
|
domaine = "Inconnu"
|
|
try:
|
|
domaine_val = getattr(doc, "DO_Domaine", -1)
|
|
domaine = {0: "Vente", 1: "Achat"}.get(
|
|
domaine_val, f"Code {domaine_val}"
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Récupérer la catégorie
|
|
categorie = "Inconnue"
|
|
try:
|
|
cat_val = getattr(doc, "DO_Categorie", -1)
|
|
categorie = str(cat_val)
|
|
except:
|
|
pass
|
|
|
|
# Grouper par type
|
|
if type_doc not in documents_par_type:
|
|
documents_par_type[type_doc] = {
|
|
"count": 0,
|
|
"exemples": [],
|
|
"domaine": domaine,
|
|
"categorie": categorie,
|
|
}
|
|
|
|
documents_par_type[type_doc]["count"] += 1
|
|
|
|
# Garder quelques exemples
|
|
if len(documents_par_type[type_doc]["exemples"]) < 3:
|
|
documents_par_type[type_doc]["exemples"].append(
|
|
{
|
|
"numero": piece,
|
|
"statut": statut,
|
|
"domaine": domaine,
|
|
"categorie": categorie,
|
|
}
|
|
)
|
|
|
|
index += 1
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Erreur index {index}: {e}")
|
|
index += 1
|
|
|
|
# Formater le résultat
|
|
types_trouves = []
|
|
|
|
for type_num, infos in sorted(documents_par_type.items()):
|
|
types_trouves.append(
|
|
{
|
|
"type_code": type_num,
|
|
"nombre_documents": infos["count"],
|
|
"domaine": infos["domaine"],
|
|
"categorie": infos["categorie"],
|
|
"exemples": infos["exemples"],
|
|
"suggestion_libelle": _deviner_libelle_type(
|
|
type_num, infos["exemples"]
|
|
),
|
|
}
|
|
)
|
|
|
|
logger.info(
|
|
f"[DIAG] {len(types_trouves)} types de documents distincts trouves"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"types_documents_reels": types_trouves,
|
|
"instructions": (
|
|
"Pour identifier les types corrects:\n"
|
|
"1. Creez manuellement dans Sage: 1 Bon de commande, 1 BL, 1 Facture\n"
|
|
"2. Appelez de nouveau cet endpoint\n"
|
|
"3. Les nouveaux types apparaitront avec leurs numeros corrects"
|
|
),
|
|
"total_documents_scannes": index - 1,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur decouverte types: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
def _deviner_libelle_type(type_num, exemples):
|
|
"""Devine le libellé d'un type basé sur les numéros de pièce"""
|
|
if not exemples:
|
|
return "Type inconnu"
|
|
|
|
# Analyser les préfixes des numéros
|
|
prefixes = [ex["numero"][:2] for ex in exemples if ex["numero"]]
|
|
prefix_commun = max(set(prefixes), key=prefixes.count) if prefixes else ""
|
|
|
|
# Deviner selon le type_num et les préfixes
|
|
suggestions = {
|
|
0: "Devis (DE)",
|
|
1: "Bon de livraison (BL)",
|
|
2: "Bon de retour (BR)",
|
|
3: "Bon de commande (BC)",
|
|
4: "Preparation de livraison (PL)",
|
|
5: "Facture (FA)",
|
|
6: "Facture d'avoir (AV)",
|
|
7: "Bon d'avoir financier (BA)",
|
|
}
|
|
|
|
libelle_base = suggestions.get(type_num, f"Type {type_num}")
|
|
|
|
if prefix_commun:
|
|
libelle_base += f" - Detecte: prefix '{prefix_commun}'"
|
|
|
|
return libelle_base
|
|
|
|
|
|
@app.post("/sage/test-creation-par-type", dependencies=[Depends(verify_token)])
|
|
def tester_creation_par_type(type_doc: int = Query(..., ge=0, le=20)):
|
|
"""
|
|
TEST: Essaie de créer un document d'un type spécifique
|
|
|
|
Permet de tester tous les types possibles (0-20) pour trouver
|
|
lesquels fonctionnent sur votre installation
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
logger.info(f"[TEST] Tentative creation type {type_doc}...")
|
|
|
|
try:
|
|
# Essayer de créer un process
|
|
process = sage.cial.CreateProcess_Document(type_doc)
|
|
|
|
if not process:
|
|
return {
|
|
"success": False,
|
|
"type": type_doc,
|
|
"resultat": "Process NULL retourne",
|
|
}
|
|
|
|
# Si on arrive ici, le type est valide !
|
|
doc = process.Document
|
|
|
|
try:
|
|
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
|
|
except:
|
|
pass
|
|
|
|
# Récupérer les infos du document créé
|
|
type_reel = getattr(doc, "DO_Type", -1)
|
|
domaine = getattr(doc, "DO_Domaine", -1)
|
|
|
|
# NE PAS VALIDER le document (pas de Write/Process)
|
|
# On veut juste savoir si la création est possible
|
|
|
|
del process
|
|
del doc
|
|
|
|
return {
|
|
"success": True,
|
|
"type_demande": type_doc,
|
|
"type_reel_doc": type_reel,
|
|
"domaine": {0: "Vente", 1: "Achat"}.get(domaine, domaine),
|
|
"resultat": "CREATION POSSIBLE",
|
|
"note": "Document non valide (test uniquement)",
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"type": type_doc,
|
|
"erreur": str(e),
|
|
"resultat": "CREATION IMPOSSIBLE",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TEST] Erreur test type {type_doc}: {e}")
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/diagnostic/facture-requirements", dependencies=[Depends(verify_token)])
|
|
def diagnostiquer_exigences_facture():
|
|
"""
|
|
DIAGNOSTIC: Découvre les champs obligatoires pour créer une facture
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
# Créer un process facture de test
|
|
process = sage.cial.CreateProcess_Document(60)
|
|
doc_test = process.Document
|
|
|
|
try:
|
|
doc_test = win32com.client.CastTo(doc_test, "IBODocumentVente3")
|
|
except:
|
|
pass
|
|
|
|
# Tester tous les champs potentiellement obligatoires
|
|
champs_a_tester = [
|
|
"DO_ModeRegl",
|
|
"DO_CondRegl",
|
|
"DO_CodeJournal",
|
|
"DO_Souche",
|
|
"DO_TypeCalcul",
|
|
"DO_CodeTaxe1",
|
|
"CT_Num",
|
|
"DO_Date",
|
|
"DO_Statut",
|
|
]
|
|
|
|
resultats = {}
|
|
|
|
for champ in champs_a_tester:
|
|
try:
|
|
valeur = getattr(doc_test, champ, None)
|
|
resultats[champ] = {
|
|
"valeur_defaut": str(valeur) if valeur is not None else "None",
|
|
"accessible": True,
|
|
}
|
|
except Exception as e:
|
|
resultats[champ] = {
|
|
"valeur_defaut": "N/A",
|
|
"accessible": False,
|
|
"erreur": str(e)[:100],
|
|
}
|
|
|
|
# Ne pas valider le document de test
|
|
del process
|
|
del doc_test
|
|
|
|
return {
|
|
"success": True,
|
|
"champs_facture": resultats,
|
|
"conseil": "Les champs avec valeur_defaut=None ou 0 sont souvent obligatoires",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.post("/sage/test/creer-facture-vide", dependencies=[Depends(verify_token)])
|
|
def tester_creation_facture_vide():
|
|
"""
|
|
🧪 TEST: Crée une facture vide pour identifier les champs obligatoires
|
|
|
|
Ce test permet de découvrir EXACTEMENT quels champs Sage exige
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
logger.info("[TEST] Creation facture test...")
|
|
|
|
# 1. Créer le process
|
|
process = sage.cial.CreateProcess_Document(60)
|
|
doc = process.Document
|
|
|
|
try:
|
|
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
|
|
except:
|
|
pass
|
|
|
|
# 2. Définir UNIQUEMENT les champs absolument critiques
|
|
import pywintypes
|
|
|
|
# Date (obligatoire)
|
|
doc.DO_Date = pywintypes.Time(datetime.now())
|
|
|
|
# Client (obligatoire) - Utiliser le premier client disponible
|
|
factory_client = sage.cial.CptaApplication.FactoryClient
|
|
persist_client = factory_client.List(1)
|
|
|
|
if persist_client:
|
|
client = sage._cast_client(persist_client)
|
|
if client:
|
|
doc.SetDefaultClient(client)
|
|
client_code = getattr(client, "CT_Num", "?")
|
|
logger.info(f"[TEST] Client test: {client_code}")
|
|
|
|
# 3. Écrire sans Process() pour voir les valeurs par défaut
|
|
doc.Write()
|
|
doc.Read()
|
|
|
|
# 4. Analyser tous les champs
|
|
champs_analyse = {}
|
|
|
|
for attr in dir(doc):
|
|
if attr.startswith("DO_") or attr.startswith("CT_"):
|
|
try:
|
|
valeur = getattr(doc, attr, None)
|
|
if valeur is not None:
|
|
champs_analyse[attr] = {
|
|
"valeur": str(valeur),
|
|
"type": type(valeur).__name__,
|
|
}
|
|
except:
|
|
pass
|
|
|
|
logger.info(f"[TEST] {len(champs_analyse)} champs analyses")
|
|
|
|
# 5. Tester Process() pour voir l'erreur exacte
|
|
erreur_process = None
|
|
try:
|
|
process.Process()
|
|
logger.info("[TEST] Process() reussi (inattendu!)")
|
|
except Exception as e:
|
|
erreur_process = str(e)
|
|
logger.info(f"[TEST] Process() echoue comme prevu: {e}")
|
|
|
|
# Ne pas commit - c'est juste un test
|
|
try:
|
|
sage.cial.CptaApplication.RollbackTrans()
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"champs_definis": champs_analyse,
|
|
"erreur_process": erreur_process,
|
|
"conseil": "Les champs manquants dans l'erreur sont probablement obligatoires",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TEST] Erreur: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/config/parametres-facture", dependencies=[Depends(verify_token)])
|
|
def verifier_parametres_facture():
|
|
"""
|
|
🔍 Vérifie les paramètres Sage pour la création de factures
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
parametres = {}
|
|
|
|
# Paramètres société
|
|
try:
|
|
param_societe = sage.cial.CptaApplication.ParametreSociete
|
|
|
|
parametres["societe"] = {
|
|
"journal_vente_defaut": getattr(
|
|
param_societe, "P_CodeJournalVte", "N/A"
|
|
),
|
|
"mode_reglement_defaut": getattr(
|
|
param_societe, "P_ModeRegl", "N/A"
|
|
),
|
|
"souche_facture": getattr(param_societe, "P_SoucheFacture", "N/A"),
|
|
}
|
|
except Exception as e:
|
|
parametres["erreur_societe"] = str(e)
|
|
|
|
# Tester un client existant
|
|
try:
|
|
factory_client = sage.cial.CptaApplication.FactoryClient
|
|
persist = factory_client.List(1)
|
|
|
|
if persist:
|
|
client = sage._cast_client(persist)
|
|
if client:
|
|
parametres["exemple_client"] = {
|
|
"code": getattr(client, "CT_Num", "?"),
|
|
"mode_reglement": getattr(client, "CT_ModeRegl", "N/A"),
|
|
"conditions_reglement": getattr(
|
|
client, "CT_CondRegl", "N/A"
|
|
),
|
|
}
|
|
except Exception as e:
|
|
parametres["erreur_client"] = str(e)
|
|
|
|
# Journaux disponibles
|
|
try:
|
|
factory_journal = sage.cial.CptaApplication.FactoryJournal
|
|
journaux = []
|
|
|
|
index = 1
|
|
while index <= 20: # Max 20 journaux
|
|
try:
|
|
persist_journal = factory_journal.List(index)
|
|
if persist_journal is None:
|
|
break
|
|
|
|
# Cast en journal
|
|
journal = win32com.client.CastTo(persist_journal, "IBOJournal3")
|
|
journal.Read()
|
|
|
|
journaux.append(
|
|
{
|
|
"code": getattr(journal, "JO_Num", "?"),
|
|
"intitule": getattr(journal, "JO_Intitule", "?"),
|
|
"type": getattr(journal, "JO_Type", "?"),
|
|
}
|
|
)
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
break
|
|
|
|
parametres["journaux_disponibles"] = journaux
|
|
|
|
except Exception as e:
|
|
parametres["erreur_journaux"] = str(e)
|
|
|
|
return {
|
|
"success": True,
|
|
"parametres": parametres,
|
|
"conseil": "Utilisez ces valeurs pour remplir les champs obligatoires des factures",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur verification config: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/sage/diagnostic/statuts-globaux", dependencies=[Depends(verify_token)])
|
|
def diagnostiquer_statuts_globaux():
|
|
"""
|
|
📊 MATRICE COMPLÈTE DES STATUTS SAGE
|
|
|
|
Retourne pour CHAQUE type de document :
|
|
- Tous les statuts possibles avec leurs descriptions
|
|
- Les statuts requis pour transformation
|
|
- Les changements de statuts après transformation
|
|
- Les restrictions de changement de statut
|
|
|
|
Cette route analyse la base Sage pour découvrir les règles réelles
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Définition des types de documents
|
|
types_documents = {
|
|
0: "Devis",
|
|
10: "Bon de commande",
|
|
20: "Préparation",
|
|
30: "Bon de livraison",
|
|
40: "Bon de retour",
|
|
50: "Bon d'avoir",
|
|
60: "Facture",
|
|
}
|
|
|
|
# Descriptions standard des statuts Sage
|
|
descriptions_statuts = {
|
|
0: "Brouillon",
|
|
1: "Soumis/En attente",
|
|
2: "Accepté/Validé",
|
|
3: "Réalisé partiellement",
|
|
4: "Réalisé totalement",
|
|
5: "Transformé",
|
|
6: "Annulé",
|
|
}
|
|
|
|
matrice_complete = {}
|
|
|
|
logger.info(
|
|
"[DIAG] 🔍 Analyse des statuts pour tous les types de documents..."
|
|
)
|
|
|
|
# Pour chaque type de document
|
|
for type_doc, libelle_type in types_documents.items():
|
|
logger.info(f"[DIAG] Analyse type {type_doc} ({libelle_type})...")
|
|
|
|
analyse_type = {
|
|
"type": type_doc,
|
|
"libelle": libelle_type,
|
|
"statuts_observes": {},
|
|
"exemples_par_statut": {},
|
|
"nb_documents_total": 0,
|
|
}
|
|
|
|
# Scanner tous les documents de ce type
|
|
index = 1
|
|
max_scan = 1000
|
|
|
|
while index < max_scan:
|
|
try:
|
|
persist = factory.List(index)
|
|
if persist is None:
|
|
break
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
doc_type = getattr(doc, "DO_Type", -1)
|
|
|
|
# Filtrer sur le type qu'on analyse
|
|
if doc_type != type_doc:
|
|
index += 1
|
|
continue
|
|
|
|
analyse_type["nb_documents_total"] += 1
|
|
|
|
# Récupérer le statut
|
|
statut = getattr(doc, "DO_Statut", -1)
|
|
|
|
# Compter les statuts observés
|
|
if statut not in analyse_type["statuts_observes"]:
|
|
analyse_type["statuts_observes"][statut] = {
|
|
"count": 0,
|
|
"description": descriptions_statuts.get(
|
|
statut, f"Statut {statut}"
|
|
),
|
|
"exemples": [],
|
|
}
|
|
|
|
analyse_type["statuts_observes"][statut]["count"] += 1
|
|
|
|
# Garder quelques exemples
|
|
if (
|
|
len(analyse_type["statuts_observes"][statut]["exemples"])
|
|
< 3
|
|
):
|
|
numero = getattr(doc, "DO_Piece", "")
|
|
date = str(getattr(doc, "DO_Date", ""))
|
|
|
|
analyse_type["statuts_observes"][statut]["exemples"].append(
|
|
{
|
|
"numero": numero,
|
|
"date": date,
|
|
"total_ttc": float(
|
|
getattr(doc, "DO_TotalTTC", 0.0)
|
|
),
|
|
}
|
|
)
|
|
|
|
index += 1
|
|
|
|
except Exception as e:
|
|
index += 1
|
|
continue
|
|
|
|
# Trier les statuts par nombre d'occurrences
|
|
analyse_type["statuts_par_frequence"] = sorted(
|
|
[
|
|
{
|
|
"statut": s,
|
|
"description": info["description"],
|
|
"count": info["count"],
|
|
"pourcentage": (
|
|
round(
|
|
info["count"]
|
|
/ analyse_type["nb_documents_total"]
|
|
* 100,
|
|
1,
|
|
)
|
|
if analyse_type["nb_documents_total"] > 0
|
|
else 0
|
|
),
|
|
}
|
|
for s, info in analyse_type["statuts_observes"].items()
|
|
],
|
|
key=lambda x: x["count"],
|
|
reverse=True,
|
|
)
|
|
|
|
matrice_complete[type_doc] = analyse_type
|
|
|
|
logger.info(
|
|
f"[DIAG] ✅ Type {type_doc}: {analyse_type['nb_documents_total']} docs, "
|
|
f"{len(analyse_type['statuts_observes'])} statuts différents"
|
|
)
|
|
|
|
# RÈGLES DE TRANSFORMATION
|
|
regles_transformation = {
|
|
"transformations_valides": [
|
|
{
|
|
"source_type": 0,
|
|
"source_libelle": "Devis",
|
|
"cible_type": 10,
|
|
"cible_libelle": "Bon de commande",
|
|
"statut_source_requis": [2],
|
|
"statut_source_requis_description": ["Accepté/Validé"],
|
|
"statut_source_apres": 5,
|
|
"statut_source_apres_description": "Transformé",
|
|
"statut_cible_initial": 2,
|
|
"statut_cible_initial_description": "Accepté/Validé",
|
|
},
|
|
{
|
|
"source_type": 10,
|
|
"source_libelle": "Bon de commande",
|
|
"cible_type": 30,
|
|
"cible_libelle": "Bon de livraison",
|
|
"statut_source_requis": [2],
|
|
"statut_source_requis_description": ["Accepté/Validé"],
|
|
"statut_source_apres": 5,
|
|
"statut_source_apres_description": "Transformé",
|
|
"statut_cible_initial": 2,
|
|
"statut_cible_initial_description": "Accepté/Validé",
|
|
},
|
|
{
|
|
"source_type": 10,
|
|
"source_libelle": "Bon de commande",
|
|
"cible_type": 60,
|
|
"cible_libelle": "Facture",
|
|
"statut_source_requis": [2],
|
|
"statut_source_requis_description": ["Accepté/Validé"],
|
|
"statut_source_apres": 5,
|
|
"statut_source_apres_description": "Transformé",
|
|
"statut_cible_initial": 2,
|
|
"statut_cible_initial_description": "Accepté/Validé",
|
|
},
|
|
{
|
|
"source_type": 30,
|
|
"source_libelle": "Bon de livraison",
|
|
"cible_type": 60,
|
|
"cible_libelle": "Facture",
|
|
"statut_source_requis": [2],
|
|
"statut_source_requis_description": ["Accepté/Validé"],
|
|
"statut_source_apres": 5,
|
|
"statut_source_apres_description": "Transformé",
|
|
"statut_cible_initial": 2,
|
|
"statut_cible_initial_description": "Accepté/Validé",
|
|
},
|
|
{
|
|
"source_type": 0,
|
|
"source_libelle": "Devis",
|
|
"cible_type": 60,
|
|
"cible_libelle": "Facture",
|
|
"statut_source_requis": [2],
|
|
"statut_source_requis_description": ["Accepté/Validé"],
|
|
"statut_source_apres": 5,
|
|
"statut_source_apres_description": "Transformé",
|
|
"statut_cible_initial": 2,
|
|
"statut_cible_initial_description": "Accepté/Validé",
|
|
},
|
|
],
|
|
"statuts_bloquants_pour_transformation": [
|
|
{
|
|
"statut": 5,
|
|
"description": "Transformé",
|
|
"raison": "Le document a déjà été transformé",
|
|
},
|
|
{
|
|
"statut": 6,
|
|
"description": "Annulé",
|
|
"raison": "Le document est annulé",
|
|
},
|
|
{
|
|
"statut": 3,
|
|
"description": "Réalisé partiellement",
|
|
"raison": "Un document cible existe probablement déjà (transformation partielle effectuée)",
|
|
},
|
|
{
|
|
"statut": 4,
|
|
"description": "Réalisé totalement",
|
|
"raison": "Le document a été entièrement réalisé (transformation déjà effectuée)",
|
|
},
|
|
],
|
|
"changements_statut_autorises": {
|
|
"0_Brouillon": {
|
|
"vers": [2, 6],
|
|
"descriptions": ["Accepté/Validé", "Annulé"],
|
|
"note": "Un brouillon peut être accepté ou annulé",
|
|
},
|
|
"2_Accepte": {
|
|
"vers": [5, 6],
|
|
"descriptions": ["Transformé", "Annulé"],
|
|
"note": "Un document accepté peut être transformé ou annulé",
|
|
},
|
|
"5_Transforme": {
|
|
"vers": [],
|
|
"descriptions": [],
|
|
"note": "Un document transformé ne peut plus changer de statut",
|
|
},
|
|
"6_Annule": {
|
|
"vers": [],
|
|
"descriptions": [],
|
|
"note": "Un document annulé ne peut plus changer de statut",
|
|
},
|
|
},
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"matrice_statuts_par_type": matrice_complete,
|
|
"regles_transformation": regles_transformation,
|
|
"legende_statuts": descriptions_statuts,
|
|
"types_documents": types_documents,
|
|
"date_analyse": datetime.now().isoformat(),
|
|
"note": "Cette matrice est construite à partir des documents réels dans votre base Sage",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur diagnostic global: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get(
|
|
"/sage/diagnostic/statuts-permis/{numero}", dependencies=[Depends(verify_token)]
|
|
)
|
|
def diagnostiquer_statuts_permis(numero: str):
|
|
"""
|
|
🔍 DIAGNOSTIC CRITIQUE: Découvre TOUS les statuts possibles pour un document
|
|
|
|
Teste tous les statuts de 0 à 10 pour identifier lesquels sont valides
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Chercher le document (tous types confondus)
|
|
persist = None
|
|
type_doc_trouve = None
|
|
|
|
# Essayer ReadPiece pour différents types
|
|
for type_test in range(7): # 0-6
|
|
try:
|
|
persist_test = factory.ReadPiece(type_test, numero)
|
|
if persist_test:
|
|
persist = persist_test
|
|
type_doc_trouve = type_test
|
|
logger.info(
|
|
f"[DIAG] Document {numero} trouvé avec ReadPiece(type={type_test})"
|
|
)
|
|
break
|
|
except:
|
|
continue
|
|
|
|
# Si pas trouvé, chercher dans List()
|
|
if not persist:
|
|
index = 1
|
|
while index < 10000:
|
|
try:
|
|
persist_test = factory.List(index)
|
|
if persist_test is None:
|
|
break
|
|
|
|
doc_test = win32com.client.CastTo(
|
|
persist_test, "IBODocumentVente3"
|
|
)
|
|
doc_test.Read()
|
|
|
|
if getattr(doc_test, "DO_Piece", "") == numero:
|
|
persist = persist_test
|
|
type_doc_trouve = getattr(doc_test, "DO_Type", -1)
|
|
logger.info(
|
|
f"[DIAG] Document {numero} trouvé dans List() à l'index {index}"
|
|
)
|
|
break
|
|
|
|
index += 1
|
|
except:
|
|
index += 1
|
|
|
|
if not persist:
|
|
raise HTTPException(404, f"Document {numero} introuvable")
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
# Infos du document
|
|
statut_actuel = getattr(doc, "DO_Statut", -1)
|
|
type_actuel = getattr(doc, "DO_Type", -1)
|
|
|
|
diagnostic = {
|
|
"numero": numero,
|
|
"type_document": type_actuel,
|
|
"type_libelle": {
|
|
0: "Devis",
|
|
10: "Bon de commande",
|
|
20: "Préparation",
|
|
30: "Bon de livraison",
|
|
40: "Bon de retour",
|
|
50: "Bon d'avoir",
|
|
60: "Facture",
|
|
}.get(type_actuel, f"Type {type_actuel}"),
|
|
"statut_actuel": statut_actuel,
|
|
"statut_actuel_libelle": {
|
|
0: "Brouillon",
|
|
1: "Soumis/En attente",
|
|
2: "Accepté/Validé",
|
|
3: "Réalisé partiellement",
|
|
4: "Réalisé totalement",
|
|
5: "Transformé",
|
|
6: "Annulé",
|
|
}.get(statut_actuel, f"Statut {statut_actuel}"),
|
|
"tests_statuts": [],
|
|
}
|
|
|
|
# Tester tous les statuts de 0 à 10
|
|
logger.info(f"[DIAG] Test des statuts pour {numero}...")
|
|
|
|
for statut_test in range(11):
|
|
resultat_test = {
|
|
"statut": statut_test,
|
|
"libelle": {
|
|
0: "Brouillon",
|
|
1: "Soumis/En attente",
|
|
2: "Accepté/Validé",
|
|
3: "Réalisé partiellement",
|
|
4: "Réalisé totalement",
|
|
5: "Transformé",
|
|
6: "Annulé",
|
|
7: "Statut 7",
|
|
8: "Statut 8",
|
|
9: "Statut 9",
|
|
10: "Statut 10",
|
|
}.get(statut_test, f"Statut {statut_test}"),
|
|
"autorise": False,
|
|
"erreur": None,
|
|
"est_statut_actuel": (statut_test == statut_actuel),
|
|
}
|
|
|
|
# Si c'est le statut actuel, on sait qu'il est valide
|
|
if statut_test == statut_actuel:
|
|
resultat_test["autorise"] = True
|
|
resultat_test["note"] = "Statut actuel du document"
|
|
else:
|
|
# Tester le changement de statut
|
|
try:
|
|
# Relire le document
|
|
doc.Read()
|
|
|
|
# Essayer de changer le statut
|
|
doc.DO_Statut = statut_test
|
|
|
|
# Essayer d'écrire
|
|
doc.Write()
|
|
|
|
# Si on arrive ici, le statut est valide !
|
|
resultat_test["autorise"] = True
|
|
resultat_test["note"] = "Changement de statut réussi"
|
|
|
|
logger.info(f"[DIAG] ✅ Statut {statut_test} AUTORISÉ")
|
|
|
|
# Restaurer le statut d'origine immédiatement
|
|
doc.Read()
|
|
doc.DO_Statut = statut_actuel
|
|
doc.Write()
|
|
|
|
except Exception as e:
|
|
erreur_str = str(e)
|
|
resultat_test["autorise"] = False
|
|
resultat_test["erreur"] = erreur_str
|
|
|
|
logger.debug(
|
|
f"[DIAG] ❌ Statut {statut_test} REFUSÉ: {erreur_str[:100]}"
|
|
)
|
|
|
|
# Restaurer en cas d'erreur
|
|
try:
|
|
doc.Read()
|
|
except:
|
|
pass
|
|
|
|
diagnostic["tests_statuts"].append(resultat_test)
|
|
|
|
# Résumé
|
|
statuts_autorises = [
|
|
t["statut"] for t in diagnostic["tests_statuts"] if t["autorise"]
|
|
]
|
|
statuts_refuses = [
|
|
t["statut"] for t in diagnostic["tests_statuts"] if not t["autorise"]
|
|
]
|
|
|
|
diagnostic["resume"] = {
|
|
"nb_statuts_autorises": len(statuts_autorises),
|
|
"statuts_autorises": statuts_autorises,
|
|
"statuts_autorises_libelles": [
|
|
t["libelle"] for t in diagnostic["tests_statuts"] if t["autorise"]
|
|
],
|
|
"nb_statuts_refuses": len(statuts_refuses),
|
|
"statuts_refuses": statuts_refuses,
|
|
}
|
|
|
|
# Recommandations
|
|
recommendations = []
|
|
|
|
if 2 in statuts_autorises and statut_actuel == 0:
|
|
recommendations.append(
|
|
"✅ Vous pouvez passer ce document de 'Brouillon' (0) à 'Accepté' (2)"
|
|
)
|
|
|
|
if 5 in statuts_autorises:
|
|
recommendations.append(
|
|
"✅ Le statut 'Transformé' (5) est disponible - utilisé après transformation"
|
|
)
|
|
|
|
if 6 in statuts_autorises:
|
|
recommendations.append("✅ Vous pouvez annuler ce document (statut 6)")
|
|
|
|
if not any(s in statuts_autorises for s in [2, 3, 4]):
|
|
recommendations.append(
|
|
"⚠️ Aucun statut de validation (2/3/4) n'est disponible - "
|
|
"le document a peut-être déjà été traité"
|
|
)
|
|
|
|
diagnostic["recommendations"] = recommendations
|
|
|
|
logger.info(
|
|
f"[DIAG] Statuts autorisés pour {numero}: "
|
|
f"{statuts_autorises} / Refusés: {statuts_refuses}"
|
|
)
|
|
|
|
return {"success": True, "diagnostic": diagnostic}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur diagnostic statuts: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get(
|
|
"/sage/diagnostic/erreur-transformation/{numero}",
|
|
dependencies=[Depends(verify_token)],
|
|
)
|
|
def diagnostiquer_erreur_transformation(
|
|
numero: str, type_source: int = Query(...), type_cible: int = Query(...)
|
|
):
|
|
"""
|
|
🔍 DIAGNOSTIC AVANCÉ: Analyse pourquoi une transformation échoue
|
|
|
|
Vérifie:
|
|
- Statut du document source
|
|
- Statuts autorisés
|
|
- Lignes du document
|
|
- Client associé
|
|
- Champs obligatoires manquants
|
|
"""
|
|
try:
|
|
if not sage or not sage.cial:
|
|
raise HTTPException(503, "Service Sage indisponible")
|
|
|
|
with sage._com_context(), sage._lock_com:
|
|
factory = sage.cial.FactoryDocumentVente
|
|
|
|
# Lire le document source
|
|
persist = factory.ReadPiece(type_source, numero)
|
|
|
|
if not persist:
|
|
persist = sage._find_document_in_list(numero, type_source)
|
|
|
|
if not persist:
|
|
raise HTTPException(
|
|
404, f"Document {numero} (type {type_source}) introuvable"
|
|
)
|
|
|
|
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
|
doc.Read()
|
|
|
|
diagnostic = {
|
|
"numero": numero,
|
|
"type_source": type_source,
|
|
"type_cible": type_cible,
|
|
"problemes_detectes": [],
|
|
"avertissements": [],
|
|
"suggestions": [],
|
|
}
|
|
|
|
# 1. Vérifier le statut
|
|
statut_actuel = getattr(doc, "DO_Statut", -1)
|
|
diagnostic["statut_actuel"] = statut_actuel
|
|
|
|
if statut_actuel == 5:
|
|
diagnostic["problemes_detectes"].append(
|
|
{
|
|
"severite": "BLOQUANT",
|
|
"champ": "DO_Statut",
|
|
"valeur": 5,
|
|
"message": "Document déjà transformé (statut=5)",
|
|
}
|
|
)
|
|
|
|
elif statut_actuel == 6:
|
|
diagnostic["problemes_detectes"].append(
|
|
{
|
|
"severite": "BLOQUANT",
|
|
"champ": "DO_Statut",
|
|
"valeur": 6,
|
|
"message": "Document annulé (statut=6)",
|
|
}
|
|
)
|
|
|
|
elif statut_actuel in [3, 4]:
|
|
diagnostic["avertissements"].append(
|
|
{
|
|
"severite": "ATTENTION",
|
|
"champ": "DO_Statut",
|
|
"valeur": statut_actuel,
|
|
"message": f"Document déjà réalisé (statut={statut_actuel}). "
|
|
f"Un document cible existe peut-être déjà.",
|
|
}
|
|
)
|
|
|
|
elif statut_actuel == 0:
|
|
diagnostic["suggestions"].append(
|
|
"Le document est en 'Brouillon' (statut=0). "
|
|
"Le système le passera automatiquement à 'Accepté' (statut=2) avant transformation."
|
|
)
|
|
|
|
# 2. Vérifier le client
|
|
client_code = ""
|
|
try:
|
|
client_obj = getattr(doc, "Client", None)
|
|
if client_obj:
|
|
client_obj.Read()
|
|
client_code = getattr(client_obj, "CT_Num", "").strip()
|
|
except:
|
|
pass
|
|
|
|
if not client_code:
|
|
diagnostic["problemes_detectes"].append(
|
|
{
|
|
"severite": "BLOQUANT",
|
|
"champ": "CT_Num",
|
|
"valeur": None,
|
|
"message": "Aucun client associé au document",
|
|
}
|
|
)
|
|
else:
|
|
diagnostic["client_code"] = client_code
|
|
|
|
# 3. Vérifier les lignes
|
|
try:
|
|
factory_lignes = getattr(doc, "FactoryDocumentLigne", None) or getattr(
|
|
doc, "FactoryDocumentVenteLigne", None
|
|
)
|
|
|
|
nb_lignes = 0
|
|
lignes_problemes = []
|
|
|
|
if factory_lignes:
|
|
index = 1
|
|
while index <= 100:
|
|
try:
|
|
ligne_p = factory_lignes.List(index)
|
|
if ligne_p is None:
|
|
break
|
|
|
|
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
|
|
ligne.Read()
|
|
|
|
nb_lignes += 1
|
|
|
|
# Vérifier article
|
|
article_ref = ""
|
|
try:
|
|
article_ref = getattr(ligne, "AR_Ref", "").strip()
|
|
if not article_ref:
|
|
article_obj = getattr(ligne, "Article", None)
|
|
if article_obj:
|
|
article_obj.Read()
|
|
article_ref = getattr(
|
|
article_obj, "AR_Ref", ""
|
|
).strip()
|
|
except:
|
|
pass
|
|
|
|
if not article_ref:
|
|
lignes_problemes.append(
|
|
{
|
|
"ligne": index,
|
|
"probleme": "Aucune référence article",
|
|
}
|
|
)
|
|
|
|
# Vérifier prix
|
|
prix = float(getattr(ligne, "DL_PrixUnitaire", 0.0))
|
|
if prix == 0:
|
|
lignes_problemes.append(
|
|
{"ligne": index, "probleme": "Prix unitaire = 0"}
|
|
)
|
|
|
|
index += 1
|
|
except:
|
|
break
|
|
|
|
diagnostic["nb_lignes"] = nb_lignes
|
|
|
|
if nb_lignes == 0:
|
|
diagnostic["problemes_detectes"].append(
|
|
{
|
|
"severite": "BLOQUANT",
|
|
"champ": "Lignes",
|
|
"valeur": 0,
|
|
"message": "Document vide (aucune ligne)",
|
|
}
|
|
)
|
|
|
|
if lignes_problemes:
|
|
diagnostic["avertissements"].append(
|
|
{
|
|
"severite": "ATTENTION",
|
|
"champ": "Lignes",
|
|
"message": f"{len(lignes_problemes)} ligne(s) avec des problèmes",
|
|
"details": lignes_problemes,
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
diagnostic["avertissements"].append(
|
|
{
|
|
"severite": "ERREUR",
|
|
"champ": "Lignes",
|
|
"message": f"Impossible de lire les lignes: {e}",
|
|
}
|
|
)
|
|
|
|
# 4. Vérifier les totaux
|
|
total_ht = float(getattr(doc, "DO_TotalHT", 0.0))
|
|
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
|
|
|
|
diagnostic["totaux"] = {"total_ht": total_ht, "total_ttc": total_ttc}
|
|
|
|
if total_ht == 0 and total_ttc == 0:
|
|
diagnostic["avertissements"].append(
|
|
{
|
|
"severite": "ATTENTION",
|
|
"champ": "Totaux",
|
|
"message": "Tous les totaux sont à 0",
|
|
}
|
|
)
|
|
|
|
# 5. Vérifier si la transformation est autorisée
|
|
transformations_valides = {(0, 10), (10, 30), (10, 60), (30, 60), (0, 60)}
|
|
|
|
if (type_source, type_cible) not in transformations_valides:
|
|
diagnostic["problemes_detectes"].append(
|
|
{
|
|
"severite": "BLOQUANT",
|
|
"champ": "Transformation",
|
|
"message": f"Transformation {type_source} → {type_cible} non autorisée. "
|
|
f"Transformations valides: {transformations_valides}",
|
|
}
|
|
)
|
|
|
|
# Résumé
|
|
nb_bloquants = sum(
|
|
1
|
|
for p in diagnostic["problemes_detectes"]
|
|
if p.get("severite") == "BLOQUANT"
|
|
)
|
|
nb_avertissements = len(diagnostic["avertissements"])
|
|
|
|
diagnostic["resume"] = {
|
|
"peut_transformer": nb_bloquants == 0,
|
|
"nb_problemes_bloquants": nb_bloquants,
|
|
"nb_avertissements": nb_avertissements,
|
|
}
|
|
|
|
if nb_bloquants == 0:
|
|
diagnostic["suggestions"].append(
|
|
"✅ Aucun problème bloquant détecté. La transformation devrait fonctionner."
|
|
)
|
|
else:
|
|
diagnostic["suggestions"].append(
|
|
f"❌ {nb_bloquants} problème(s) bloquant(s) doivent être résolus avant la transformation."
|
|
)
|
|
|
|
return {"success": True, "diagnostic": diagnostic}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[DIAG] Erreur diagnostic transformation: {e}", exc_info=True)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
# =====================================================
|
|
# LANCEMENT
|
|
# =====================================================
|
|
if __name__ == "__main__":
|
|
uvicorn.run(
|
|
"main:app",
|
|
host=settings.api_host,
|
|
port=settings.api_port,
|
|
reload=False, # Pas de reload en production
|
|
log_level="info",
|
|
)
|