import logging from utils.functions.functions import ( _convertir_type_depuis_sql, _convertir_type_pour_sql, _normaliser_type_document, _get_type_libelle ) logger = logging.getLogger(__name__) def _verifier_devis_non_transforme(numero: str, doc, cursor): """Vérifie que le devis n'est pas transformé.""" verification = verifier_si_deja_transforme_sql(numero, cursor, 0) if verification["deja_transforme"]: docs_cibles = verification["documents_cibles"] nums = [d["numero"] for d in docs_cibles] raise ValueError( f" Devis {numero} déjà transformé en {len(docs_cibles)} document(s): {', '.join(nums)}" ) statut_actuel = getattr(doc, "DO_Statut", 0) if statut_actuel == 5: raise ValueError(f" Devis {numero} déjà transformé (statut=5)") def verifier_si_deja_transforme_sql(numero_source, cursor, type_source): """Version corrigée avec normalisation des types""" logger.info( f"[VERIF] Vérification transformations de {numero_source} (type {type_source})" ) logger.info( f"[VERIF] Vérification transformations de {numero_source} (type {type_source})" ) logger.info(f"[DEBUG] Type source brut: {type_source}") logger.info( f"[DEBUG] Type source après normalisation: {_normaliser_type_document(type_source)}" ) logger.info( f"[DEBUG] Type source après normalisation SQL: {_convertir_type_pour_sql(type_source)}" ) type_source = _convertir_type_pour_sql(type_source) champ_liaison_mapping = { 0: "DL_PieceDE", 1: "DL_PieceBC", 3: "DL_PieceBL", } champ_liaison = champ_liaison_mapping.get(type_source) if not champ_liaison: logger.warning(f"[VERIF] Type source {type_source} non géré") return {"deja_transforme": False, "documents_cibles": []} try: query = f""" SELECT DISTINCT dc.DO_Piece, dc.DO_Type, dc.DO_Statut, (SELECT COUNT(*) FROM F_DOCLIGNE WHERE DO_Piece = dc.DO_Piece AND DO_Type = dc.DO_Type) as NbLignes FROM F_DOCENTETE dc INNER JOIN F_DOCLIGNE dl ON dc.DO_Piece = dl.DO_Piece AND dc.DO_Type = dl.DO_Type WHERE dl.{champ_liaison} = ? ORDER BY dc.DO_Type, dc.DO_Piece """ cursor.execute(query, (numero_source,)) resultats = cursor.fetchall() documents_cibles = [] for row in resultats: type_brut = int(row.DO_Type) type_normalise = _convertir_type_depuis_sql(type_brut) doc = { "numero": row.DO_Piece.strip() if row.DO_Piece else "", "type": type_normalise, # ← TYPE NORMALISÉ "type_brut": type_brut, # Garder aussi le type original "type_libelle": _get_type_libelle(type_brut), "statut": int(row.DO_Statut) if row.DO_Statut else 0, "nb_lignes": int(row.NbLignes) if row.NbLignes else 0, } documents_cibles.append(doc) logger.info( f"[VERIF] Trouvé: {doc['numero']} " f"(type {type_brut}→{type_normalise} - {doc['type_libelle']}) " f"- {doc['nb_lignes']} lignes" ) deja_transforme = len(documents_cibles) > 0 if deja_transforme: logger.info( f"[VERIF] Document {numero_source} a {len(documents_cibles)} transformation(s)" ) else: logger.info( f"[VERIF] Document {numero_source} pas encore transformé" ) return { "deja_transforme": deja_transforme, "documents_cibles": documents_cibles, } except Exception as e: logger.error(f"[VERIF] Erreur vérification: {e}") return {"deja_transforme": False, "documents_cibles": []} def peut_etre_transforme(cursor, numero_source, type_source, type_cible): """Version corrigée avec normalisation""" type_source = _normaliser_type_document(type_source) type_cible = _normaliser_type_document(type_cible) logger.info( f"[VERIF_TRANSFO] {numero_source} " f"(type {type_source}) → type {type_cible}" ) verif = verifier_si_deja_transforme_sql(cursor, numero_source, type_source) docs_meme_type = [ d for d in verif["documents_cibles"] if d["type"] == type_cible ] if docs_meme_type: nums = [d["numero"] for d in docs_meme_type] return { "possible": False, "raison": f"Document déjà transformé en {_get_type_libelle(type_cible)}", "documents_existants": docs_meme_type, "message_detaille": f"Document(s) existant(s): {', '.join(nums)}", } return { "possible": True, "raison": "Transformation possible", "documents_existants": [], } def lire_erreurs_sage(obj, nom_obj=""): erreurs = [] try: if not hasattr(obj, "Errors") or obj.Errors is None: return erreurs nb_erreurs = 0 try: nb_erreurs = obj.Errors.Count except: return erreurs if nb_erreurs == 0: return erreurs for i in range(1, nb_erreurs + 1): try: err = None try: err = obj.Errors.Item(i) except: try: err = obj.Errors(i) except: try: err = obj.Errors.Item(i - 1) except: pass if err is not None: description = "" field = "" number = "" for attr in ["Description", "Descr", "Message", "Text"]: try: val = getattr(err, attr, None) if val: description = str(val) break except: pass for attr in ["Field", "FieldName", "Champ", "Property"]: try: val = getattr(err, attr, None) if val: field = str(val) break except: pass for attr in ["Number", "Code", "ErrorCode", "Numero"]: try: val = getattr(err, attr, None) if val is not None: number = str(val) break except: pass if description or field or number: erreurs.append( { "source": nom_obj, "index": i, "description": description or "Erreur inconnue", "field": field or "?", "number": number or "?", } ) except Exception as e: logger.debug(f"Erreur lecture erreur {i}: {e}") continue except Exception as e: logger.debug(f"Erreur globale lecture erreurs {nom_obj}: {e}") return erreurs __all__ = [ "_verifier_devis_non_transforme", "verifier_si_deja_transforme_sql", "peut_etre_transforme", "lire_erreurs_sage" ]