feat(reglements): add endpoints to list and get payment details

This commit is contained in:
fanilo 2026-01-16 13:47:20 +01:00
parent f5c5a87d0d
commit 5ad54a2ff0
3 changed files with 408 additions and 18 deletions

60
main.py
View file

@ -2,7 +2,7 @@ from fastapi import FastAPI, HTTPException, Header, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
from datetime import datetime
from datetime import datetime, date
import uvicorn
import logging
import win32com.client
@ -1794,14 +1794,6 @@ def introspection_com():
@app.get("/sage/debug/introspection-validation")
def introspection_validation(numero_facture: str = None):
"""
Introspection des méthodes de validation disponibles dans Sage.
Utiliser cette route pour découvrir comment valider un document.
Args:
numero_facture: Optionnel - numéro de facture pour inspecter le document
"""
try:
resultat = sage.introspecter_validation(numero_facture)
return {"success": True, "data": resultat}
@ -1812,12 +1804,6 @@ def introspection_validation(numero_facture: str = None):
@app.get("/sage/debug/introspection-document/{numero_facture}")
def introspection_document_complet(numero_facture: str):
"""
Introspection COMPLÈTE d'un document spécifique.
Retourne tous les attributs, méthodes et propriétés disponibles
sur le document pour découvrir la méthode de validation.
"""
try:
resultat = sage.introspecter_document_complet(numero_facture)
return {"success": True, "data": resultat}
@ -1826,6 +1812,50 @@ def introspection_document_complet(numero_facture: str):
raise HTTPException(status_code=500, detail=str(e))
@app.get("/sage/reglements", dependencies=[Depends(verify_token)])
def get_tous_reglements(
date_debut: Optional[date] = Query(None, description="Date de début (YYYY-MM-DD)"),
date_fin: Optional[date] = Query(None, description="Date de fin (YYYY-MM-DD)"),
client_code: Optional[str] = Query(None, description="Filtrer par code client"),
type_reglement: Optional[str] = Query(
None, description="Type: 'client' ou 'fournisseur'"
),
limit: int = Query(500, ge=1, le=2000, description="Nombre max de résultats"),
):
try:
date_debut_dt = (
datetime.combine(date_debut, datetime.min.time()) if date_debut else None
)
date_fin_dt = (
datetime.combine(date_fin, datetime.max.time()) if date_fin else None
)
result = sage.lire_tous_reglements(
date_debut=date_debut_dt,
date_fin=date_fin_dt,
client_code=client_code,
type_reglement=type_reglement,
limit=limit,
)
return {"success": True, "data": result}
except Exception as e:
logger.error(f"Erreur lecture règlements: {e}")
raise HTTPException(500, str(e))
# Route: Détail d'un règlement
@app.get("/sage/reglements/{rg_no}", dependencies=[Depends(verify_token)])
def get_reglement_detail(rg_no: int):
try:
result = sage.lire_reglement_detail(rg_no)
return {"success": True, "data": result}
except ValueError as e:
raise HTTPException(404, str(e))
except Exception as e:
logger.error(f"Erreur détail règlement {rg_no}: {e}")
raise HTTPException(500, str(e))
if __name__ == "__main__":
uvicorn.run(
"main:app",

View file

@ -100,7 +100,8 @@ from utils.documents.settle import (
lire_comptes_generaux,
lire_tva_taux,
lire_parametres_encaissement,
_get_modes_reglement_standards,
lire_tous_reglements,
lire_reglement_detail,
regler_facture as _regler_facture,
regler_factures_client as _regler_factures_client,
lire_reglements_client as _lire_reglements_client,
@ -8144,3 +8145,23 @@ class SageConnector:
def lire_parametres_encaissement(self) -> dict:
return lire_parametres_encaissement(self)
def lire_tous_reglements(
self,
date_debut,
date_fin,
client_code,
type_reglement,
limit,
) -> dict:
return lire_tous_reglements(
self,
date_debut,
date_fin,
client_code,
type_reglement,
limit,
)
def lire_reglement_detail(self, rg_no) -> dict:
return lire_reglement_detail(self, rg_no)

View file

@ -167,15 +167,352 @@ def lire_tous_journaux(self) -> List[Dict]:
]
def lire_tous_reglements(
self,
date_debut: datetime = None,
date_fin: datetime = None,
client_code: str = None,
type_reglement: str = None,
limit: int = 500,
) -> Dict:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(
f"Lecture règlements (date_debut={date_debut}, date_fin={date_fin}, client={client_code})"
)
with self._get_sql_connection() as conn:
cursor = conn.cursor()
query = """
SELECT
r.RG_No,
r.RG_Date,
r.RG_Reference,
r.RG_Libelle,
r.RG_Montant,
r.RG_MontantDev,
r.RG_Impute,
r.RG_Compta,
r.RG_Type,
r.RG_Cours,
r.RG_Banque,
r.RG_Impaye,
r.RG_MontantEcart,
r.CT_NumPayeur,
r.JO_Num,
r.RG_Piece,
r.N_Reglement,
r.CG_NumCont,
r.RG_Cloture,
r.RG_Ticket,
r.cbCreation,
r.cbModification,
c.CT_Intitule,
j.JO_Intitule,
mr.R_Intitule as ModeReglementIntitule,
(SELECT ISNULL(SUM(RE_Montant), 0) FROM F_REGLECH WHERE RG_No = r.RG_No) as MontantImpute
FROM F_CREGLEMENT r
LEFT JOIN F_COMPTET c ON r.CT_NumPayeur = c.CT_Num
LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num
LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice
WHERE 1=1
"""
params = []
# Filtrer par type (0 = client, 1 = fournisseur)
if type_reglement:
if type_reglement.lower() == "client":
query += " AND r.RG_Type = 0"
elif type_reglement.lower() == "fournisseur":
query += " AND r.RG_Type = 1"
if date_debut:
query += " AND r.RG_Date >= ?"
params.append(date_debut)
if date_fin:
query += " AND r.RG_Date <= ?"
params.append(date_fin)
if client_code:
query += " AND r.CT_NumPayeur = ?"
params.append(client_code)
query += f" ORDER BY r.RG_Date DESC, r.RG_No DESC"
if limit:
query = query.replace("SELECT", f"SELECT TOP {limit}")
cursor.execute(query, params)
rows = cursor.fetchall()
types_reglement = {0: "Client", 1: "Fournisseur", 2: "Salarié"}
statuts_impute = {
0: "Non imputé",
1: "Partiellement imputé",
2: "Totalement imputé",
}
reglements = []
total_montant = 0.0
total_impute = 0.0
for row in rows:
rg_no = row[0]
rg_montant = float(row[4] or 0)
montant_impute = float(row[25] or 0)
reste_a_imputer = rg_montant - montant_impute
# Déterminer le statut d'imputation
if montant_impute == 0:
statut_imputation = "Non imputé"
elif abs(reste_a_imputer) < 0.01:
statut_imputation = "Totalement imputé"
else:
statut_imputation = "Partiellement imputé"
reglement = {
"rg_no": rg_no,
"numero_piece": (row[15] or "").strip(),
"date": row[1].strftime("%Y-%m-%d") if row[1] else None,
"reference": (row[2] or "").strip(),
"libelle": (row[3] or "").strip(),
"montant": rg_montant,
"montant_devise": float(row[5] or 0),
"montant_impute": montant_impute,
"reste_a_imputer": reste_a_imputer,
"statut_imputation": statut_imputation,
"type_code": row[8],
"type_libelle": types_reglement.get(row[8], f"Type {row[8]}"),
"client_code": (row[13] or "").strip(),
"client_intitule": (row[22] or "").strip(),
"journal_code": (row[14] or "").strip(),
"journal_intitule": (row[23] or "").strip(),
"mode_reglement_code": row[16],
"mode_reglement_libelle": (
row[24] or ModeReglement.get_libelle(row[16] or 0)
).strip(),
"compte_contrepartie": (row[17] or "").strip(),
"cours": float(row[9] or 1),
"banque": (row[10] or "").strip(),
"impaye": row[11] == 1,
"ecart": float(row[12] or 0),
"cloture": row[18] == 1,
"ticket": (row[19] or "").strip(),
"date_creation": row[20].strftime("%Y-%m-%d %H:%M:%S")
if row[20]
else None,
"date_modification": row[21].strftime("%Y-%m-%d %H:%M:%S")
if row[21]
else None,
}
reglements.append(reglement)
total_montant += rg_montant
total_impute += montant_impute
# Récupérer les détails d'imputation pour chaque règlement
for reg in reglements:
cursor.execute(
"""
SELECT
re.RE_Montant,
re.RE_MontantDev,
e.DR_No,
d.DO_Piece,
d.DO_Date,
d.DO_TotalTTC
FROM F_REGLECH re
LEFT JOIN F_DOCREGL e ON re.DR_No = e.DR_No
LEFT JOIN F_DOCENTETE d ON e.DO_Domaine = d.DO_Domaine AND e.DO_Type = d.DO_Type AND e.DO_Piece = d.DO_Piece
WHERE re.RG_No = ?
""",
(reg["rg_no"],),
)
imputations = cursor.fetchall()
reg["imputations"] = [
{
"montant": float(imp[0] or 0),
"montant_devise": float(imp[1] or 0),
"document_piece": (imp[3] or "").strip() if imp[3] else None,
"document_date": imp[4].strftime("%Y-%m-%d") if imp[4] else None,
"document_total": float(imp[5] or 0) if imp[5] else None,
}
for imp in imputations
]
reg["nb_imputations"] = len(imputations)
# Statistiques
nb_total = len(reglements)
nb_imputes = sum(
1 for r in reglements if r["statut_imputation"] == "Totalement imputé"
)
nb_partiels = sum(
1 for r in reglements if r["statut_imputation"] == "Partiellement imputé"
)
nb_non_imputes = sum(
1 for r in reglements if r["statut_imputation"] == "Non imputé"
)
return {
"nb_reglements": nb_total,
"nb_totalement_imputes": nb_imputes,
"nb_partiellement_imputes": nb_partiels,
"nb_non_imputes": nb_non_imputes,
"total_montant": total_montant,
"total_impute": total_impute,
"total_reste": total_montant - total_impute,
"filtres": {
"date_debut": date_debut.strftime("%Y-%m-%d") if date_debut else None,
"date_fin": date_fin.strftime("%Y-%m-%d") if date_fin else None,
"client_code": client_code,
"type_reglement": type_reglement,
"limit": limit,
},
"reglements": reglements,
}
def lire_reglement_detail(self, rg_no: int) -> Dict:
"""Récupère le détail complet d'un règlement par son numéro interne"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT
r.RG_No, r.RG_Date, r.RG_Reference, r.RG_Libelle,
r.RG_Montant, r.RG_MontantDev, r.RG_Impute, r.RG_Compta,
r.RG_Type, r.RG_Cours, r.RG_Banque, r.RG_Impaye,
r.RG_MontantEcart, r.CT_NumPayeur, r.JO_Num, r.RG_Piece,
r.N_Reglement, r.CG_NumCont, r.RG_Cloture, r.RG_Ticket,
r.cbCreation, r.cbModification,
c.CT_Intitule, j.JO_Intitule, mr.R_Intitule
FROM F_CREGLEMENT r
LEFT JOIN F_COMPTET c ON r.CT_NumPayeur = c.CT_Num
LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num
LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice
WHERE r.RG_No = ?
""",
(rg_no,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Règlement {rg_no} introuvable")
# Imputations
cursor.execute(
"""
SELECT
re.RE_No, re.RE_Montant, re.RE_MontantDev, re.DR_No,
e.DO_Domaine, e.DO_Type, e.DO_Piece,
d.DO_Date, d.DO_TotalTTC, d.DO_MontantRegle, d.DO_Ref
FROM F_REGLECH re
LEFT JOIN F_DOCREGL e ON re.DR_No = e.DR_No
LEFT JOIN F_DOCENTETE d ON e.DO_Domaine = d.DO_Domaine AND e.DO_Type = d.DO_Type AND e.DO_Piece = d.DO_Piece
WHERE re.RG_No = ?
""",
(rg_no,),
)
imputations_rows = cursor.fetchall()
types_doc = {
0: "Devis",
1: "Bon de commande",
2: "Préparation",
3: "Bon de livraison",
6: "Facture",
7: "Avoir",
}
domaines = {0: "Vente", 1: "Achat", 2: "Stock"}
imputations = []
total_impute = 0.0
for imp in imputations_rows:
montant_imp = float(imp[1] or 0)
total_impute += montant_imp
imputations.append(
{
"re_no": imp[0],
"montant": montant_imp,
"montant_devise": float(imp[2] or 0),
"document": {
"domaine": domaines.get(imp[4], f"Domaine {imp[4]}")
if imp[4] is not None
else None,
"type": types_doc.get(imp[5], f"Type {imp[5]}")
if imp[5] is not None
else None,
"numero": (imp[6] or "").strip() if imp[6] else None,
"date": imp[7].strftime("%Y-%m-%d") if imp[7] else None,
"total_ttc": float(imp[8] or 0) if imp[8] else None,
"montant_regle": float(imp[9] or 0) if imp[9] else None,
"reference": (imp[10] or "").strip() if imp[10] else None,
},
}
)
rg_montant = float(row[4] or 0)
reste = rg_montant - total_impute
types_reglement = {0: "Client", 1: "Fournisseur", 2: "Salarié"}
return {
"rg_no": row[0],
"numero_piece": (row[15] or "").strip(),
"date": row[1].strftime("%Y-%m-%d") if row[1] else None,
"reference": (row[2] or "").strip(),
"libelle": (row[3] or "").strip(),
"montant": rg_montant,
"montant_devise": float(row[5] or 0),
"montant_impute": total_impute,
"reste_a_imputer": reste,
"statut_imputation": "Totalement imputé"
if abs(reste) < 0.01
else ("Partiellement imputé" if total_impute > 0 else "Non imputé"),
"est_comptabilise": row[7] == 1,
"type_code": row[8],
"type_libelle": types_reglement.get(row[8], f"Type {row[8]}"),
"client_code": (row[13] or "").strip(),
"client_intitule": (row[22] or "").strip(),
"journal_code": (row[14] or "").strip(),
"journal_intitule": (row[23] or "").strip(),
"mode_reglement_code": row[16],
"mode_reglement_libelle": (
row[24] or ModeReglement.get_libelle(row[16] or 0)
).strip(),
"compte_contrepartie": (row[17] or "").strip(),
"cours": float(row[9] or 1),
"banque": (row[10] or "").strip(),
"impaye": row[11] == 1,
"ecart": float(row[12] or 0),
"cloture": row[18] == 1,
"ticket": (row[19] or "").strip(),
"date_creation": row[20].strftime("%Y-%m-%d %H:%M:%S") if row[20] else None,
"date_modification": row[21].strftime("%Y-%m-%d %H:%M:%S")
if row[21]
else None,
"nb_imputations": len(imputations),
"imputations": imputations,
}
def regler_facture(
self,
numero_facture: str,
montant: float,
mode_reglement: int = 0, # 0 = Chèque par défaut
mode_reglement: int = ModeReglement.CHEQUE,
date_reglement: datetime = None,
reference: str = "",
libelle: str = "",
code_journal: str = None, # Si None, déduit automatiquement
code_journal: str = None,
devise_code: int = 0,
cours_devise: float = 1.0,
tva_encaissement: bool = False,
@ -1272,4 +1609,6 @@ __all__ = [
"lire_tva_taux",
"lire_parametres_encaissement",
"_get_modes_reglement_standards",
"lire_tous_reglements",
"lire_reglement_detail",
]