Sage100-ws/utils/documents/settle.py

1884 lines
65 KiB
Python

from typing import Dict, List
import win32com.client
import pywintypes
from datetime import datetime
from schemas.documents.reglements import ModeReglement
import time
import logging
logger = logging.getLogger(__name__)
def _get_journal_auto(self, mode_reglement: int) -> str:
with self._get_sql_connection() as conn:
cursor = conn.cursor()
# Mode Espèces = 2 (selon l'image Sage fournie)
if mode_reglement == 2: # Espèces
cursor.execute("""
SELECT TOP 1 JO_Num
FROM F_JOURNAUX
WHERE JO_Type = 2
AND JO_Reglement = 1
AND CG_Num LIKE '53%'
AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL)
ORDER BY JO_Num
""")
else:
# Autres modes → Banque
cursor.execute("""
SELECT TOP 1 JO_Num
FROM F_JOURNAUX
WHERE JO_Type = 2
AND JO_Reglement = 1
AND CG_Num LIKE '51%'
AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL)
ORDER BY JO_Num
""")
row = cursor.fetchone()
if row:
return row[0].strip()
# Fallback: premier journal de trésorerie disponible
cursor.execute("""
SELECT TOP 1 JO_Num
FROM F_JOURNAUX
WHERE JO_Type = 2
AND JO_Reglement = 1
AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL)
ORDER BY JO_Num
""")
row = cursor.fetchone()
if row:
return row[0].strip()
raise ValueError("Aucun journal de trésorerie configuré")
def _valider_coherence_journal_mode(self, code_journal: str, mode_reglement: int):
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT CG_Num
FROM F_JOURNAUX
WHERE JO_Num = ?
""",
(code_journal,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Journal {code_journal} introuvable")
compte_general = (row[0] or "").strip()
# Mode Espèces (2) doit utiliser un journal caisse (53x)
if mode_reglement == 2:
if not compte_general.startswith("53"):
logger.warning(
f"Mode Espèces avec journal non-caisse ({code_journal}, compte {compte_general})"
)
else:
# Autres modes doivent utiliser un journal banque (51x)
if compte_general.startswith("53"):
logger.warning(
f"Mode non-espèces avec journal caisse ({code_journal}, compte {compte_general})"
)
def _get_mode_reglement_libelle(self, mode_reglement: int) -> str:
"""Récupère le libellé d'un mode de règlement"""
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT R_Intitule
FROM P_REGLEMENT
WHERE cbIndice = ?
""",
(mode_reglement,),
)
row = cursor.fetchone()
if row:
return (row[0] or "").strip()
# Fallback sur les libellés standards
libelles = {
0: "Chèque",
1: "Virement",
2: "Espèces",
3: "LCR Acceptée",
4: "LCR non acceptée",
5: "BOR",
6: "Prélèvement",
7: "Carte bancaire",
8: "Bon d'achat",
}
return libelles.get(mode_reglement, f"Mode {mode_reglement}")
def lire_journaux_banque(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT JO_Num, JO_Intitule, CG_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 ORDER BY JO_Num"
)
return [
{
"code": row[0].strip(),
"intitule": row[1].strip() if row[1] else "",
"compte_general": row[2].strip() if row[2] else "",
"type": "Caisse" if (row[2] or "").startswith("53") else "Banque",
}
for row in cursor.fetchall()
]
def lire_tous_journaux(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
types_libelles = {
0: "Achats",
1: "Ventes",
2: "Trésorerie",
3: "Général",
4: "Situation",
}
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT JO_Num, JO_Intitule, JO_Type, JO_Reglement, CG_Num FROM F_JOURNAUX ORDER BY JO_Type, JO_Num"
)
return [
{
"code": row[0].strip(),
"intitule": row[1].strip() if row[1] else "",
"type_code": row[2],
"type_libelle": types_libelles.get(row[2], f"Type {row[2]}"),
"reglement_actif": row[3] == 1,
"compte_general": row[4].strip() if row[4] else "",
}
for row in cursor.fetchall()
]
def _format_facture(row, echeances: list) -> dict:
"""Formatte une facture selon la structure unifiée."""
types_doc = {6: "Facture", 7: "Avoir"}
montant_ttc = float(row[6] or 0)
montant_regle = sum(e["montant_regle"] for e in echeances)
reste_a_regler = montant_ttc - montant_regle
if montant_regle == 0:
statut = "Non réglé"
elif abs(reste_a_regler) < 0.01:
statut = "Soldé"
else:
statut = "Partiellement réglé"
return {
"domaine": row[0],
"type_code": row[1],
"type_libelle": types_doc.get(row[1], f"Type {row[1]}"),
"numero": (row[2] or "").strip(),
"date": row[3].strftime("%Y-%m-%d") if row[3] else None,
"date_livraison": row[7].strftime("%Y-%m-%d") if row[7] else None,
"reference": (row[4] or "").strip(),
"souche": row[8],
"date_creation": row[9].strftime("%Y-%m-%d %H:%M:%S") if row[9] else None,
"client": {
"code": (row[5] or "").strip(),
"intitule": (row[10] or "").strip(),
"email": (row[11] or "").strip(),
"telephone": (row[12] or "").strip(),
"adresse": None, # Pas dans cette requête
"code_postal": None,
"ville": None,
},
"montants": {
"total_ttc": montant_ttc,
"montant_regle": montant_regle,
"reste_a_regler": reste_a_regler,
},
"statut_reglement": statut,
"nb_echeances": len(echeances),
"nb_reglements": sum(len(e["reglements"]) for e in echeances),
"echeances": echeances,
}
def _format_reglement(rg_row, rc_montant=None) -> dict:
"""Formatte un règlement selon la structure unifiée."""
return {
"rg_no": rg_row[0],
"numero_piece": (rg_row[4] or "").strip(),
"date": rg_row[2].strftime("%Y-%m-%d") if rg_row[2] else None,
"heure": rg_row[8].strftime("%H:%M:%S")
if rg_row[8] and hasattr(rg_row[8], "strftime")
else str(rg_row[8])
if rg_row[8]
else None,
"reference": (rg_row[3] or "").strip(),
"libelle": (rg_row[7] or "").strip(),
"montant": float(rc_montant or rg_row[1] or 0),
"mode_reglement": {
"code": rg_row[5],
"libelle": (rg_row[6] or ModeReglement.get_libelle(rg_row[5] or 0)).strip()
if rg_row[6]
else ModeReglement.get_libelle(rg_row[5] or 0),
},
"journal": {
"code": (rg_row[12] or "").strip(),
"intitule": (rg_row[13] or "").strip(),
},
"banque": (rg_row[9] or "").strip(),
"est_impaye": rg_row[10] == 1,
"est_valide": rg_row[11] == 1,
"date_creation": rg_row[14].strftime("%Y-%m-%d %H:%M:%S")
if rg_row[14]
else None,
}
def lire_tous_reglements(
self,
date_debut: datetime = None,
date_fin: datetime = None,
client_code: str = None,
statut_reglement: str = None,
) -> Dict:
"""Liste toutes les factures avec leurs règlements - VERSION UNIFIÉE."""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
logger.info(
f"Lecture factures avec règlements (date_debut={date_debut}, date_fin={date_fin}, client={client_code})"
)
with self._get_sql_connection() as conn:
cursor = conn.cursor()
query = """
SELECT
d.DO_Domaine,
d.DO_Type,
d.DO_Piece,
d.DO_Date,
d.DO_Ref,
d.DO_Tiers,
d.DO_TotalTTC,
d.DO_DateLivr,
d.DO_Souche,
d.cbCreation,
c.CT_Intitule,
c.CT_EMail,
c.CT_Telephone
FROM F_DOCENTETE d
LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num
WHERE d.DO_Type IN (6, 7)
"""
params = []
if date_debut:
query += " AND d.DO_Date >= ?"
params.append(date_debut)
if date_fin:
query += " AND d.DO_Date <= ?"
params.append(date_fin)
if client_code:
query += " AND d.DO_Tiers = ?"
params.append(client_code)
query += " ORDER BY d.DO_Date DESC, d.DO_Piece DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
factures = []
total_factures = 0.0
total_regle = 0.0
total_reste = 0.0
for row in rows:
do_domaine = row[0]
do_type = row[1]
do_piece = (row[2] or "").strip()
montant_ttc = float(row[6] or 0)
echeances = _get_echeances_avec_reglements(
cursor, do_domaine, do_type, do_piece, montant_ttc
)
facture = _format_facture(row, echeances)
# Filtrer par statut si demandé
if statut_reglement:
reste = facture["montants"]["reste_a_regler"]
montant_regle = facture["montants"]["montant_regle"]
if statut_reglement == "solde" and abs(reste) >= 0.01:
continue
if statut_reglement == "partiel" and (
montant_regle == 0 or abs(reste) < 0.01
):
continue
if statut_reglement == "non_regle" and montant_regle > 0:
continue
factures.append(facture)
total_factures += facture["montants"]["total_ttc"]
total_regle += facture["montants"]["montant_regle"]
total_reste += facture["montants"]["reste_a_regler"]
nb_total = len(factures)
nb_soldees = sum(1 for f in factures if f["statut_reglement"] == "Soldé")
nb_partielles = sum(
1 for f in factures if f["statut_reglement"] == "Partiellement réglé"
)
nb_non_reglees = sum(
1 for f in factures if f["statut_reglement"] == "Non réglé"
)
return {
"statistiques": {
"nb_factures": nb_total,
"nb_soldees": nb_soldees,
"nb_partiellement_reglees": nb_partielles,
"nb_non_reglees": nb_non_reglees,
"total_factures": total_factures,
"total_regle": total_regle,
"total_reste": total_reste,
},
"filtres": {
"date_debut": date_debut.strftime("%Y-%m-%d") if date_debut else None,
"date_fin": date_fin.strftime("%Y-%m-%d") if date_fin else None,
"client_code": client_code,
"statut_reglement": statut_reglement,
},
"factures": factures,
}
def _get_echeances_avec_reglements(
cursor,
do_domaine: int,
do_type: int,
do_piece: str,
montant_ttc_facture: float,
) -> list:
"""Récupère les échéances d'un document avec leurs règlements - VERSION UNIFIÉE."""
cursor.execute(
"""
SELECT
dr.DR_No,
dr.DR_Date,
dr.DR_Montant,
dr.DR_Regle,
dr.N_Reglement,
dr.DR_RefPaiement,
mr.R_Intitule
FROM F_DOCREGL dr
LEFT JOIN P_REGLEMENT mr ON dr.N_Reglement = mr.cbIndice
WHERE dr.DO_Domaine = ? AND dr.DO_Type = ? AND dr.DO_Piece = ?
ORDER BY dr.DR_Date ASC
""",
(do_domaine, do_type, do_piece),
)
echeances_rows = cursor.fetchall()
echeances = []
nb_echeances = len(echeances_rows)
for ech in echeances_rows:
reglements = _get_reglements_echeance(cursor, ech[0])
montant_regle = sum(r["montant"] for r in reglements)
montant_echeance = float(ech[2] or 0)
if montant_echeance == 0 and nb_echeances == 1:
montant_echeance = montant_ttc_facture
echeances.append(
{
"dr_no": ech[0],
"date_echeance": ech[1].strftime("%Y-%m-%d") if ech[1] else None,
"montant": montant_echeance,
"montant_regle": montant_regle,
"reste_a_regler": montant_echeance - montant_regle,
"est_regle": ech[3] == 1,
"mode_reglement": {
"code": ech[4],
"libelle": (
ech[6] or ModeReglement.get_libelle(ech[4] or 0)
).strip()
if ech[6]
else ModeReglement.get_libelle(ech[4] or 0),
},
"reference_paiement": (ech[5] or "").strip(),
"nb_reglements": len(reglements),
"reglements": reglements,
}
)
return echeances
def _get_reglements_echeance(cursor, dr_no: int) -> list:
"""Récupère les règlements liés à une échéance - VERSION UNIFIÉE."""
cursor.execute(
"""
SELECT
r.RG_No,
rc.RC_Montant,
r.RG_Date,
r.RG_Reference,
r.RG_Piece,
r.N_Reglement,
mr.R_Intitule,
r.RG_Libelle,
r.RG_Heure,
r.RG_Banque,
r.RG_Impaye,
r.RG_Valide,
r.JO_Num,
j.JO_Intitule,
r.cbCreation
FROM F_REGLECH rc
INNER JOIN F_CREGLEMENT r ON rc.RG_No = r.RG_No
LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice
LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num
WHERE rc.DR_No = ?
ORDER BY r.RG_Date DESC
""",
(dr_no,),
)
return [_format_reglement(row) for row in cursor.fetchall()]
def lire_facture_reglement_detail(self, do_piece: str) -> Dict:
"""Récupère le détail complet d'une facture - VERSION UNIFIÉE."""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT
d.DO_Domaine,
d.DO_Type,
d.DO_Piece,
d.DO_Date,
d.DO_Ref,
d.DO_Tiers,
d.DO_TotalTTC,
d.DO_DateLivr,
d.DO_Souche,
d.cbCreation,
c.CT_Intitule,
c.CT_EMail,
c.CT_Telephone,
c.CT_Adresse,
c.CT_CodePostal,
c.CT_Ville
FROM F_DOCENTETE d
LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num
WHERE d.DO_Piece = ? AND d.DO_Type IN (6, 7)
""",
(do_piece,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Facture {do_piece} introuvable")
do_domaine = row[0]
do_type = row[1]
montant_ttc = float(row[6] or 0)
echeances = _get_echeances_avec_reglements(
cursor, do_domaine, do_type, do_piece, montant_ttc
)
# Utiliser la même structure que lire_tous_reglements
# mais avec les infos complètes du client
types_doc = {6: "Facture", 7: "Avoir"}
montant_regle = sum(e["montant_regle"] for e in echeances)
reste_a_regler = montant_ttc - montant_regle
if montant_regle == 0:
statut = "Non réglé"
elif abs(reste_a_regler) < 0.01:
statut = "Soldé"
else:
statut = "Partiellement réglé"
return {
"domaine": do_domaine,
"type_code": do_type,
"type_libelle": types_doc.get(do_type, f"Type {do_type}"),
"numero": do_piece,
"date": row[3].strftime("%Y-%m-%d") if row[3] else None,
"date_livraison": row[7].strftime("%Y-%m-%d") if row[7] else None,
"reference": (row[4] or "").strip(),
"souche": row[8],
"date_creation": row[9].strftime("%Y-%m-%d %H:%M:%S") if row[9] else None,
"client": {
"code": (row[5] or "").strip(),
"intitule": (row[10] or "").strip(),
"email": (row[11] or "").strip(),
"telephone": (row[12] or "").strip(),
"adresse": (row[13] or "").strip(),
"code_postal": (row[14] or "").strip(),
"ville": (row[15] or "").strip(),
},
"montants": {
"total_ttc": montant_ttc,
"montant_regle": montant_regle,
"reste_a_regler": reste_a_regler,
},
"statut_reglement": statut,
"nb_echeances": len(echeances),
"nb_reglements": sum(len(e["reglements"]) for e in echeances),
"echeances": echeances,
}
def lire_reglement_detail(self, rg_no: int) -> Dict:
"""Récupère le détail d'un règlement spécifique - VERSION UNIFIÉE."""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT
r.RG_No,
r.CT_NumPayeur,
r.RG_Date,
r.RG_Reference,
r.RG_Libelle,
r.RG_Montant,
r.RG_MontantDev,
r.N_Reglement,
r.RG_Impute,
r.RG_Compta,
r.RG_Type,
r.RG_Cours,
r.N_Devise,
r.JO_Num,
r.CG_Num,
r.RG_Impaye,
r.RG_Heure,
r.RG_Piece,
r.RG_Banque,
r.RG_Valide,
r.RG_MontantCommission,
r.RG_MontantNet,
r.cbCreation,
r.cbModification,
c.CT_Intitule,
j.JO_Intitule,
mr.R_Intitule,
dev.D_Intitule,
dev.D_Sigle
FROM F_CREGLEMENT r
LEFT JOIN F_COMPTET c ON r.CT_NumPayeur = c.CT_Num
LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num
LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice
LEFT JOIN P_DEVISE dev ON r.N_Devise = dev.cbIndice
WHERE r.RG_No = ?
""",
(rg_no,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Règlement {rg_no} introuvable")
# Récupérer les imputations
cursor.execute(
"""
SELECT
re.RC_Montant,
re.DR_No,
dr.DO_Domaine,
dr.DO_Type,
dr.DO_Piece,
d.DO_Date,
d.DO_TotalTTC,
d.DO_Ref,
d.DO_Tiers,
c.CT_Intitule
FROM F_REGLECH re
LEFT JOIN F_DOCREGL dr ON re.DR_No = dr.DR_No
LEFT JOIN F_DOCENTETE d ON dr.DO_Domaine = d.DO_Domaine
AND dr.DO_Type = d.DO_Type
AND dr.DO_Piece = d.DO_Piece
LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num
WHERE re.RG_No = ?
""",
(rg_no,),
)
imputations_rows = cursor.fetchall()
types_doc = {6: "Facture", 7: "Avoir"}
domaines = {0: "Vente", 1: "Achat"}
types_reglement = {0: "Client", 1: "Fournisseur"}
imputations = []
total_impute = 0.0
for imp in imputations_rows:
montant_imp = float(imp[0] or 0)
total_impute += montant_imp
imputations.append(
{
"montant": montant_imp,
"dr_no": imp[1],
"document": {
"domaine": domaines.get(imp[2], f"Domaine {imp[2]}")
if imp[2] is not None
else None,
"type_code": imp[3],
"type_libelle": types_doc.get(imp[3], f"Type {imp[3]}")
if imp[3] is not None
else None,
"numero": (imp[4] or "").strip() if imp[4] else None,
"date": imp[5].strftime("%Y-%m-%d") if imp[5] else None,
"reference": (imp[7] or "").strip() if imp[7] else None,
"montants": {
"total_ttc": float(imp[6] or 0) if imp[6] else None
},
"client": {
"code": (imp[8] or "").strip() if imp[8] else None,
"intitule": (imp[9] or "").strip() if imp[9] else None,
},
},
}
)
rg_montant = float(row[5] or 0)
reste = rg_montant - total_impute
if total_impute == 0:
statut_imputation = "Non imputé"
elif abs(reste) < 0.01:
statut_imputation = "Totalement imputé"
else:
statut_imputation = "Partiellement imputé"
heure_str = None
if row[16]:
try:
heure_str = (
row[16].strftime("%H:%M:%S")
if hasattr(row[16], "strftime")
else str(row[16])
)
except Exception:
heure_str = str(row[16])
return {
"rg_no": row[0],
"numero_piece": (row[17] or "").strip(),
"date": row[2].strftime("%Y-%m-%d") if row[2] else None,
"heure": heure_str,
"reference": (row[3] or "").strip(),
"libelle": (row[4] or "").strip(),
"montant": rg_montant,
"montant_devise": float(row[6] or 0),
"cours": float(row[11] or 1),
"montant_commission": float(row[20] or 0),
"montant_net": float(row[21] or 0),
"devise": {
"code": row[12],
"intitule": (row[27] or "").strip() if row[27] else "",
"sigle": (row[28] or "").strip() if row[28] else "",
},
"mode_reglement": {
"code": row[7],
"libelle": (row[26] or ModeReglement.get_libelle(row[7] or 0)).strip()
if row[26]
else ModeReglement.get_libelle(row[7] or 0),
},
"journal": {
"code": (row[13] or "").strip(),
"intitule": (row[25] or "").strip(),
},
"compte_general": (row[14] or "").strip(),
"banque": (row[18] or "").strip(),
"est_impaye": row[15] == 1,
"est_valide": row[19] == 1,
"est_comptabilise": row[9] == 1,
"date_creation": row[22].strftime("%Y-%m-%d %H:%M:%S") if row[22] else None,
"date_modification": row[23].strftime("%Y-%m-%d %H:%M:%S")
if row[23]
else None,
"client": {
"code": (row[1] or "").strip(),
"intitule": (row[24] or "").strip(),
},
"type": {
"code": row[10],
"libelle": types_reglement.get(row[10], f"Type {row[10]}"),
},
"imputation": {
"montant_impute": total_impute,
"reste_a_imputer": reste,
"statut": statut_imputation,
"est_impute": row[8] == 1,
"nb_imputations": len(imputations),
},
"imputations": imputations,
}
def regler_facture(
self,
numero_facture: str,
montant: float,
mode_reglement: int = ModeReglement.CHEQUE,
date_reglement: datetime = None,
reference: str = "",
libelle: str = "",
code_journal: str = None,
devise_code: int = 0,
cours_devise: float = 1.0,
tva_encaissement: bool = False,
compte_general: str = None,
) -> Dict:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
if montant <= 0:
raise ValueError("Le montant du règlement doit être positif")
date_reglement = date_reglement or datetime.now()
# Déduction automatique du journal si non fourni
if not code_journal:
code_journal = _get_journal_auto(self, mode_reglement)
else:
# Valider la cohérence journal/mode
_valider_coherence_journal_mode(self, code_journal, mode_reglement)
logger.info(
f"Règlement facture {numero_facture}: {montant}"
f"(mode: {mode_reglement}, journal: {code_journal}, devise: {devise_code})"
)
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
if not factory.ExistPiece(60, numero_facture):
raise ValueError(f"Facture {numero_facture} introuvable")
persist = factory.ReadPiece(60, numero_facture)
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
montant_deja_regle = float(getattr(doc, "DO_MontantRegle", 0.0))
statut = getattr(doc, "DO_Statut", 0)
if statut == 6:
raise ValueError(f"Facture {numero_facture} annulée")
solde_actuel = total_ttc - montant_deja_regle
if montant > solde_actuel + 0.01:
raise ValueError(
f"Montant ({montant}€) supérieur au solde ({solde_actuel:.2f}€)"
)
# Récupérer le client
client_code = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
except Exception:
pass
# Récupérer l'échéance
echeance = _get_premiere_echeance(doc)
if not echeance:
raise ValueError(f"Facture {numero_facture} sans échéance")
# Exécuter le règlement
numero_reglement = _executer_reglement_com(
self,
doc=doc,
echeance=echeance,
montant=montant,
mode_reglement=mode_reglement,
date_reglement=date_reglement,
reference=reference,
libelle=libelle,
code_journal=code_journal,
client_code=client_code,
numero_facture=numero_facture,
devise_code=devise_code,
cours_devise=cours_devise,
tva_encaissement=tva_encaissement,
compte_general=compte_general,
)
time.sleep(0.3)
doc.Read()
nouveau_montant_regle = float(getattr(doc, "DO_MontantRegle", 0.0))
if abs(nouveau_montant_regle - montant_deja_regle) < 0.01:
raise RuntimeError(
"Le règlement n'a pas été appliqué (DO_MontantRegle inchangé)"
)
nouveau_solde = total_ttc - nouveau_montant_regle
logger.info(f"Règlement effectué - Solde restant: {nouveau_solde:.2f}")
# Récupérer le libellé du mode règlement
mode_libelle = _get_mode_reglement_libelle(self, mode_reglement)
return {
"numero_facture": numero_facture,
"numero_reglement": numero_reglement,
"montant_regle": montant,
"date_reglement": date_reglement.strftime("%Y-%m-%d"),
"mode_reglement": mode_reglement,
"mode_reglement_libelle": mode_libelle,
"reference": reference,
"libelle": libelle,
"code_journal": code_journal,
"devise_code": devise_code,
"cours_devise": cours_devise,
"tva_encaissement": tva_encaissement,
"total_facture": total_ttc,
"solde_restant": nouveau_solde,
"facture_soldee": nouveau_solde < 0.01,
"client_code": client_code,
}
except ValueError:
raise
except Exception as e:
logger.error(f"Erreur règlement: {e}", exc_info=True)
raise RuntimeError(f"Échec règlement facture: {str(e)}")
def _get_premiere_echeance(doc):
try:
factory_ech = getattr(doc, "FactoryDocumentEcheance", None)
if factory_ech:
ech_list = factory_ech.List
if ech_list:
echeance = ech_list.Item(1)
if echeance:
for iface in ["IBODocumentEcheance3", "IBODocumentEcheance"]:
try:
echeance = win32com.client.CastTo(echeance, iface)
logger.info(f" Échéance castée vers {iface}")
break
except Exception:
continue
echeance.Read()
return echeance
except Exception as e:
logger.warning(f" Pas d'échéance: {e}")
return None
def _executer_reglement_com(
self,
doc,
echeance,
montant,
mode_reglement,
date_reglement,
reference,
libelle,
code_journal,
client_code,
numero_facture,
devise_code=0,
cours_devise=1.0,
tva_encaissement=False,
compte_general=None,
):
erreurs = []
# APPROCHE PRINCIPALE: Créer règlement complet, l'écrire, puis l'assigner au process
try:
logger.info("Création du règlement via FactoryDocumentReglement...")
# 1. Créer le règlement
factory_reg = self.cial.FactoryDocumentReglement
reg = factory_reg.Create()
reg = win32com.client.CastTo(reg, "IBODocumentReglement")
logger.info(" Règlement créé et casté vers IBODocumentReglement")
# 2. Configurer le Journal (objet)
try:
journal_factory = self.cial.CptaApplication.FactoryJournal
journal_persist = journal_factory.ReadNumero(code_journal)
if journal_persist:
reg.Journal = journal_persist
logger.info(f" Journal défini: {code_journal}")
except Exception as e:
logger.warning(f" Journal: {e}")
# 3. Configurer le TiersPayeur (objet client)
try:
factory_client = self.cial.CptaApplication.FactoryClient
if client_code:
client_persist = factory_client.ReadNumero(client_code)
if client_persist:
reg.TiersPayeur = client_persist
logger.info(f" TiersPayeur défini: {client_code}")
except Exception as e:
logger.warning(f" TiersPayeur: {e}")
# 4. Configurer les champs simples
try:
reg.RG_Date = pywintypes.Time(date_reglement)
logger.info(f" RG_Date: {date_reglement}")
except Exception as e:
logger.warning(f" RG_Date: {e}")
try:
reg.RG_Montant = montant
logger.info(f" RG_Montant: {montant}")
except Exception as e:
logger.warning(f" RG_Montant: {e}")
# 5. Mode de règlement via l'objet Reglement
try:
mode_factory = getattr(
self.cial.CptaApplication, "FactoryModeReglement", None
)
if mode_factory:
mode_obj = mode_factory.ReadNumero(mode_reglement)
if mode_obj:
reg.Reglement = mode_obj
logger.info(f" Mode règlement défini: {mode_reglement}")
except Exception as e:
logger.debug(f" Mode règlement via factory: {e}")
# 6. Devise
if devise_code != 0:
try:
reg.RG_Devise = devise_code
logger.info(f" RG_Devise: {devise_code}")
except Exception as e:
logger.debug(f" RG_Devise: {e}")
try:
reg.RG_Cours = cours_devise
logger.info(f" RG_Cours: {cours_devise}")
except Exception as e:
logger.debug(f" RG_Cours: {e}")
# Montant en devise
try:
montant_devise = montant * cours_devise
reg.RG_MontantDev = montant_devise
logger.info(f" RG_MontantDev: {montant_devise}")
except Exception as e:
logger.debug(f" RG_MontantDev: {e}")
# 7. TVA sur encaissement
if tva_encaissement:
try:
reg.RG_Encaissement = 1
logger.info(" RG_Encaissement: 1 (TVA sur encaissement)")
except Exception as e:
logger.debug(f" RG_Encaissement: {e}")
# 8. Compte général spécifique
if compte_general:
try:
cg_factory = self.cial.CptaApplication.FactoryCompteG
cg_persist = cg_factory.ReadNumero(compte_general)
if cg_persist:
reg.CompteG = cg_persist
logger.info(f" CompteG défini: {compte_general}")
except Exception as e:
logger.debug(f" CompteG: {e}")
# 9. Référence et libellé
if reference:
try:
reg.RG_Reference = reference
logger.info(f" RG_Reference: {reference}")
except Exception:
pass
if libelle:
try:
reg.RG_Libelle = libelle
logger.info(f" RG_Libelle: {libelle}")
except Exception:
pass
try:
reg.RG_Impute = 1 # Imputé
except Exception:
pass
try:
reg.RG_Compta = 0 # Non comptabilisé
except Exception:
pass
# 10. ÉCRIRE le règlement
reg.Write()
numero = getattr(reg, "RG_Piece", None)
logger.info(f" Règlement écrit avec numéro: {numero}")
# 11. Créer le lien règlement-échéance via la factory DU RÈGLEMENT
try:
logger.info(" Création du lien règlement-échéance...")
factory_reg_ech = getattr(reg, "FactoryDocumentReglementEcheance", None)
if factory_reg_ech:
reg_ech = factory_reg_ech.Create()
# Cast vers IBODocumentReglementEcheance
for iface in [
"IBODocumentReglementEcheance3",
"IBODocumentReglementEcheance",
]:
try:
reg_ech = win32com.client.CastTo(reg_ech, iface)
logger.info(f" Cast vers {iface} réussi")
break
except Exception:
continue
# Définir l'échéance
try:
reg_ech.Echeance = echeance
logger.info(" Echeance définie")
except Exception as e:
logger.warning(f" Echeance: {e}")
# Définir le montant
try:
reg_ech.RC_Montant = montant
logger.info(f" RC_Montant: {montant}")
except Exception as e:
logger.warning(f" RC_Montant: {e}")
# Écrire le lien
try:
reg_ech.SetDefault()
except Exception:
pass
reg_ech.Write()
logger.info(" Lien règlement-échéance écrit!")
return str(numero) if numero else None
except Exception as e:
erreurs.append(f"Lien échéance: {e}")
logger.warning(f" Erreur création lien: {e}")
# Si le lien a échoué, essayer via le process
logger.info(" Tentative via CreateProcess_ReglerEcheances...")
try:
process = self.cial.CreateProcess_ReglerEcheances()
# Assigner le règlement déjà écrit
process.Reglement = reg
logger.info(" Règlement assigné au process")
# Ajouter l'échéance
try:
process.AddDocumentEcheanceMontant(echeance, montant)
logger.info(" Échéance ajoutée avec montant")
except Exception:
process.AddDocumentEcheance(echeance)
logger.info(" Échéance ajoutée")
can_process = getattr(process, "CanProcess", True)
logger.info(f" CanProcess: {can_process}")
if can_process:
process.Process()
logger.info(" Process() réussi!")
return str(numero) if numero else None
else:
try:
errors = process.Errors
if errors:
for i in range(1, errors.Count + 1):
logger.warning(f" Erreur [{i}]: {errors.Item(i)}")
except Exception:
pass
except Exception as e:
erreurs.append(f"Process: {e}")
logger.warning(f" Process échoué: {e}")
except Exception as e:
erreurs.append(f"FactoryDocumentReglement: {e}")
logger.error(f"FactoryDocumentReglement échoué: {e}")
# APPROCHE ALTERNATIVE: Via le mode règlement de l'échéance
try:
logger.info("Tentative via modification directe de l'échéance...")
mode_obj = getattr(echeance, "Reglement", None)
if mode_obj:
attrs = [a for a in dir(mode_obj) if not a.startswith("_")]
logger.info(f" Attributs Reglement échéance: {attrs[:15]}...")
factory_reg_ech = getattr(echeance, "FactoryDocumentReglementEcheance", None)
if factory_reg_ech:
logger.info(" FactoryDocumentReglementEcheance trouvée sur échéance")
reg_ech = factory_reg_ech.Create()
for iface in [
"IBODocumentReglementEcheance3",
"IBODocumentReglementEcheance",
]:
try:
reg_ech = win32com.client.CastTo(reg_ech, iface)
logger.info(f" Cast vers {iface}")
break
except Exception:
continue
try:
factory_reg = self.cial.FactoryDocumentReglement
new_reg = factory_reg.Create()
new_reg = win32com.client.CastTo(new_reg, "IBODocumentReglement")
# Configurer
journal_factory = self.cial.CptaApplication.FactoryJournal
journal_persist = journal_factory.ReadNumero(code_journal)
if journal_persist:
new_reg.Journal = journal_persist
factory_client = self.cial.CptaApplication.FactoryClient
if client_code:
client_persist = factory_client.ReadNumero(client_code)
if client_persist:
new_reg.TiersPayeur = client_persist
new_reg.RG_Date = pywintypes.Time(date_reglement)
new_reg.RG_Montant = montant
new_reg.RG_Impute = 1
# Devise si non EUR
if devise_code != 0:
try:
new_reg.RG_Devise = devise_code
new_reg.RG_Cours = cours_devise
new_reg.RG_MontantDev = montant * cours_devise
except Exception:
pass
# TVA encaissement
if tva_encaissement:
try:
new_reg.RG_Encaissement = 1
except Exception:
pass
# Compte général
if compte_general:
try:
cg_factory = self.cial.CptaApplication.FactoryCompteG
cg_persist = cg_factory.ReadNumero(compte_general)
if cg_persist:
new_reg.CompteG = cg_persist
except Exception:
pass
new_reg.Write()
logger.info(
f" Nouveau règlement créé: {getattr(new_reg, 'RG_Piece', None)}"
)
try:
reg_ech.SetDefault()
except Exception:
pass
reg_ech.RC_Montant = montant
reg_ech.Write()
logger.info(" Lien créé depuis échéance!")
return str(getattr(new_reg, "RG_Piece", None))
except Exception as e:
logger.warning(f" Erreur: {e}")
except Exception as e:
erreurs.append(f"Via échéance: {e}")
logger.warning(f"Via échéance échoué: {e}")
raise RuntimeError(f"Aucune méthode n'a fonctionné. Erreurs: {'; '.join(erreurs)}")
def introspecter_reglement(self):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
result = {}
try:
with self._com_context(), self._lock_com:
# IBODocumentReglement et sa factory de liens
try:
factory = self.cial.FactoryDocumentReglement
reg = factory.Create()
reg = win32com.client.CastTo(reg, "IBODocumentReglement")
result["IBODocumentReglement"] = [
a for a in dir(reg) if not a.startswith("_")
]
# FactoryDocumentReglementEcheance depuis le règlement
factory_lien = getattr(reg, "FactoryDocumentReglementEcheance", None)
if factory_lien:
lien = factory_lien.Create()
result["ReglementEcheance_base"] = [
a for a in dir(lien) if not a.startswith("_")
]
for iface in [
"IBODocumentReglementEcheance3",
"IBODocumentReglementEcheance",
]:
try:
lien_cast = win32com.client.CastTo(lien, iface)
result[f"ReglementEcheance_{iface}"] = [
a for a in dir(lien_cast) if not a.startswith("_")
]
except Exception as e:
result[f"cast_{iface}_error"] = str(e)
except Exception as e:
result["error_reglement"] = str(e)
# Process
try:
process = self.cial.CreateProcess_ReglerEcheances()
result["Process"] = [a for a in dir(process) if not a.startswith("_")]
except Exception as e:
result["error_process"] = str(e)
# Échéance et ses attributs
try:
factory_doc = self.cial.FactoryDocumentVente
doc_list = factory_doc.List
for i in range(1, 20):
try:
doc = doc_list.Item(i)
if doc:
doc.Read()
if getattr(doc, "DO_Type", 0) == 6:
factory_ech = getattr(
doc, "FactoryDocumentEcheance", None
)
if factory_ech:
ech_list = factory_ech.List
if ech_list:
ech = ech_list.Item(1)
if ech:
ech = win32com.client.CastTo(
ech, "IBODocumentEcheance3"
)
ech.Read()
result["IBODocumentEcheance3"] = [
a
for a in dir(ech)
if not a.startswith("_")
]
# FactoryDocumentReglementEcheance depuis l'échéance
factory_lien_ech = getattr(
ech,
"FactoryDocumentReglementEcheance",
None,
)
if factory_lien_ech:
lien_ech = factory_lien_ech.Create()
result[
"EcheanceReglementEcheance_base"
] = [
a
for a in dir(lien_ech)
if not a.startswith("_")
]
for iface in [
"IBODocumentReglementEcheance3",
"IBODocumentReglementEcheance",
]:
try:
lien_ech_cast = (
win32com.client.CastTo(
lien_ech, iface
)
)
result[
f"EcheanceReglementEcheance_{iface}"
] = [
a
for a in dir(lien_ech_cast)
if not a.startswith("_")
]
except Exception:
pass
# Reglement de l'échéance (mode)
mode = getattr(ech, "Reglement", None)
if mode:
result["Echeance_Reglement_mode"] = [
a
for a in dir(mode)
if not a.startswith("_")
]
break
except Exception:
continue
except Exception as e:
result["error_echeance"] = str(e)
except Exception as e:
result["global_error"] = str(e)
return result
def regler_factures_client(
self,
client_code,
montant_total,
mode_reglement=ModeReglement.CHEQUE,
date_reglement=None,
reference="",
libelle="",
numeros_factures=None,
code_journal=None,
devise_code=0,
cours_devise=1.0,
tva_encaissement=False,
):
"""Règle plusieurs factures d'un client"""
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
if montant_total <= 0:
raise ValueError("Le montant total doit être positif")
date_reglement = date_reglement or datetime.now()
# Déduction automatique du journal si non fourni
if not code_journal:
code_journal = _get_journal_auto(self, mode_reglement)
factures = _get_factures_non_soldees_client_sql(self, client_code, numeros_factures)
if not factures:
raise ValueError(f"Aucune facture à régler pour {client_code}")
solde_total = sum(f["solde"] for f in factures)
if montant_total > solde_total + 0.01:
raise ValueError(
f"Montant ({montant_total}€) supérieur au solde ({solde_total:.2f}€)"
)
reglements = []
restant = montant_total
for fac in factures:
if restant < 0.01:
break
a_regler = min(restant, fac["solde"])
try:
res = regler_facture(
self,
fac["numero"],
a_regler,
mode_reglement=mode_reglement,
date_reglement=date_reglement,
reference=reference,
libelle=libelle,
code_journal=code_journal,
devise_code=devise_code,
cours_devise=cours_devise,
tva_encaissement=tva_encaissement,
compte_general=None, # Optionnel, peut être ajouté si nécessaire
)
reglements.append(res)
restant -= a_regler
except Exception as e:
logger.error(f"Erreur règlement {fac['numero']}: {e}")
break
if not reglements:
raise RuntimeError("Aucun règlement effectué")
return {
"client_code": client_code,
"montant_demande": montant_total,
"montant_effectif": sum(r["montant_regle"] for r in reglements),
"nb_factures_reglees": len(reglements),
"nb_factures_soldees": sum(1 for r in reglements if r["facture_soldee"]),
"date_reglement": date_reglement.strftime("%Y-%m-%d"),
"mode_reglement": mode_reglement,
"mode_reglement_libelle": ModeReglement.get_libelle(mode_reglement),
"reference": reference,
"libelle": libelle,
"code_journal": code_journal,
"devise_code": devise_code,
"cours_devise": cours_devise,
"tva_encaissement": tva_encaissement,
"reglements": reglements,
}
def _get_factures_non_soldees_client_sql(self, client_code, numeros=None):
with self._get_sql_connection() as conn:
cursor = conn.cursor()
if numeros:
placeholders = ",".join("?" * len(numeros))
query = f"SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Piece IN ({placeholders}) AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC"
params = [client_code] + numeros
else:
query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC"
params = [client_code]
cursor.execute(query, params)
return [
{
"numero": r[0].strip(),
"date": r[1].strftime("%Y-%m-%d") if r[1] else None,
"total_ttc": float(r[2] or 0),
"montant_regle": float(r[3] or 0),
"solde": float(r[2] or 0) - float(r[3] or 0),
}
for r in cursor.fetchall()
]
def lire_reglements_facture(self, numero_facture):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT DO_TotalTTC, DO_MontantRegle, DO_Tiers, DO_Date, DO_Ref FROM F_DOCENTETE WHERE DO_Piece = ? AND DO_Type = 6",
(numero_facture,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Facture {numero_facture} introuvable")
total = float(row[0] or 0)
regle = float(row[1] or 0)
solde = max(0, total - regle)
return {
"numero_facture": numero_facture,
"client_code": (row[2] or "").strip(),
"date_facture": row[3].strftime("%Y-%m-%d") if row[3] else None,
"reference": (row[4] or "").strip(),
"total_ttc": total,
"total_regle": regle,
"solde_restant": solde,
"est_soldee": solde < 0.01,
}
def lire_reglements_client(
self, client_code, date_debut=None, date_fin=None, inclure_soldees=True
):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT CT_Intitule FROM F_COMPTET WHERE CT_Num = ?", (client_code,)
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Client {client_code} introuvable")
intitule = (row[0] or "").strip()
query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle, DO_Ref FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ?"
params = [client_code]
if date_debut:
query += " AND DO_Date >= ?"
params.append(date_debut)
if date_fin:
query += " AND DO_Date <= ?"
params.append(date_fin)
query += " ORDER BY DO_Date ASC"
cursor.execute(query, params)
factures = []
for r in cursor.fetchall():
total = float(r[2] or 0)
regle = float(r[3] or 0)
solde = max(0, total - regle)
soldee = solde < 0.01
if inclure_soldees or not soldee:
factures.append(
{
"numero_facture": r[0].strip(),
"date_facture": r[1].strftime("%Y-%m-%d") if r[1] else None,
"total_ttc": total,
"reference": (r[4] or "").strip(),
"total_regle": regle,
"solde_restant": solde,
"est_soldee": soldee,
}
)
return {
"client_code": client_code,
"client_intitule": intitule,
"nb_factures": len(factures),
"nb_factures_soldees": sum(1 for f in factures if f["est_soldee"]),
"nb_factures_en_cours": sum(1 for f in factures if not f["est_soldee"]),
"total_factures": sum(f["total_ttc"] for f in factures),
"total_regle": sum(f["total_regle"] for f in factures),
"solde_global": sum(f["solde_restant"] for f in factures),
"factures": factures,
}
def lire_modes_reglement(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
cbIndice,
R_Intitule,
R_Code,
R_ModePaieDebit,
R_ModePaieCredit
FROM P_REGLEMENT
WHERE R_Intitule IS NOT NULL AND LTRIM(RTRIM(R_Intitule)) <> ''
ORDER BY cbIndice
""")
modes = []
for row in cursor.fetchall():
intitule = (row[1] or "").strip()
if intitule: # Ne garder que ceux avec un intitulé
modes.append(
{
"code": row[0], # cbIndice
"intitule": intitule,
"code_sage": (row[2] or "").strip(),
"mode_paie_debit": row[3] or 0,
"mode_paie_credit": row[4] or 0,
}
)
return modes
def _get_modes_reglement_standards() -> List[Dict]:
"""Modes de règlement standards Sage si P_REGLEMENT non configuré"""
return [
{"code": 0, "intitule": "Chèque", "type": "banque"},
{"code": 1, "intitule": "Virement", "type": "banque"},
{"code": 2, "intitule": "Espèces", "type": "caisse"},
{"code": 3, "intitule": "LCR Acceptée", "type": "banque"},
{"code": 4, "intitule": "LCR non acceptée", "type": "banque"},
{"code": 5, "intitule": "BOR", "type": "banque"},
{"code": 6, "intitule": "Prélèvement", "type": "banque"},
{"code": 7, "intitule": "Carte bancaire", "type": "banque"},
{"code": 8, "intitule": "Bon d'achat", "type": "autre"},
]
def lire_devises(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
# Vérifier d'abord si F_DEVISE existe
cursor.execute("""
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = 'F_DEVISE'
""")
has_f_devise = cursor.fetchone()[0] > 0
if has_f_devise:
cursor.execute("""
SELECT
D_Code,
D_Intitule,
D_Sigle,
D_CoursActuel
FROM F_DEVISE
ORDER BY D_Code
""")
devises = []
for row in cursor.fetchall():
devises.append(
{
"code": row[0],
"intitule": (row[1] or "").strip(),
"sigle": (row[2] or "").strip(),
"cours_actuel": float(row[3] or 1.0),
}
)
if devises:
return devises
# Fallback: Lire depuis P_DOSSIER
cursor.execute("""
SELECT N_DeviseCompte, N_DeviseEquival
FROM P_DOSSIER
""")
row = cursor.fetchone()
# Devise par défaut basée sur la config dossier
devise_principale = row[0] if row else 0
# Retourner les devises standards
devises_standards = [
{
"code": 0,
"intitule": "Euro",
"sigle": "EUR",
"cours_actuel": 1.0,
"est_principale": devise_principale == 0,
},
{
"code": 1,
"intitule": "Dollar US",
"sigle": "USD",
"cours_actuel": 1.0,
"est_principale": devise_principale == 1,
},
{
"code": 2,
"intitule": "Livre Sterling",
"sigle": "GBP",
"cours_actuel": 1.0,
"est_principale": devise_principale == 2,
},
{
"code": 3,
"intitule": "Franc Suisse",
"sigle": "CHF",
"cours_actuel": 1.0,
"est_principale": devise_principale == 3,
},
]
return devises_standards
def lire_journaux_tresorerie(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
JO_Num,
JO_Intitule,
CG_Num,
JO_Type,
JO_Reglement,
JO_Sommeil
FROM F_JOURNAUX
WHERE JO_Type = 2
AND JO_Reglement = 1
AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL)
ORDER BY JO_Num
""")
journaux = []
for row in cursor.fetchall():
compte_general = (row[2] or "").strip()
# Déterminer le type basé sur le compte général
if compte_general.startswith("53"):
type_journal = "caisse"
elif compte_general.startswith("51"):
type_journal = "banque"
else:
type_journal = "tresorerie"
journaux.append(
{
"code": row[0].strip(),
"intitule": (row[1] or "").strip(),
"compte_general": compte_general,
"type": type_journal,
}
)
return journaux
def lire_comptes_generaux(
self, prefixe: str = None, type_compte: str = None
) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
# Mapping type -> préfixes de comptes
prefixes_map = {
"client": ["411"],
"fournisseur": ["401"],
"banque": ["51"],
"caisse": ["53"],
"tva_collectee": ["4457"],
"tva_deductible": ["4456"],
"tva": ["445"],
"produit": ["7"],
"charge": ["6"],
}
with self._get_sql_connection() as conn:
cursor = conn.cursor()
query = """
SELECT
CG_Num,
CG_Intitule,
CG_Type,
CG_Raccourci,
CG_Sommeil
FROM F_COMPTEG
WHERE (CG_Sommeil = 0 OR CG_Sommeil IS NULL)
"""
params = []
# Appliquer les filtres
if type_compte and type_compte in prefixes_map:
prefixes = prefixes_map[type_compte]
conditions = " OR ".join(["CG_Num LIKE ?" for _ in prefixes])
query += f" AND ({conditions})"
params.extend([f"{p}%" for p in prefixes])
elif prefixe:
query += " AND CG_Num LIKE ?"
params.append(f"{prefixe}%")
query += " ORDER BY CG_Num"
cursor.execute(query, params)
comptes = []
for row in cursor.fetchall():
comptes.append(
{
"numero": row[0].strip(),
"intitule": (row[1] or "").strip(),
"type": row[2] or 0,
"raccourci": (row[3] or "").strip(),
}
)
return comptes
def lire_tva_taux(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
TA_No,
TA_Code,
TA_Intitule,
TA_Taux,
TA_TTaux,
TA_Type,
TA_Sens,
CG_Num,
TA_Assujet
FROM F_TAXE
ORDER BY TA_No
""")
taux = []
for row in cursor.fetchall():
taux.append(
{
"numero": row[0],
"code": (row[1] or "").strip(),
"intitule": (row[2] or "").strip(),
"taux": float(row[3] or 0),
"type_taux": row[4] or 0, # 0=Pourcentage, 1=Montant
"type": row[5] or 0, # 0=Collectée, 1=Déductible, etc.
"sens": row[6] or 0, # 0=Vente, 1=Achat
"compte_general": (row[7] or "").strip(),
"assujetti": row[8] or 0,
}
)
return taux
def lire_parametres_encaissement(self) -> Dict:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
D_TVAEncReg,
D_TVAEncAffect,
N_DeviseCompte
FROM P_DOSSIER
""")
row = cursor.fetchone()
if row:
return {
"tva_encaissement_regime": row[0] or 0,
"tva_encaissement_affectation": row[1] or 0,
"tva_encaissement_actif": (row[0] or 0) > 0,
"devise_compte": row[2] or 0,
}
return {
"tva_encaissement_regime": 0,
"tva_encaissement_affectation": 0,
"tva_encaissement_actif": False,
"devise_compte": 0,
}
__all__ = [
"ModeReglement",
"lire_journaux_banque",
"lire_tous_journaux",
"introspecter_reglement",
"regler_facture",
"regler_factures_client",
"lire_reglements_facture",
"lire_reglements_client",
"lire_modes_reglement",
"lire_devises",
"lire_journaux_tresorerie",
"lire_comptes_generaux",
"lire_tva_taux",
"lire_parametres_encaissement",
"_get_modes_reglement_standards",
"lire_tous_reglements",
"lire_facture_reglement_detail",
"lire_reglement_detail",
]