Sage100-vps/api.py

1544 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Query, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field, EmailStr
from typing import List, Optional, Dict
from datetime import date, datetime
from enum import Enum
import uvicorn
from contextlib import asynccontextmanager
import uuid
import csv
import io
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
# Configuration logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("sage_api.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# Imports locaux
from config import settings
from database import (
init_db,
async_session_factory,
get_session,
EmailLog,
StatutEmail as StatutEmailEnum,
WorkflowLog,
SignatureLog,
StatutSignature as StatutSignatureEnum,
)
from email_queue import email_queue
from sage_client import sage_client
# =====================================================
# ENUMS
# =====================================================
class TypeDocument(int, Enum):
DEVIS = 0
BON_LIVRAISON = 1
BON_RETOUR = 2
COMMANDE = 3
PREPARATION = 4
FACTURE = 5
class StatutSignature(str, Enum):
EN_ATTENTE = "EN_ATTENTE"
ENVOYE = "ENVOYE"
SIGNE = "SIGNE"
REFUSE = "REFUSE"
EXPIRE = "EXPIRE"
class StatutEmail(str, Enum):
EN_ATTENTE = "EN_ATTENTE"
EN_COURS = "EN_COURS"
ENVOYE = "ENVOYE"
OUVERT = "OUVERT"
ERREUR = "ERREUR"
BOUNCE = "BOUNCE"
# =====================================================
# MODÈLES PYDANTIC
# =====================================================
class ClientResponse(BaseModel):
numero: str
intitule: str
adresse: Optional[str] = None
code_postal: Optional[str] = None
ville: Optional[str] = None
email: Optional[str] = None
telephone: Optional[str] = None
class ArticleResponse(BaseModel):
reference: str
designation: str
prix_vente: float
stock_reel: float
class LigneDevis(BaseModel):
article_code: str
quantite: float
prix_unitaire_ht: Optional[float] = None
remise_pourcentage: Optional[float] = 0.0
class DevisRequest(BaseModel):
client_id: str
date_devis: Optional[date] = None
lignes: List[LigneDevis]
class DevisResponse(BaseModel):
id: str
client_id: str
date_devis: str
montant_total_ht: float
montant_total_ttc: float
nb_lignes: int
class SignatureRequest(BaseModel):
doc_id: str
type_doc: TypeDocument
email_signataire: EmailStr
nom_signataire: str
class EmailEnvoiRequest(BaseModel):
destinataire: EmailStr
cc: Optional[List[EmailStr]] = []
cci: Optional[List[EmailStr]] = []
sujet: str
corps_html: str
document_ids: Optional[List[str]] = None
type_document: Optional[TypeDocument] = None
class RelanceDevisRequest(BaseModel):
doc_id: str
message_personnalise: Optional[str] = None
class BaremeRemiseResponse(BaseModel):
client_id: str
remise_max_autorisee: float
remise_demandee: float
autorisee: bool
message: str
# =====================================================
# SERVICES EXTERNES (Universign)
# =====================================================
async def universign_envoyer(
doc_id: str, pdf_bytes: bytes, email: str, nom: str
) -> Dict:
"""Envoi signature via API Universign"""
import requests
try:
api_key = settings.universign_api_key
api_url = settings.universign_api_url
auth = (api_key, "")
# Étape 1: Créer transaction
response = requests.post(
f"{api_url}/transactions",
auth=auth,
json={"name": f"Devis {doc_id}", "language": "fr"},
timeout=30,
)
response.raise_for_status()
transaction_id = response.json().get("id")
# Étape 2: Upload PDF
files = {"file": (f"Devis_{doc_id}.pdf", pdf_bytes, "application/pdf")}
response = requests.post(f"{api_url}/files", auth=auth, files=files, timeout=30)
response.raise_for_status()
file_id = response.json().get("id")
# Étape 3: Ajouter document
response = requests.post(
f"{api_url}/transactions/{transaction_id}/documents",
auth=auth,
data={"document": file_id},
timeout=30,
)
response.raise_for_status()
document_id = response.json().get("id")
# Étape 4: Créer champ signature
response = requests.post(
f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields",
auth=auth,
data={"type": "signature"},
timeout=30,
)
response.raise_for_status()
field_id = response.json().get("id")
# Étape 5: Assigner signataire
response = requests.post(
f"{api_url}/transactions/{transaction_id}/signatures",
auth=auth,
data={"signer": email, "field": field_id},
timeout=30,
)
response.raise_for_status()
# Étape 6: Démarrer transaction
response = requests.post(
f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30
)
response.raise_for_status()
final_data = response.json()
signer_url = (
final_data.get("actions", [{}])[0].get("url", "")
if final_data.get("actions")
else ""
)
logger.info(f"✅ Signature Universign envoyée: {transaction_id}")
return {
"transaction_id": transaction_id,
"signer_url": signer_url,
"statut": "ENVOYE",
}
except Exception as e:
logger.error(f"❌ Erreur Universign: {e}")
return {"error": str(e), "statut": "ERREUR"}
async def universign_statut(transaction_id: str) -> Dict:
"""Récupération statut signature"""
import requests
try:
response = requests.get(
f"{settings.universign_api_url}/transactions/{transaction_id}",
auth=(settings.universign_api_key, ""),
timeout=10,
)
if response.status_code == 200:
data = response.json()
statut_map = {
"draft": "EN_ATTENTE",
"started": "EN_ATTENTE",
"completed": "SIGNE",
"refused": "REFUSE",
"expired": "EXPIRE",
"canceled": "REFUSE",
}
return {
"statut": statut_map.get(data.get("state"), "EN_ATTENTE"),
"date_signature": data.get("completed_at"),
}
else:
return {"statut": "ERREUR"}
except Exception as e:
logger.error(f"Erreur statut Universign: {e}")
return {"statut": "ERREUR", "error": str(e)}
# =====================================================
# CYCLE DE VIE
# =====================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
# Init base de données
await init_db()
logger.info("✅ Base de données initialisée")
# Injecter session_factory dans email_queue
email_queue.session_factory = async_session_factory
# ⚠️ PAS de sage_connector ici (c'est sur Windows !)
# email_queue utilisera sage_client pour générer les PDFs via HTTP
# Démarrer queue
email_queue.start(num_workers=settings.max_email_workers)
logger.info(f"✅ Email queue démarrée")
yield
# Cleanup
email_queue.stop()
logger.info("👋 Services arrêtés")
# =====================================================
# APPLICATION
# =====================================================
app = FastAPI(
title="API Sage 100c Dataven",
version="2.0.0",
description="API de gestion commerciale - VPS Linux",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
allow_credentials=True,
)
# =====================================================
# ENDPOINTS - US-A1 (CRÉATION RAPIDE DEVIS)
# =====================================================
@app.get("/clients", response_model=List[ClientResponse], tags=["US-A1"])
async def rechercher_clients(query: Optional[str] = Query(None)):
"""🔍 Recherche clients via gateway Windows"""
try:
clients = sage_client.lister_clients(filtre=query or "")
return [ClientResponse(**c) for c in clients]
except Exception as e:
logger.error(f"Erreur recherche clients: {e}")
raise HTTPException(500, str(e))
@app.get("/articles", response_model=List[ArticleResponse], tags=["US-A1"])
async def rechercher_articles(query: Optional[str] = Query(None)):
"""🔍 Recherche articles via gateway Windows"""
try:
articles = sage_client.lister_articles(filtre=query or "")
return [ArticleResponse(**a) for a in articles]
except Exception as e:
logger.error(f"Erreur recherche articles: {e}")
raise HTTPException(500, str(e))
@app.post("/devis", response_model=DevisResponse, status_code=201, tags=["US-A1"])
async def creer_devis(devis: DevisRequest):
"""📝 Création de devis via gateway Windows"""
try:
# Préparer les données pour la gateway
devis_data = {
"client_id": devis.client_id,
"date_devis": devis.date_devis.isoformat() if devis.date_devis else None,
"lignes": [
{
"article_code": l.article_code,
"quantite": l.quantite,
"prix_unitaire_ht": l.prix_unitaire_ht,
"remise_pourcentage": l.remise_pourcentage,
}
for l in devis.lignes
],
}
# Appel HTTP vers Windows
resultat = sage_client.creer_devis(devis_data)
logger.info(f"✅ Devis créé: {resultat.get('numero_devis')}")
return DevisResponse(
id=resultat["numero_devis"],
client_id=devis.client_id,
date_devis=resultat["date_devis"],
montant_total_ht=resultat["total_ht"],
montant_total_ttc=resultat["total_ttc"],
nb_lignes=resultat["nb_lignes"],
)
except Exception as e:
logger.error(f"Erreur création devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis", tags=["US-A1"])
async def lister_devis(
limit: int = Query(100, le=1000),
statut: Optional[int] = Query(None),
inclure_lignes: bool = Query(
True, description="Inclure les lignes de chaque devis"
),
):
"""
📋 Liste tous les devis via gateway Windows
Args:
limit: Nombre maximum de devis à retourner
statut: Filtre par statut (0=Devis, 2=Accepté, 5=Transformé, etc.)
inclure_lignes: Si True, charge les lignes de chaque devis (par défaut: True)
✅ AMÉLIORATION: Les lignes sont maintenant incluses par défaut
"""
try:
devis_list = sage_client.lister_devis(
limit=limit, statut=statut, inclure_lignes=inclure_lignes
)
return devis_list
except Exception as e:
logger.error(f"Erreur liste devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis/{id}", tags=["US-A1"])
async def lire_devis(id: str):
"""📄 Lecture d'un devis via gateway Windows"""
try:
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
return devis
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis/{id}/pdf", tags=["US-A1"])
async def telecharger_devis_pdf(id: str):
"""📄 Téléchargement PDF (généré via email_queue)"""
try:
# Générer PDF en appelant la méthode de email_queue
# qui elle-même appellera sage_client pour récupérer les données
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
return StreamingResponse(
iter([pdf_bytes]),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=Devis_{id}.pdf"},
)
except Exception as e:
logger.error(f"Erreur génération PDF: {e}")
raise HTTPException(500, str(e))
@app.post("/devis/{id}/envoyer", tags=["US-A1"])
async def envoyer_devis_email(
id: str, request: EmailEnvoiRequest, session: AsyncSession = Depends(get_session)
):
"""📧 Envoi devis par email"""
try:
# Vérifier que le devis existe
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Créer logs email pour chaque destinataire
tous_destinataires = [request.destinataire] + request.cc + request.cci
email_logs = []
for dest in tous_destinataires:
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=dest,
sujet=request.sujet,
corps_html=request.corps_html,
document_ids=id,
type_document=TypeDocument.DEVIS,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
email_queue.enqueue(email_log.id)
email_logs.append(email_log.id)
await session.commit()
logger.info(
f"✅ Email devis {id} mis en file: {len(tous_destinataires)} destinataire(s)"
)
return {
"success": True,
"email_log_ids": email_logs,
"devis_id": id,
"message": f"{len(tous_destinataires)} email(s) en file d'attente",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur envoi email: {e}")
raise HTTPException(500, str(e))
@app.put("/devis/{id}/statut", tags=["US-A1"])
async def changer_statut_devis(id: str, nouveau_statut: int = Query(..., ge=0, le=5)):
"""
📄 Changement de statut d'un devis via gateway Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
resultat = sage_client.changer_statut_devis(id, nouveau_statut)
logger.info(f"✅ Statut devis {id} changé: {nouveau_statut}")
return {
"success": True,
"devis_id": id,
"statut_ancien": resultat.get("statut_ancien"),
"statut_nouveau": resultat.get("statut_nouveau"),
"message": "Statut mis à jour avec succès",
}
except Exception as e:
logger.error(f"Erreur changement statut: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A2 (WORKFLOW SANS RESSAISIE)
# =====================================================
@app.get("/commandes", tags=["US-A2"])
async def lister_commandes(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""
📋 Liste toutes les commandes via gateway Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
commandes = sage_client.lister_commandes(limit=limit, statut=statut)
return commandes
except Exception as e:
logger.error(f"Erreur liste commandes: {e}")
raise HTTPException(500, str(e))
@app.post("/workflow/devis/{id}/to-commande", tags=["US-A2"])
async def devis_vers_commande(id: str, session: AsyncSession = Depends(get_session)):
"""🔧 Transformation Devis → Commande via gateway Windows"""
try:
# Appel HTTP vers Windows
resultat = sage_client.transformer_document(
numero_source=id,
type_source=TypeDocument.DEVIS,
type_cible=TypeDocument.COMMANDE,
)
# Logger en DB
workflow_log = WorkflowLog(
id=str(uuid.uuid4()),
document_source=id,
type_source=TypeDocument.DEVIS,
document_cible=resultat.get("document_cible", ""),
type_cible=TypeDocument.COMMANDE,
nb_lignes=resultat.get("nb_lignes", 0),
date_transformation=datetime.now(),
succes=True,
)
session.add(workflow_log)
await session.commit()
logger.info(
f"✅ Transformation: Devis {id} → Commande {resultat['document_cible']}"
)
return {
"success": True,
"document_source": id,
"document_cible": resultat["document_cible"],
"nb_lignes": resultat["nb_lignes"],
}
except Exception as e:
logger.error(f"Erreur transformation: {e}")
raise HTTPException(500, str(e))
@app.post("/workflow/commande/{id}/to-facture", tags=["US-A2"])
async def commande_vers_facture(id: str, session: AsyncSession = Depends(get_session)):
"""🔧 Transformation Commande → Facture via gateway Windows"""
try:
resultat = sage_client.transformer_document(
numero_source=id,
type_source=TypeDocument.COMMANDE,
type_cible=TypeDocument.FACTURE,
)
workflow_log = WorkflowLog(
id=str(uuid.uuid4()),
document_source=id,
type_source=TypeDocument.COMMANDE,
document_cible=resultat.get("document_cible", ""),
type_cible=TypeDocument.FACTURE,
nb_lignes=resultat.get("nb_lignes", 0),
date_transformation=datetime.now(),
succes=True,
)
session.add(workflow_log)
await session.commit()
return resultat
except Exception as e:
logger.error(f"Erreur transformation: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A3 (SIGNATURE ÉLECTRONIQUE)
# =====================================================
@app.post("/signature/universign/send", tags=["US-A3"])
async def envoyer_signature(
demande: SignatureRequest, session: AsyncSession = Depends(get_session)
):
"""✍️ Envoi document pour signature Universign"""
try:
# Générer PDF
pdf_bytes = email_queue._generate_pdf(demande.doc_id, demande.type_doc)
# Envoi Universign
resultat = await universign_envoyer(
demande.doc_id, pdf_bytes, demande.email_signataire, demande.nom_signataire
)
if "error" in resultat:
raise HTTPException(500, resultat["error"])
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=demande.doc_id,
type_document=demande.type_doc,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=demande.email_signataire,
nom_signataire=demande.nom_signataire,
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
)
session.add(signature_log)
await session.commit()
# MAJ champ libre Sage via gateway Windows
sage_client.mettre_a_jour_champ_libre(
demande.doc_id, demande.type_doc, "UniversignID", resultat["transaction_id"]
)
logger.info(f"✅ Signature envoyée: {demande.doc_id}")
return {
"success": True,
"transaction_id": resultat["transaction_id"],
"signer_url": resultat["signer_url"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur signature: {e}")
raise HTTPException(500, str(e))
@app.get("/signature/universign/status", tags=["US-A3"])
async def statut_signature(docId: str = Query(...)):
"""🔍 Récupération du statut de signature en temps réel"""
# Chercher dans la DB locale
try:
async with async_session_factory() as session:
query = select(SignatureLog).where(SignatureLog.document_id == docId)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, "Signature introuvable")
# Interroger Universign
statut = await universign_statut(signature_log.transaction_id)
return {
"doc_id": docId,
"statut": statut["statut"],
"date_signature": statut.get("date_signature"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur statut signature: {e}")
raise HTTPException(500, str(e))
@app.get("/signatures", tags=["US-A3"])
async def lister_signatures(
statut: Optional[StatutSignature] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 Liste toutes les demandes de signature"""
query = select(SignatureLog).order_by(SignatureLog.date_envoi.desc())
if statut:
statut_db = StatutSignatureEnum[statut.value]
query = query.where(SignatureLog.statut == statut_db)
query = query.limit(limit)
result = await session.execute(query)
signatures = result.scalars().all()
return [
{
"id": sig.id,
"document_id": sig.document_id,
"type_document": sig.type_document.value,
"transaction_id": sig.transaction_id,
"signer_url": sig.signer_url,
"email_signataire": sig.email_signataire,
"nom_signataire": sig.nom_signataire,
"statut": sig.statut.value,
"date_envoi": sig.date_envoi.isoformat() if sig.date_envoi else None,
"date_signature": (
sig.date_signature.isoformat() if sig.date_signature else None
),
"est_relance": sig.est_relance,
"nb_relances": sig.nb_relances or 0,
}
for sig in signatures
]
@app.get("/signatures/{transaction_id}/status", tags=["US-A3"])
async def statut_signature_detail(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""🔍 Récupération du statut détaillé d'une signature"""
query = select(SignatureLog).where(SignatureLog.transaction_id == transaction_id)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
# Interroger Universign
statut_universign = await universign_statut(transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"EN_ATTENTE": StatutSignatureEnum.EN_ATTENTE,
"ENVOYE": StatutSignatureEnum.ENVOYE,
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau_statut = statut_map.get(
statut_universign["statut"], StatutSignatureEnum.EN_ATTENTE
)
signature_log.statut = nouveau_statut
if statut_universign.get("date_signature"):
signature_log.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
await session.commit()
return {
"transaction_id": transaction_id,
"document_id": signature_log.document_id,
"statut": signature_log.statut.value,
"email_signataire": signature_log.email_signataire,
"date_envoi": (
signature_log.date_envoi.isoformat() if signature_log.date_envoi else None
),
"date_signature": (
signature_log.date_signature.isoformat()
if signature_log.date_signature
else None
),
"signer_url": signature_log.signer_url,
}
@app.post("/signatures/refresh-all", tags=["US-A3"])
async def rafraichir_statuts_signatures(session: AsyncSession = Depends(get_session)):
"""🔄 Rafraîchit TOUS les statuts des signatures en attente"""
query = select(SignatureLog).where(
SignatureLog.statut.in_(
[StatutSignatureEnum.EN_ATTENTE, StatutSignatureEnum.ENVOYE]
)
)
result = await session.execute(query)
signatures = result.scalars().all()
nb_mises_a_jour = 0
for sig in signatures:
try:
statut_universign = await universign_statut(sig.transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau = statut_map.get(statut_universign["statut"])
if nouveau and nouveau != sig.statut:
sig.statut = nouveau
if statut_universign.get("date_signature"):
sig.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
nb_mises_a_jour += 1
except Exception as e:
logger.error(f"Erreur refresh signature {sig.transaction_id}: {e}")
continue
await session.commit()
return {
"success": True,
"nb_signatures_verifiees": len(signatures),
"nb_mises_a_jour": nb_mises_a_jour,
}
@app.post("/devis/{id}/signer", tags=["US-A3"])
async def envoyer_devis_signature(
id: str, request: SignatureRequest, session: AsyncSession = Depends(get_session)
):
"""✏️ Envoi d'un devis pour signature électronique"""
try:
# Vérifier devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Générer PDF
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
# Envoi Universign
resultat = await universign_envoyer(
id, pdf_bytes, request.email_signataire, request.nom_signataire
)
if "error" in resultat:
raise HTTPException(500, f"Erreur Universign: {resultat['error']}")
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=id,
type_document=TypeDocument.DEVIS,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=request.email_signataire,
nom_signataire=request.nom_signataire,
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
)
session.add(signature_log)
await session.commit()
# MAJ champ libre Sage via gateway
sage_client.mettre_a_jour_champ_libre(
id, TypeDocument.DEVIS, "UniversignID", resultat["transaction_id"]
)
return {
"success": True,
"devis_id": id,
"transaction_id": resultat["transaction_id"],
"signer_url": resultat["signer_url"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur envoi signature: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A4 - ENVOI EMAILS EN LOT
# ============================================
class EmailBatchRequest(BaseModel):
destinataires: List[EmailStr] = Field(..., min_length=1, max_length=100)
sujet: str = Field(..., min_length=1, max_length=500)
corps_html: str = Field(..., min_length=1)
document_ids: Optional[List[str]] = None
type_document: Optional[TypeDocument] = None
@app.post("/emails/send-batch", tags=["US-A4"])
async def envoyer_emails_lot(
batch: EmailBatchRequest, session: AsyncSession = Depends(get_session)
):
"""📧 US-A4: Envoi groupé via email_queue"""
resultats = []
for destinataire in batch.destinataires:
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=destinataire,
sujet=batch.sujet,
corps_html=batch.corps_html,
document_ids=",".join(batch.document_ids) if batch.document_ids else None,
type_document=batch.type_document,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
email_queue.enqueue(email_log.id)
resultats.append(
{
"destinataire": destinataire,
"log_id": email_log.id,
"statut": "EN_ATTENTE",
}
)
await session.commit()
nb_documents = len(batch.document_ids) if batch.document_ids else 0
logger.info(
f"{len(batch.destinataires)} emails mis en file avec {nb_documents} docs"
)
return {
"total": len(batch.destinataires),
"succes": len(batch.destinataires),
"documents_attaches": nb_documents,
"details": resultats,
}
# =====================================================
# ENDPOINTS - US-A5
# =====================================================
@app.post("/devis/valider-remise", response_model=BaremeRemiseResponse, tags=["US-A5"])
async def valider_remise(
client_id: str = Query(..., min_length=1),
remise_pourcentage: float = Query(0.0, ge=0, le=100),
):
"""
💰 US-A5: Validation remise via barème client Sage
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
remise_max = sage_client.lire_remise_max_client(client_id)
autorisee = remise_pourcentage <= remise_max
if not autorisee:
message = f"⚠️ Remise trop élevée (max autorisé: {remise_max}%)"
logger.warning(
f"Remise refusée pour {client_id}: {remise_pourcentage}% > {remise_max}%"
)
else:
message = "✅ Remise autorisée"
return BaremeRemiseResponse(
client_id=client_id,
remise_max_autorisee=remise_max,
remise_demandee=remise_pourcentage,
autorisee=autorisee,
message=message,
)
except Exception as e:
logger.error(f"Erreur validation remise: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A6 (RELANCE DEVIS)
# =====================================================
@app.post("/devis/{id}/relancer-signature", tags=["US-A6"])
async def relancer_devis_signature(
id: str, relance: RelanceDevisRequest, session: AsyncSession = Depends(get_session)
):
"""📧 Relance devis via Universign"""
try:
# Lire devis via gateway
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Récupérer contact via gateway
contact = sage_client.lire_contact_client(devis["client_code"])
if not contact or not contact.get("email"):
raise HTTPException(400, "Aucun email trouvé pour ce client")
# Générer PDF
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
# Envoi Universign
resultat = await universign_envoyer(
id,
pdf_bytes,
contact["email"],
contact["nom"] or contact["client_intitule"],
)
if "error" in resultat:
raise HTTPException(500, resultat["error"])
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=id,
type_document=TypeDocument.DEVIS,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=contact["email"],
nom_signataire=contact["nom"] or contact["client_intitule"],
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
est_relance=True,
nb_relances=1,
)
session.add(signature_log)
await session.commit()
return {
"success": True,
"devis_id": id,
"transaction_id": resultat["transaction_id"],
"message": "Relance signature envoyée",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur relance: {e}")
raise HTTPException(500, str(e))
class ContactClientResponse(BaseModel):
client_code: str
client_intitule: str
email: Optional[str]
nom: Optional[str]
telephone: Optional[str]
peut_etre_relance: bool
@app.get("/devis/{id}/contact", response_model=ContactClientResponse, tags=["US-A6"])
async def recuperer_contact_devis(id: str):
"""👤 US-A6: Récupération du contact client associé au devis"""
try:
# Lire devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Lire contact via gateway Windows
contact = sage_client.lire_contact_client(devis["client_code"])
if not contact:
raise HTTPException(
404, f"Contact introuvable pour client {devis['client_code']}"
)
peut_relancer = bool(contact.get("email"))
return ContactClientResponse(**contact, peut_etre_relance=peut_relancer)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur récupération contact: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A7
# =====================================================
@app.get("/factures", tags=["US-A7"])
async def lister_factures(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""
📋 Liste toutes les factures via gateway Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
factures = sage_client.lister_factures(limit=limit, statut=statut)
return factures
except Exception as e:
logger.error(f"Erreur liste factures: {e}")
raise HTTPException(500, str(e))
class RelanceFactureRequest(BaseModel):
doc_id: str
message_personnalise: Optional[str] = None
# Templates email (si pas déjà définis)
templates_email_db = {
"relance_facture": {
"id": "relance_facture",
"nom": "Relance Facture",
"sujet": "Rappel - Facture {{DO_Piece}}",
"corps_html": """
<p>Bonjour {{CT_Intitule}},</p>
<p>La facture <strong>{{DO_Piece}}</strong> du {{DO_Date}}
d'un montant de <strong>{{DO_TotalTTC}}€ TTC</strong> reste impayée.</p>
<p>Merci de régulariser dans les meilleurs délais.</p>
<p>Cordialement,</p>
""",
"variables_disponibles": [
"DO_Piece",
"DO_Date",
"CT_Intitule",
"DO_TotalHT",
"DO_TotalTTC",
],
}
}
@app.post("/factures/{id}/relancer", tags=["US-A7"])
async def relancer_facture(
id: str,
relance: RelanceFactureRequest,
session: AsyncSession = Depends(get_session),
):
"""💸 US-A7: Relance facture en un clic"""
try:
# Lire facture via gateway Windows
facture = sage_client.lire_document(id, TypeDocument.FACTURE)
if not facture:
raise HTTPException(404, f"Facture {id} introuvable")
# Récupérer contact via gateway Windows
contact = sage_client.lire_contact_client(facture["client_code"])
if not contact or not contact.get("email"):
raise HTTPException(400, "Aucun email trouvé pour ce client")
# Préparer email
template = templates_email_db["relance_facture"]
variables = {
"DO_Piece": facture.get("numero", id),
"DO_Date": str(facture.get("date", "")),
"CT_Intitule": facture.get("client_intitule", ""),
"DO_TotalHT": f"{facture.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{facture.get('total_ttc', 0):.2f}",
}
sujet = template["sujet"]
corps = relance.message_personnalise or template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", valeur)
corps = corps.replace(f"{{{{{var}}}}}", valeur)
# Créer log email
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=contact["email"],
sujet=sujet,
corps_html=corps,
document_ids=id,
type_document=TypeDocument.FACTURE,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
# Enqueue
email_queue.enqueue(email_log.id)
# ✅ MAJ champ libre via gateway Windows
sage_client.mettre_a_jour_derniere_relance(id, TypeDocument.FACTURE)
await session.commit()
logger.info(f"✅ Relance facture: {id}{contact['email']}")
return {
"success": True,
"facture_id": id,
"email_log_id": email_log.id,
"destinataire": contact["email"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur relance facture: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A9 - JOURNAL DES E-MAILS
# ============================================
@app.get("/emails/logs", tags=["US-A9"])
async def journal_emails(
statut: Optional[StatutEmail] = Query(None),
destinataire: Optional[str] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 US-A9: Journal des e-mails envoyés"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
if destinataire:
query = query.where(EmailLog.destinataire.contains(destinataire))
query = query.order_by(EmailLog.date_creation.desc()).limit(limit)
result = await session.execute(query)
logs = result.scalars().all()
return [
{
"id": log.id,
"destinataire": log.destinataire,
"sujet": log.sujet,
"statut": log.statut.value,
"date_creation": log.date_creation.isoformat(),
"date_envoi": log.date_envoi.isoformat() if log.date_envoi else None,
"nb_tentatives": log.nb_tentatives,
"derniere_erreur": log.derniere_erreur,
"document_ids": log.document_ids,
}
for log in logs
]
@app.get("/emails/logs/export", tags=["US-A9"])
async def exporter_logs_csv(
statut: Optional[StatutEmail] = Query(None),
session: AsyncSession = Depends(get_session),
):
"""📥 US-A9: Export CSV des logs d'envoi"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
query = query.order_by(EmailLog.date_creation.desc())
result = await session.execute(query)
logs = result.scalars().all()
# Génération CSV
output = io.StringIO()
writer = csv.writer(output)
# En-têtes
writer.writerow(
[
"ID",
"Destinataire",
"Sujet",
"Statut",
"Date Création",
"Date Envoi",
"Nb Tentatives",
"Erreur",
"Documents",
]
)
# Données
for log in logs:
writer.writerow(
[
log.id,
log.destinataire,
log.sujet,
log.statut.value,
log.date_creation.isoformat(),
log.date_envoi.isoformat() if log.date_envoi else "",
log.nb_tentatives,
log.derniere_erreur or "",
log.document_ids or "",
]
)
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=emails_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
},
)
# ============================================
# US-A10 - MODÈLES D'E-MAILS
# ============================================
class TemplateEmail(BaseModel):
id: Optional[str] = None
nom: str
sujet: str
corps_html: str
variables_disponibles: List[str] = []
class TemplatePreviewRequest(BaseModel):
template_id: str
document_id: str
type_document: TypeDocument
@app.get("/templates/emails", response_model=List[TemplateEmail], tags=["US-A10"])
async def lister_templates():
"""📧 US-A10: Liste tous les templates d'emails"""
return [TemplateEmail(**template) for template in templates_email_db.values()]
@app.get(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def lire_template(template_id: str):
"""📖 Lecture d'un template par ID"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
return TemplateEmail(**templates_email_db[template_id])
@app.post("/templates/emails", response_model=TemplateEmail, tags=["US-A10"])
async def creer_template(template: TemplateEmail):
""" Création d'un nouveau template"""
template_id = str(uuid.uuid4())
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template créé: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.put(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def modifier_template(template_id: str, template: TemplateEmail):
"""✏️ Modification d'un template existant"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
# Ne pas modifier les templates système
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être modifiés")
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template modifié: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.delete("/templates/emails/{template_id}", tags=["US-A10"])
async def supprimer_template(template_id: str):
"""🗑️ Suppression d'un template"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être supprimés")
del templates_email_db[template_id]
logger.info(f"Template supprimé: {template_id}")
return {"success": True, "message": f"Template {template_id} supprimé"}
@app.post("/templates/emails/preview", tags=["US-A10"])
async def previsualiser_email(preview: TemplatePreviewRequest):
"""👁️ US-A10: Prévisualisation email avec fusion variables"""
if preview.template_id not in templates_email_db:
raise HTTPException(404, f"Template {preview.template_id} introuvable")
template = templates_email_db[preview.template_id]
# Lire document via gateway Windows
doc = sage_client.lire_document(preview.document_id, preview.type_document)
if not doc:
raise HTTPException(404, f"Document {preview.document_id} introuvable")
# Variables
variables = {
"DO_Piece": doc.get("numero", preview.document_id),
"DO_Date": str(doc.get("date", "")),
"CT_Intitule": doc.get("client_intitule", ""),
"DO_TotalHT": f"{doc.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{doc.get('total_ttc', 0):.2f}",
}
# Fusion
sujet_preview = template["sujet"]
corps_preview = template["corps_html"]
for var, valeur in variables.items():
sujet_preview = sujet_preview.replace(f"{{{{{var}}}}}", valeur)
corps_preview = corps_preview.replace(f"{{{{{var}}}}}", valeur)
return {
"template_id": preview.template_id,
"document_id": preview.document_id,
"sujet": sujet_preview,
"corps_html": corps_preview,
"variables_utilisees": variables,
}
# =====================================================
# ENDPOINTS - HEALTH
# =====================================================
@app.get("/health", tags=["System"])
async def health_check():
"""🏥 Health check"""
gateway_health = sage_client.health()
return {
"status": "healthy",
"sage_gateway": gateway_health,
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers),
"queue_size": email_queue.queue.qsize(),
},
"timestamp": datetime.now().isoformat(),
}
@app.get("/", tags=["System"])
async def root():
"""🏠 Page d'accueil"""
return {
"api": "Sage 100c Dataven - VPS Linux",
"version": "2.0.0",
"documentation": "/docs",
"health": "/health",
}
# =====================================================
# ENDPOINTS - ADMIN
# =====================================================
@app.get("/admin/cache/info", tags=["Admin"])
async def info_cache():
"""
📊 Informations sur l'état du cache Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
cache_info = sage_client.get_cache_info()
return cache_info
except Exception as e:
logger.error(f"Erreur info cache: {e}")
raise HTTPException(500, str(e))
# 🔧 CORRECTION ADMIN: Cache refresh (doit appeler Windows)
@app.post("/admin/cache/refresh", tags=["Admin"])
async def forcer_actualisation():
"""
🔄 Force l'actualisation du cache Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
resultat = sage_client.refresh_cache()
cache_info = sage_client.get_cache_info()
return {
"success": True,
"message": "Cache actualisé sur Windows Server",
"info": cache_info,
}
except Exception as e:
logger.error(f"Erreur refresh cache: {e}")
raise HTTPException(500, str(e))
# ✅ RESTE INCHANGÉ: admin/queue/status (local au VPS)
@app.get("/admin/queue/status", tags=["Admin"])
async def statut_queue():
"""
📊 Statut de la queue d'emails (local VPS)
"""
return {
"queue_size": email_queue.queue.qsize(),
"workers": len(email_queue.workers),
"running": email_queue.running,
}
# =====================================================
# LANCEMENT
# =====================================================
if __name__ == "__main__":
uvicorn.run(
"api:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.api_reload,
)