from typing import Dict, List import win32com.client import pywintypes from datetime import datetime from schemas.documents.reglements import ModeReglement import time import logging logger = logging.getLogger(__name__) def _get_journal_auto(self, mode_reglement: int) -> str: with self._get_sql_connection() as conn: cursor = conn.cursor() # Mode Espèces = 2 (selon l'image Sage fournie) if mode_reglement == 2: # Espèces cursor.execute(""" SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND CG_Num LIKE '53%' AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL) ORDER BY JO_Num """) else: # Autres modes → Banque cursor.execute(""" SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND CG_Num LIKE '51%' AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL) ORDER BY JO_Num """) row = cursor.fetchone() if row: return row[0].strip() # Fallback: premier journal de trésorerie disponible cursor.execute(""" SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL) ORDER BY JO_Num """) row = cursor.fetchone() if row: return row[0].strip() raise ValueError("Aucun journal de trésorerie configuré") def _valider_coherence_journal_mode(self, code_journal: str, mode_reglement: int): with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT CG_Num FROM F_JOURNAUX WHERE JO_Num = ? """, (code_journal,), ) row = cursor.fetchone() if not row: raise ValueError(f"Journal {code_journal} introuvable") compte_general = (row[0] or "").strip() # Mode Espèces (2) doit utiliser un journal caisse (53x) if mode_reglement == 2: if not compte_general.startswith("53"): logger.warning( f"Mode Espèces avec journal non-caisse ({code_journal}, compte {compte_general})" ) else: # Autres modes doivent utiliser un journal banque (51x) if compte_general.startswith("53"): logger.warning( f"Mode non-espèces avec journal caisse ({code_journal}, compte {compte_general})" ) def _get_mode_reglement_libelle(self, mode_reglement: int) -> str: """Récupère le libellé d'un mode de règlement""" with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT R_Intitule FROM P_REGLEMENT WHERE cbIndice = ? """, (mode_reglement,), ) row = cursor.fetchone() if row: return (row[0] or "").strip() # Fallback sur les libellés standards libelles = { 0: "Chèque", 1: "Virement", 2: "Espèces", 3: "LCR Acceptée", 4: "LCR non acceptée", 5: "BOR", 6: "Prélèvement", 7: "Carte bancaire", 8: "Bon d'achat", } return libelles.get(mode_reglement, f"Mode {mode_reglement}") def lire_journaux_banque(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT JO_Num, JO_Intitule, CG_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 ORDER BY JO_Num" ) return [ { "code": row[0].strip(), "intitule": row[1].strip() if row[1] else "", "compte_general": row[2].strip() if row[2] else "", "type": "Caisse" if (row[2] or "").startswith("53") else "Banque", } for row in cursor.fetchall() ] def lire_tous_journaux(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") types_libelles = { 0: "Achats", 1: "Ventes", 2: "Trésorerie", 3: "Général", 4: "Situation", } with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT JO_Num, JO_Intitule, JO_Type, JO_Reglement, CG_Num FROM F_JOURNAUX ORDER BY JO_Type, JO_Num" ) return [ { "code": row[0].strip(), "intitule": row[1].strip() if row[1] else "", "type_code": row[2], "type_libelle": types_libelles.get(row[2], f"Type {row[2]}"), "reglement_actif": row[3] == 1, "compte_general": row[4].strip() if row[4] else "", } for row in cursor.fetchall() ] def lire_tous_reglements( self, date_debut: datetime = None, date_fin: datetime = None, client_code: str = None, statut_reglement: str = None, ) -> Dict: if not self.cial: raise RuntimeError("Connexion Sage non établie") logger.info( f"Lecture factures avec règlements (date_debut={date_debut}, date_fin={date_fin}, client={client_code})" ) with self._get_sql_connection() as conn: cursor = conn.cursor() query = """ SELECT d.DO_Domaine, d.DO_Type, d.DO_Piece, d.DO_Date, d.DO_Ref, d.DO_Tiers, d.DO_TotalTTC, d.DO_DateLivr, d.DO_Souche, d.cbCreation, c.CT_Intitule, c.CT_EMail, c.CT_Telephone, (SELECT ISNULL(SUM(rc.RC_Montant), 0) FROM F_REGLECH rc INNER JOIN F_DOCREGL dr ON rc.DR_No = dr.DR_No WHERE dr.DO_Domaine = d.DO_Domaine AND dr.DO_Type = d.DO_Type AND dr.DO_Piece = d.DO_Piece) as MontantRegleTotal FROM F_DOCENTETE d LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num WHERE d.DO_Type IN (6, 7) """ params = [] if date_debut: query += " AND d.DO_Date >= ?" params.append(date_debut) if date_fin: query += " AND d.DO_Date <= ?" params.append(date_fin) if client_code: query += " AND d.DO_Tiers = ?" params.append(client_code) query += " ORDER BY d.DO_Date DESC, d.DO_Piece DESC" cursor.execute(query, params) rows = cursor.fetchall() types_doc = {6: "Facture", 7: "Avoir"} factures = [] total_factures = 0.0 total_regle = 0.0 total_reste = 0.0 for row in rows: do_domaine = row[0] do_type = row[1] do_piece = (row[2] or "").strip() montant_ttc = float(row[6] or 0) montant_regle_total = float(row[13] or 0) reste_a_regler = montant_ttc - montant_regle_total if statut_reglement: if statut_reglement == "solde" and abs(reste_a_regler) >= 0.01: continue if statut_reglement == "partiel" and ( montant_regle_total == 0 or abs(reste_a_regler) < 0.01 ): continue if statut_reglement == "non_regle" and montant_regle_total > 0: continue if montant_regle_total == 0: statut = "Non réglé" elif abs(reste_a_regler) < 0.01: statut = "Soldé" else: statut = "Partiellement réglé" echeances = _get_echeances_avec_reglements( cursor, do_domaine, do_type, do_piece, montant_ttc ) facture = { "domaine": do_domaine, "type_code": do_type, "type_libelle": types_doc.get(do_type, f"Type {do_type}"), "numero": do_piece, "date": row[3].strftime("%Y-%m-%d") if row[3] else None, "reference": (row[4] or "").strip(), "client_code": (row[5] or "").strip(), "client_intitule": (row[10] or "").strip(), "client_email": (row[11] or "").strip(), "client_telephone": (row[12] or "").strip(), "montant_ttc": montant_ttc, "montant_regle": montant_regle_total, "reste_a_regler": reste_a_regler, "statut_reglement": statut, "date_livraison": row[7].strftime("%Y-%m-%d") if row[7] else None, "souche": row[8], "date_creation": row[9].strftime("%Y-%m-%d %H:%M:%S") if row[9] else None, "nb_echeances": len(echeances), "nb_reglements": sum(len(e["reglements"]) for e in echeances), "echeances": echeances, } factures.append(facture) total_factures += montant_ttc total_regle += montant_regle_total total_reste += reste_a_regler nb_total = len(factures) nb_soldees = sum(1 for f in factures if f["statut_reglement"] == "Soldé") nb_partielles = sum( 1 for f in factures if f["statut_reglement"] == "Partiellement réglé" ) nb_non_reglees = sum( 1 for f in factures if f["statut_reglement"] == "Non réglé" ) return { "nb_factures": nb_total, "nb_soldees": nb_soldees, "nb_partiellement_reglees": nb_partielles, "nb_non_reglees": nb_non_reglees, "total_factures": total_factures, "total_regle": total_regle, "total_reste": total_reste, "filtres": { "date_debut": date_debut.strftime("%Y-%m-%d") if date_debut else None, "date_fin": date_fin.strftime("%Y-%m-%d") if date_fin else None, "client_code": client_code, "statut_reglement": statut_reglement, }, "factures": factures, } def _get_echeances_avec_reglements( cursor, do_domaine: int, do_type: int, do_piece: str, montant_ttc_facture: float, ) -> list: """Récupère les échéances d'un document avec leurs règlements détaillés.""" cursor.execute( """ SELECT dr.DR_No, dr.DR_Date, dr.DR_Montant, dr.DR_Regle, dr.N_Reglement, dr.DR_RefPaiement, mr.R_Intitule FROM F_DOCREGL dr LEFT JOIN P_REGLEMENT mr ON dr.N_Reglement = mr.cbIndice WHERE dr.DO_Domaine = ? AND dr.DO_Type = ? AND dr.DO_Piece = ? ORDER BY dr.DR_Date ASC """, (do_domaine, do_type, do_piece), ) echeances_rows = cursor.fetchall() echeances = [] nb_echeances = len(echeances_rows) for idx, ech in enumerate(echeances_rows): reglements = _get_reglements_echeance(cursor, ech[0]) montant_regle = sum(r["montant"] for r in reglements) montant_echeance = float(ech[2] or 0) if montant_echeance == 0 and nb_echeances == 1: montant_echeance = montant_ttc_facture echeances.append( { "dr_no": ech[0], "date_echeance": ech[1].strftime("%Y-%m-%d") if ech[1] else None, "montant": montant_echeance, "est_regle": ech[3] == 1, "mode_reglement_code": ech[4], "mode_reglement_libelle": ( ech[6] or ModeReglement.get_libelle(ech[4] or 0) ).strip() if ech[6] else ModeReglement.get_libelle(ech[4] or 0), "reference_paiement": (ech[5] or "").strip(), "reglements": reglements, "montant_regle": montant_regle, "reste_a_regler": montant_echeance - montant_regle, } ) return echeances def _get_reglements_echeance(cursor, dr_no: int) -> list: """Récupère les règlements liés à une échéance avec détails complets.""" cursor.execute( """ SELECT r.RG_No, rc.RC_Montant, r.RG_Date, r.RG_Reference, r.RG_Piece, r.N_Reglement, mr.R_Intitule, r.RG_Libelle, r.RG_Heure, r.RG_Banque, r.RG_Impaye, r.RG_Valide, r.JO_Num, j.JO_Intitule, r.cbCreation FROM F_REGLECH rc INNER JOIN F_CREGLEMENT r ON rc.RG_No = r.RG_No LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num WHERE rc.DR_No = ? ORDER BY r.RG_Date DESC """, (dr_no,), ) reglements_rows = cursor.fetchall() reglements = [] for rg in reglements_rows: heure_str = None if rg[8]: try: heure_str = ( rg[8].strftime("%H:%M:%S") if hasattr(rg[8], "strftime") else str(rg[8]) ) except Exception: heure_str = str(rg[8]) reglements.append( { "rg_no": rg[0], "montant": float(rg[1] or 0), "date": rg[2].strftime("%Y-%m-%d") if rg[2] else None, "heure": heure_str, "reference": (rg[3] or "").strip(), "numero_piece": (rg[4] or "").strip(), "mode_reglement_code": rg[5], "mode_reglement_libelle": ( rg[6] or ModeReglement.get_libelle(rg[5] or 0) ).strip() if rg[6] else ModeReglement.get_libelle(rg[5] or 0), "libelle": (rg[7] or "").strip(), "banque": (rg[9] or "").strip(), "est_impaye": rg[10] == 1, "est_valide": rg[11] == 1, "journal_code": (rg[12] or "").strip(), "journal_intitule": (rg[13] or "").strip(), "date_creation": rg[14].strftime("%Y-%m-%d %H:%M:%S") if rg[14] else None, } ) return reglements def lire_facture_reglement_detail(self, do_piece: str) -> Dict: """Récupère le détail complet d'une facture avec tous ses règlements.""" if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT d.DO_Domaine, d.DO_Type, d.DO_Piece, d.DO_Date, d.DO_Ref, d.DO_Tiers, d.DO_TotalTTC, d.DO_DateLivr, d.DO_Souche, d.cbCreation, c.CT_Intitule, c.CT_EMail, c.CT_Telephone, c.CT_Adresse, c.CT_CodePostal, c.CT_Ville FROM F_DOCENTETE d LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num WHERE d.DO_Piece = ? AND d.DO_Type IN (6, 7) """, (do_piece,), ) row = cursor.fetchone() if not row: raise ValueError(f"Facture {do_piece} introuvable") do_domaine = row[0] do_type = row[1] montant_ttc = float(row[6] or 0) echeances = _get_echeances_avec_reglements( cursor, do_domaine, do_type, do_piece ) total_regle = sum(e["montant_regle"] for e in echeances) reste_a_regler = montant_ttc - total_regle if total_regle == 0: statut = "Non réglé" elif abs(reste_a_regler) < 0.01: statut = "Soldé" else: statut = "Partiellement réglé" types_doc = {6: "Facture", 7: "Avoir"} return { "domaine": do_domaine, "type_code": do_type, "type_libelle": types_doc.get(do_type, f"Type {do_type}"), "numero": do_piece, "date": row[3].strftime("%Y-%m-%d") if row[3] else None, "reference": (row[4] or "").strip(), "client": { "code": (row[5] or "").strip(), "intitule": (row[10] or "").strip(), "email": (row[11] or "").strip(), "telephone": (row[12] or "").strip(), "adresse": (row[13] or "").strip(), "code_postal": (row[14] or "").strip(), "ville": (row[15] or "").strip(), }, "montant_ttc": montant_ttc, "montant_regle": total_regle, "reste_a_regler": reste_a_regler, "statut_reglement": statut, "date_livraison": row[7].strftime("%Y-%m-%d") if row[7] else None, "souche": row[8], "date_creation": row[9].strftime("%Y-%m-%d %H:%M:%S") if row[9] else None, "nb_echeances": len(echeances), "nb_reglements": sum(len(e["reglements"]) for e in echeances), "echeances": echeances, } def lire_reglement_detail(self, rg_no: int) -> Dict: """Récupère le détail d'un règlement spécifique avec ses imputations.""" if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT r.RG_No, r.CT_NumPayeur, r.RG_Date, r.RG_Reference, r.RG_Libelle, r.RG_Montant, r.RG_MontantDev, r.N_Reglement, r.RG_Impute, r.RG_Compta, r.RG_Type, r.RG_Cours, r.N_Devise, r.JO_Num, r.CG_Num, r.RG_Impaye, r.RG_Heure, r.RG_Piece, r.RG_Banque, r.RG_Valide, r.RG_MontantCommission, r.RG_MontantNet, r.cbCreation, r.cbModification, c.CT_Intitule, j.JO_Intitule, mr.R_Intitule, dev.D_Intitule FROM F_CREGLEMENT r LEFT JOIN F_COMPTET c ON r.CT_NumPayeur = c.CT_Num LEFT JOIN F_JOURNAUX j ON r.JO_Num = j.JO_Num LEFT JOIN P_REGLEMENT mr ON r.N_Reglement = mr.cbIndice LEFT JOIN P_DEVISE dev ON r.N_Devise = dev.cbIndice WHERE r.RG_No = ? """, (rg_no,), ) row = cursor.fetchone() if not row: raise ValueError(f"Règlement {rg_no} introuvable") cursor.execute( """ SELECT re.RC_Montant, re.DR_No, dr.DO_Domaine, dr.DO_Type, dr.DO_Piece, d.DO_Date, d.DO_TotalTTC, d.DO_Ref, d.DO_Tiers, c.CT_Intitule FROM F_REGLECH re LEFT JOIN F_DOCREGL dr ON re.DR_No = dr.DR_No LEFT JOIN F_DOCENTETE d ON dr.DO_Domaine = d.DO_Domaine AND dr.DO_Type = d.DO_Type AND dr.DO_Piece = d.DO_Piece LEFT JOIN F_COMPTET c ON d.DO_Tiers = c.CT_Num WHERE re.RG_No = ? """, (rg_no,), ) imputations_rows = cursor.fetchall() types_doc = {6: "Facture", 7: "Avoir"} domaines = {0: "Vente", 1: "Achat"} types_reglement = {0: "Client", 1: "Fournisseur"} imputations = [] total_impute = 0.0 for imp in imputations_rows: montant_imp = float(imp[0] or 0) total_impute += montant_imp imputations.append( { "montant": montant_imp, "dr_no": imp[1], "document": { "domaine": domaines.get(imp[2], f"Domaine {imp[2]}") if imp[2] is not None else None, "type": types_doc.get(imp[3], f"Type {imp[3]}") if imp[3] is not None else None, "numero": (imp[4] or "").strip() if imp[4] else None, "date": imp[5].strftime("%Y-%m-%d") if imp[5] else None, "total_ttc": float(imp[6] or 0) if imp[6] else None, "reference": (imp[7] or "").strip() if imp[7] else None, "tiers_code": (imp[8] or "").strip() if imp[8] else None, "tiers_intitule": (imp[9] or "").strip() if imp[9] else None, }, } ) rg_montant = float(row[5] or 0) reste = rg_montant - total_impute if total_impute == 0: statut_imputation = "Non imputé" elif abs(reste) < 0.01: statut_imputation = "Totalement imputé" else: statut_imputation = "Partiellement imputé" heure_str = None if row[16]: try: heure_str = ( row[16].strftime("%H:%M:%S") if hasattr(row[16], "strftime") else str(row[16]) ) except Exception: heure_str = str(row[16]) return { "rg_no": row[0], "numero_piece": (row[17] or "").strip(), "date": row[2].strftime("%Y-%m-%d") if row[2] else None, "heure": heure_str, "reference": (row[3] or "").strip(), "libelle": (row[4] or "").strip(), "montant": rg_montant, "montant_devise": float(row[6] or 0), "montant_impute": total_impute, "reste_a_imputer": reste, "montant_commission": float(row[20] or 0), "montant_net": float(row[21] or 0), "cours": float(row[11] or 1), "statut_imputation": statut_imputation, "est_impute": row[8] == 1, "est_comptabilise": row[9] == 1, "est_impaye": row[15] == 1, "est_valide": row[19] == 1, "type_code": row[10], "type_libelle": types_reglement.get(row[10], f"Type {row[10]}"), "client_code": (row[1] or "").strip(), "client_intitule": (row[24] or "").strip(), "journal_code": (row[13] or "").strip(), "journal_intitule": (row[25] or "").strip(), "compte_general": (row[14] or "").strip(), "mode_reglement_code": row[7], "mode_reglement_libelle": ( row[26] or ModeReglement.get_libelle(row[7] or 0) ).strip() if row[26] else ModeReglement.get_libelle(row[7] or 0), "devise_code": row[12], "devise_intitule": (row[27] or "").strip() if row[27] else "", "banque": (row[18] or "").strip(), "date_creation": row[22].strftime("%Y-%m-%d %H:%M:%S") if row[22] else None, "date_modification": row[23].strftime("%Y-%m-%d %H:%M:%S") if row[23] else None, "nb_imputations": len(imputations), "imputations": imputations, } def regler_facture( self, numero_facture: str, montant: float, mode_reglement: int = ModeReglement.CHEQUE, date_reglement: datetime = None, reference: str = "", libelle: str = "", code_journal: str = None, devise_code: int = 0, cours_devise: float = 1.0, tva_encaissement: bool = False, compte_general: str = None, ) -> Dict: if not self.cial: raise RuntimeError("Connexion Sage non établie") if montant <= 0: raise ValueError("Le montant du règlement doit être positif") date_reglement = date_reglement or datetime.now() # Déduction automatique du journal si non fourni if not code_journal: code_journal = _get_journal_auto(self, mode_reglement) else: # Valider la cohérence journal/mode _valider_coherence_journal_mode(self, code_journal, mode_reglement) logger.info( f"Règlement facture {numero_facture}: {montant}€ " f"(mode: {mode_reglement}, journal: {code_journal}, devise: {devise_code})" ) try: with self._com_context(), self._lock_com: factory = self.cial.FactoryDocumentVente if not factory.ExistPiece(60, numero_facture): raise ValueError(f"Facture {numero_facture} introuvable") persist = factory.ReadPiece(60, numero_facture) doc = win32com.client.CastTo(persist, "IBODocumentVente3") doc.Read() total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0)) montant_deja_regle = float(getattr(doc, "DO_MontantRegle", 0.0)) statut = getattr(doc, "DO_Statut", 0) if statut == 6: raise ValueError(f"Facture {numero_facture} annulée") solde_actuel = total_ttc - montant_deja_regle if montant > solde_actuel + 0.01: raise ValueError( f"Montant ({montant}€) supérieur au solde ({solde_actuel:.2f}€)" ) # Récupérer le client client_code = "" try: client_obj = getattr(doc, "Client", None) if client_obj: client_obj.Read() client_code = getattr(client_obj, "CT_Num", "").strip() except Exception: pass # Récupérer l'échéance echeance = _get_premiere_echeance(doc) if not echeance: raise ValueError(f"Facture {numero_facture} sans échéance") # Exécuter le règlement numero_reglement = _executer_reglement_com( self, doc=doc, echeance=echeance, montant=montant, mode_reglement=mode_reglement, date_reglement=date_reglement, reference=reference, libelle=libelle, code_journal=code_journal, client_code=client_code, numero_facture=numero_facture, devise_code=devise_code, cours_devise=cours_devise, tva_encaissement=tva_encaissement, compte_general=compte_general, ) time.sleep(0.3) doc.Read() nouveau_montant_regle = float(getattr(doc, "DO_MontantRegle", 0.0)) if abs(nouveau_montant_regle - montant_deja_regle) < 0.01: raise RuntimeError( "Le règlement n'a pas été appliqué (DO_MontantRegle inchangé)" ) nouveau_solde = total_ttc - nouveau_montant_regle logger.info(f"Règlement effectué - Solde restant: {nouveau_solde:.2f}€") # Récupérer le libellé du mode règlement mode_libelle = _get_mode_reglement_libelle(self, mode_reglement) return { "numero_facture": numero_facture, "numero_reglement": numero_reglement, "montant_regle": montant, "date_reglement": date_reglement.strftime("%Y-%m-%d"), "mode_reglement": mode_reglement, "mode_reglement_libelle": mode_libelle, "reference": reference, "libelle": libelle, "code_journal": code_journal, "devise_code": devise_code, "cours_devise": cours_devise, "tva_encaissement": tva_encaissement, "total_facture": total_ttc, "solde_restant": nouveau_solde, "facture_soldee": nouveau_solde < 0.01, "client_code": client_code, } except ValueError: raise except Exception as e: logger.error(f"Erreur règlement: {e}", exc_info=True) raise RuntimeError(f"Échec règlement facture: {str(e)}") def _get_premiere_echeance(doc): try: factory_ech = getattr(doc, "FactoryDocumentEcheance", None) if factory_ech: ech_list = factory_ech.List if ech_list: echeance = ech_list.Item(1) if echeance: for iface in ["IBODocumentEcheance3", "IBODocumentEcheance"]: try: echeance = win32com.client.CastTo(echeance, iface) logger.info(f" Échéance castée vers {iface}") break except Exception: continue echeance.Read() return echeance except Exception as e: logger.warning(f" Pas d'échéance: {e}") return None def _executer_reglement_com( self, doc, echeance, montant, mode_reglement, date_reglement, reference, libelle, code_journal, client_code, numero_facture, devise_code=0, cours_devise=1.0, tva_encaissement=False, compte_general=None, ): erreurs = [] # APPROCHE PRINCIPALE: Créer règlement complet, l'écrire, puis l'assigner au process try: logger.info("Création du règlement via FactoryDocumentReglement...") # 1. Créer le règlement factory_reg = self.cial.FactoryDocumentReglement reg = factory_reg.Create() reg = win32com.client.CastTo(reg, "IBODocumentReglement") logger.info(" Règlement créé et casté vers IBODocumentReglement") # 2. Configurer le Journal (objet) try: journal_factory = self.cial.CptaApplication.FactoryJournal journal_persist = journal_factory.ReadNumero(code_journal) if journal_persist: reg.Journal = journal_persist logger.info(f" Journal défini: {code_journal}") except Exception as e: logger.warning(f" Journal: {e}") # 3. Configurer le TiersPayeur (objet client) try: factory_client = self.cial.CptaApplication.FactoryClient if client_code: client_persist = factory_client.ReadNumero(client_code) if client_persist: reg.TiersPayeur = client_persist logger.info(f" TiersPayeur défini: {client_code}") except Exception as e: logger.warning(f" TiersPayeur: {e}") # 4. Configurer les champs simples try: reg.RG_Date = pywintypes.Time(date_reglement) logger.info(f" RG_Date: {date_reglement}") except Exception as e: logger.warning(f" RG_Date: {e}") try: reg.RG_Montant = montant logger.info(f" RG_Montant: {montant}") except Exception as e: logger.warning(f" RG_Montant: {e}") # 5. Mode de règlement via l'objet Reglement try: mode_factory = getattr( self.cial.CptaApplication, "FactoryModeReglement", None ) if mode_factory: mode_obj = mode_factory.ReadNumero(mode_reglement) if mode_obj: reg.Reglement = mode_obj logger.info(f" Mode règlement défini: {mode_reglement}") except Exception as e: logger.debug(f" Mode règlement via factory: {e}") # 6. Devise if devise_code != 0: try: reg.RG_Devise = devise_code logger.info(f" RG_Devise: {devise_code}") except Exception as e: logger.debug(f" RG_Devise: {e}") try: reg.RG_Cours = cours_devise logger.info(f" RG_Cours: {cours_devise}") except Exception as e: logger.debug(f" RG_Cours: {e}") # Montant en devise try: montant_devise = montant * cours_devise reg.RG_MontantDev = montant_devise logger.info(f" RG_MontantDev: {montant_devise}") except Exception as e: logger.debug(f" RG_MontantDev: {e}") # 7. TVA sur encaissement if tva_encaissement: try: reg.RG_Encaissement = 1 logger.info(" RG_Encaissement: 1 (TVA sur encaissement)") except Exception as e: logger.debug(f" RG_Encaissement: {e}") # 8. Compte général spécifique if compte_general: try: cg_factory = self.cial.CptaApplication.FactoryCompteG cg_persist = cg_factory.ReadNumero(compte_general) if cg_persist: reg.CompteG = cg_persist logger.info(f" CompteG défini: {compte_general}") except Exception as e: logger.debug(f" CompteG: {e}") # 9. Référence et libellé if reference: try: reg.RG_Reference = reference logger.info(f" RG_Reference: {reference}") except Exception: pass if libelle: try: reg.RG_Libelle = libelle logger.info(f" RG_Libelle: {libelle}") except Exception: pass try: reg.RG_Impute = 1 # Imputé except Exception: pass try: reg.RG_Compta = 0 # Non comptabilisé except Exception: pass # 10. ÉCRIRE le règlement reg.Write() numero = getattr(reg, "RG_Piece", None) logger.info(f" Règlement écrit avec numéro: {numero}") # 11. Créer le lien règlement-échéance via la factory DU RÈGLEMENT try: logger.info(" Création du lien règlement-échéance...") factory_reg_ech = getattr(reg, "FactoryDocumentReglementEcheance", None) if factory_reg_ech: reg_ech = factory_reg_ech.Create() # Cast vers IBODocumentReglementEcheance for iface in [ "IBODocumentReglementEcheance3", "IBODocumentReglementEcheance", ]: try: reg_ech = win32com.client.CastTo(reg_ech, iface) logger.info(f" Cast vers {iface} réussi") break except Exception: continue # Définir l'échéance try: reg_ech.Echeance = echeance logger.info(" Echeance définie") except Exception as e: logger.warning(f" Echeance: {e}") # Définir le montant try: reg_ech.RC_Montant = montant logger.info(f" RC_Montant: {montant}") except Exception as e: logger.warning(f" RC_Montant: {e}") # Écrire le lien try: reg_ech.SetDefault() except Exception: pass reg_ech.Write() logger.info(" Lien règlement-échéance écrit!") return str(numero) if numero else None except Exception as e: erreurs.append(f"Lien échéance: {e}") logger.warning(f" Erreur création lien: {e}") # Si le lien a échoué, essayer via le process logger.info(" Tentative via CreateProcess_ReglerEcheances...") try: process = self.cial.CreateProcess_ReglerEcheances() # Assigner le règlement déjà écrit process.Reglement = reg logger.info(" Règlement assigné au process") # Ajouter l'échéance try: process.AddDocumentEcheanceMontant(echeance, montant) logger.info(" Échéance ajoutée avec montant") except Exception: process.AddDocumentEcheance(echeance) logger.info(" Échéance ajoutée") can_process = getattr(process, "CanProcess", True) logger.info(f" CanProcess: {can_process}") if can_process: process.Process() logger.info(" Process() réussi!") return str(numero) if numero else None else: try: errors = process.Errors if errors: for i in range(1, errors.Count + 1): logger.warning(f" Erreur [{i}]: {errors.Item(i)}") except Exception: pass except Exception as e: erreurs.append(f"Process: {e}") logger.warning(f" Process échoué: {e}") except Exception as e: erreurs.append(f"FactoryDocumentReglement: {e}") logger.error(f"FactoryDocumentReglement échoué: {e}") # APPROCHE ALTERNATIVE: Via le mode règlement de l'échéance try: logger.info("Tentative via modification directe de l'échéance...") mode_obj = getattr(echeance, "Reglement", None) if mode_obj: attrs = [a for a in dir(mode_obj) if not a.startswith("_")] logger.info(f" Attributs Reglement échéance: {attrs[:15]}...") factory_reg_ech = getattr(echeance, "FactoryDocumentReglementEcheance", None) if factory_reg_ech: logger.info(" FactoryDocumentReglementEcheance trouvée sur échéance") reg_ech = factory_reg_ech.Create() for iface in [ "IBODocumentReglementEcheance3", "IBODocumentReglementEcheance", ]: try: reg_ech = win32com.client.CastTo(reg_ech, iface) logger.info(f" Cast vers {iface}") break except Exception: continue try: factory_reg = self.cial.FactoryDocumentReglement new_reg = factory_reg.Create() new_reg = win32com.client.CastTo(new_reg, "IBODocumentReglement") # Configurer journal_factory = self.cial.CptaApplication.FactoryJournal journal_persist = journal_factory.ReadNumero(code_journal) if journal_persist: new_reg.Journal = journal_persist factory_client = self.cial.CptaApplication.FactoryClient if client_code: client_persist = factory_client.ReadNumero(client_code) if client_persist: new_reg.TiersPayeur = client_persist new_reg.RG_Date = pywintypes.Time(date_reglement) new_reg.RG_Montant = montant new_reg.RG_Impute = 1 # Devise si non EUR if devise_code != 0: try: new_reg.RG_Devise = devise_code new_reg.RG_Cours = cours_devise new_reg.RG_MontantDev = montant * cours_devise except Exception: pass # TVA encaissement if tva_encaissement: try: new_reg.RG_Encaissement = 1 except Exception: pass # Compte général if compte_general: try: cg_factory = self.cial.CptaApplication.FactoryCompteG cg_persist = cg_factory.ReadNumero(compte_general) if cg_persist: new_reg.CompteG = cg_persist except Exception: pass new_reg.Write() logger.info( f" Nouveau règlement créé: {getattr(new_reg, 'RG_Piece', None)}" ) try: reg_ech.SetDefault() except Exception: pass reg_ech.RC_Montant = montant reg_ech.Write() logger.info(" Lien créé depuis échéance!") return str(getattr(new_reg, "RG_Piece", None)) except Exception as e: logger.warning(f" Erreur: {e}") except Exception as e: erreurs.append(f"Via échéance: {e}") logger.warning(f"Via échéance échoué: {e}") raise RuntimeError(f"Aucune méthode n'a fonctionné. Erreurs: {'; '.join(erreurs)}") def introspecter_reglement(self): if not self.cial: raise RuntimeError("Connexion Sage non établie") result = {} try: with self._com_context(), self._lock_com: # IBODocumentReglement et sa factory de liens try: factory = self.cial.FactoryDocumentReglement reg = factory.Create() reg = win32com.client.CastTo(reg, "IBODocumentReglement") result["IBODocumentReglement"] = [ a for a in dir(reg) if not a.startswith("_") ] # FactoryDocumentReglementEcheance depuis le règlement factory_lien = getattr(reg, "FactoryDocumentReglementEcheance", None) if factory_lien: lien = factory_lien.Create() result["ReglementEcheance_base"] = [ a for a in dir(lien) if not a.startswith("_") ] for iface in [ "IBODocumentReglementEcheance3", "IBODocumentReglementEcheance", ]: try: lien_cast = win32com.client.CastTo(lien, iface) result[f"ReglementEcheance_{iface}"] = [ a for a in dir(lien_cast) if not a.startswith("_") ] except Exception as e: result[f"cast_{iface}_error"] = str(e) except Exception as e: result["error_reglement"] = str(e) # Process try: process = self.cial.CreateProcess_ReglerEcheances() result["Process"] = [a for a in dir(process) if not a.startswith("_")] except Exception as e: result["error_process"] = str(e) # Échéance et ses attributs try: factory_doc = self.cial.FactoryDocumentVente doc_list = factory_doc.List for i in range(1, 20): try: doc = doc_list.Item(i) if doc: doc.Read() if getattr(doc, "DO_Type", 0) == 6: factory_ech = getattr( doc, "FactoryDocumentEcheance", None ) if factory_ech: ech_list = factory_ech.List if ech_list: ech = ech_list.Item(1) if ech: ech = win32com.client.CastTo( ech, "IBODocumentEcheance3" ) ech.Read() result["IBODocumentEcheance3"] = [ a for a in dir(ech) if not a.startswith("_") ] # FactoryDocumentReglementEcheance depuis l'échéance factory_lien_ech = getattr( ech, "FactoryDocumentReglementEcheance", None, ) if factory_lien_ech: lien_ech = factory_lien_ech.Create() result[ "EcheanceReglementEcheance_base" ] = [ a for a in dir(lien_ech) if not a.startswith("_") ] for iface in [ "IBODocumentReglementEcheance3", "IBODocumentReglementEcheance", ]: try: lien_ech_cast = ( win32com.client.CastTo( lien_ech, iface ) ) result[ f"EcheanceReglementEcheance_{iface}" ] = [ a for a in dir(lien_ech_cast) if not a.startswith("_") ] except Exception: pass # Reglement de l'échéance (mode) mode = getattr(ech, "Reglement", None) if mode: result["Echeance_Reglement_mode"] = [ a for a in dir(mode) if not a.startswith("_") ] break except Exception: continue except Exception as e: result["error_echeance"] = str(e) except Exception as e: result["global_error"] = str(e) return result def regler_factures_client( self, client_code, montant_total, mode_reglement=ModeReglement.CHEQUE, date_reglement=None, reference="", libelle="", numeros_factures=None, ): if not self.cial: raise RuntimeError("Connexion Sage non établie") if montant_total <= 0: raise ValueError("Le montant total doit être positif") date_reglement = date_reglement or datetime.now() factures = _get_factures_non_soldees_client_sql(self, client_code, numeros_factures) if not factures: raise ValueError(f"Aucune facture à régler pour {client_code}") solde_total = sum(f["solde"] for f in factures) if montant_total > solde_total + 0.01: raise ValueError( f"Montant ({montant_total}€) supérieur au solde ({solde_total:.2f}€)" ) reglements = [] restant = montant_total for fac in factures: if restant < 0.01: break a_regler = min(restant, fac["solde"]) try: res = regler_facture( self, fac["numero"], a_regler, mode_reglement, date_reglement, reference, libelle, ) reglements.append(res) restant -= a_regler except Exception as e: logger.error(f"Erreur {fac['numero']}: {e}") break if not reglements: raise RuntimeError("Aucun règlement effectué") return { "client_code": client_code, "montant_demande": montant_total, "montant_effectif": sum(r["montant_regle"] for r in reglements), "nb_factures_reglees": len(reglements), "nb_factures_soldees": sum(1 for r in reglements if r["facture_soldee"]), "date_reglement": date_reglement.strftime("%Y-%m-%d"), "mode_reglement": mode_reglement, "mode_reglement_libelle": ModeReglement.get_libelle(mode_reglement), "reference": reference, "reglements": reglements, } def _get_factures_non_soldees_client_sql(self, client_code, numeros=None): with self._get_sql_connection() as conn: cursor = conn.cursor() if numeros: placeholders = ",".join("?" * len(numeros)) query = f"SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Piece IN ({placeholders}) AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC" params = [client_code] + numeros else: query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC" params = [client_code] cursor.execute(query, params) return [ { "numero": r[0].strip(), "date": r[1].strftime("%Y-%m-%d") if r[1] else None, "total_ttc": float(r[2] or 0), "montant_regle": float(r[3] or 0), "solde": float(r[2] or 0) - float(r[3] or 0), } for r in cursor.fetchall() ] def lire_reglements_facture(self, numero_facture): if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT DO_TotalTTC, DO_MontantRegle, DO_Tiers, DO_Date, DO_Ref FROM F_DOCENTETE WHERE DO_Piece = ? AND DO_Type = 6", (numero_facture,), ) row = cursor.fetchone() if not row: raise ValueError(f"Facture {numero_facture} introuvable") total = float(row[0] or 0) regle = float(row[1] or 0) solde = max(0, total - regle) return { "numero_facture": numero_facture, "client_code": (row[2] or "").strip(), "date_facture": row[3].strftime("%Y-%m-%d") if row[3] else None, "reference": (row[4] or "").strip(), "total_ttc": total, "total_regle": regle, "solde_restant": solde, "est_soldee": solde < 0.01, } def lire_reglements_client( self, client_code, date_debut=None, date_fin=None, inclure_soldees=True ): if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT CT_Intitule FROM F_COMPTET WHERE CT_Num = ?", (client_code,) ) row = cursor.fetchone() if not row: raise ValueError(f"Client {client_code} introuvable") intitule = (row[0] or "").strip() query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle, DO_Ref FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ?" params = [client_code] if date_debut: query += " AND DO_Date >= ?" params.append(date_debut) if date_fin: query += " AND DO_Date <= ?" params.append(date_fin) query += " ORDER BY DO_Date ASC" cursor.execute(query, params) factures = [] for r in cursor.fetchall(): total = float(r[2] or 0) regle = float(r[3] or 0) solde = max(0, total - regle) soldee = solde < 0.01 if inclure_soldees or not soldee: factures.append( { "numero_facture": r[0].strip(), "date_facture": r[1].strftime("%Y-%m-%d") if r[1] else None, "total_ttc": total, "reference": (r[4] or "").strip(), "total_regle": regle, "solde_restant": solde, "est_soldee": soldee, } ) return { "client_code": client_code, "client_intitule": intitule, "nb_factures": len(factures), "nb_factures_soldees": sum(1 for f in factures if f["est_soldee"]), "nb_factures_en_cours": sum(1 for f in factures if not f["est_soldee"]), "total_factures": sum(f["total_ttc"] for f in factures), "total_regle": sum(f["total_regle"] for f in factures), "solde_global": sum(f["solde_restant"] for f in factures), "factures": factures, } def lire_modes_reglement(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cbIndice, R_Intitule, R_Code, R_ModePaieDebit, R_ModePaieCredit FROM P_REGLEMENT WHERE R_Intitule IS NOT NULL AND LTRIM(RTRIM(R_Intitule)) <> '' ORDER BY cbIndice """) modes = [] for row in cursor.fetchall(): intitule = (row[1] or "").strip() if intitule: # Ne garder que ceux avec un intitulé modes.append( { "code": row[0], # cbIndice "intitule": intitule, "code_sage": (row[2] or "").strip(), "mode_paie_debit": row[3] or 0, "mode_paie_credit": row[4] or 0, } ) return modes def _get_modes_reglement_standards() -> List[Dict]: """Modes de règlement standards Sage si P_REGLEMENT non configuré""" return [ {"code": 0, "intitule": "Chèque", "type": "banque"}, {"code": 1, "intitule": "Virement", "type": "banque"}, {"code": 2, "intitule": "Espèces", "type": "caisse"}, {"code": 3, "intitule": "LCR Acceptée", "type": "banque"}, {"code": 4, "intitule": "LCR non acceptée", "type": "banque"}, {"code": 5, "intitule": "BOR", "type": "banque"}, {"code": 6, "intitule": "Prélèvement", "type": "banque"}, {"code": 7, "intitule": "Carte bancaire", "type": "banque"}, {"code": 8, "intitule": "Bon d'achat", "type": "autre"}, ] def lire_devises(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() # Vérifier d'abord si F_DEVISE existe cursor.execute(""" SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'F_DEVISE' """) has_f_devise = cursor.fetchone()[0] > 0 if has_f_devise: cursor.execute(""" SELECT D_Code, D_Intitule, D_Sigle, D_CoursActuel FROM F_DEVISE ORDER BY D_Code """) devises = [] for row in cursor.fetchall(): devises.append( { "code": row[0], "intitule": (row[1] or "").strip(), "sigle": (row[2] or "").strip(), "cours_actuel": float(row[3] or 1.0), } ) if devises: return devises # Fallback: Lire depuis P_DOSSIER cursor.execute(""" SELECT N_DeviseCompte, N_DeviseEquival FROM P_DOSSIER """) row = cursor.fetchone() # Devise par défaut basée sur la config dossier devise_principale = row[0] if row else 0 # Retourner les devises standards devises_standards = [ { "code": 0, "intitule": "Euro", "sigle": "EUR", "cours_actuel": 1.0, "est_principale": devise_principale == 0, }, { "code": 1, "intitule": "Dollar US", "sigle": "USD", "cours_actuel": 1.0, "est_principale": devise_principale == 1, }, { "code": 2, "intitule": "Livre Sterling", "sigle": "GBP", "cours_actuel": 1.0, "est_principale": devise_principale == 2, }, { "code": 3, "intitule": "Franc Suisse", "sigle": "CHF", "cours_actuel": 1.0, "est_principale": devise_principale == 3, }, ] return devises_standards def lire_journaux_tresorerie(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT JO_Num, JO_Intitule, CG_Num, JO_Type, JO_Reglement, JO_Sommeil FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND (JO_Sommeil = 0 OR JO_Sommeil IS NULL) ORDER BY JO_Num """) journaux = [] for row in cursor.fetchall(): compte_general = (row[2] or "").strip() # Déterminer le type basé sur le compte général if compte_general.startswith("53"): type_journal = "caisse" elif compte_general.startswith("51"): type_journal = "banque" else: type_journal = "tresorerie" journaux.append( { "code": row[0].strip(), "intitule": (row[1] or "").strip(), "compte_general": compte_general, "type": type_journal, } ) return journaux def lire_comptes_generaux( self, prefixe: str = None, type_compte: str = None ) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") # Mapping type -> préfixes de comptes prefixes_map = { "client": ["411"], "fournisseur": ["401"], "banque": ["51"], "caisse": ["53"], "tva_collectee": ["4457"], "tva_deductible": ["4456"], "tva": ["445"], "produit": ["7"], "charge": ["6"], } with self._get_sql_connection() as conn: cursor = conn.cursor() query = """ SELECT CG_Num, CG_Intitule, CG_Type, CG_Raccourci, CG_Sommeil FROM F_COMPTEG WHERE (CG_Sommeil = 0 OR CG_Sommeil IS NULL) """ params = [] # Appliquer les filtres if type_compte and type_compte in prefixes_map: prefixes = prefixes_map[type_compte] conditions = " OR ".join(["CG_Num LIKE ?" for _ in prefixes]) query += f" AND ({conditions})" params.extend([f"{p}%" for p in prefixes]) elif prefixe: query += " AND CG_Num LIKE ?" params.append(f"{prefixe}%") query += " ORDER BY CG_Num" cursor.execute(query, params) comptes = [] for row in cursor.fetchall(): comptes.append( { "numero": row[0].strip(), "intitule": (row[1] or "").strip(), "type": row[2] or 0, "raccourci": (row[3] or "").strip(), } ) return comptes def lire_tva_taux(self) -> List[Dict]: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT TA_No, TA_Code, TA_Intitule, TA_Taux, TA_TTaux, TA_Type, TA_Sens, CG_Num, TA_Assujet FROM F_TAXE ORDER BY TA_No """) taux = [] for row in cursor.fetchall(): taux.append( { "numero": row[0], "code": (row[1] or "").strip(), "intitule": (row[2] or "").strip(), "taux": float(row[3] or 0), "type_taux": row[4] or 0, # 0=Pourcentage, 1=Montant "type": row[5] or 0, # 0=Collectée, 1=Déductible, etc. "sens": row[6] or 0, # 0=Vente, 1=Achat "compte_general": (row[7] or "").strip(), "assujetti": row[8] or 0, } ) return taux def lire_parametres_encaissement(self) -> Dict: if not self.cial: raise RuntimeError("Connexion Sage non établie") with self._get_sql_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT D_TVAEncReg, D_TVAEncAffect, N_DeviseCompte FROM P_DOSSIER """) row = cursor.fetchone() if row: return { "tva_encaissement_regime": row[0] or 0, "tva_encaissement_affectation": row[1] or 0, "tva_encaissement_actif": (row[0] or 0) > 0, "devise_compte": row[2] or 0, } return { "tva_encaissement_regime": 0, "tva_encaissement_affectation": 0, "tva_encaissement_actif": False, "devise_compte": 0, } __all__ = [ "ModeReglement", "lire_journaux_banque", "lire_tous_journaux", "introspecter_reglement", "regler_facture", "regler_factures_client", "lire_reglements_facture", "lire_reglements_client", "lire_modes_reglement", "lire_devises", "lire_journaux_tresorerie", "lire_comptes_generaux", "lire_tva_taux", "lire_parametres_encaissement", "_get_modes_reglement_standards", "lire_tous_reglements", "lire_facture_reglement_detail", "lire_reglement_detail", ]