2723 lines
100 KiB
Python
2723 lines
100 KiB
Python
from fastapi import FastAPI, HTTPException, Header, Depends, Query
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional, List, Dict
|
||
from datetime import datetime, date
|
||
from enum import Enum
|
||
import uvicorn
|
||
import logging
|
||
import win32com.client
|
||
from config import settings, validate_settings
|
||
from sage_connector import SageConnector
|
||
|
||
# =====================================================
|
||
# LOGGING
|
||
# =====================================================
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
handlers=[logging.FileHandler("sage_gateway.log"), logging.StreamHandler()],
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# =====================================================
|
||
# ENUMS
|
||
# =====================================================
|
||
class TypeDocument(int, Enum):
|
||
DEVIS = 0
|
||
BON_LIVRAISON = 1
|
||
BON_RETOUR = 2
|
||
COMMANDE = 3
|
||
PREPARATION = 4
|
||
FACTURE = 5
|
||
|
||
|
||
# =====================================================
|
||
# MODÈLES
|
||
# =====================================================
|
||
|
||
|
||
class DocumentGetRequest(BaseModel):
|
||
numero: str
|
||
type_doc: int
|
||
|
||
|
||
class FiltreRequest(BaseModel):
|
||
filtre: Optional[str] = ""
|
||
|
||
|
||
class CodeRequest(BaseModel):
|
||
code: str
|
||
|
||
|
||
class ChampLibreRequest(BaseModel):
|
||
doc_id: str
|
||
type_doc: int
|
||
nom_champ: str
|
||
valeur: str
|
||
|
||
|
||
class DevisRequest(BaseModel):
|
||
client_id: str
|
||
date_devis: Optional[date] = None
|
||
lignes: List[
|
||
Dict
|
||
] # Format: {article_code, quantite, prix_unitaire_ht, remise_pourcentage}
|
||
|
||
|
||
class TransformationRequest(BaseModel):
|
||
numero_source: str
|
||
type_source: int
|
||
type_cible: int
|
||
|
||
|
||
class StatutRequest(BaseModel):
|
||
nouveau_statut: int
|
||
|
||
|
||
class ClientCreateRequest(BaseModel):
|
||
intitule: str = Field(..., description="Nom du client (CT_Intitule)")
|
||
compte_collectif: str = Field("411000", description="Compte général rattaché")
|
||
num: Optional[str] = Field(None, description="Laisser vide pour numérotation auto")
|
||
adresse: Optional[str] = None
|
||
code_postal: Optional[str] = None
|
||
ville: Optional[str] = None
|
||
pays: Optional[str] = None
|
||
email: Optional[str] = None
|
||
telephone: Optional[str] = None
|
||
siret: Optional[str] = None
|
||
tva_intra: Optional[str] = None
|
||
|
||
class ClientUpdateGatewayRequest(BaseModel):
|
||
"""Modèle pour modification client côté gateway"""
|
||
code: str
|
||
client_data: Dict
|
||
|
||
class FournisseurCreateRequest(BaseModel):
|
||
intitule: str = Field(..., description="Raison sociale du fournisseur")
|
||
compte_collectif: str = Field("401000", description="Compte général rattaché")
|
||
num: Optional[str] = Field(None, description="Code fournisseur (auto si vide)")
|
||
adresse: Optional[str] = None
|
||
code_postal: Optional[str] = None
|
||
ville: Optional[str] = None
|
||
pays: Optional[str] = None
|
||
email: Optional[str] = None
|
||
telephone: Optional[str] = None
|
||
siret: Optional[str] = None
|
||
tva_intra: Optional[str] = None
|
||
|
||
|
||
# =====================================================
|
||
# SÉCURITÉ
|
||
# =====================================================
|
||
def verify_token(x_sage_token: str = Header(...)):
|
||
"""Vérification du token d'authentification"""
|
||
if x_sage_token != settings.sage_gateway_token:
|
||
logger.warning(f"❌ Token invalide reçu: {x_sage_token[:20]}...")
|
||
raise HTTPException(401, "Token invalide")
|
||
return True
|
||
|
||
|
||
# =====================================================
|
||
# APPLICATION
|
||
# =====================================================
|
||
app = FastAPI(
|
||
title="Sage Gateway - Windows Server",
|
||
version="1.0.0",
|
||
description="Passerelle d'accès à Sage 100c pour VPS Linux",
|
||
)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=settings.cors_origins,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
allow_credentials=True,
|
||
)
|
||
|
||
sage: Optional[SageConnector] = None
|
||
|
||
|
||
# =====================================================
|
||
# LIFECYCLE
|
||
# =====================================================
|
||
@app.on_event("startup")
|
||
def startup():
|
||
global sage
|
||
|
||
logger.info("🚀 Démarrage Sage Gateway Windows...")
|
||
|
||
# Validation config
|
||
try:
|
||
validate_settings()
|
||
logger.info("✅ Configuration validée")
|
||
except ValueError as e:
|
||
logger.error(f"❌ Configuration invalide: {e}")
|
||
raise
|
||
|
||
# Connexion Sage
|
||
sage = SageConnector(
|
||
settings.chemin_base, settings.utilisateur, settings.mot_de_passe
|
||
)
|
||
|
||
if not sage.connecter():
|
||
raise RuntimeError("❌ Impossible de se connecter à Sage 100c")
|
||
|
||
logger.info("✅ Sage Gateway démarré et connecté")
|
||
|
||
|
||
@app.on_event("shutdown")
|
||
def shutdown():
|
||
if sage:
|
||
sage.deconnecter()
|
||
logger.info("👋 Sage Gateway arrêté")
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - SYSTÈME
|
||
# =====================================================
|
||
@app.get("/health")
|
||
def health():
|
||
"""Health check"""
|
||
return {
|
||
"status": "ok",
|
||
"sage_connected": sage is not None and sage.cial is not None,
|
||
"cache_info": sage.get_cache_info() if sage else None,
|
||
"timestamp": datetime.now().isoformat(),
|
||
}
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - CLIENTS
|
||
# =====================================================
|
||
@app.post("/sage/clients/list", dependencies=[Depends(verify_token)])
|
||
def clients_list(req: FiltreRequest):
|
||
"""Liste des clients avec filtre optionnel"""
|
||
try:
|
||
clients = sage.lister_tous_clients(req.filtre)
|
||
return {"success": True, "data": clients}
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste clients: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
@app.post("/sage/clients/update", dependencies=[Depends(verify_token)])
|
||
def modifier_client_endpoint(req: ClientUpdateGatewayRequest):
|
||
"""
|
||
✏️ Modification d'un client dans Sage
|
||
"""
|
||
try:
|
||
resultat = sage.modifier_client(req.code, req.client_data)
|
||
return {"success": True, "data": resultat}
|
||
|
||
except ValueError as e:
|
||
logger.warning(f"Erreur métier modification client: {e}")
|
||
raise HTTPException(404, str(e))
|
||
except Exception as e:
|
||
logger.error(f"Erreur technique modification client: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
@app.post("/sage/clients/get", dependencies=[Depends(verify_token)])
|
||
def client_get(req: CodeRequest):
|
||
"""Lecture d'un client par code"""
|
||
try:
|
||
client = sage.lire_client(req.code)
|
||
if not client:
|
||
raise HTTPException(404, f"Client {req.code} non trouvé")
|
||
return {"success": True, "data": client}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture client: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# DANS main.py
|
||
@app.post("/sage/clients/create", dependencies=[Depends(verify_token)])
|
||
def create_client_endpoint(req: ClientCreateRequest):
|
||
"""Création d'un client dans Sage"""
|
||
try:
|
||
# L'appel au connecteur est fait ici
|
||
resultat = sage.creer_client(req.dict())
|
||
return {"success": True, "data": resultat}
|
||
except ValueError as e:
|
||
logger.warning(f"Erreur métier création client: {e}")
|
||
# Erreur métier (ex: doublon) -> 400 Bad Request
|
||
raise HTTPException(400, str(e))
|
||
except Exception as e:
|
||
logger.error(f"Erreur technique création client: {e}")
|
||
# Erreur technique (ex: COM) -> 500 Internal Server Error
|
||
raise HTTPException(500, str(e))
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - ARTICLES
|
||
# =====================================================
|
||
@app.post("/sage/articles/list", dependencies=[Depends(verify_token)])
|
||
def articles_list(req: FiltreRequest):
|
||
"""Liste des articles avec filtre optionnel"""
|
||
try:
|
||
articles = sage.lister_tous_articles(req.filtre)
|
||
return {"success": True, "data": articles}
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste articles: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/articles/get", dependencies=[Depends(verify_token)])
|
||
def article_get(req: CodeRequest):
|
||
"""Lecture d'un article par référence"""
|
||
try:
|
||
article = sage.lire_article(req.code)
|
||
if not article:
|
||
raise HTTPException(404, f"Article {req.code} non trouvé")
|
||
return {"success": True, "data": article}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture article: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - DEVIS
|
||
# =====================================================
|
||
@app.post("/sage/devis/create", dependencies=[Depends(verify_token)])
|
||
def creer_devis(req: DevisRequest):
|
||
"""Création d'un devis"""
|
||
try:
|
||
# Transformer en format attendu par sage_connector
|
||
devis_data = {
|
||
"client": {"code": req.client_id, "intitule": ""},
|
||
"date_devis": req.date_devis or date.today(),
|
||
"lignes": req.lignes,
|
||
}
|
||
|
||
resultat = sage.creer_devis_enrichi(devis_data)
|
||
return {"success": True, "data": resultat}
|
||
except Exception as e:
|
||
logger.error(f"Erreur création devis: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/devis/get", dependencies=[Depends(verify_token)])
|
||
def lire_devis(req: CodeRequest):
|
||
"""Lecture d'un devis"""
|
||
try:
|
||
devis = sage.lire_devis(req.code)
|
||
if not devis:
|
||
raise HTTPException(404, f"Devis {req.code} non trouvé")
|
||
return {"success": True, "data": devis}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture devis: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/devis/list", dependencies=[Depends(verify_token)])
|
||
def devis_list(
|
||
limit: int = 100, statut: Optional[int] = None, inclure_lignes: bool = Query(True)
|
||
):
|
||
"""
|
||
📋 Liste tous les devis avec filtres optionnels
|
||
|
||
Args:
|
||
limit: Nombre max de devis à retourner
|
||
statut: Filtre par statut (optionnel)
|
||
inclure_lignes: Si True, charge les lignes de chaque devis (par défaut: True)
|
||
|
||
✅ AMÉLIORATION: Charge maintenant les lignes de chaque devis
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
devis_list = []
|
||
index = 1
|
||
max_iterations = limit * 3
|
||
erreurs_consecutives = 0
|
||
max_erreurs = 50
|
||
|
||
logger.info(
|
||
f"🔍 Recherche devis (limit={limit}, statut={statut}, inclure_lignes={inclure_lignes})"
|
||
)
|
||
|
||
while (
|
||
len(devis_list) < limit
|
||
and index < max_iterations
|
||
and erreurs_consecutives < max_erreurs
|
||
):
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
logger.debug(f"Fin de liste à l'index {index}")
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
# Filtrer uniquement devis (type 0)
|
||
doc_type = getattr(doc, "DO_Type", -1)
|
||
if doc_type != 0:
|
||
index += 1
|
||
continue
|
||
|
||
doc_statut = getattr(doc, "DO_Statut", 0)
|
||
|
||
# Filtre statut
|
||
if statut is not None and doc_statut != statut:
|
||
index += 1
|
||
continue
|
||
|
||
# ✅ Charger client via .Client
|
||
client_code = ""
|
||
client_intitule = ""
|
||
|
||
try:
|
||
client_obj = getattr(doc, "Client", None)
|
||
if client_obj:
|
||
client_obj.Read()
|
||
client_code = getattr(client_obj, "CT_Num", "").strip()
|
||
client_intitule = getattr(
|
||
client_obj, "CT_Intitule", ""
|
||
).strip()
|
||
logger.debug(
|
||
f"✅ Client: {client_code} - {client_intitule}"
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"Erreur chargement client: {e}")
|
||
# Fallback sur cache si code disponible
|
||
if client_code:
|
||
client_cache = sage.lire_client(client_code)
|
||
if client_cache:
|
||
client_intitule = client_cache.get("intitule", "")
|
||
|
||
# ✅✅ NOUVEAU: Charger les lignes si demandé
|
||
lignes = []
|
||
if inclure_lignes:
|
||
try:
|
||
factory_lignes = getattr(doc, "FactoryDocumentLigne", None)
|
||
if not factory_lignes:
|
||
factory_lignes = getattr(
|
||
doc, "FactoryDocumentVenteLigne", None
|
||
)
|
||
|
||
if factory_lignes:
|
||
ligne_index = 1
|
||
while ligne_index <= 100: # Max 100 lignes par devis
|
||
try:
|
||
ligne_persist = factory_lignes.List(ligne_index)
|
||
if ligne_persist is None:
|
||
break
|
||
|
||
ligne = win32com.client.CastTo(
|
||
ligne_persist, "IBODocumentLigne3"
|
||
)
|
||
ligne.Read()
|
||
|
||
# Charger référence article
|
||
article_ref = ""
|
||
try:
|
||
article_ref = getattr(
|
||
ligne, "AR_Ref", ""
|
||
).strip()
|
||
if not article_ref:
|
||
article_obj = getattr(
|
||
ligne, "Article", None
|
||
)
|
||
if article_obj:
|
||
article_obj.Read()
|
||
article_ref = getattr(
|
||
article_obj, "AR_Ref", ""
|
||
).strip()
|
||
except:
|
||
pass
|
||
|
||
lignes.append(
|
||
{
|
||
"article": article_ref,
|
||
"designation": getattr(
|
||
ligne, "DL_Design", ""
|
||
),
|
||
"quantite": float(
|
||
getattr(ligne, "DL_Qte", 0.0)
|
||
),
|
||
"prix_unitaire": float(
|
||
getattr(
|
||
ligne, "DL_PrixUnitaire", 0.0
|
||
)
|
||
),
|
||
"montant_ht": float(
|
||
getattr(ligne, "DL_MontantHT", 0.0)
|
||
),
|
||
}
|
||
)
|
||
|
||
ligne_index += 1
|
||
except Exception as e:
|
||
logger.debug(f"Erreur ligne {ligne_index}: {e}")
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"Erreur chargement lignes: {e}")
|
||
|
||
devis_list.append(
|
||
{
|
||
"numero": getattr(doc, "DO_Piece", ""),
|
||
"date": str(getattr(doc, "DO_Date", "")),
|
||
"client_code": client_code,
|
||
"client_intitule": client_intitule,
|
||
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
||
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
||
"statut": doc_statut,
|
||
"lignes": lignes, # ✅ Lignes incluses
|
||
}
|
||
)
|
||
|
||
erreurs_consecutives = 0
|
||
index += 1
|
||
|
||
except Exception as e:
|
||
erreurs_consecutives += 1
|
||
logger.debug(f"⚠️ Erreur index {index}: {e}")
|
||
index += 1
|
||
|
||
if erreurs_consecutives >= max_erreurs:
|
||
logger.warning(
|
||
f"⚠️ Arrêt après {max_erreurs} erreurs consécutives"
|
||
)
|
||
break
|
||
|
||
nb_avec_client = sum(1 for d in devis_list if d["client_intitule"])
|
||
nb_avec_lignes = sum(1 for d in devis_list if d.get("lignes"))
|
||
logger.info(
|
||
f"✅ {len(devis_list)} devis retournés "
|
||
f"({nb_avec_client} avec client, {nb_avec_lignes} avec lignes)"
|
||
)
|
||
|
||
return {"success": True, "data": devis_list}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"❌ Erreur liste devis: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/devis/statut", dependencies=[Depends(verify_token)])
|
||
def changer_statut_devis_endpoint(numero: str, nouveau_statut: int):
|
||
"""Change le statut d'un devis"""
|
||
try:
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
persist = factory.ReadPiece(0, numero)
|
||
|
||
if not persist:
|
||
raise HTTPException(404, f"Devis {numero} introuvable")
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
statut_actuel = getattr(doc, "DO_Statut", 0)
|
||
doc.DO_Statut = nouveau_statut
|
||
doc.Write()
|
||
|
||
logger.info(f"✅ Statut devis {numero}: {statut_actuel} → {nouveau_statut}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"numero": numero,
|
||
"statut_ancien": statut_actuel,
|
||
"statut_nouveau": nouveau_statut,
|
||
},
|
||
}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur changement statut: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - DOCUMENTS
|
||
# =====================================================
|
||
@app.post("/sage/documents/get", dependencies=[Depends(verify_token)])
|
||
def lire_document(req: DocumentGetRequest):
|
||
"""Lecture d'un document (commande, facture, etc.)"""
|
||
try:
|
||
doc = sage.lire_document(req.numero, req.type_doc)
|
||
if not doc:
|
||
raise HTTPException(404, f"Document {req.numero} non trouvé")
|
||
return {"success": True, "data": doc}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture document: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/documents/transform", dependencies=[Depends(verify_token)])
|
||
def transformer_document(
|
||
numero_source: str = Query(..., description="Numéro du document source"),
|
||
type_source: int = Query(..., description="Type document source"),
|
||
type_cible: int = Query(..., description="Type document cible"),
|
||
):
|
||
"""
|
||
🔧 Transformation de document
|
||
|
||
✅ CORRECTION : Utilise les VRAIS types Sage Dataven
|
||
|
||
Types valides :
|
||
- 0: Devis
|
||
- 10: Bon de commande
|
||
- 20: Préparation
|
||
- 30: Bon de livraison
|
||
- 40: Bon de retour
|
||
- 50: Bon d'avoir
|
||
- 60: Facture
|
||
|
||
Transformations autorisées :
|
||
- Devis (0) → Commande (10)
|
||
- Commande (10) → Bon livraison (30)
|
||
- Commande (10) → Facture (60)
|
||
- Bon livraison (30) → Facture (60)
|
||
"""
|
||
try:
|
||
logger.info(
|
||
f"🔄 Transformation demandée: {numero_source} "
|
||
f"(type {type_source}) → type {type_cible}"
|
||
)
|
||
|
||
# ✅ Matrice des transformations valides pour VOTRE Sage
|
||
transformations_valides = {
|
||
(0, 10), # Devis → Commande
|
||
(10, 30), # Commande → Bon de livraison
|
||
(10, 60), # Commande → Facture
|
||
(30, 60), # Bon de livraison → Facture
|
||
(0, 60), # Devis → Facture (si autorisé)
|
||
}
|
||
|
||
if (type_source, type_cible) not in transformations_valides:
|
||
logger.error(
|
||
f"❌ Transformation non autorisée: {type_source} → {type_cible}"
|
||
)
|
||
raise HTTPException(
|
||
400,
|
||
f"Transformation non autorisée: type {type_source} → type {type_cible}. "
|
||
f"Transformations valides: {transformations_valides}",
|
||
)
|
||
|
||
# Appel au connecteur Sage
|
||
resultat = sage.transformer_document(numero_source, type_source, type_cible)
|
||
|
||
logger.info(
|
||
f"✅ Transformation réussie: {numero_source} → "
|
||
f"{resultat.get('document_cible', '?')} "
|
||
f"({resultat.get('nb_lignes', 0)} lignes)"
|
||
)
|
||
|
||
return {"success": True, "data": resultat}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except ValueError as e:
|
||
logger.error(f"❌ Erreur métier transformation: {e}")
|
||
raise HTTPException(400, str(e))
|
||
except Exception as e:
|
||
logger.error(f"❌ Erreur technique transformation: {e}", exc_info=True)
|
||
raise HTTPException(500, f"Erreur transformation: {str(e)}")
|
||
|
||
|
||
@app.post("/sage/documents/champ-libre", dependencies=[Depends(verify_token)])
|
||
def maj_champ_libre(req: ChampLibreRequest):
|
||
"""Mise à jour d'un champ libre"""
|
||
try:
|
||
success = sage.mettre_a_jour_champ_libre(
|
||
req.doc_id, req.type_doc, req.nom_champ, req.valeur
|
||
)
|
||
return {"success": success}
|
||
except Exception as e:
|
||
logger.error(f"Erreur MAJ champ libre: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/documents/derniere-relance", dependencies=[Depends(verify_token)])
|
||
def maj_derniere_relance(doc_id: str, type_doc: int):
|
||
"""📅 Met à jour le champ 'Dernière relance' d'un document"""
|
||
try:
|
||
success = sage.mettre_a_jour_derniere_relance(doc_id, type_doc)
|
||
return {"success": success}
|
||
except Exception as e:
|
||
logger.error(f"Erreur MAJ dernière relance: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - CONTACTS
|
||
# =====================================================
|
||
@app.post("/sage/contact/read", dependencies=[Depends(verify_token)])
|
||
def contact_read(req: CodeRequest):
|
||
"""Lecture du contact principal d'un client"""
|
||
try:
|
||
contact = sage.lire_contact_principal_client(req.code)
|
||
if not contact:
|
||
raise HTTPException(404, f"Contact non trouvé pour client {req.code}")
|
||
return {"success": True, "data": contact}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture contact: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/commandes/list", dependencies=[Depends(verify_token)])
|
||
def commandes_list(limit: int = 100, statut: Optional[int] = None):
|
||
"""
|
||
📋 Liste toutes les commandes
|
||
✅ CORRECTION: Filtre sur type 10 (BON_COMMANDE)
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
commandes = []
|
||
index = 1
|
||
max_iterations = limit * 10
|
||
erreurs_consecutives = 0
|
||
max_erreurs = 100
|
||
|
||
logger.info(f"🔍 Recherche commandes (limit={limit}, statut={statut})")
|
||
|
||
while (
|
||
len(commandes) < limit
|
||
and index < max_iterations
|
||
and erreurs_consecutives < max_erreurs
|
||
):
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
doc_type = getattr(doc, "DO_Type", -1)
|
||
|
||
# ✅ CRITIQUE : Filtrer sur type 10 (BON_COMMANDE)
|
||
if doc_type != settings.SAGE_TYPE_BON_COMMANDE:
|
||
index += 1
|
||
continue
|
||
|
||
doc_statut = getattr(doc, "DO_Statut", 0)
|
||
|
||
# Filtre statut
|
||
if statut is not None and doc_statut != statut:
|
||
index += 1
|
||
continue
|
||
|
||
# Charger client
|
||
client_code = ""
|
||
client_intitule = ""
|
||
|
||
try:
|
||
client_obj = getattr(doc, "Client", None)
|
||
if client_obj:
|
||
client_obj.Read()
|
||
client_code = getattr(client_obj, "CT_Num", "").strip()
|
||
client_intitule = getattr(
|
||
client_obj, "CT_Intitule", ""
|
||
).strip()
|
||
except Exception as e:
|
||
logger.debug(f"Erreur chargement client: {e}")
|
||
|
||
commande = {
|
||
"numero": getattr(doc, "DO_Piece", ""),
|
||
"date": str(getattr(doc, "DO_Date", "")),
|
||
"client_code": client_code,
|
||
"client_intitule": client_intitule,
|
||
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
||
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
||
"statut": doc_statut,
|
||
}
|
||
|
||
commandes.append(commande)
|
||
erreurs_consecutives = 0
|
||
index += 1
|
||
|
||
except Exception as e:
|
||
erreurs_consecutives += 1
|
||
index += 1
|
||
|
||
if erreurs_consecutives >= max_erreurs:
|
||
break
|
||
|
||
logger.info(f"✅ {len(commandes)} commandes retournées")
|
||
|
||
return {"success": True, "data": commandes}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"❌ Erreur liste commandes: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/factures/list", dependencies=[Depends(verify_token)])
|
||
def factures_list(limit: int = 100, statut: Optional[int] = None):
|
||
"""
|
||
📋 Liste toutes les factures avec leurs lignes
|
||
✅ CORRECTION: Filtre sur type 60 (FACTURE) + chargement des lignes
|
||
"""
|
||
try:
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
factures = []
|
||
index = 1
|
||
max_iterations = limit * 3
|
||
|
||
while len(factures) < limit and index < max_iterations:
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
# ✅ CRITIQUE: Filtrer factures (type 60)
|
||
if getattr(doc, "DO_Type", -1) != settings.SAGE_TYPE_FACTURE:
|
||
index += 1
|
||
continue
|
||
|
||
doc_statut = getattr(doc, "DO_Statut", 0)
|
||
|
||
if statut is None or doc_statut == statut:
|
||
# Charger client
|
||
client_code = ""
|
||
client_intitule = ""
|
||
|
||
try:
|
||
client_obj = getattr(doc, "Client", None)
|
||
if client_obj:
|
||
client_obj.Read()
|
||
client_code = getattr(client_obj, "CT_Num", "").strip()
|
||
client_intitule = getattr(client_obj, "CT_Intitule", "").strip()
|
||
except:
|
||
pass
|
||
|
||
# ✅ NOUVEAU : Charger les lignes
|
||
lignes = []
|
||
try:
|
||
factory_lignes = getattr(doc, "FactoryDocumentLigne", None)
|
||
if not factory_lignes:
|
||
factory_lignes = getattr(doc, "FactoryDocumentVenteLigne", None)
|
||
|
||
if factory_lignes:
|
||
ligne_index = 1
|
||
while ligne_index <= 100:
|
||
try:
|
||
ligne_persist = factory_lignes.List(ligne_index)
|
||
if ligne_persist is None:
|
||
break
|
||
|
||
ligne = win32com.client.CastTo(ligne_persist, "IBODocumentLigne3")
|
||
ligne.Read()
|
||
|
||
# Charger référence article
|
||
article_ref = ""
|
||
try:
|
||
article_ref = getattr(ligne, "AR_Ref", "").strip()
|
||
if not article_ref:
|
||
article_obj = getattr(ligne, "Article", None)
|
||
if article_obj:
|
||
article_obj.Read()
|
||
article_ref = getattr(article_obj, "AR_Ref", "").strip()
|
||
except:
|
||
pass
|
||
|
||
lignes.append({
|
||
"article": article_ref,
|
||
"designation": getattr(ligne, "DL_Design", ""),
|
||
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
|
||
"prix_unitaire": float(getattr(ligne, "DL_PrixUnitaire", 0.0)),
|
||
"montant_ht": float(getattr(ligne, "DL_MontantHT", 0.0))
|
||
})
|
||
|
||
ligne_index += 1
|
||
except Exception as e:
|
||
logger.debug(f"Erreur ligne {ligne_index}: {e}")
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"Erreur chargement lignes: {e}")
|
||
|
||
factures.append({
|
||
"numero": getattr(doc, "DO_Piece", ""),
|
||
"date": str(getattr(doc, "DO_Date", "")),
|
||
"client_code": client_code,
|
||
"client_intitule": client_intitule,
|
||
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
||
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
||
"statut": doc_statut,
|
||
"lignes": lignes # ✅ AJOUT
|
||
})
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
continue
|
||
|
||
logger.info(f"✅ {len(factures)} factures retournées")
|
||
return {"success": True, "data": factures}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste factures: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/client/remise-max", dependencies=[Depends(verify_token)])
|
||
def lire_remise_max_client(code: str):
|
||
"""Récupère la remise max autorisée pour un client"""
|
||
try:
|
||
client_obj = sage._lire_client_obj(code)
|
||
|
||
if not client_obj:
|
||
raise HTTPException(404, f"Client {code} introuvable")
|
||
|
||
remise_max = 10.0 # Défaut
|
||
|
||
try:
|
||
remise_max = float(getattr(client_obj, "CT_RemiseMax", 10.0))
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"✅ Remise max client {code}: {remise_max}%")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {"client_code": code, "remise_max": remise_max},
|
||
}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture remise: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - ADMIN
|
||
# =====================================================
|
||
@app.post("/sage/cache/refresh", dependencies=[Depends(verify_token)])
|
||
def refresh_cache():
|
||
"""Force le rafraîchissement du cache"""
|
||
try:
|
||
sage.forcer_actualisation_cache()
|
||
return {
|
||
"success": True,
|
||
"message": "Cache actualisé",
|
||
"info": sage.get_cache_info(),
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Erreur refresh cache: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/cache/info", dependencies=[Depends(verify_token)])
|
||
def cache_info_get():
|
||
"""Informations sur le cache (endpoint GET)"""
|
||
try:
|
||
return {"success": True, "data": sage.get_cache_info()}
|
||
except Exception as e:
|
||
logger.error(f"Erreur info cache: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# Script à ajouter temporairement dans main.py pour diagnostiquer
|
||
|
||
|
||
@app.get("/sage/devis/{numero}/diagnostic", dependencies=[Depends(verify_token)])
|
||
def diagnostiquer_devis(numero: str):
|
||
"""
|
||
ENDPOINT DE DIAGNOSTIC: Affiche TOUTES les infos d'un devis
|
||
|
||
Permet de comprendre pourquoi un devis ne peut pas être transformé
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Essayer ReadPiece
|
||
persist = factory.ReadPiece(0, numero)
|
||
|
||
# Si échec, chercher dans List()
|
||
if not persist:
|
||
logger.info(f"[DIAG] ReadPiece echoue, recherche dans List()...")
|
||
index = 1
|
||
while index < 10000:
|
||
try:
|
||
persist_test = factory.List(index)
|
||
if persist_test is None:
|
||
break
|
||
|
||
doc_test = win32com.client.CastTo(
|
||
persist_test, "IBODocumentVente3"
|
||
)
|
||
doc_test.Read()
|
||
|
||
if (
|
||
getattr(doc_test, "DO_Type", -1) == 0
|
||
and getattr(doc_test, "DO_Piece", "") == numero
|
||
):
|
||
persist = persist_test
|
||
logger.info(f"[DIAG] Trouve a l'index {index}")
|
||
break
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
|
||
if not persist:
|
||
raise HTTPException(404, f"Devis {numero} introuvable")
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
# EXTRACTION COMPLÈTE
|
||
diagnostic = {
|
||
"numero": getattr(doc, "DO_Piece", ""),
|
||
"type": getattr(doc, "DO_Type", -1),
|
||
"statut": getattr(doc, "DO_Statut", -1),
|
||
"statut_libelle": {
|
||
0: "Brouillon",
|
||
1: "Soumis",
|
||
2: "Accepte",
|
||
3: "Realise partiellement",
|
||
4: "Realise totalement",
|
||
5: "Transforme",
|
||
6: "Annule",
|
||
}.get(getattr(doc, "DO_Statut", -1), "Inconnu"),
|
||
"date": str(getattr(doc, "DO_Date", "")),
|
||
"total_ht": float(getattr(doc, "DO_TotalHT", 0.0)),
|
||
"total_ttc": float(getattr(doc, "DO_TotalTTC", 0.0)),
|
||
"est_transformable": False,
|
||
"raison_blocage": None,
|
||
}
|
||
|
||
# Client
|
||
try:
|
||
client_obj = getattr(doc, "Client", None)
|
||
if client_obj:
|
||
client_obj.Read()
|
||
diagnostic["client_code"] = getattr(
|
||
client_obj, "CT_Num", ""
|
||
).strip()
|
||
diagnostic["client_intitule"] = getattr(
|
||
client_obj, "CT_Intitule", ""
|
||
).strip()
|
||
except Exception as e:
|
||
diagnostic["erreur_client"] = str(e)
|
||
|
||
# Lignes
|
||
try:
|
||
factory_lignes = doc.FactoryDocumentLigne
|
||
except:
|
||
factory_lignes = doc.FactoryDocumentVenteLigne
|
||
|
||
lignes = []
|
||
index = 1
|
||
while index <= 100:
|
||
try:
|
||
ligne_p = factory_lignes.List(index)
|
||
if ligne_p is None:
|
||
break
|
||
|
||
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
|
||
ligne.Read()
|
||
|
||
article_ref = ""
|
||
try:
|
||
article_ref = getattr(ligne, "AR_Ref", "").strip()
|
||
if not article_ref:
|
||
article_obj = getattr(ligne, "Article", None)
|
||
if article_obj:
|
||
article_obj.Read()
|
||
article_ref = getattr(article_obj, "AR_Ref", "").strip()
|
||
except:
|
||
pass
|
||
|
||
lignes.append(
|
||
{
|
||
"index": index,
|
||
"article": article_ref,
|
||
"designation": getattr(ligne, "DL_Design", ""),
|
||
"quantite": float(getattr(ligne, "DL_Qte", 0.0)),
|
||
"prix_unitaire": float(
|
||
getattr(ligne, "DL_PrixUnitaire", 0.0)
|
||
),
|
||
"montant_ht": float(getattr(ligne, "DL_MontantHT", 0.0)),
|
||
}
|
||
)
|
||
|
||
index += 1
|
||
except:
|
||
break
|
||
|
||
diagnostic["nb_lignes"] = len(lignes)
|
||
diagnostic["lignes"] = lignes
|
||
|
||
# ANALYSE TRANSFORMABILITÉ
|
||
statut = diagnostic["statut"]
|
||
|
||
if statut == 5:
|
||
diagnostic["raison_blocage"] = "Document deja transforme (statut=5)"
|
||
elif statut == 6:
|
||
diagnostic["raison_blocage"] = "Document annule (statut=6)"
|
||
elif statut in [3, 4]:
|
||
diagnostic["raison_blocage"] = (
|
||
f"Document deja realise partiellement ou totalement (statut={statut}). "
|
||
f"Une commande/BL/facture existe probablement deja."
|
||
)
|
||
diagnostic["suggestion"] = (
|
||
"Cherchez les documents lies a ce devis dans Sage. "
|
||
"Il a peut-etre deja ete transforme manuellement."
|
||
)
|
||
elif statut == 0:
|
||
diagnostic["est_transformable"] = True
|
||
diagnostic["action_requise"] = (
|
||
"Statut 'Brouillon'. Le systeme le passera automatiquement a 'Accepte' "
|
||
"avant transformation."
|
||
)
|
||
elif statut == 2:
|
||
diagnostic["est_transformable"] = True
|
||
diagnostic["action_requise"] = (
|
||
"Statut 'Accepte'. Transformation possible."
|
||
)
|
||
else:
|
||
diagnostic["raison_blocage"] = f"Statut inconnu ou non gere: {statut}"
|
||
|
||
# Champs libres (pour Universign, etc.)
|
||
champs_libres = {}
|
||
try:
|
||
for champ in ["UniversignID", "DerniereRelance", "DO_Ref"]:
|
||
try:
|
||
valeur = getattr(doc, f"DO_{champ}", None)
|
||
if valeur:
|
||
champs_libres[champ] = str(valeur)
|
||
except:
|
||
pass
|
||
except:
|
||
pass
|
||
|
||
if champs_libres:
|
||
diagnostic["champs_libres"] = champs_libres
|
||
|
||
logger.info(
|
||
f"[DIAG] Devis {numero}: statut={statut}, transformable={diagnostic['est_transformable']}"
|
||
)
|
||
|
||
return {"success": True, "diagnostic": diagnostic}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur diagnostic: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/diagnostic/configuration", dependencies=[Depends(verify_token)])
|
||
def diagnostic_configuration():
|
||
"""
|
||
DIAGNOSTIC COMPLET de la configuration Sage
|
||
|
||
Teste:
|
||
- Quelles méthodes COM sont disponibles
|
||
- Quels types de documents sont autorisés
|
||
- Quelles permissions l'utilisateur a
|
||
- Version de Sage
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
diagnostic = {
|
||
"connexion": "OK",
|
||
"chemin_base": sage.chemin_base,
|
||
"utilisateur": sage.utilisateur,
|
||
}
|
||
|
||
# Version Sage
|
||
try:
|
||
version = getattr(sage.cial, "Version", "Inconnue")
|
||
diagnostic["version_sage"] = str(version)
|
||
except:
|
||
diagnostic["version_sage"] = "Non disponible"
|
||
|
||
# Test des méthodes disponibles sur IBSCIALApplication3
|
||
methodes_disponibles = []
|
||
methodes_a_tester = [
|
||
"CreateProcess_Document",
|
||
"FactoryDocumentVente",
|
||
"FactoryArticle",
|
||
"CptaApplication",
|
||
"BeginTrans",
|
||
"CommitTrans",
|
||
"RollbackTrans",
|
||
]
|
||
|
||
for methode in methodes_a_tester:
|
||
try:
|
||
if hasattr(sage.cial, methode):
|
||
methodes_disponibles.append(methode)
|
||
except:
|
||
pass
|
||
|
||
diagnostic["methodes_cial_disponibles"] = methodes_disponibles
|
||
|
||
# Test des types de documents autorisés
|
||
types_autorises = []
|
||
types_bloques = []
|
||
|
||
for type_doc in range(6): # 0-5
|
||
try:
|
||
# Essayer de créer un process (sans le valider)
|
||
process = sage.cial.CreateProcess_Document(type_doc)
|
||
if process:
|
||
types_autorises.append(
|
||
{
|
||
"type": type_doc,
|
||
"libelle": {
|
||
0: "Devis",
|
||
1: "Bon de livraison",
|
||
2: "Bon de retour",
|
||
3: "Commande",
|
||
4: "Preparation",
|
||
5: "Facture",
|
||
}[type_doc],
|
||
}
|
||
)
|
||
# Ne pas valider, juste tester
|
||
del process
|
||
except Exception as e:
|
||
types_bloques.append(
|
||
{
|
||
"type": type_doc,
|
||
"libelle": {
|
||
0: "Devis",
|
||
1: "Bon de livraison",
|
||
2: "Bon de retour",
|
||
3: "Commande",
|
||
4: "Preparation",
|
||
5: "Facture",
|
||
}[type_doc],
|
||
"erreur": str(e)[:200],
|
||
}
|
||
)
|
||
|
||
diagnostic["types_documents_autorises"] = types_autorises
|
||
diagnostic["types_documents_bloques"] = types_bloques
|
||
|
||
# Test TransformInto() sur un devis test
|
||
try:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Chercher n'importe quel devis
|
||
index = 1
|
||
devis_test = None
|
||
|
||
while index < 100:
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
if getattr(doc, "DO_Type", -1) == 0: # Devis
|
||
devis_test = doc
|
||
break
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
|
||
if devis_test:
|
||
# Tester si TransformInto existe
|
||
if hasattr(devis_test, "TransformInto"):
|
||
diagnostic["transforminto_disponible"] = True
|
||
diagnostic["transforminto_test"] = "Methode existe (non testee)"
|
||
else:
|
||
diagnostic["transforminto_disponible"] = False
|
||
diagnostic["transforminto_test"] = (
|
||
"Methode TransformInto() inexistante"
|
||
)
|
||
else:
|
||
diagnostic["transforminto_disponible"] = (
|
||
"Impossible de tester (aucun devis trouve)"
|
||
)
|
||
|
||
except Exception as e:
|
||
diagnostic["transforminto_disponible"] = False
|
||
diagnostic["transforminto_erreur"] = str(e)
|
||
|
||
# Modules Sage actifs
|
||
try:
|
||
# Tester l'accès aux différentes factories
|
||
modules = {}
|
||
|
||
try:
|
||
sage.cial.FactoryDocumentVente
|
||
modules["Ventes"] = "OK"
|
||
except:
|
||
modules["Ventes"] = "INACCESSIBLE"
|
||
|
||
try:
|
||
sage.cial.CptaApplication.FactoryClient
|
||
modules["Clients"] = "OK"
|
||
except:
|
||
modules["Clients"] = "INACCESSIBLE"
|
||
|
||
try:
|
||
sage.cial.FactoryArticle
|
||
modules["Articles"] = "OK"
|
||
except:
|
||
modules["Articles"] = "INACCESSIBLE"
|
||
|
||
diagnostic["modules_actifs"] = modules
|
||
except Exception as e:
|
||
diagnostic["modules_actifs_erreur"] = str(e)
|
||
|
||
# Compter documents existants
|
||
try:
|
||
counts = {}
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
for type_doc in range(6):
|
||
count = 0
|
||
index = 1
|
||
|
||
while index < 1000:
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
if getattr(doc, "DO_Type", -1) == type_doc:
|
||
count += 1
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
break
|
||
|
||
counts[
|
||
{
|
||
0: "Devis",
|
||
1: "Bons_livraison",
|
||
2: "Bons_retour",
|
||
3: "Commandes",
|
||
4: "Preparations",
|
||
5: "Factures",
|
||
}[type_doc]
|
||
] = count
|
||
|
||
diagnostic["documents_existants"] = counts
|
||
except Exception as e:
|
||
diagnostic["documents_existants_erreur"] = str(e)
|
||
|
||
logger.info("[DIAG] Configuration Sage analysee")
|
||
|
||
return {"success": True, "diagnostic": diagnostic}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur diagnostic config: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/diagnostic/types-reels", dependencies=[Depends(verify_token)])
|
||
def decouvrir_types_reels():
|
||
"""
|
||
DIAGNOSTIC CRITIQUE: Découvre les VRAIS types de documents Sage
|
||
|
||
Au lieu de deviner les types (0-5), on va:
|
||
1. Créer manuellement un document de chaque type dans Sage
|
||
2. Les lister ici pour voir leurs vrais numéros de type
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Parcourir TOUS les documents
|
||
documents_par_type = {}
|
||
index = 1
|
||
max_docs = 500 # Limiter pour ne pas bloquer
|
||
|
||
logger.info("[DIAG] Scan de tous les documents...")
|
||
|
||
while index < max_docs:
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
# Récupérer le type ET le sous-type
|
||
type_doc = getattr(doc, "DO_Type", -1)
|
||
piece = getattr(doc, "DO_Piece", "")
|
||
statut = getattr(doc, "DO_Statut", -1)
|
||
|
||
# Essayer de récupérer le domaine (vente/achat)
|
||
domaine = "Inconnu"
|
||
try:
|
||
domaine_val = getattr(doc, "DO_Domaine", -1)
|
||
domaine = {0: "Vente", 1: "Achat"}.get(
|
||
domaine_val, f"Code {domaine_val}"
|
||
)
|
||
except:
|
||
pass
|
||
|
||
# Récupérer la catégorie
|
||
categorie = "Inconnue"
|
||
try:
|
||
cat_val = getattr(doc, "DO_Categorie", -1)
|
||
categorie = str(cat_val)
|
||
except:
|
||
pass
|
||
|
||
# Grouper par type
|
||
if type_doc not in documents_par_type:
|
||
documents_par_type[type_doc] = {
|
||
"count": 0,
|
||
"exemples": [],
|
||
"domaine": domaine,
|
||
"categorie": categorie,
|
||
}
|
||
|
||
documents_par_type[type_doc]["count"] += 1
|
||
|
||
# Garder quelques exemples
|
||
if len(documents_par_type[type_doc]["exemples"]) < 3:
|
||
documents_par_type[type_doc]["exemples"].append(
|
||
{
|
||
"numero": piece,
|
||
"statut": statut,
|
||
"domaine": domaine,
|
||
"categorie": categorie,
|
||
}
|
||
)
|
||
|
||
index += 1
|
||
|
||
except Exception as e:
|
||
logger.debug(f"Erreur index {index}: {e}")
|
||
index += 1
|
||
|
||
# Formater le résultat
|
||
types_trouves = []
|
||
|
||
for type_num, infos in sorted(documents_par_type.items()):
|
||
types_trouves.append(
|
||
{
|
||
"type_code": type_num,
|
||
"nombre_documents": infos["count"],
|
||
"domaine": infos["domaine"],
|
||
"categorie": infos["categorie"],
|
||
"exemples": infos["exemples"],
|
||
"suggestion_libelle": _deviner_libelle_type(
|
||
type_num, infos["exemples"]
|
||
),
|
||
}
|
||
)
|
||
|
||
logger.info(
|
||
f"[DIAG] {len(types_trouves)} types de documents distincts trouves"
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"types_documents_reels": types_trouves,
|
||
"instructions": (
|
||
"Pour identifier les types corrects:\n"
|
||
"1. Creez manuellement dans Sage: 1 Bon de commande, 1 BL, 1 Facture\n"
|
||
"2. Appelez de nouveau cet endpoint\n"
|
||
"3. Les nouveaux types apparaitront avec leurs numeros corrects"
|
||
),
|
||
"total_documents_scannes": index - 1,
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur decouverte types: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
def _deviner_libelle_type(type_num, exemples):
|
||
"""Devine le libellé d'un type basé sur les numéros de pièce"""
|
||
if not exemples:
|
||
return "Type inconnu"
|
||
|
||
# Analyser les préfixes des numéros
|
||
prefixes = [ex["numero"][:2] for ex in exemples if ex["numero"]]
|
||
prefix_commun = max(set(prefixes), key=prefixes.count) if prefixes else ""
|
||
|
||
# Deviner selon le type_num et les préfixes
|
||
suggestions = {
|
||
0: "Devis (DE)",
|
||
1: "Bon de livraison (BL)",
|
||
2: "Bon de retour (BR)",
|
||
3: "Bon de commande (BC)",
|
||
4: "Preparation de livraison (PL)",
|
||
5: "Facture (FA)",
|
||
6: "Facture d'avoir (AV)",
|
||
7: "Bon d'avoir financier (BA)",
|
||
}
|
||
|
||
libelle_base = suggestions.get(type_num, f"Type {type_num}")
|
||
|
||
if prefix_commun:
|
||
libelle_base += f" - Detecte: prefix '{prefix_commun}'"
|
||
|
||
return libelle_base
|
||
|
||
|
||
@app.post("/sage/test-creation-par-type", dependencies=[Depends(verify_token)])
|
||
def tester_creation_par_type(type_doc: int = Query(..., ge=0, le=20)):
|
||
"""
|
||
TEST: Essaie de créer un document d'un type spécifique
|
||
|
||
Permet de tester tous les types possibles (0-20) pour trouver
|
||
lesquels fonctionnent sur votre installation
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
logger.info(f"[TEST] Tentative creation type {type_doc}...")
|
||
|
||
try:
|
||
# Essayer de créer un process
|
||
process = sage.cial.CreateProcess_Document(type_doc)
|
||
|
||
if not process:
|
||
return {
|
||
"success": False,
|
||
"type": type_doc,
|
||
"resultat": "Process NULL retourne",
|
||
}
|
||
|
||
# Si on arrive ici, le type est valide !
|
||
doc = process.Document
|
||
|
||
try:
|
||
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
|
||
except:
|
||
pass
|
||
|
||
# Récupérer les infos du document créé
|
||
type_reel = getattr(doc, "DO_Type", -1)
|
||
domaine = getattr(doc, "DO_Domaine", -1)
|
||
|
||
# NE PAS VALIDER le document (pas de Write/Process)
|
||
# On veut juste savoir si la création est possible
|
||
|
||
del process
|
||
del doc
|
||
|
||
return {
|
||
"success": True,
|
||
"type_demande": type_doc,
|
||
"type_reel_doc": type_reel,
|
||
"domaine": {0: "Vente", 1: "Achat"}.get(domaine, domaine),
|
||
"resultat": "CREATION POSSIBLE",
|
||
"note": "Document non valide (test uniquement)",
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"type": type_doc,
|
||
"erreur": str(e),
|
||
"resultat": "CREATION IMPOSSIBLE",
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[TEST] Erreur test type {type_doc}: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/diagnostic/facture-requirements", dependencies=[Depends(verify_token)])
|
||
def diagnostiquer_exigences_facture():
|
||
"""
|
||
DIAGNOSTIC: Découvre les champs obligatoires pour créer une facture
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
# Créer un process facture de test
|
||
process = sage.cial.CreateProcess_Document(60)
|
||
doc_test = process.Document
|
||
|
||
try:
|
||
doc_test = win32com.client.CastTo(doc_test, "IBODocumentVente3")
|
||
except:
|
||
pass
|
||
|
||
# Tester tous les champs potentiellement obligatoires
|
||
champs_a_tester = [
|
||
"DO_ModeRegl",
|
||
"DO_CondRegl",
|
||
"DO_CodeJournal",
|
||
"DO_Souche",
|
||
"DO_TypeCalcul",
|
||
"DO_CodeTaxe1",
|
||
"CT_Num",
|
||
"DO_Date",
|
||
"DO_Statut",
|
||
]
|
||
|
||
resultats = {}
|
||
|
||
for champ in champs_a_tester:
|
||
try:
|
||
valeur = getattr(doc_test, champ, None)
|
||
resultats[champ] = {
|
||
"valeur_defaut": str(valeur) if valeur is not None else "None",
|
||
"accessible": True,
|
||
}
|
||
except Exception as e:
|
||
resultats[champ] = {
|
||
"valeur_defaut": "N/A",
|
||
"accessible": False,
|
||
"erreur": str(e)[:100],
|
||
}
|
||
|
||
# Ne pas valider le document de test
|
||
del process
|
||
del doc_test
|
||
|
||
return {
|
||
"success": True,
|
||
"champs_facture": resultats,
|
||
"conseil": "Les champs avec valeur_defaut=None ou 0 sont souvent obligatoires",
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/test/creer-facture-vide", dependencies=[Depends(verify_token)])
|
||
def tester_creation_facture_vide():
|
||
"""
|
||
🧪 TEST: Crée une facture vide pour identifier les champs obligatoires
|
||
|
||
Ce test permet de découvrir EXACTEMENT quels champs Sage exige
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
logger.info("[TEST] Creation facture test...")
|
||
|
||
# 1. Créer le process
|
||
process = sage.cial.CreateProcess_Document(60)
|
||
doc = process.Document
|
||
|
||
try:
|
||
doc = win32com.client.CastTo(doc, "IBODocumentVente3")
|
||
except:
|
||
pass
|
||
|
||
# 2. Définir UNIQUEMENT les champs absolument critiques
|
||
import pywintypes
|
||
|
||
# Date (obligatoire)
|
||
doc.DO_Date = pywintypes.Time(datetime.now())
|
||
|
||
# Client (obligatoire) - Utiliser le premier client disponible
|
||
factory_client = sage.cial.CptaApplication.FactoryClient
|
||
persist_client = factory_client.List(1)
|
||
|
||
if persist_client:
|
||
client = sage._cast_client(persist_client)
|
||
if client:
|
||
doc.SetDefaultClient(client)
|
||
client_code = getattr(client, "CT_Num", "?")
|
||
logger.info(f"[TEST] Client test: {client_code}")
|
||
|
||
# 3. Écrire sans Process() pour voir les valeurs par défaut
|
||
doc.Write()
|
||
doc.Read()
|
||
|
||
# 4. Analyser tous les champs
|
||
champs_analyse = {}
|
||
|
||
for attr in dir(doc):
|
||
if attr.startswith("DO_") or attr.startswith("CT_"):
|
||
try:
|
||
valeur = getattr(doc, attr, None)
|
||
if valeur is not None:
|
||
champs_analyse[attr] = {
|
||
"valeur": str(valeur),
|
||
"type": type(valeur).__name__,
|
||
}
|
||
except:
|
||
pass
|
||
|
||
logger.info(f"[TEST] {len(champs_analyse)} champs analyses")
|
||
|
||
# 5. Tester Process() pour voir l'erreur exacte
|
||
erreur_process = None
|
||
try:
|
||
process.Process()
|
||
logger.info("[TEST] Process() reussi (inattendu!)")
|
||
except Exception as e:
|
||
erreur_process = str(e)
|
||
logger.info(f"[TEST] Process() echoue comme prevu: {e}")
|
||
|
||
# Ne pas commit - c'est juste un test
|
||
try:
|
||
sage.cial.CptaApplication.RollbackTrans()
|
||
except:
|
||
pass
|
||
|
||
return {
|
||
"success": True,
|
||
"champs_definis": champs_analyse,
|
||
"erreur_process": erreur_process,
|
||
"conseil": "Les champs manquants dans l'erreur sont probablement obligatoires",
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[TEST] Erreur: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/config/parametres-facture", dependencies=[Depends(verify_token)])
|
||
def verifier_parametres_facture():
|
||
"""
|
||
🔍 Vérifie les paramètres Sage pour la création de factures
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
parametres = {}
|
||
|
||
# Paramètres société
|
||
try:
|
||
param_societe = sage.cial.CptaApplication.ParametreSociete
|
||
|
||
parametres["societe"] = {
|
||
"journal_vente_defaut": getattr(
|
||
param_societe, "P_CodeJournalVte", "N/A"
|
||
),
|
||
"mode_reglement_defaut": getattr(
|
||
param_societe, "P_ModeRegl", "N/A"
|
||
),
|
||
"souche_facture": getattr(param_societe, "P_SoucheFacture", "N/A"),
|
||
}
|
||
except Exception as e:
|
||
parametres["erreur_societe"] = str(e)
|
||
|
||
# Tester un client existant
|
||
try:
|
||
factory_client = sage.cial.CptaApplication.FactoryClient
|
||
persist = factory_client.List(1)
|
||
|
||
if persist:
|
||
client = sage._cast_client(persist)
|
||
if client:
|
||
parametres["exemple_client"] = {
|
||
"code": getattr(client, "CT_Num", "?"),
|
||
"mode_reglement": getattr(client, "CT_ModeRegl", "N/A"),
|
||
"conditions_reglement": getattr(
|
||
client, "CT_CondRegl", "N/A"
|
||
),
|
||
}
|
||
except Exception as e:
|
||
parametres["erreur_client"] = str(e)
|
||
|
||
# Journaux disponibles
|
||
try:
|
||
factory_journal = sage.cial.CptaApplication.FactoryJournal
|
||
journaux = []
|
||
|
||
index = 1
|
||
while index <= 20: # Max 20 journaux
|
||
try:
|
||
persist_journal = factory_journal.List(index)
|
||
if persist_journal is None:
|
||
break
|
||
|
||
# Cast en journal
|
||
journal = win32com.client.CastTo(persist_journal, "IBOJournal3")
|
||
journal.Read()
|
||
|
||
journaux.append(
|
||
{
|
||
"code": getattr(journal, "JO_Num", "?"),
|
||
"intitule": getattr(journal, "JO_Intitule", "?"),
|
||
"type": getattr(journal, "JO_Type", "?"),
|
||
}
|
||
)
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
break
|
||
|
||
parametres["journaux_disponibles"] = journaux
|
||
|
||
except Exception as e:
|
||
parametres["erreur_journaux"] = str(e)
|
||
|
||
return {
|
||
"success": True,
|
||
"parametres": parametres,
|
||
"conseil": "Utilisez ces valeurs pour remplir les champs obligatoires des factures",
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Erreur verification config: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get("/sage/diagnostic/statuts-globaux", dependencies=[Depends(verify_token)])
|
||
def diagnostiquer_statuts_globaux():
|
||
"""
|
||
📊 MATRICE COMPLÈTE DES STATUTS SAGE
|
||
|
||
Retourne pour CHAQUE type de document :
|
||
- Tous les statuts possibles avec leurs descriptions
|
||
- Les statuts requis pour transformation
|
||
- Les changements de statuts après transformation
|
||
- Les restrictions de changement de statut
|
||
|
||
Cette route analyse la base Sage pour découvrir les règles réelles
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Définition des types de documents
|
||
types_documents = {
|
||
0: "Devis",
|
||
10: "Bon de commande",
|
||
20: "Préparation",
|
||
30: "Bon de livraison",
|
||
40: "Bon de retour",
|
||
50: "Bon d'avoir",
|
||
60: "Facture",
|
||
}
|
||
|
||
# Descriptions standard des statuts Sage
|
||
descriptions_statuts = {
|
||
0: "Brouillon",
|
||
1: "Soumis/En attente",
|
||
2: "Accepté/Validé",
|
||
3: "Réalisé partiellement",
|
||
4: "Réalisé totalement",
|
||
5: "Transformé",
|
||
6: "Annulé",
|
||
}
|
||
|
||
matrice_complete = {}
|
||
|
||
logger.info(
|
||
"[DIAG] 🔍 Analyse des statuts pour tous les types de documents..."
|
||
)
|
||
|
||
# Pour chaque type de document
|
||
for type_doc, libelle_type in types_documents.items():
|
||
logger.info(f"[DIAG] Analyse type {type_doc} ({libelle_type})...")
|
||
|
||
analyse_type = {
|
||
"type": type_doc,
|
||
"libelle": libelle_type,
|
||
"statuts_observes": {},
|
||
"exemples_par_statut": {},
|
||
"nb_documents_total": 0,
|
||
}
|
||
|
||
# Scanner tous les documents de ce type
|
||
index = 1
|
||
max_scan = 1000
|
||
|
||
while index < max_scan:
|
||
try:
|
||
persist = factory.List(index)
|
||
if persist is None:
|
||
break
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
doc_type = getattr(doc, "DO_Type", -1)
|
||
|
||
# Filtrer sur le type qu'on analyse
|
||
if doc_type != type_doc:
|
||
index += 1
|
||
continue
|
||
|
||
analyse_type["nb_documents_total"] += 1
|
||
|
||
# Récupérer le statut
|
||
statut = getattr(doc, "DO_Statut", -1)
|
||
|
||
# Compter les statuts observés
|
||
if statut not in analyse_type["statuts_observes"]:
|
||
analyse_type["statuts_observes"][statut] = {
|
||
"count": 0,
|
||
"description": descriptions_statuts.get(
|
||
statut, f"Statut {statut}"
|
||
),
|
||
"exemples": [],
|
||
}
|
||
|
||
analyse_type["statuts_observes"][statut]["count"] += 1
|
||
|
||
# Garder quelques exemples
|
||
if (
|
||
len(analyse_type["statuts_observes"][statut]["exemples"])
|
||
< 3
|
||
):
|
||
numero = getattr(doc, "DO_Piece", "")
|
||
date = str(getattr(doc, "DO_Date", ""))
|
||
|
||
analyse_type["statuts_observes"][statut]["exemples"].append(
|
||
{
|
||
"numero": numero,
|
||
"date": date,
|
||
"total_ttc": float(
|
||
getattr(doc, "DO_TotalTTC", 0.0)
|
||
),
|
||
}
|
||
)
|
||
|
||
index += 1
|
||
|
||
except Exception as e:
|
||
index += 1
|
||
continue
|
||
|
||
# Trier les statuts par nombre d'occurrences
|
||
analyse_type["statuts_par_frequence"] = sorted(
|
||
[
|
||
{
|
||
"statut": s,
|
||
"description": info["description"],
|
||
"count": info["count"],
|
||
"pourcentage": (
|
||
round(
|
||
info["count"]
|
||
/ analyse_type["nb_documents_total"]
|
||
* 100,
|
||
1,
|
||
)
|
||
if analyse_type["nb_documents_total"] > 0
|
||
else 0
|
||
),
|
||
}
|
||
for s, info in analyse_type["statuts_observes"].items()
|
||
],
|
||
key=lambda x: x["count"],
|
||
reverse=True,
|
||
)
|
||
|
||
matrice_complete[type_doc] = analyse_type
|
||
|
||
logger.info(
|
||
f"[DIAG] ✅ Type {type_doc}: {analyse_type['nb_documents_total']} docs, "
|
||
f"{len(analyse_type['statuts_observes'])} statuts différents"
|
||
)
|
||
|
||
# RÈGLES DE TRANSFORMATION
|
||
regles_transformation = {
|
||
"transformations_valides": [
|
||
{
|
||
"source_type": 0,
|
||
"source_libelle": "Devis",
|
||
"cible_type": 10,
|
||
"cible_libelle": "Bon de commande",
|
||
"statut_source_requis": [2],
|
||
"statut_source_requis_description": ["Accepté/Validé"],
|
||
"statut_source_apres": 5,
|
||
"statut_source_apres_description": "Transformé",
|
||
"statut_cible_initial": 2,
|
||
"statut_cible_initial_description": "Accepté/Validé",
|
||
},
|
||
{
|
||
"source_type": 10,
|
||
"source_libelle": "Bon de commande",
|
||
"cible_type": 30,
|
||
"cible_libelle": "Bon de livraison",
|
||
"statut_source_requis": [2],
|
||
"statut_source_requis_description": ["Accepté/Validé"],
|
||
"statut_source_apres": 5,
|
||
"statut_source_apres_description": "Transformé",
|
||
"statut_cible_initial": 2,
|
||
"statut_cible_initial_description": "Accepté/Validé",
|
||
},
|
||
{
|
||
"source_type": 10,
|
||
"source_libelle": "Bon de commande",
|
||
"cible_type": 60,
|
||
"cible_libelle": "Facture",
|
||
"statut_source_requis": [2],
|
||
"statut_source_requis_description": ["Accepté/Validé"],
|
||
"statut_source_apres": 5,
|
||
"statut_source_apres_description": "Transformé",
|
||
"statut_cible_initial": 2,
|
||
"statut_cible_initial_description": "Accepté/Validé",
|
||
},
|
||
{
|
||
"source_type": 30,
|
||
"source_libelle": "Bon de livraison",
|
||
"cible_type": 60,
|
||
"cible_libelle": "Facture",
|
||
"statut_source_requis": [2],
|
||
"statut_source_requis_description": ["Accepté/Validé"],
|
||
"statut_source_apres": 5,
|
||
"statut_source_apres_description": "Transformé",
|
||
"statut_cible_initial": 2,
|
||
"statut_cible_initial_description": "Accepté/Validé",
|
||
},
|
||
{
|
||
"source_type": 0,
|
||
"source_libelle": "Devis",
|
||
"cible_type": 60,
|
||
"cible_libelle": "Facture",
|
||
"statut_source_requis": [2],
|
||
"statut_source_requis_description": ["Accepté/Validé"],
|
||
"statut_source_apres": 5,
|
||
"statut_source_apres_description": "Transformé",
|
||
"statut_cible_initial": 2,
|
||
"statut_cible_initial_description": "Accepté/Validé",
|
||
},
|
||
],
|
||
"statuts_bloquants_pour_transformation": [
|
||
{
|
||
"statut": 5,
|
||
"description": "Transformé",
|
||
"raison": "Le document a déjà été transformé",
|
||
},
|
||
{
|
||
"statut": 6,
|
||
"description": "Annulé",
|
||
"raison": "Le document est annulé",
|
||
},
|
||
{
|
||
"statut": 3,
|
||
"description": "Réalisé partiellement",
|
||
"raison": "Un document cible existe probablement déjà (transformation partielle effectuée)",
|
||
},
|
||
{
|
||
"statut": 4,
|
||
"description": "Réalisé totalement",
|
||
"raison": "Le document a été entièrement réalisé (transformation déjà effectuée)",
|
||
},
|
||
],
|
||
"changements_statut_autorises": {
|
||
"0_Brouillon": {
|
||
"vers": [2, 6],
|
||
"descriptions": ["Accepté/Validé", "Annulé"],
|
||
"note": "Un brouillon peut être accepté ou annulé",
|
||
},
|
||
"2_Accepte": {
|
||
"vers": [5, 6],
|
||
"descriptions": ["Transformé", "Annulé"],
|
||
"note": "Un document accepté peut être transformé ou annulé",
|
||
},
|
||
"5_Transforme": {
|
||
"vers": [],
|
||
"descriptions": [],
|
||
"note": "Un document transformé ne peut plus changer de statut",
|
||
},
|
||
"6_Annule": {
|
||
"vers": [],
|
||
"descriptions": [],
|
||
"note": "Un document annulé ne peut plus changer de statut",
|
||
},
|
||
},
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"matrice_statuts_par_type": matrice_complete,
|
||
"regles_transformation": regles_transformation,
|
||
"legende_statuts": descriptions_statuts,
|
||
"types_documents": types_documents,
|
||
"date_analyse": datetime.now().isoformat(),
|
||
"note": "Cette matrice est construite à partir des documents réels dans votre base Sage",
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur diagnostic global: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get(
|
||
"/sage/diagnostic/statuts-permis/{numero}", dependencies=[Depends(verify_token)]
|
||
)
|
||
def diagnostiquer_statuts_permis(numero: str):
|
||
"""
|
||
🔍 DIAGNOSTIC CRITIQUE: Découvre TOUS les statuts possibles pour un document
|
||
|
||
Teste tous les statuts de 0 à 10 pour identifier lesquels sont valides
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Chercher le document (tous types confondus)
|
||
persist = None
|
||
type_doc_trouve = None
|
||
|
||
# Essayer ReadPiece pour différents types
|
||
for type_test in range(7): # 0-6
|
||
try:
|
||
persist_test = factory.ReadPiece(type_test, numero)
|
||
if persist_test:
|
||
persist = persist_test
|
||
type_doc_trouve = type_test
|
||
logger.info(
|
||
f"[DIAG] Document {numero} trouvé avec ReadPiece(type={type_test})"
|
||
)
|
||
break
|
||
except:
|
||
continue
|
||
|
||
# Si pas trouvé, chercher dans List()
|
||
if not persist:
|
||
index = 1
|
||
while index < 10000:
|
||
try:
|
||
persist_test = factory.List(index)
|
||
if persist_test is None:
|
||
break
|
||
|
||
doc_test = win32com.client.CastTo(
|
||
persist_test, "IBODocumentVente3"
|
||
)
|
||
doc_test.Read()
|
||
|
||
if getattr(doc_test, "DO_Piece", "") == numero:
|
||
persist = persist_test
|
||
type_doc_trouve = getattr(doc_test, "DO_Type", -1)
|
||
logger.info(
|
||
f"[DIAG] Document {numero} trouvé dans List() à l'index {index}"
|
||
)
|
||
break
|
||
|
||
index += 1
|
||
except:
|
||
index += 1
|
||
|
||
if not persist:
|
||
raise HTTPException(404, f"Document {numero} introuvable")
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
# Infos du document
|
||
statut_actuel = getattr(doc, "DO_Statut", -1)
|
||
type_actuel = getattr(doc, "DO_Type", -1)
|
||
|
||
diagnostic = {
|
||
"numero": numero,
|
||
"type_document": type_actuel,
|
||
"type_libelle": {
|
||
0: "Devis",
|
||
10: "Bon de commande",
|
||
20: "Préparation",
|
||
30: "Bon de livraison",
|
||
40: "Bon de retour",
|
||
50: "Bon d'avoir",
|
||
60: "Facture",
|
||
}.get(type_actuel, f"Type {type_actuel}"),
|
||
"statut_actuel": statut_actuel,
|
||
"statut_actuel_libelle": {
|
||
0: "Brouillon",
|
||
1: "Soumis/En attente",
|
||
2: "Accepté/Validé",
|
||
3: "Réalisé partiellement",
|
||
4: "Réalisé totalement",
|
||
5: "Transformé",
|
||
6: "Annulé",
|
||
}.get(statut_actuel, f"Statut {statut_actuel}"),
|
||
"tests_statuts": [],
|
||
}
|
||
|
||
# Tester tous les statuts de 0 à 10
|
||
logger.info(f"[DIAG] Test des statuts pour {numero}...")
|
||
|
||
for statut_test in range(11):
|
||
resultat_test = {
|
||
"statut": statut_test,
|
||
"libelle": {
|
||
0: "Brouillon",
|
||
1: "Soumis/En attente",
|
||
2: "Accepté/Validé",
|
||
3: "Réalisé partiellement",
|
||
4: "Réalisé totalement",
|
||
5: "Transformé",
|
||
6: "Annulé",
|
||
7: "Statut 7",
|
||
8: "Statut 8",
|
||
9: "Statut 9",
|
||
10: "Statut 10",
|
||
}.get(statut_test, f"Statut {statut_test}"),
|
||
"autorise": False,
|
||
"erreur": None,
|
||
"est_statut_actuel": (statut_test == statut_actuel),
|
||
}
|
||
|
||
# Si c'est le statut actuel, on sait qu'il est valide
|
||
if statut_test == statut_actuel:
|
||
resultat_test["autorise"] = True
|
||
resultat_test["note"] = "Statut actuel du document"
|
||
else:
|
||
# Tester le changement de statut
|
||
try:
|
||
# Relire le document
|
||
doc.Read()
|
||
|
||
# Essayer de changer le statut
|
||
doc.DO_Statut = statut_test
|
||
|
||
# Essayer d'écrire
|
||
doc.Write()
|
||
|
||
# Si on arrive ici, le statut est valide !
|
||
resultat_test["autorise"] = True
|
||
resultat_test["note"] = "Changement de statut réussi"
|
||
|
||
logger.info(f"[DIAG] ✅ Statut {statut_test} AUTORISÉ")
|
||
|
||
# Restaurer le statut d'origine immédiatement
|
||
doc.Read()
|
||
doc.DO_Statut = statut_actuel
|
||
doc.Write()
|
||
|
||
except Exception as e:
|
||
erreur_str = str(e)
|
||
resultat_test["autorise"] = False
|
||
resultat_test["erreur"] = erreur_str
|
||
|
||
logger.debug(
|
||
f"[DIAG] ❌ Statut {statut_test} REFUSÉ: {erreur_str[:100]}"
|
||
)
|
||
|
||
# Restaurer en cas d'erreur
|
||
try:
|
||
doc.Read()
|
||
except:
|
||
pass
|
||
|
||
diagnostic["tests_statuts"].append(resultat_test)
|
||
|
||
# Résumé
|
||
statuts_autorises = [
|
||
t["statut"] for t in diagnostic["tests_statuts"] if t["autorise"]
|
||
]
|
||
statuts_refuses = [
|
||
t["statut"] for t in diagnostic["tests_statuts"] if not t["autorise"]
|
||
]
|
||
|
||
diagnostic["resume"] = {
|
||
"nb_statuts_autorises": len(statuts_autorises),
|
||
"statuts_autorises": statuts_autorises,
|
||
"statuts_autorises_libelles": [
|
||
t["libelle"] for t in diagnostic["tests_statuts"] if t["autorise"]
|
||
],
|
||
"nb_statuts_refuses": len(statuts_refuses),
|
||
"statuts_refuses": statuts_refuses,
|
||
}
|
||
|
||
# Recommandations
|
||
recommendations = []
|
||
|
||
if 2 in statuts_autorises and statut_actuel == 0:
|
||
recommendations.append(
|
||
"✅ Vous pouvez passer ce document de 'Brouillon' (0) à 'Accepté' (2)"
|
||
)
|
||
|
||
if 5 in statuts_autorises:
|
||
recommendations.append(
|
||
"✅ Le statut 'Transformé' (5) est disponible - utilisé après transformation"
|
||
)
|
||
|
||
if 6 in statuts_autorises:
|
||
recommendations.append("✅ Vous pouvez annuler ce document (statut 6)")
|
||
|
||
if not any(s in statuts_autorises for s in [2, 3, 4]):
|
||
recommendations.append(
|
||
"⚠️ Aucun statut de validation (2/3/4) n'est disponible - "
|
||
"le document a peut-être déjà été traité"
|
||
)
|
||
|
||
diagnostic["recommendations"] = recommendations
|
||
|
||
logger.info(
|
||
f"[DIAG] Statuts autorisés pour {numero}: "
|
||
f"{statuts_autorises} / Refusés: {statuts_refuses}"
|
||
)
|
||
|
||
return {"success": True, "diagnostic": diagnostic}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur diagnostic statuts: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.get(
|
||
"/sage/diagnostic/erreur-transformation/{numero}",
|
||
dependencies=[Depends(verify_token)],
|
||
)
|
||
def diagnostiquer_erreur_transformation(
|
||
numero: str, type_source: int = Query(...), type_cible: int = Query(...)
|
||
):
|
||
"""
|
||
🔍 DIAGNOSTIC AVANCÉ: Analyse pourquoi une transformation échoue
|
||
|
||
Vérifie:
|
||
- Statut du document source
|
||
- Statuts autorisés
|
||
- Lignes du document
|
||
- Client associé
|
||
- Champs obligatoires manquants
|
||
"""
|
||
try:
|
||
if not sage or not sage.cial:
|
||
raise HTTPException(503, "Service Sage indisponible")
|
||
|
||
with sage._com_context(), sage._lock_com:
|
||
factory = sage.cial.FactoryDocumentVente
|
||
|
||
# Lire le document source
|
||
persist = factory.ReadPiece(type_source, numero)
|
||
|
||
if not persist:
|
||
persist = sage._find_document_in_list(numero, type_source)
|
||
|
||
if not persist:
|
||
raise HTTPException(
|
||
404, f"Document {numero} (type {type_source}) introuvable"
|
||
)
|
||
|
||
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
|
||
doc.Read()
|
||
|
||
diagnostic = {
|
||
"numero": numero,
|
||
"type_source": type_source,
|
||
"type_cible": type_cible,
|
||
"problemes_detectes": [],
|
||
"avertissements": [],
|
||
"suggestions": [],
|
||
}
|
||
|
||
# 1. Vérifier le statut
|
||
statut_actuel = getattr(doc, "DO_Statut", -1)
|
||
diagnostic["statut_actuel"] = statut_actuel
|
||
|
||
if statut_actuel == 5:
|
||
diagnostic["problemes_detectes"].append(
|
||
{
|
||
"severite": "BLOQUANT",
|
||
"champ": "DO_Statut",
|
||
"valeur": 5,
|
||
"message": "Document déjà transformé (statut=5)",
|
||
}
|
||
)
|
||
|
||
elif statut_actuel == 6:
|
||
diagnostic["problemes_detectes"].append(
|
||
{
|
||
"severite": "BLOQUANT",
|
||
"champ": "DO_Statut",
|
||
"valeur": 6,
|
||
"message": "Document annulé (statut=6)",
|
||
}
|
||
)
|
||
|
||
elif statut_actuel in [3, 4]:
|
||
diagnostic["avertissements"].append(
|
||
{
|
||
"severite": "ATTENTION",
|
||
"champ": "DO_Statut",
|
||
"valeur": statut_actuel,
|
||
"message": f"Document déjà réalisé (statut={statut_actuel}). "
|
||
f"Un document cible existe peut-être déjà.",
|
||
}
|
||
)
|
||
|
||
elif statut_actuel == 0:
|
||
diagnostic["suggestions"].append(
|
||
"Le document est en 'Brouillon' (statut=0). "
|
||
"Le système le passera automatiquement à 'Accepté' (statut=2) avant transformation."
|
||
)
|
||
|
||
# 2. Vérifier le client
|
||
client_code = ""
|
||
try:
|
||
client_obj = getattr(doc, "Client", None)
|
||
if client_obj:
|
||
client_obj.Read()
|
||
client_code = getattr(client_obj, "CT_Num", "").strip()
|
||
except:
|
||
pass
|
||
|
||
if not client_code:
|
||
diagnostic["problemes_detectes"].append(
|
||
{
|
||
"severite": "BLOQUANT",
|
||
"champ": "CT_Num",
|
||
"valeur": None,
|
||
"message": "Aucun client associé au document",
|
||
}
|
||
)
|
||
else:
|
||
diagnostic["client_code"] = client_code
|
||
|
||
# 3. Vérifier les lignes
|
||
try:
|
||
factory_lignes = getattr(doc, "FactoryDocumentLigne", None) or getattr(
|
||
doc, "FactoryDocumentVenteLigne", None
|
||
)
|
||
|
||
nb_lignes = 0
|
||
lignes_problemes = []
|
||
|
||
if factory_lignes:
|
||
index = 1
|
||
while index <= 100:
|
||
try:
|
||
ligne_p = factory_lignes.List(index)
|
||
if ligne_p is None:
|
||
break
|
||
|
||
ligne = win32com.client.CastTo(ligne_p, "IBODocumentLigne3")
|
||
ligne.Read()
|
||
|
||
nb_lignes += 1
|
||
|
||
# Vérifier article
|
||
article_ref = ""
|
||
try:
|
||
article_ref = getattr(ligne, "AR_Ref", "").strip()
|
||
if not article_ref:
|
||
article_obj = getattr(ligne, "Article", None)
|
||
if article_obj:
|
||
article_obj.Read()
|
||
article_ref = getattr(
|
||
article_obj, "AR_Ref", ""
|
||
).strip()
|
||
except:
|
||
pass
|
||
|
||
if not article_ref:
|
||
lignes_problemes.append(
|
||
{
|
||
"ligne": index,
|
||
"probleme": "Aucune référence article",
|
||
}
|
||
)
|
||
|
||
# Vérifier prix
|
||
prix = float(getattr(ligne, "DL_PrixUnitaire", 0.0))
|
||
if prix == 0:
|
||
lignes_problemes.append(
|
||
{"ligne": index, "probleme": "Prix unitaire = 0"}
|
||
)
|
||
|
||
index += 1
|
||
except:
|
||
break
|
||
|
||
diagnostic["nb_lignes"] = nb_lignes
|
||
|
||
if nb_lignes == 0:
|
||
diagnostic["problemes_detectes"].append(
|
||
{
|
||
"severite": "BLOQUANT",
|
||
"champ": "Lignes",
|
||
"valeur": 0,
|
||
"message": "Document vide (aucune ligne)",
|
||
}
|
||
)
|
||
|
||
if lignes_problemes:
|
||
diagnostic["avertissements"].append(
|
||
{
|
||
"severite": "ATTENTION",
|
||
"champ": "Lignes",
|
||
"message": f"{len(lignes_problemes)} ligne(s) avec des problèmes",
|
||
"details": lignes_problemes,
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
diagnostic["avertissements"].append(
|
||
{
|
||
"severite": "ERREUR",
|
||
"champ": "Lignes",
|
||
"message": f"Impossible de lire les lignes: {e}",
|
||
}
|
||
)
|
||
|
||
# 4. Vérifier les totaux
|
||
total_ht = float(getattr(doc, "DO_TotalHT", 0.0))
|
||
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
|
||
|
||
diagnostic["totaux"] = {"total_ht": total_ht, "total_ttc": total_ttc}
|
||
|
||
if total_ht == 0 and total_ttc == 0:
|
||
diagnostic["avertissements"].append(
|
||
{
|
||
"severite": "ATTENTION",
|
||
"champ": "Totaux",
|
||
"message": "Tous les totaux sont à 0",
|
||
}
|
||
)
|
||
|
||
# 5. Vérifier si la transformation est autorisée
|
||
transformations_valides = {(0, 10), (10, 30), (10, 60), (30, 60), (0, 60)}
|
||
|
||
if (type_source, type_cible) not in transformations_valides:
|
||
diagnostic["problemes_detectes"].append(
|
||
{
|
||
"severite": "BLOQUANT",
|
||
"champ": "Transformation",
|
||
"message": f"Transformation {type_source} → {type_cible} non autorisée. "
|
||
f"Transformations valides: {transformations_valides}",
|
||
}
|
||
)
|
||
|
||
# Résumé
|
||
nb_bloquants = sum(
|
||
1
|
||
for p in diagnostic["problemes_detectes"]
|
||
if p.get("severite") == "BLOQUANT"
|
||
)
|
||
nb_avertissements = len(diagnostic["avertissements"])
|
||
|
||
diagnostic["resume"] = {
|
||
"peut_transformer": nb_bloquants == 0,
|
||
"nb_problemes_bloquants": nb_bloquants,
|
||
"nb_avertissements": nb_avertissements,
|
||
}
|
||
|
||
if nb_bloquants == 0:
|
||
diagnostic["suggestions"].append(
|
||
"✅ Aucun problème bloquant détecté. La transformation devrait fonctionner."
|
||
)
|
||
else:
|
||
diagnostic["suggestions"].append(
|
||
f"❌ {nb_bloquants} problème(s) bloquant(s) doivent être résolus avant la transformation."
|
||
)
|
||
|
||
return {"success": True, "diagnostic": diagnostic}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"[DIAG] Erreur diagnostic transformation: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - PROSPECTS
|
||
# =====================================================
|
||
@app.post("/sage/prospects/list", dependencies=[Depends(verify_token)])
|
||
def prospects_list(req: FiltreRequest):
|
||
"""📋 Liste tous les prospects (CT_Type=0 AND CT_Prospect=1)"""
|
||
try:
|
||
prospects = sage.lister_tous_prospects(req.filtre)
|
||
return {"success": True, "data": prospects}
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste prospects: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/prospects/get", dependencies=[Depends(verify_token)])
|
||
def prospect_get(req: CodeRequest):
|
||
"""📄 Lecture d'un prospect par code"""
|
||
try:
|
||
prospect = sage.lire_prospect(req.code)
|
||
if not prospect:
|
||
raise HTTPException(404, f"Prospect {req.code} non trouvé")
|
||
return {"success": True, "data": prospect}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture prospect: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - FOURNISSEURS
|
||
# =====================================================
|
||
@app.post("/sage/fournisseurs/list", dependencies=[Depends(verify_token)])
|
||
def fournisseurs_list(req: FiltreRequest):
|
||
"""
|
||
⚡ ENDPOINT DIRECT : Liste fournisseurs via FactoryFournisseur
|
||
|
||
✅ Utilise la méthode corrigée qui ne dépend pas du cache
|
||
"""
|
||
try:
|
||
# ✅ Appel direct à la méthode corrigée
|
||
fournisseurs = sage.lister_tous_fournisseurs(req.filtre)
|
||
|
||
logger.info(f"✅ {len(fournisseurs)} fournisseurs retournés via endpoint")
|
||
|
||
return {"success": True, "data": fournisseurs}
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Erreur liste fournisseurs: {e}", exc_info=True)
|
||
raise HTTPException(500, str(e))
|
||
|
||
@app.post("/sage/fournisseurs/create", dependencies=[Depends(verify_token)])
|
||
def create_fournisseur_endpoint(req: FournisseurCreateRequest):
|
||
"""
|
||
➕ Création d'un fournisseur dans Sage
|
||
|
||
✅ Utilise FactoryFournisseur.Create() directement
|
||
"""
|
||
try:
|
||
# Appel au connecteur Sage
|
||
resultat = sage.creer_fournisseur(req.dict())
|
||
|
||
logger.info(f"✅ Fournisseur créé: {resultat.get('numero')}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": resultat
|
||
}
|
||
|
||
except ValueError as e:
|
||
# Erreur métier (ex: doublon)
|
||
logger.warning(f"⚠️ Erreur métier création fournisseur: {e}")
|
||
raise HTTPException(400, str(e))
|
||
|
||
except Exception as e:
|
||
# Erreur technique (ex: COM)
|
||
logger.error(f"❌ Erreur technique création fournisseur: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
@app.post("/sage/fournisseurs/get", dependencies=[Depends(verify_token)])
|
||
def fournisseur_get(req: CodeRequest):
|
||
"""
|
||
✅ NOUVEAU : Lecture d'un fournisseur par code
|
||
"""
|
||
try:
|
||
fournisseur = sage.lire_fournisseur(req.code)
|
||
if not fournisseur:
|
||
raise HTTPException(404, f"Fournisseur {req.code} non trouvé")
|
||
return {"success": True, "data": fournisseur}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture fournisseur: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - AVOIRS
|
||
# =====================================================
|
||
@app.post("/sage/avoirs/list", dependencies=[Depends(verify_token)])
|
||
def avoirs_list(limit: int = 100, statut: Optional[int] = None):
|
||
"""📋 Liste tous les avoirs (DO_Domaine=0 AND DO_Type=5)"""
|
||
try:
|
||
avoirs = sage.lister_avoirs(limit=limit, statut=statut)
|
||
return {"success": True, "data": avoirs}
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste avoirs: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/avoirs/get", dependencies=[Depends(verify_token)])
|
||
def avoir_get(req: CodeRequest):
|
||
"""📄 Lecture d'un avoir avec ses lignes"""
|
||
try:
|
||
avoir = sage.lire_avoir(req.code)
|
||
if not avoir:
|
||
raise HTTPException(404, f"Avoir {req.code} non trouvé")
|
||
return {"success": True, "data": avoir}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture avoir: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
# =====================================================
|
||
# ENDPOINTS - LIVRAISONS
|
||
# =====================================================
|
||
@app.post("/sage/livraisons/list", dependencies=[Depends(verify_token)])
|
||
def livraisons_list(limit: int = 100, statut: Optional[int] = None):
|
||
"""📋 Liste tous les bons de livraison (DO_Domaine=0 AND DO_Type=30)"""
|
||
try:
|
||
livraisons = sage.lister_livraisons(limit=limit, statut=statut)
|
||
return {"success": True, "data": livraisons}
|
||
except Exception as e:
|
||
logger.error(f"Erreur liste livraisons: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
|
||
@app.post("/sage/livraisons/get", dependencies=[Depends(verify_token)])
|
||
def livraison_get(req: CodeRequest):
|
||
"""📄 Lecture d'une livraison avec ses lignes"""
|
||
try:
|
||
livraison = sage.lire_livraison(req.code)
|
||
if not livraison:
|
||
raise HTTPException(404, f"Livraison {req.code} non trouvée")
|
||
return {"success": True, "data": livraison}
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Erreur lecture livraison: {e}")
|
||
raise HTTPException(500, str(e))
|
||
|
||
# =====================================================
|
||
# LANCEMENT
|
||
# =====================================================
|
||
if __name__ == "__main__":
|
||
uvicorn.run(
|
||
"main:app",
|
||
host=settings.api_host,
|
||
port=settings.api_port,
|
||
reload=False, # Pas de reload en production
|
||
log_level="info",
|
||
)
|