Sage100-ws/utils/functions/sage_utilities.py

234 lines
No EOL
7.7 KiB
Python

import logging
from utils.functions.functions import (
_convertir_type_depuis_sql,
_convertir_type_pour_sql,
_normaliser_type_document,
_get_type_libelle
)
logger = logging.getLogger(__name__)
def _verifier_devis_non_transforme(numero: str, doc, cursor):
"""Vérifie que le devis n'est pas transformé."""
verification = verifier_si_deja_transforme_sql(numero, cursor, 0)
if verification["deja_transforme"]:
docs_cibles = verification["documents_cibles"]
nums = [d["numero"] for d in docs_cibles]
raise ValueError(
f" Devis {numero} déjà transformé en {len(docs_cibles)} document(s): {', '.join(nums)}"
)
statut_actuel = getattr(doc, "DO_Statut", 0)
if statut_actuel == 5:
raise ValueError(f" Devis {numero} déjà transformé (statut=5)")
def verifier_si_deja_transforme_sql(numero_source, cursor, type_source):
"""Version corrigée avec normalisation des types"""
logger.info(
f"[VERIF] Vérification transformations de {numero_source} (type {type_source})"
)
logger.info(
f"[VERIF] Vérification transformations de {numero_source} (type {type_source})"
)
logger.info(f"[DEBUG] Type source brut: {type_source}")
logger.info(
f"[DEBUG] Type source après normalisation: {_normaliser_type_document(type_source)}"
)
logger.info(
f"[DEBUG] Type source après normalisation SQL: {_convertir_type_pour_sql(type_source)}"
)
type_source = _convertir_type_pour_sql(type_source)
champ_liaison_mapping = {
0: "DL_PieceDE",
1: "DL_PieceBC",
3: "DL_PieceBL",
}
champ_liaison = champ_liaison_mapping.get(type_source)
if not champ_liaison:
logger.warning(f"[VERIF] Type source {type_source} non géré")
return {"deja_transforme": False, "documents_cibles": []}
try:
query = f"""
SELECT DISTINCT
dc.DO_Piece,
dc.DO_Type,
dc.DO_Statut,
(SELECT COUNT(*) FROM F_DOCLIGNE
WHERE DO_Piece = dc.DO_Piece AND DO_Type = dc.DO_Type) as NbLignes
FROM F_DOCENTETE dc
INNER JOIN F_DOCLIGNE dl ON dc.DO_Piece = dl.DO_Piece AND dc.DO_Type = dl.DO_Type
WHERE dl.{champ_liaison} = ?
ORDER BY dc.DO_Type, dc.DO_Piece
"""
cursor.execute(query, (numero_source,))
resultats = cursor.fetchall()
documents_cibles = []
for row in resultats:
type_brut = int(row.DO_Type)
type_normalise = _convertir_type_depuis_sql(type_brut)
doc = {
"numero": row.DO_Piece.strip() if row.DO_Piece else "",
"type": type_normalise, # ← TYPE NORMALISÉ
"type_brut": type_brut, # Garder aussi le type original
"type_libelle": _get_type_libelle(type_brut),
"statut": int(row.DO_Statut) if row.DO_Statut else 0,
"nb_lignes": int(row.NbLignes) if row.NbLignes else 0,
}
documents_cibles.append(doc)
logger.info(
f"[VERIF] Trouvé: {doc['numero']} "
f"(type {type_brut}{type_normalise} - {doc['type_libelle']}) "
f"- {doc['nb_lignes']} lignes"
)
deja_transforme = len(documents_cibles) > 0
if deja_transforme:
logger.info(
f"[VERIF] Document {numero_source} a {len(documents_cibles)} transformation(s)"
)
else:
logger.info(
f"[VERIF] Document {numero_source} pas encore transformé"
)
return {
"deja_transforme": deja_transforme,
"documents_cibles": documents_cibles,
}
except Exception as e:
logger.error(f"[VERIF] Erreur vérification: {e}")
return {"deja_transforme": False, "documents_cibles": []}
def peut_etre_transforme(cursor, numero_source, type_source, type_cible):
"""Version corrigée avec normalisation"""
type_source = _normaliser_type_document(type_source)
type_cible = _normaliser_type_document(type_cible)
logger.info(
f"[VERIF_TRANSFO] {numero_source} "
f"(type {type_source}) → type {type_cible}"
)
verif = verifier_si_deja_transforme_sql(cursor, numero_source, type_source)
docs_meme_type = [
d for d in verif["documents_cibles"] if d["type"] == type_cible
]
if docs_meme_type:
nums = [d["numero"] for d in docs_meme_type]
return {
"possible": False,
"raison": f"Document déjà transformé en {_get_type_libelle(type_cible)}",
"documents_existants": docs_meme_type,
"message_detaille": f"Document(s) existant(s): {', '.join(nums)}",
}
return {
"possible": True,
"raison": "Transformation possible",
"documents_existants": [],
}
def lire_erreurs_sage(obj, nom_obj=""):
erreurs = []
try:
if not hasattr(obj, "Errors") or obj.Errors is None:
return erreurs
nb_erreurs = 0
try:
nb_erreurs = obj.Errors.Count
except:
return erreurs
if nb_erreurs == 0:
return erreurs
for i in range(1, nb_erreurs + 1):
try:
err = None
try:
err = obj.Errors.Item(i)
except:
try:
err = obj.Errors(i)
except:
try:
err = obj.Errors.Item(i - 1)
except:
pass
if err is not None:
description = ""
field = ""
number = ""
for attr in ["Description", "Descr", "Message", "Text"]:
try:
val = getattr(err, attr, None)
if val:
description = str(val)
break
except:
pass
for attr in ["Field", "FieldName", "Champ", "Property"]:
try:
val = getattr(err, attr, None)
if val:
field = str(val)
break
except:
pass
for attr in ["Number", "Code", "ErrorCode", "Numero"]:
try:
val = getattr(err, attr, None)
if val is not None:
number = str(val)
break
except:
pass
if description or field or number:
erreurs.append(
{
"source": nom_obj,
"index": i,
"description": description or "Erreur inconnue",
"field": field or "?",
"number": number or "?",
}
)
except Exception as e:
logger.debug(f"Erreur lecture erreur {i}: {e}")
continue
except Exception as e:
logger.debug(f"Erreur globale lecture erreurs {nom_obj}: {e}")
return erreurs
__all__ = [
"_verifier_devis_non_transforme",
"verifier_si_deja_transforme_sql",
"peut_etre_transforme",
"lire_erreurs_sage"
]