Modified implicit error on converting type int into varchar in SQL Query in articles_data_sql

Added settle logics for invoice and trying to gather society's profile image
This commit is contained in:
fanilo 2026-01-15 06:38:49 +01:00
parent e47e14f1b4
commit b6416487c0
8 changed files with 1395 additions and 26 deletions

273
main.py
View file

@ -1,7 +1,8 @@
from fastapi import FastAPI, HTTPException, Header, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
from datetime import datetime, date
from datetime import datetime
import uvicorn
import logging
import win32com.client
@ -43,6 +44,10 @@ from schemas import (
CollaborateurNumeroRequest,
CollaborateurUpdateRequest,
)
from schemas.documents.reglements import (
ReglementFactureRequest,
ReglementMultipleRequest,
)
logging.basicConfig(
level=logging.INFO,
@ -1431,6 +1436,272 @@ def get_societe_info():
raise HTTPException(500, str(e))
@app.post(
"/sage/factures/{numero_facture}/regler", dependencies=[Depends(verify_token)]
)
def regler_facture_endpoint(
numero_facture: str,
req: ReglementFactureRequest,
):
"""
Règle une facture (totalement ou partiellement)
- **numero_facture**: Numéro de la facture (ex: FA00081)
- **montant**: Montant du règlement
- **mode_reglement**: 1=Virement, 2=Chèque, 3=Traite, 4=CB, 5=LCR, 6=Prélèvement, 7=Espèces
"""
try:
date_reg = datetime.combine(req.date_reglement, datetime.min.time()) if req.date_reglement else None
result = sage.regler_facture(
numero_facture=numero_facture,
montant=req.montant,
mode_reglement=req.mode_reglement,
date_reglement=date_reg,
reference=req.reference or "",
libelle=req.libelle or "",
)
return {"success": True, "message": "Règlement effectué avec succès", "data": result}
except ValueError as e:
raise HTTPException(400, str(e))
except Exception as e:
logger.error(f"Erreur règlement {numero_facture}: {e}")
raise HTTPException(500, str(e))
@app.post("/sage/reglements/multiple", dependencies=[Depends(verify_token)])
def regler_factures_client_endpoint(req: ReglementMultipleRequest):
"""
Règle plusieurs factures d'un client
Si numeros_factures est fourni, règle ces factures dans l'ordre.
Sinon, règle les factures les plus anciennes en priorité.
"""
try:
resultat = sage.regler_factures_client(
client_code=req.client_code,
montant_total=req.montant_total,
mode_reglement=req.mode_reglement,
date_reglement=req.date_reglement,
reference=req.reference or "",
libelle=req.libelle or "",
code_journal=req.code_journal,
numeros_factures=req.numeros_factures,
)
return {"success": True, "data": resultat}
except ValueError as e:
logger.warning(f"Erreur métier règlement multiple {req.client_code}: {e}")
raise HTTPException(400, str(e))
except Exception as e:
logger.error(f"Erreur technique règlement multiple: {e}")
raise HTTPException(500, str(e))
@app.get(
"/sage/factures/{numero_facture}/reglements", dependencies=[Depends(verify_token)]
)
def get_reglements_facture_endpoint(numero_facture: str):
"""
Récupère tous les règlements d'une facture
"""
try:
resultat = sage.lire_reglements_facture(numero_facture)
return {"success": True, "data": resultat}
except ValueError as e:
logger.warning(f"Facture introuvable: {numero_facture}")
raise HTTPException(404, str(e))
except Exception as e:
logger.error(f"Erreur lecture règlements {numero_facture}: {e}")
raise HTTPException(500, str(e))
@app.get("/sage/clients/{client_code}/reglements", dependencies=[Depends(verify_token)])
def get_reglements_client_endpoint(
client_code: str,
date_debut: Optional[datetime] = Query(None, description="Date début (filtrage)"),
date_fin: Optional[datetime] = Query(None, description="Date fin (filtrage)"),
inclure_soldees: bool = Query(True, description="Inclure les factures soldées"),
):
"""
Récupère tous les règlements d'un client avec leurs factures
"""
try:
resultat = sage.lire_reglements_client(
client_code=client_code,
date_debut=date_debut,
date_fin=date_fin,
inclure_soldees=inclure_soldees,
)
return {"success": True, "data": resultat}
except ValueError as e:
logger.warning(f"Client introuvable: {client_code}")
raise HTTPException(404, str(e))
except Exception as e:
logger.error(f"Erreur lecture règlements client {client_code}: {e}")
raise HTTPException(500, str(e))
@app.get("/sage/reglements/modes", dependencies=[Depends(verify_token)])
def get_modes_reglement():
"""
Retourne la liste des modes de règlement disponibles
"""
return {
"success": True,
"data": {
"modes": [
{"code": 1, "libelle": "Virement"},
{"code": 2, "libelle": "Chèque"},
{"code": 3, "libelle": "Traite"},
{"code": 4, "libelle": "Carte bancaire"},
{"code": 5, "libelle": "LCR"},
{"code": 6, "libelle": "Prélèvement"},
{"code": 7, "libelle": "Espèces"},
]
},
}
@app.get("/sage/journaux")
def get_tous_journaux():
"""
Liste TOUS les journaux (pour diagnostic)
Types:
- 0 = Achats
- 1 = Ventes
- 2 = Trésorerie (Banque/Caisse) Pour les règlements
- 3 = Général (OD)
- 4 = Situation
"""
try:
journaux = sage.lire_tous_journaux()
# Grouper par type
par_type = {}
for j in journaux:
t = j["type_libelle"]
if t not in par_type:
par_type[t] = []
par_type[t].append(j)
return {
"success": True,
"data": {
"journaux": journaux,
"par_type": par_type,
"nb_tresorerie": len([j for j in journaux if j["type_code"] == 2]),
"message": "Pour les règlements, utilisez les journaux de type Trésorerie (type_code=2)",
},
}
except Exception as e:
logger.error(f"Erreur lecture journaux: {e}")
raise HTTPException(500, str(e))
@app.get("/sage/journaux/banque")
def get_journaux_banque():
"""
Liste les journaux de trésorerie (type 2) pour les règlements
"""
try:
journaux = sage.lire_journaux_banque()
if not journaux:
return {
"success": True,
"data": {
"journaux": [],
"warning": "Aucun journal de trésorerie configuré. "
"Créez un journal de type 2 (Trésorerie) dans Sage "
"avant de pouvoir effectuer des règlements.",
},
}
return {"success": True, "data": {"journaux": journaux}}
except Exception as e:
logger.error(f"Erreur lecture journaux: {e}")
raise HTTPException(500, str(e))
@app.get("/sage/reglements/introspection")
def introspecter_reglements():
"""
Introspection des objets COM de règlement (diagnostic)
Utile pour comprendre les attributs disponibles sur les objets de règlement.
"""
try:
data = sage.introspecter_reglement()
return {"success": True, "data": data}
except Exception as e:
logger.error(f"Erreur introspection: {e}")
raise HTTPException(500, str(e))
@app.get("/sage/societe/com-introspection")
def introspection_com():
"""Liste toutes les propriétés/méthodes COM disponibles"""
try:
resultats = {
"cial_attributes": [],
"base_cpta_attributes": [],
"param_dossier_attributes": [],
}
with sage._com_context(), sage._lock_com:
# Attributs de cial
try:
for attr in dir(sage.cial):
if not attr.startswith("_"):
resultats["cial_attributes"].append(attr)
except Exception as e:
resultats["cial_error"] = str(e)
# Attributs de BaseCpta
try:
base_cpta = sage.cial.BaseCpta
for attr in dir(base_cpta):
if not attr.startswith("_"):
resultats["base_cpta_attributes"].append(attr)
except Exception as e:
resultats["base_cpta_error"] = str(e)
# Attributs de ParametreDossier
try:
param = sage.cial.BaseCpta.ParametreDossier
for attr in dir(param):
if not attr.startswith("_"):
resultats["param_dossier_attributes"].append(attr)
# Tester spécifiquement les attributs logo possibles
resultats["logo_tests"] = {}
for logo_attr in [
"Logo",
"D_Logo",
"ImageLogo",
"LogoImage",
"GetLogo",
]:
try:
val = getattr(param, logo_attr, None)
resultats["logo_tests"][logo_attr] = str(type(val))
except:
pass
except Exception as e:
resultats["param_dossier_error"] = str(e)
return {"success": True, "data": resultats}
except Exception as e:
raise HTTPException(500, str(e))
if __name__ == "__main__":
uvicorn.run(
"main:app",

View file

@ -51,7 +51,7 @@ from utils.functions.items_to_dict import (
contacts_to_dict,
contact_to_dict,
tiers_to_dict,
society_to_dict
society_to_dict,
)
from utils.functions.sage_utilities import (
@ -90,6 +90,17 @@ from utils.functions.society.societe_data import (
get_societe_row,
add_logo,
build_exercices,
recuperer_logo_com,
)
from utils.documents.settle import (
regler_facture as _regler_facture,
regler_factures_client as _regler_factures_client,
lire_reglements_client as _lire_reglements_client,
lire_reglements_facture as _lire_reglements_facture,
lire_journaux_banque as _lire_journaux,
lire_tous_journaux as _lire,
introspecter_reglement as _intro,
)
logger = logging.getLogger(__name__)
@ -7555,9 +7566,9 @@ class SageConnector:
logger.info("💾 Write()...")
try:
collab.Write()
logger.info(" Write() RÉUSSI!")
logger.info(" Write() RÉUSSI!")
except Exception as e:
logger.error(f" Write() échoué: {e}")
logger.error(f" Write() échoué: {e}")
raise RuntimeError(f"Échec Write(): {e}")
# ===== RÉCUPÉRATION DU NUMÉRO =====
@ -7594,7 +7605,7 @@ class SageConnector:
logger.info(f"\n{'=' * 70}")
logger.info(
f" COLLABORATEUR CRÉÉ: N°{numero_cree} - {nom_upper} {prenom}"
f" COLLABORATEUR CRÉÉ: N°{numero_cree} - {nom_upper} {prenom}"
)
logger.info(f"{'=' * 70}")
@ -7608,7 +7619,7 @@ class SageConnector:
logger.warning(f"⚠️ Validation: {e}")
raise
except Exception as e:
logger.error(f" Erreur création collaborateur: {e}", exc_info=True)
logger.error(f" Erreur création collaborateur: {e}", exc_info=True)
raise RuntimeError(f"Échec création collaborateur: {str(e)}")
def modifier_collaborateur(self, numero: int, data: dict) -> dict:
@ -7781,14 +7792,14 @@ class SageConnector:
logger.info("💾 Write()...")
try:
collab.Write()
logger.info(" Write() RÉUSSI!")
logger.info(" Write() RÉUSSI!")
except Exception as e:
logger.error(f" Write() échoué: {e}")
logger.error(f" Write() échoué: {e}")
raise RuntimeError(f"Échec Write(): {e}")
# ===== RETOUR =====
logger.info(f"\n{'=' * 70}")
logger.info(f" COLLABORATEUR MODIFIÉ: N°{numero}")
logger.info(f" COLLABORATEUR MODIFIÉ: N°{numero}")
logger.info(f"{'=' * 70}")
return self.lire_collaborateur(numero)
@ -7797,7 +7808,7 @@ class SageConnector:
logger.warning(f"⚠️ Validation: {e}")
raise
except Exception as e:
logger.error(f" Erreur modification collaborateur: {e}", exc_info=True)
logger.error(f" Erreur modification collaborateur: {e}", exc_info=True)
raise RuntimeError(f"Échec modification collaborateur: {str(e)}")
def lire_informations_societe(self):
@ -7812,8 +7823,16 @@ class SageConnector:
societe = society_to_dict(row)
societe["exercices"] = build_exercices(row)
# Stocker le numéro de dossier pour la recherche du logo
self._numero_dossier = societe.get("numero_dossier")
add_logo(societe)
if not societe.get("logo_base64"):
logo_com = recuperer_logo_com(self)
societe.update(logo_com)
logger.info(
f"✓ Informations société '{societe['raison_sociale']}' lues"
)
@ -7822,3 +7841,76 @@ class SageConnector:
except Exception as e:
logger.error(f"✗ Erreur lecture P_DOSSIER: {e}", exc_info=True)
raise RuntimeError(f"Erreur lecture informations société: {str(e)}")
def regler_facture(
self,
numero_facture,
montant,
mode_reglement=2,
date_reglement=None,
reference="",
libelle="",
):
return _regler_facture(
self,
numero_facture,
montant,
mode_reglement,
date_reglement,
reference,
libelle,
)
def regler_factures_client(
self,
client_code: str,
montant_total: float,
mode_reglement: int = 2,
date_reglement: datetime = None,
reference: str = "",
libelle: str = "",
code_journal: str = "BEU",
numeros_factures: List[str] = None,
):
"""Règle plusieurs factures d'un client"""
return _regler_factures_client(
self,
client_code=client_code,
montant_total=montant_total,
mode_reglement=mode_reglement,
date_reglement=date_reglement,
reference=reference,
libelle=libelle,
code_journal=code_journal,
numeros_factures=numeros_factures,
)
def lire_reglements_facture(self, numero_facture: str):
"""Récupère les règlements d'une facture"""
return _lire_reglements_facture(self, numero_facture)
def lire_reglements_client(
self,
client_code: str,
date_debut: datetime = None,
date_fin: datetime = None,
inclure_soldees: bool = True,
):
"""Récupère les règlements d'un client"""
return _lire_reglements_client(
self,
client_code=client_code,
date_debut=date_debut,
date_fin=date_fin,
inclure_soldees=inclure_soldees,
)
def lire_journaux_banque(self):
return _lire_journaux(self)
def lire_tous_journaux(self):
return _lire(self)
def introspecter_reglement(self):
return _intro(self)

View file

@ -0,0 +1,236 @@
"""
Schémas Pydantic pour les règlements de factures
Module: schemas/reglements.py
"""
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional
from datetime import datetime
from enum import IntEnum
class ModeReglement:
"""Modes de règlement Sage 100c"""
VIREMENT = 1
CHEQUE = 2
TRAITE = 3
CARTE_BANCAIRE = 4
LCR = 5
PRELEVEMENT = 6
ESPECES = 7
LIBELLES = {
1: "Virement",
2: "Chèque",
3: "Traite",
4: "Carte bancaire",
5: "LCR",
6: "Prélèvement",
7: "Espèces",
}
@classmethod
def get_libelle(cls, code: int) -> str:
return cls.LIBELLES.get(code, f"Mode {code}")
class ModeReglementEnum(IntEnum):
"""Modes de règlement Sage 100c"""
VIREMENT = 1
CHEQUE = 2
TRAITE = 3
CARTE_BANCAIRE = 4
LCR = 5
PRELEVEMENT = 6
ESPECES = 7
class ReglementFactureRequest(BaseModel):
"""Requête de règlement d'une facture"""
montant: float = Field(..., gt=0, description="Montant du règlement")
mode_reglement: int = Field(
default=2,
ge=1,
le=7,
description="Mode de règlement (1=Virement, 2=Chèque, 3=Traite, 4=CB, 5=LCR, 6=Prélèvement, 7=Espèces)",
)
date_reglement: Optional[datetime] = Field(
default=None, description="Date du règlement (défaut: aujourd'hui)"
)
reference: Optional[str] = Field(
default="", max_length=35, description="Référence du règlement"
)
libelle: Optional[str] = Field(
default="", max_length=69, description="Libellé du règlement"
)
code_journal: str = Field(
default="BEU", max_length=6, description="Code journal comptable"
)
@field_validator("montant")
def validate_montant(cls, v):
if v <= 0:
raise ValueError("Le montant doit être positif")
return round(v, 2)
class Config:
json_schema_extra = {
"example": {
"montant": 375.12,
"mode_reglement": 2,
"date_reglement": "2026-01-06T00:00:00",
"reference": "CHQ-001",
"libelle": "Règlement facture",
"code_journal": "BEU",
}
}
class ReglementMultipleRequest(BaseModel):
"""Requête de règlement multiple pour un client"""
client_code: str = Field(..., description="Code client")
montant_total: float = Field(..., gt=0, description="Montant total à régler")
mode_reglement: int = Field(default=2, ge=1, le=7)
date_reglement: Optional[datetime] = None
reference: Optional[str] = Field(default="", max_length=35)
libelle: Optional[str] = Field(default="", max_length=69)
code_journal: str = Field(default="BEU", max_length=6)
numeros_factures: Optional[List[str]] = Field(
default=None,
description="Liste des factures à régler (sinon: plus anciennes d'abord)",
)
@field_validator("client_code", mode="before")
def strip_client_code(cls, v):
return v.replace("\xa0", "").strip() if v else v
@field_validator("montant_total")
def validate_montant(cls, v):
if v <= 0:
raise ValueError("Le montant doit être positif")
return round(v, 2)
class Config:
json_schema_extra = {
"example": {
"client_code": "CLI000001",
"montant_total": 1000.00,
"mode_reglement": 2,
"reference": "VIR-MULTI-001",
"numeros_factures": ["FA00081", "FA00082"],
}
}
class ReglementResponse(BaseModel):
"""Réponse d'un règlement effectué"""
numero_facture: str
numero_reglement: Optional[str]
montant_regle: float
date_reglement: str
mode_reglement: int
mode_reglement_libelle: str
reference: str
libelle: str
code_journal: str
total_facture: float
solde_restant: float
facture_soldee: bool
client_code: str
class ReglementMultipleResponse(BaseModel):
"""Réponse d'un règlement multiple"""
client_code: str
montant_demande: float
montant_effectif: float
nb_factures_reglees: int
nb_factures_soldees: int
date_reglement: str
mode_reglement: int
mode_reglement_libelle: str
reference: str
reglements: List[ReglementResponse]
class ReglementDetail(BaseModel):
"""Détail d'un règlement"""
id: int
date: Optional[str]
montant: float
reference: str
libelle: str
mode_reglement: int
mode_reglement_libelle: str
code_journal: str
class ReglementsFactureResponse(BaseModel):
"""Réponse: tous les règlements d'une facture"""
numero_facture: str
client_code: str
date_facture: Optional[str]
reference: str
total_ttc: float
total_regle: float
solde_restant: float
est_soldee: bool
nb_reglements: int
reglements: List[ReglementDetail]
class FactureAvecReglements(BaseModel):
"""Facture avec ses règlements"""
numero_facture: str
date_facture: Optional[str]
total_ttc: float
reference: str
total_regle: float
solde_restant: float
est_soldee: bool
nb_reglements: int
reglements: List[ReglementDetail]
class ReglementsClientResponse(BaseModel):
"""Réponse: tous les règlements d'un client"""
client_code: str
client_intitule: str
nb_factures: int
nb_factures_soldees: int
nb_factures_en_cours: int
total_factures: float
total_regle: float
solde_global: float
factures: List[FactureAvecReglements]
class ModeReglementInfo(BaseModel):
"""Information sur un mode de règlement"""
code: int
libelle: str
class ModesReglementResponse(BaseModel):
"""Liste des modes de règlement disponibles"""
modes: List[ModeReglementInfo] = [
ModeReglementInfo(code=1, libelle="Virement"),
ModeReglementInfo(code=2, libelle="Chèque"),
ModeReglementInfo(code=3, libelle="Traite"),
ModeReglementInfo(code=4, libelle="Carte bancaire"),
ModeReglementInfo(code=5, libelle="LCR"),
ModeReglementInfo(code=6, libelle="Prélèvement"),
ModeReglementInfo(code=7, libelle="Espèces"),
]

View file

@ -1204,7 +1204,7 @@ def enrichir_fournisseurs_articles(articles: List[Dict], cursor) -> List[Dict]:
nums_fournisseurs = list(
set(
[
a["fournisseur_principal"]
str(a["fournisseur_principal"]).strip()
for a in articles
if a.get("fournisseur_principal") not in (None, "", " ")
]
@ -1261,8 +1261,12 @@ def enrichir_fournisseurs_articles(articles: List[Dict], cursor) -> List[Dict]:
nb_enrichis = 0
for article in articles:
num_fourn = article.get("fournisseur_principal")
if num_fourn and num_fourn in fournisseur_map:
article["fournisseur_nom"] = fournisseur_map[num_fourn]
# Convertir en string pour correspondre au fournisseur_map
num_fourn_str = (
str(num_fourn).strip() if num_fourn not in (None, "", " ") else None
)
if num_fourn_str and num_fourn_str in fournisseur_map:
article["fournisseur_nom"] = fournisseur_map[num_fourn_str]
nb_enrichis += 1
else:
article["fournisseur_nom"] = None

723
utils/documents/settle.py Normal file
View file

@ -0,0 +1,723 @@
from typing import Dict, List, Optional
import win32com.client
import pywintypes
from datetime import datetime
from schemas.documents.reglements import ModeReglement
import time
import logging
logger = logging.getLogger(__name__)
def _get_journal_auto(self, mode_reglement: int) -> str:
with self._get_sql_connection() as conn:
cursor = conn.cursor()
if mode_reglement == ModeReglement.ESPECES:
cursor.execute(
"SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND CG_Num LIKE '53%' ORDER BY JO_Num"
)
else:
cursor.execute(
"SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 AND CG_Num LIKE '51%' ORDER BY JO_Num"
)
row = cursor.fetchone()
if row:
return row[0].strip()
cursor.execute(
"SELECT TOP 1 JO_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 ORDER BY JO_Num"
)
row = cursor.fetchone()
if row:
return row[0].strip()
raise ValueError("Aucun journal de trésorerie configuré")
def lire_journaux_banque(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT JO_Num, JO_Intitule, CG_Num FROM F_JOURNAUX WHERE JO_Type = 2 AND JO_Reglement = 1 ORDER BY JO_Num"
)
return [
{
"code": row[0].strip(),
"intitule": row[1].strip() if row[1] else "",
"compte_general": row[2].strip() if row[2] else "",
"type": "Caisse" if (row[2] or "").startswith("53") else "Banque",
}
for row in cursor.fetchall()
]
def lire_tous_journaux(self) -> List[Dict]:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
types_libelles = {
0: "Achats",
1: "Ventes",
2: "Trésorerie",
3: "Général",
4: "Situation",
}
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT JO_Num, JO_Intitule, JO_Type, JO_Reglement, CG_Num FROM F_JOURNAUX ORDER BY JO_Type, JO_Num"
)
return [
{
"code": row[0].strip(),
"intitule": row[1].strip() if row[1] else "",
"type_code": row[2],
"type_libelle": types_libelles.get(row[2], f"Type {row[2]}"),
"reglement_actif": row[3] == 1,
"compte_general": row[4].strip() if row[4] else "",
}
for row in cursor.fetchall()
]
def regler_facture(
self,
numero_facture: str,
montant: float,
mode_reglement: int = ModeReglement.CHEQUE,
date_reglement: datetime = None,
reference: str = "",
libelle: str = "",
) -> Dict:
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
if montant <= 0:
raise ValueError("Le montant du règlement doit être positif")
date_reglement = date_reglement or datetime.now()
code_journal = _get_journal_auto(self, mode_reglement)
logger.info(
f"Règlement facture {numero_facture}: {montant}€ (mode: {mode_reglement}, journal: {code_journal})"
)
try:
with self._com_context(), self._lock_com:
factory = self.cial.FactoryDocumentVente
if not factory.ExistPiece(60, numero_facture):
raise ValueError(f"Facture {numero_facture} introuvable")
persist = factory.ReadPiece(60, numero_facture)
doc = win32com.client.CastTo(persist, "IBODocumentVente3")
doc.Read()
total_ttc = float(getattr(doc, "DO_TotalTTC", 0.0))
montant_deja_regle = float(getattr(doc, "DO_MontantRegle", 0.0))
statut = getattr(doc, "DO_Statut", 0)
if statut == 6:
raise ValueError(f"Facture {numero_facture} annulée")
solde_actuel = total_ttc - montant_deja_regle
if montant > solde_actuel + 0.01:
raise ValueError(
f"Montant ({montant}€) supérieur au solde ({solde_actuel:.2f}€)"
)
client_code = ""
try:
client_obj = getattr(doc, "Client", None)
if client_obj:
client_obj.Read()
client_code = getattr(client_obj, "CT_Num", "").strip()
except Exception:
pass
echeance = _get_premiere_echeance(doc)
if not echeance:
raise ValueError(f"Facture {numero_facture} sans échéance")
transaction_active = False
try:
self.cial.CptaApplication.BeginTrans()
transaction_active = True
except Exception:
pass
try:
numero_reglement = _executer_reglement_com(
self,
doc,
echeance,
montant,
mode_reglement,
date_reglement,
reference,
libelle,
code_journal,
client_code,
numero_facture,
)
if transaction_active:
try:
self.cial.CptaApplication.CommitTrans()
except Exception:
pass
time.sleep(0.5)
doc.Read()
nouveau_montant_regle = float(getattr(doc, "DO_MontantRegle", 0.0))
if abs(nouveau_montant_regle - montant_deja_regle) < 0.01:
raise RuntimeError(
"Le règlement n'a pas été appliqué (DO_MontantRegle inchangé)"
)
nouveau_solde = total_ttc - nouveau_montant_regle
logger.info(f"Règlement effectué - Solde restant: {nouveau_solde:.2f}")
return {
"numero_facture": numero_facture,
"numero_reglement": numero_reglement,
"montant_regle": montant,
"date_reglement": date_reglement.strftime("%Y-%m-%d"),
"mode_reglement": mode_reglement,
"mode_reglement_libelle": ModeReglement.get_libelle(mode_reglement),
"reference": reference,
"libelle": libelle,
"code_journal": code_journal,
"total_facture": total_ttc,
"solde_restant": nouveau_solde,
"facture_soldee": nouveau_solde < 0.01,
"client_code": client_code,
}
except Exception:
if transaction_active:
try:
self.cial.CptaApplication.RollbackTrans()
except Exception:
pass
raise
except ValueError:
raise
except Exception as e:
logger.error(f"Erreur règlement: {e}", exc_info=True)
raise RuntimeError(f"Échec règlement facture: {str(e)}")
def _get_premiere_echeance(doc):
try:
factory_ech = getattr(doc, "FactoryDocumentEcheance", None)
if factory_ech:
ech_list = factory_ech.List
if ech_list:
echeance = ech_list.Item(1)
if echeance:
for iface in ["IBODocumentEcheance3", "IBODocumentEcheance"]:
try:
echeance = win32com.client.CastTo(echeance, iface)
logger.info(f" Échéance castée vers {iface}")
break
except Exception:
continue
echeance.Read()
return echeance
except Exception as e:
logger.warning(f" Pas d'échéance: {e}")
return None
def _executer_reglement_com(
self,
doc,
echeance,
montant,
mode_reglement,
date_reglement,
reference,
libelle,
code_journal,
client_code,
numero_facture,
):
erreurs = []
# Approche 1: CreateProcess_ReglerEcheances - Créer règlement puis l'assigner
try:
logger.info(
"Tentative via CreateProcess_ReglerEcheances avec règlement créé..."
)
process = self.cial.CreateProcess_ReglerEcheances()
if process:
# D'abord créer un règlement via FactoryDocumentReglement
factory_reg = self.cial.FactoryDocumentReglement
reg = factory_reg.Create()
reg = win32com.client.CastTo(reg, "IBODocumentReglement")
# Configurer le règlement
try:
journal_factory = self.cial.CptaApplication.FactoryJournal
journal_persist = journal_factory.ReadNumero(code_journal)
if journal_persist:
reg.Journal = journal_persist
logger.info(f" Journal: {code_journal}")
except Exception as e:
logger.warning(f" Journal: {e}")
try:
factory_client = self.cial.CptaApplication.FactoryClient
if client_code:
client_persist = factory_client.ReadNumero(client_code)
if client_persist:
reg.TiersPayeur = client_persist
logger.info(f" TiersPayeur: {client_code}")
except Exception as e:
logger.warning(f" TiersPayeur: {e}")
try:
reg.RG_Date = pywintypes.Time(date_reglement)
except Exception:
pass
try:
reg.RG_Montant = montant
except Exception:
pass
if reference:
try:
reg.RG_Reference = reference
except Exception:
pass
if libelle:
try:
reg.RG_Libelle = libelle
except Exception:
pass
try:
reg.RG_Impute = 1
except Exception:
pass
# Assigner le règlement au process
try:
process.Reglement = reg
logger.info(" Règlement assigné au process")
except Exception as e:
logger.warning(f" Assignation règlement: {e}")
# Ajouter l'échéance avec montant
try:
process.AddDocumentEcheanceMontant(echeance, montant)
logger.info(" Échéance ajoutée avec montant")
except Exception as e1:
logger.debug(f" AddDocumentEcheanceMontant: {e1}")
try:
process.AddDocumentEcheance(echeance)
logger.info(" Échéance ajoutée")
except Exception as e2:
raise RuntimeError(f"AddEcheance: {e2}")
can_process = getattr(process, "CanProcess", True)
logger.info(f" CanProcess: {can_process}")
if can_process:
process.Process()
logger.info(" Process() réussi")
numero = None
try:
result = getattr(process, "ReglementResult", None)
if result:
result.Read()
numero = getattr(result, "RG_Piece", "")
except Exception:
pass
return str(numero) if numero else None
except Exception as e:
erreurs.append(f"CreateProcess avec règlement: {e}")
logger.warning(f"CreateProcess avec règlement échoué: {e}")
# Approche 2: Configurer le Reglement du process directement (toutes propriétés)
try:
logger.info("Tentative via configuration complète Process.Reglement...")
process = self.cial.CreateProcess_ReglerEcheances()
if process:
reglement = getattr(process, "Reglement", None)
if reglement:
# Lister TOUS les attributs
reg_attrs = [a for a in dir(reglement) if not a.startswith("_")]
logger.info(f" Attributs Reglement: {reg_attrs}")
# Configurer TOUT
_set_safe(
reglement, ["RG_Date", "Date"], pywintypes.Time(date_reglement)
)
_set_safe(reglement, ["RG_Montant", "Montant"], montant)
_set_safe(reglement, ["JO_Num", "Journal", "CodeJournal"], code_journal)
_set_safe(
reglement,
["CT_NumPayeur", "CT_Num", "Tiers", "Client"],
client_code,
)
_set_safe(
reglement,
["N_Reglement", "ModeReglement", "RG_ModeReglement"],
mode_reglement,
)
_set_safe(reglement, ["RG_Type", "Type"], 0) # 0 = Client
_set_safe(reglement, ["RG_Impute", "Impute"], 1)
_set_safe(reglement, ["RG_Compta", "Compta"], 0)
if reference:
_set_safe(reglement, ["RG_Reference", "Reference"], reference)
if libelle:
_set_safe(reglement, ["RG_Libelle", "Libelle"], libelle)
# Essayer SetDefault
try:
reglement.SetDefault()
logger.info(" SetDefault() appelé")
except Exception:
pass
logger.info(" Reglement configuré")
# Ajouter l'échéance
try:
process.AddDocumentEcheanceMontant(echeance, montant)
logger.info(" Échéance ajoutée")
except Exception as e1:
try:
process.AddDocumentEcheance(echeance)
logger.info(" Échéance ajoutée (sans montant)")
except Exception as e2:
raise RuntimeError(f"AddEcheance: {e2}")
can_process = getattr(process, "CanProcess", True)
logger.info(f" CanProcess: {can_process}")
# Vérifier les erreurs du process
try:
errors = getattr(process, "Errors", None)
if errors:
err_count = getattr(errors, "Count", 0)
for i in range(1, err_count + 1):
err = errors.Item(i)
logger.warning(f" Erreur process [{i}]: {err}")
except Exception:
pass
if can_process:
process.Process()
logger.info(" Process() réussi")
numero = None
try:
result = getattr(process, "ReglementResult", None)
if result:
result.Read()
numero = getattr(result, "RG_Piece", "")
except Exception:
pass
return str(numero) if numero else None
else:
logger.warning(" CanProcess = False!")
except Exception as e:
erreurs.append(f"Config complète: {e}")
logger.warning(f"Config complète échouée: {e}")
# Approche 3: Utiliser SetDefaultReglement sur le document
try:
logger.info("Tentative via doc.SetDefaultReglement...")
if hasattr(doc, "SetDefaultReglement"):
doc.SetDefaultReglement()
logger.info(" SetDefaultReglement() appelé")
# Configurer via le règlement par défaut
reg = getattr(doc, "Reglement", None)
if reg:
attrs = [a for a in dir(reg) if not a.startswith("_")]
logger.info(f" Attributs doc.Reglement: {attrs[:15]}...")
except Exception as e:
erreurs.append(f"SetDefaultReglement: {e}")
logger.warning(f"SetDefaultReglement: {e}")
raise RuntimeError(f"Aucune méthode n'a fonctionné. Erreurs: {'; '.join(erreurs)}")
def _set_safe(obj, attrs, value):
for attr in attrs:
try:
setattr(obj, attr, value)
logger.debug(f" {attr} = {value}")
return True
except Exception:
continue
return False
def introspecter_reglement(self):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
result = {}
try:
with self._com_context(), self._lock_com:
# Process et son Reglement
try:
process = self.cial.CreateProcess_ReglerEcheances()
result["Process"] = [a for a in dir(process) if not a.startswith("_")]
reglement = getattr(process, "Reglement", None)
if reglement:
result["Process_Reglement"] = [
a for a in dir(reglement) if not a.startswith("_")
]
# Essayer de lire les valeurs par défaut
for attr in ["RG_Type", "RG_Impute", "JO_Num", "CT_NumPayeur"]:
try:
val = getattr(reglement, attr, "N/A")
result[f"Reglement_{attr}"] = str(val)
except Exception:
pass
except Exception as e:
result["error_process"] = str(e)
# IBODocumentReglement
try:
factory = self.cial.FactoryDocumentReglement
reg = factory.Create()
reg = win32com.client.CastTo(reg, "IBODocumentReglement")
result["IBODocumentReglement"] = [
a for a in dir(reg) if not a.startswith("_")
]
except Exception as e:
result["error_reglement"] = str(e)
# Échéance
try:
factory_doc = self.cial.FactoryDocumentVente
doc_list = factory_doc.List
for i in range(1, 20):
try:
doc = doc_list.Item(i)
if doc:
doc.Read()
if getattr(doc, "DO_Type", 0) == 6:
factory_ech = getattr(
doc, "FactoryDocumentEcheance", None
)
if factory_ech:
ech_list = factory_ech.List
if ech_list:
ech = ech_list.Item(1)
if ech:
ech = win32com.client.CastTo(
ech, "IBODocumentEcheance3"
)
ech.Read()
result["IBODocumentEcheance3"] = [
a
for a in dir(ech)
if not a.startswith("_")
]
# Mode règlement de l'échéance
mode = getattr(ech, "Reglement", None)
if mode:
result["Echeance_Reglement"] = [
a
for a in dir(mode)
if not a.startswith("_")
]
break
except Exception:
continue
except Exception as e:
result["error_echeance"] = str(e)
except Exception as e:
result["global_error"] = str(e)
return result
def regler_factures_client(
self,
client_code,
montant_total,
mode_reglement=ModeReglement.CHEQUE,
date_reglement=None,
reference="",
libelle="",
numeros_factures=None,
):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
if montant_total <= 0:
raise ValueError("Le montant total doit être positif")
date_reglement = date_reglement or datetime.now()
factures = _get_factures_non_soldees_client_sql(self, client_code, numeros_factures)
if not factures:
raise ValueError(f"Aucune facture à régler pour {client_code}")
solde_total = sum(f["solde"] for f in factures)
if montant_total > solde_total + 0.01:
raise ValueError(
f"Montant ({montant_total}€) supérieur au solde ({solde_total:.2f}€)"
)
reglements = []
restant = montant_total
for fac in factures:
if restant < 0.01:
break
a_regler = min(restant, fac["solde"])
try:
res = regler_facture(
self,
fac["numero"],
a_regler,
mode_reglement,
date_reglement,
reference,
libelle,
)
reglements.append(res)
restant -= a_regler
except Exception as e:
logger.error(f"Erreur {fac['numero']}: {e}")
break
if not reglements:
raise RuntimeError("Aucun règlement effectué")
return {
"client_code": client_code,
"montant_demande": montant_total,
"montant_effectif": sum(r["montant_regle"] for r in reglements),
"nb_factures_reglees": len(reglements),
"nb_factures_soldees": sum(1 for r in reglements if r["facture_soldee"]),
"date_reglement": date_reglement.strftime("%Y-%m-%d"),
"mode_reglement": mode_reglement,
"mode_reglement_libelle": ModeReglement.get_libelle(mode_reglement),
"reference": reference,
"reglements": reglements,
}
def _get_factures_non_soldees_client_sql(self, client_code, numeros=None):
with self._get_sql_connection() as conn:
cursor = conn.cursor()
if numeros:
placeholders = ",".join("?" * len(numeros))
query = f"SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Piece IN ({placeholders}) AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC"
params = [client_code] + numeros
else:
query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ? AND DO_Statut <> 6 AND (DO_TotalTTC - ISNULL(DO_MontantRegle, 0)) > 0.01 ORDER BY DO_Date ASC"
params = [client_code]
cursor.execute(query, params)
return [
{
"numero": r[0].strip(),
"date": r[1].strftime("%Y-%m-%d") if r[1] else None,
"total_ttc": float(r[2] or 0),
"montant_regle": float(r[3] or 0),
"solde": float(r[2] or 0) - float(r[3] or 0),
}
for r in cursor.fetchall()
]
def lire_reglements_facture(self, numero_facture):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT DO_TotalTTC, DO_MontantRegle, DO_Tiers, DO_Date, DO_Ref FROM F_DOCENTETE WHERE DO_Piece = ? AND DO_Type = 6",
(numero_facture,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Facture {numero_facture} introuvable")
total = float(row[0] or 0)
regle = float(row[1] or 0)
solde = max(0, total - regle)
return {
"numero_facture": numero_facture,
"client_code": (row[2] or "").strip(),
"date_facture": row[3].strftime("%Y-%m-%d") if row[3] else None,
"reference": (row[4] or "").strip(),
"total_ttc": total,
"total_regle": regle,
"solde_restant": solde,
"est_soldee": solde < 0.01,
}
def lire_reglements_client(
self, client_code, date_debut=None, date_fin=None, inclure_soldees=True
):
if not self.cial:
raise RuntimeError("Connexion Sage non établie")
with self._get_sql_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT CT_Intitule FROM F_COMPTET WHERE CT_Num = ?", (client_code,)
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Client {client_code} introuvable")
intitule = (row[0] or "").strip()
query = "SELECT DO_Piece, DO_Date, DO_TotalTTC, DO_MontantRegle, DO_Ref FROM F_DOCENTETE WHERE DO_Type = 6 AND DO_Tiers = ?"
params = [client_code]
if date_debut:
query += " AND DO_Date >= ?"
params.append(date_debut)
if date_fin:
query += " AND DO_Date <= ?"
params.append(date_fin)
query += " ORDER BY DO_Date ASC"
cursor.execute(query, params)
factures = []
for r in cursor.fetchall():
total = float(r[2] or 0)
regle = float(r[3] or 0)
solde = max(0, total - regle)
soldee = solde < 0.01
if inclure_soldees or not soldee:
factures.append(
{
"numero_facture": r[0].strip(),
"date_facture": r[1].strftime("%Y-%m-%d") if r[1] else None,
"total_ttc": total,
"reference": (r[4] or "").strip(),
"total_regle": regle,
"solde_restant": solde,
"est_soldee": soldee,
}
)
return {
"client_code": client_code,
"client_intitule": intitule,
"nb_factures": len(factures),
"nb_factures_soldees": sum(1 for f in factures if f["est_soldee"]),
"nb_factures_en_cours": sum(1 for f in factures if not f["est_soldee"]),
"total_factures": sum(f["total_ttc"] for f in factures),
"total_regle": sum(f["total_regle"] for f in factures),
"solde_global": sum(f["solde_restant"] for f in factures),
"factures": factures,
}
__all__ = [
"ModeReglement",
"lire_journaux_banque",
"lire_tous_journaux",
"introspecter_reglement",
"regler_facture",
"regler_factures_client",
"lire_reglements_facture",
"lire_reglements_client",
]

View file

@ -176,7 +176,7 @@ def creer_document_vente(
)
logger.info(
f" {config.nom_document.upper()} CRÉÉ: "
f" {config.nom_document.upper()} CRÉÉ: "
f"{numero_document} - {doc_final_data['total_ttc']}€ TTC"
)
@ -222,11 +222,11 @@ def _appliquer_remise_ligne(ligne_obj, remise_pourcent: float) -> bool:
# 4. Write la ligne
ligne_obj.Write()
logger.info(f" Remise {remise_pourcent}% appliquée")
logger.info(f" Remise {remise_pourcent}% appliquée")
return True
except Exception as e:
logger.error(f" Erreur remise: {e}")
logger.error(f" Erreur remise: {e}")
return False
@ -540,7 +540,7 @@ def modifier_document_vente(
doc.Read()
logger.info(" ✓ Write() basique OK")
except Exception as e:
logger.error(f" Document verrouillé: {e}")
logger.error(f" Document verrouillé: {e}")
raise ValueError(f"Document verrouillé: {e}")
# ==========================================
@ -692,7 +692,7 @@ def modifier_document_vente(
)
resultat["champs_modifies"] = champs_modifies
logger.info(f" {config.nom_document.upper()} {numero} MODIFIÉ")
logger.info(f" {config.nom_document.upper()} {numero} MODIFIÉ")
logger.info(
f" Totaux: {resultat['total_ht']}€ HT / {resultat['total_ttc']}€ TTC"
)
@ -701,10 +701,10 @@ def modifier_document_vente(
return resultat
except ValueError as e:
logger.error(f" ERREUR MÉTIER: {e}")
logger.error(f" ERREUR MÉTIER: {e}")
raise
except Exception as e:
logger.error(f" ERREUR TECHNIQUE: {e}", exc_info=True)
logger.error(f" ERREUR TECHNIQUE: {e}", exc_info=True)
raise RuntimeError(f"Erreur Sage: {str(e)}")

View file

@ -1,3 +1,4 @@
from typing import Optional
from pathlib import Path
import base64
import logging
@ -82,8 +83,50 @@ def add_logo(societe_dict: dict) -> None:
societe_dict["logo_content_type"] = None
__all__ = [
"get_societe_row",
"build_exercices",
"add_logo",
]
def recuperer_logo_com(sage_instance) -> dict:
"""Cherche le logo dans les répertoires standards"""
return _chercher_logo_standards()
def _chercher_logo_standards() -> dict:
"""Cherche dans les répertoires standards Sage"""
chemins = [
Path("C:/ProgramData/Sage/Logo"),
Path("C:/ProgramData/Sage/Sage 100/Logo"),
Path("C:/Users/Public/Documents/Sage"),
Path(r"C:\Program Files\Sage\Sage 100\Bitmap"),
Path(r"C:\Program Files (x86)\Sage\Sage 100\Bitmap"),
]
for repertoire in chemins:
if not repertoire.exists():
continue
for ext in [".bmp", ".jpg", ".jpeg", ".png", ".gif"]:
for fichier in repertoire.glob(f"*{ext}"):
logger.info(f"Logo trouvé: {fichier}")
return _convertir_fichier_logo(str(fichier))
logger.info("Aucun logo trouvé")
return {"logo_base64": None, "logo_content_type": None}
def _convertir_fichier_logo(chemin: str) -> dict:
"""Convertit image en base64"""
ext = Path(chemin).suffix.lower()
content_type = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".bmp": "image/bmp",
".gif": "image/gif",
}.get(ext, "image/png")
with open(chemin, "rb") as f:
return {
"logo_base64": base64.b64encode(f.read()).decode("utf-8"),
"logo_content_type": content_type,
}
__all__ = ["get_societe_row", "build_exercices", "add_logo", "recuperer_logo_com"]