Sage100-vps/api.py
2025-12-05 14:00:40 +03:00

2068 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Query, Depends, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field, EmailStr, validator, field_validator
from typing import List, Optional, Dict
from datetime import date, datetime
from enum import Enum
import uvicorn
from contextlib import asynccontextmanager
import uuid
import csv
import io
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from routes.auth import router as auth_router
from core.dependencies import get_current_user, require_role
# Configuration logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("sage_api.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# Imports locaux
from config import settings
from database import (
init_db,
async_session_factory,
get_session,
EmailLog,
StatutEmail as StatutEmailEnum,
WorkflowLog,
SignatureLog,
StatutSignature as StatutSignatureEnum,
)
from email_queue import email_queue
from sage_client import sage_client
# =====================================================
# ENUMS
# =====================================================
class TypeDocument(int, Enum):
DEVIS = settings.SAGE_TYPE_DEVIS
BON_COMMANDE = settings.SAGE_TYPE_BON_COMMANDE
PREPARATION = settings.SAGE_TYPE_PREPARATION
BON_LIVRAISON = settings.SAGE_TYPE_BON_LIVRAISON
BON_RETOUR = settings.SAGE_TYPE_BON_RETOUR
BON_AVOIR = settings.SAGE_TYPE_BON_AVOIR
FACTURE = settings.SAGE_TYPE_FACTURE
class StatutSignature(str, Enum):
EN_ATTENTE = "EN_ATTENTE"
ENVOYE = "ENVOYE"
SIGNE = "SIGNE"
REFUSE = "REFUSE"
EXPIRE = "EXPIRE"
class StatutEmail(str, Enum):
EN_ATTENTE = "EN_ATTENTE"
EN_COURS = "EN_COURS"
ENVOYE = "ENVOYE"
OUVERT = "OUVERT"
ERREUR = "ERREUR"
BOUNCE = "BOUNCE"
# =====================================================
# MODÈLES PYDANTIC
# =====================================================
class ClientResponse(BaseModel):
numero: str
intitule: str
adresse: Optional[str] = None
code_postal: Optional[str] = None
ville: Optional[str] = None
email: Optional[str] = None
telephone: Optional[str] = None
class ArticleResponse(BaseModel):
reference: str
designation: str
prix_vente: float
stock_reel: float
class LigneDevis(BaseModel):
article_code: str
quantite: float
prix_unitaire_ht: Optional[float] = None
remise_pourcentage: Optional[float] = 0.0
@field_validator("article_code", mode="before")
def strip_insecables(cls, v):
return v.replace("\xa0", "").strip()
class DevisRequest(BaseModel):
client_id: str
date_devis: Optional[date] = None
lignes: List[LigneDevis]
class DevisResponse(BaseModel):
id: str
client_id: str
date_devis: str
montant_total_ht: float
montant_total_ttc: float
nb_lignes: int
class SignatureRequest(BaseModel):
doc_id: str
type_doc: TypeDocument
email_signataire: EmailStr
nom_signataire: str
class EmailEnvoiRequest(BaseModel):
destinataire: EmailStr
cc: Optional[List[EmailStr]] = []
cci: Optional[List[EmailStr]] = []
sujet: str
corps_html: str
document_ids: Optional[List[str]] = None
type_document: Optional[TypeDocument] = None
class RelanceDevisRequest(BaseModel):
doc_id: str
message_personnalise: Optional[str] = None
class BaremeRemiseResponse(BaseModel):
client_id: str
remise_max_autorisee: float
remise_demandee: float
autorisee: bool
message: str
# À ajouter dans api.py après les imports et avant les endpoints existants
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
# =====================================================
# MODÈLES PYDANTIC POUR USERS
# =====================================================
class UserResponse(BaseModel):
"""Modèle de réponse pour un utilisateur"""
id: str
email: str
nom: str
prenom: str
role: str
is_verified: bool
is_active: bool
created_at: str
last_login: Optional[str] = None
failed_login_attempts: int = 0
class Config:
from_attributes = True
# =====================================================
# SERVICES EXTERNES (Universign)
# =====================================================
async def universign_envoyer(
doc_id: str, pdf_bytes: bytes, email: str, nom: str
) -> Dict:
"""Envoi signature via API Universign"""
import requests
try:
api_key = settings.universign_api_key
api_url = settings.universign_api_url
auth = (api_key, "")
# Étape 1: Créer transaction
response = requests.post(
f"{api_url}/transactions",
auth=auth,
json={"name": f"Devis {doc_id}", "language": "fr"},
timeout=30,
)
response.raise_for_status()
transaction_id = response.json().get("id")
# Étape 2: Upload PDF
files = {"file": (f"Devis_{doc_id}.pdf", pdf_bytes, "application/pdf")}
response = requests.post(f"{api_url}/files", auth=auth, files=files, timeout=30)
response.raise_for_status()
file_id = response.json().get("id")
# Étape 3: Ajouter document
response = requests.post(
f"{api_url}/transactions/{transaction_id}/documents",
auth=auth,
data={"document": file_id},
timeout=30,
)
response.raise_for_status()
document_id = response.json().get("id")
# Étape 4: Créer champ signature
response = requests.post(
f"{api_url}/transactions/{transaction_id}/documents/{document_id}/fields",
auth=auth,
data={"type": "signature"},
timeout=30,
)
response.raise_for_status()
field_id = response.json().get("id")
# Étape 5: Assigner signataire
response = requests.post(
f"{api_url}/transactions/{transaction_id}/signatures",
auth=auth,
data={"signer": email, "field": field_id},
timeout=30,
)
response.raise_for_status()
# Étape 6: Démarrer transaction
response = requests.post(
f"{api_url}/transactions/{transaction_id}/start", auth=auth, timeout=30
)
response.raise_for_status()
final_data = response.json()
signer_url = (
final_data.get("actions", [{}])[0].get("url", "")
if final_data.get("actions")
else ""
)
logger.info(f"✅ Signature Universign envoyée: {transaction_id}")
return {
"transaction_id": transaction_id,
"signer_url": signer_url,
"statut": "ENVOYE",
}
except Exception as e:
logger.error(f"❌ Erreur Universign: {e}")
return {"error": str(e), "statut": "ERREUR"}
async def universign_statut(transaction_id: str) -> Dict:
"""Récupération statut signature"""
import requests
try:
response = requests.get(
f"{settings.universign_api_url}/transactions/{transaction_id}",
auth=(settings.universign_api_key, ""),
timeout=10,
)
if response.status_code == 200:
data = response.json()
statut_map = {
"draft": "EN_ATTENTE",
"started": "EN_ATTENTE",
"completed": "SIGNE",
"refused": "REFUSE",
"expired": "EXPIRE",
"canceled": "REFUSE",
}
return {
"statut": statut_map.get(data.get("state"), "EN_ATTENTE"),
"date_signature": data.get("completed_at"),
}
else:
return {"statut": "ERREUR"}
except Exception as e:
logger.error(f"Erreur statut Universign: {e}")
return {"statut": "ERREUR", "error": str(e)}
# =====================================================
# CYCLE DE VIE
# =====================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
# Init base de données
await init_db()
logger.info("✅ Base de données initialisée")
# ✅ CORRECTION: Injecter session_factory ET sage_client dans email_queue
email_queue.session_factory = async_session_factory
email_queue.sage_client = sage_client
logger.info("✅ sage_client injecté dans email_queue")
# Démarrer queue
email_queue.start(num_workers=settings.max_email_workers)
logger.info(f"✅ Email queue démarrée")
yield
# Cleanup
email_queue.stop()
logger.info("👋 Services arrêtés")
# =====================================================
# APPLICATION
# =====================================================
app = FastAPI(
title="API Sage 100c Dataven",
version="2.0.0",
description="API de gestion commerciale - VPS Linux",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
allow_credentials=True,
)
app.include_router(auth_router)
# =====================================================
# ENDPOINTS - US-A1 (CRÉATION RAPIDE DEVIS)
# =====================================================
@app.get("/clients", response_model=List[ClientResponse], tags=["US-A1"])
async def rechercher_clients(query: Optional[str] = Query(None)):
"""🔍 Recherche clients via gateway Windows"""
try:
clients = sage_client.lister_clients(filtre=query or "")
return [ClientResponse(**c) for c in clients]
except Exception as e:
logger.error(f"Erreur recherche clients: {e}")
raise HTTPException(500, str(e))
@app.get("/articles", response_model=List[ArticleResponse], tags=["US-A1"])
async def rechercher_articles(query: Optional[str] = Query(None)):
"""🔍 Recherche articles via gateway Windows"""
try:
articles = sage_client.lister_articles(filtre=query or "")
return [ArticleResponse(**a) for a in articles]
except Exception as e:
logger.error(f"Erreur recherche articles: {e}")
raise HTTPException(500, str(e))
@app.post("/devis", response_model=DevisResponse, status_code=201, tags=["US-A1"])
async def creer_devis(devis: DevisRequest):
"""📝 Création de devis via gateway Windows"""
try:
# Préparer les données pour la gateway
devis_data = {
"client_id": devis.client_id,
"date_devis": devis.date_devis.isoformat() if devis.date_devis else None,
"lignes": [
{
"article_code": l.article_code,
"quantite": l.quantite,
"prix_unitaire_ht": l.prix_unitaire_ht,
"remise_pourcentage": l.remise_pourcentage,
}
for l in devis.lignes
],
}
# Appel HTTP vers Windows
resultat = sage_client.creer_devis(devis_data)
logger.info(f"✅ Devis créé: {resultat.get('numero_devis')}")
return DevisResponse(
id=resultat["numero_devis"],
client_id=devis.client_id,
date_devis=resultat["date_devis"],
montant_total_ht=resultat["total_ht"],
montant_total_ttc=resultat["total_ttc"],
nb_lignes=resultat["nb_lignes"],
)
except Exception as e:
logger.error(f"Erreur création devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis", tags=["US-A1"])
async def lister_devis(
limit: int = Query(100, le=1000),
statut: Optional[int] = Query(None),
inclure_lignes: bool = Query(
True, description="Inclure les lignes de chaque devis"
),
):
"""
📋 Liste tous les devis via gateway Windows
Args:
limit: Nombre maximum de devis à retourner
statut: Filtre par statut (0=Devis, 2=Accepté, 5=Transformé, etc.)
inclure_lignes: Si True, charge les lignes de chaque devis (par défaut: True)
✅ AMÉLIORATION: Les lignes sont maintenant incluses par défaut
"""
try:
devis_list = sage_client.lister_devis(
limit=limit, statut=statut, inclure_lignes=inclure_lignes
)
return devis_list
except Exception as e:
logger.error(f"Erreur liste devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis/{id}", tags=["US-A1"])
async def lire_devis(id: str):
"""📄 Lecture d'un devis via gateway Windows"""
try:
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
return devis
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture devis: {e}")
raise HTTPException(500, str(e))
@app.get("/devis/{id}/pdf", tags=["US-A1"])
async def telecharger_devis_pdf(id: str):
"""📄 Téléchargement PDF (généré via email_queue)"""
try:
# Générer PDF en appelant la méthode de email_queue
# qui elle-même appellera sage_client pour récupérer les données
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
return StreamingResponse(
iter([pdf_bytes]),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=Devis_{id}.pdf"},
)
except Exception as e:
logger.error(f"Erreur génération PDF: {e}")
raise HTTPException(500, str(e))
@app.post("/devis/{id}/envoyer", tags=["US-A1"])
async def envoyer_devis_email(
id: str, request: EmailEnvoiRequest, session: AsyncSession = Depends(get_session)
):
"""📧 Envoi devis par email"""
try:
# Vérifier que le devis existe
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Créer logs email pour chaque destinataire
tous_destinataires = [request.destinataire] + request.cc + request.cci
email_logs = []
for dest in tous_destinataires:
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=dest,
sujet=request.sujet,
corps_html=request.corps_html,
document_ids=id,
type_document=TypeDocument.DEVIS,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
email_queue.enqueue(email_log.id)
email_logs.append(email_log.id)
await session.commit()
logger.info(
f"✅ Email devis {id} mis en file: {len(tous_destinataires)} destinataire(s)"
)
return {
"success": True,
"email_log_ids": email_logs,
"devis_id": id,
"message": f"{len(tous_destinataires)} email(s) en file d'attente",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur envoi email: {e}")
raise HTTPException(500, str(e))
@app.put("/devis/{id}/statut", tags=["US-A1"])
async def changer_statut_devis(id: str, nouveau_statut: int = Query(..., ge=0, le=5)):
"""
📄 Changement de statut d'un devis via gateway Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
resultat = sage_client.changer_statut_devis(id, nouveau_statut)
logger.info(f"✅ Statut devis {id} changé: {nouveau_statut}")
return {
"success": True,
"devis_id": id,
"statut_ancien": resultat.get("statut_ancien"),
"statut_nouveau": resultat.get("statut_nouveau"),
"message": "Statut mis à jour avec succès",
}
except Exception as e:
logger.error(f"Erreur changement statut: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A2 (WORKFLOW SANS RESSAISIE)
# =====================================================
@app.get("/commandes", tags=["US-A2"])
async def lister_commandes(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""
📋 Liste toutes les commandes via gateway Windows
✅ CORRECTION : Utilise sage_client.lister_commandes() qui existe déjà
Le filtrage sur type 10 est fait côté Windows dans main.py
"""
try:
commandes = sage_client.lister_commandes(limit=limit, statut=statut)
return commandes
except Exception as e:
logger.error(f"Erreur liste commandes: {e}")
raise HTTPException(500, str(e))
@app.post("/workflow/devis/{id}/to-commande", tags=["US-A2"])
async def devis_vers_commande(id: str, session: AsyncSession = Depends(get_session)):
"""
🔧 Transformation Devis → Commande
✅ CORRECTION : Utilise les VRAIS types Sage (0 → 10)
"""
try:
resultat = sage_client.transformer_document(
numero_source=id,
type_source=settings.SAGE_TYPE_DEVIS, # = 0
type_cible=settings.SAGE_TYPE_BON_COMMANDE, # = 10
)
workflow_log = WorkflowLog(
id=str(uuid.uuid4()),
document_source=id,
type_source=TypeDocument.DEVIS,
document_cible=resultat.get("document_cible", ""),
type_cible=TypeDocument.BON_COMMANDE,
nb_lignes=resultat.get("nb_lignes", 0),
date_transformation=datetime.now(),
succes=True,
)
session.add(workflow_log)
await session.commit()
logger.info(
f"✅ Transformation: Devis {id} → Commande {resultat['document_cible']}"
)
return {
"success": True,
"document_source": id,
"document_cible": resultat["document_cible"],
"nb_lignes": resultat["nb_lignes"],
}
except Exception as e:
logger.error(f"Erreur transformation: {e}")
raise HTTPException(500, str(e))
@app.post("/workflow/commande/{id}/to-facture", tags=["US-A2"])
async def commande_vers_facture(id: str, session: AsyncSession = Depends(get_session)):
"""
🔧 Transformation Commande → Facture
✅ Utilise les VRAIS types Sage (10 → 60)
"""
try:
resultat = sage_client.transformer_document(
numero_source=id,
type_source=settings.SAGE_TYPE_BON_COMMANDE, # = 10
type_cible=settings.SAGE_TYPE_FACTURE, # = 60
)
workflow_log = WorkflowLog(
id=str(uuid.uuid4()),
document_source=id,
type_source=TypeDocument.BON_COMMANDE,
document_cible=resultat.get("document_cible", ""),
type_cible=TypeDocument.FACTURE,
nb_lignes=resultat.get("nb_lignes", 0),
date_transformation=datetime.now(),
succes=True,
)
session.add(workflow_log)
await session.commit()
logger.info(
f"✅ Transformation: Commande {id} → Facture {resultat['document_cible']}"
)
return {
"success": True,
"document_source": id,
"document_cible": resultat["document_cible"],
"nb_lignes": resultat["nb_lignes"],
}
except Exception as e:
logger.error(f"Erreur transformation: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A3 (SIGNATURE ÉLECTRONIQUE)
# =====================================================
@app.post("/signature/universign/send", tags=["US-A3"])
async def envoyer_signature(
demande: SignatureRequest, session: AsyncSession = Depends(get_session)
):
"""✍️ Envoi document pour signature Universign"""
try:
# Générer PDF
pdf_bytes = email_queue._generate_pdf(demande.doc_id, demande.type_doc)
# Envoi Universign
resultat = await universign_envoyer(
demande.doc_id, pdf_bytes, demande.email_signataire, demande.nom_signataire
)
if "error" in resultat:
raise HTTPException(500, resultat["error"])
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=demande.doc_id,
type_document=demande.type_doc,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=demande.email_signataire,
nom_signataire=demande.nom_signataire,
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
)
session.add(signature_log)
await session.commit()
# MAJ champ libre Sage via gateway Windows
sage_client.mettre_a_jour_champ_libre(
demande.doc_id, demande.type_doc, "UniversignID", resultat["transaction_id"]
)
logger.info(f"✅ Signature envoyée: {demande.doc_id}")
return {
"success": True,
"transaction_id": resultat["transaction_id"],
"signer_url": resultat["signer_url"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur signature: {e}")
raise HTTPException(500, str(e))
@app.get("/signature/universign/status", tags=["US-A3"])
async def statut_signature(docId: str = Query(...)):
"""🔍 Récupération du statut de signature en temps réel"""
# Chercher dans la DB locale
try:
async with async_session_factory() as session:
query = select(SignatureLog).where(SignatureLog.document_id == docId)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, "Signature introuvable")
# Interroger Universign
statut = await universign_statut(signature_log.transaction_id)
return {
"doc_id": docId,
"statut": statut["statut"],
"date_signature": statut.get("date_signature"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur statut signature: {e}")
raise HTTPException(500, str(e))
@app.get("/signatures", tags=["US-A3"])
async def lister_signatures(
statut: Optional[StatutSignature] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 Liste toutes les demandes de signature"""
query = select(SignatureLog).order_by(SignatureLog.date_envoi.desc())
if statut:
statut_db = StatutSignatureEnum[statut.value]
query = query.where(SignatureLog.statut == statut_db)
query = query.limit(limit)
result = await session.execute(query)
signatures = result.scalars().all()
return [
{
"id": sig.id,
"document_id": sig.document_id,
"type_document": sig.type_document.value,
"transaction_id": sig.transaction_id,
"signer_url": sig.signer_url,
"email_signataire": sig.email_signataire,
"nom_signataire": sig.nom_signataire,
"statut": sig.statut.value,
"date_envoi": sig.date_envoi.isoformat() if sig.date_envoi else None,
"date_signature": (
sig.date_signature.isoformat() if sig.date_signature else None
),
"est_relance": sig.est_relance,
"nb_relances": sig.nb_relances or 0,
}
for sig in signatures
]
@app.get("/signatures/{transaction_id}/status", tags=["US-A3"])
async def statut_signature_detail(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
"""🔍 Récupération du statut détaillé d'une signature"""
query = select(SignatureLog).where(SignatureLog.transaction_id == transaction_id)
result = await session.execute(query)
signature_log = result.scalar_one_or_none()
if not signature_log:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
# Interroger Universign
statut_universign = await universign_statut(transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"EN_ATTENTE": StatutSignatureEnum.EN_ATTENTE,
"ENVOYE": StatutSignatureEnum.ENVOYE,
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau_statut = statut_map.get(
statut_universign["statut"], StatutSignatureEnum.EN_ATTENTE
)
signature_log.statut = nouveau_statut
if statut_universign.get("date_signature"):
signature_log.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
await session.commit()
return {
"transaction_id": transaction_id,
"document_id": signature_log.document_id,
"statut": signature_log.statut.value,
"email_signataire": signature_log.email_signataire,
"date_envoi": (
signature_log.date_envoi.isoformat() if signature_log.date_envoi else None
),
"date_signature": (
signature_log.date_signature.isoformat()
if signature_log.date_signature
else None
),
"signer_url": signature_log.signer_url,
}
@app.post("/signatures/refresh-all", tags=["US-A3"])
async def rafraichir_statuts_signatures(session: AsyncSession = Depends(get_session)):
"""🔄 Rafraîchit TOUS les statuts des signatures en attente"""
query = select(SignatureLog).where(
SignatureLog.statut.in_(
[StatutSignatureEnum.EN_ATTENTE, StatutSignatureEnum.ENVOYE]
)
)
result = await session.execute(query)
signatures = result.scalars().all()
nb_mises_a_jour = 0
for sig in signatures:
try:
statut_universign = await universign_statut(sig.transaction_id)
if statut_universign.get("statut") != "ERREUR":
statut_map = {
"SIGNE": StatutSignatureEnum.SIGNE,
"REFUSE": StatutSignatureEnum.REFUSE,
"EXPIRE": StatutSignatureEnum.EXPIRE,
}
nouveau = statut_map.get(statut_universign["statut"])
if nouveau and nouveau != sig.statut:
sig.statut = nouveau
if statut_universign.get("date_signature"):
sig.date_signature = datetime.fromisoformat(
statut_universign["date_signature"].replace("Z", "+00:00")
)
nb_mises_a_jour += 1
except Exception as e:
logger.error(f"Erreur refresh signature {sig.transaction_id}: {e}")
continue
await session.commit()
return {
"success": True,
"nb_signatures_verifiees": len(signatures),
"nb_mises_a_jour": nb_mises_a_jour,
}
@app.post("/devis/{id}/signer", tags=["US-A3"])
async def envoyer_devis_signature(
id: str, request: SignatureRequest, session: AsyncSession = Depends(get_session)
):
"""✏️ Envoi d'un devis pour signature électronique"""
try:
# Vérifier devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Générer PDF
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
# Envoi Universign
resultat = await universign_envoyer(
id, pdf_bytes, request.email_signataire, request.nom_signataire
)
if "error" in resultat:
raise HTTPException(500, f"Erreur Universign: {resultat['error']}")
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=id,
type_document=TypeDocument.DEVIS,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=request.email_signataire,
nom_signataire=request.nom_signataire,
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
)
session.add(signature_log)
await session.commit()
# MAJ champ libre Sage via gateway
sage_client.mettre_a_jour_champ_libre(
id, TypeDocument.DEVIS, "UniversignID", resultat["transaction_id"]
)
return {
"success": True,
"devis_id": id,
"transaction_id": resultat["transaction_id"],
"signer_url": resultat["signer_url"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur envoi signature: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A4 - ENVOI EMAILS EN LOT
# ============================================
class EmailBatchRequest(BaseModel):
destinataires: List[EmailStr] = Field(..., min_length=1, max_length=100)
sujet: str = Field(..., min_length=1, max_length=500)
corps_html: str = Field(..., min_length=1)
document_ids: Optional[List[str]] = None
type_document: Optional[TypeDocument] = None
@app.post("/emails/send-batch", tags=["US-A4"])
async def envoyer_emails_lot(
batch: EmailBatchRequest, session: AsyncSession = Depends(get_session)
):
"""📧 US-A4: Envoi groupé via email_queue"""
resultats = []
for destinataire in batch.destinataires:
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=destinataire,
sujet=batch.sujet,
corps_html=batch.corps_html,
document_ids=",".join(batch.document_ids) if batch.document_ids else None,
type_document=batch.type_document,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
email_queue.enqueue(email_log.id)
resultats.append(
{
"destinataire": destinataire,
"log_id": email_log.id,
"statut": "EN_ATTENTE",
}
)
await session.commit()
nb_documents = len(batch.document_ids) if batch.document_ids else 0
logger.info(
f"{len(batch.destinataires)} emails mis en file avec {nb_documents} docs"
)
return {
"total": len(batch.destinataires),
"succes": len(batch.destinataires),
"documents_attaches": nb_documents,
"details": resultats,
}
# =====================================================
# ENDPOINTS - US-A5
# =====================================================
@app.post("/devis/valider-remise", response_model=BaremeRemiseResponse, tags=["US-A5"])
async def valider_remise(
client_id: str = Query(..., min_length=1),
remise_pourcentage: float = Query(0.0, ge=0, le=100),
):
"""
💰 US-A5: Validation remise via barème client Sage
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
remise_max = sage_client.lire_remise_max_client(client_id)
autorisee = remise_pourcentage <= remise_max
if not autorisee:
message = f"⚠️ Remise trop élevée (max autorisé: {remise_max}%)"
logger.warning(
f"Remise refusée pour {client_id}: {remise_pourcentage}% > {remise_max}%"
)
else:
message = "✅ Remise autorisée"
return BaremeRemiseResponse(
client_id=client_id,
remise_max_autorisee=remise_max,
remise_demandee=remise_pourcentage,
autorisee=autorisee,
message=message,
)
except Exception as e:
logger.error(f"Erreur validation remise: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A6 (RELANCE DEVIS)
# =====================================================
@app.post("/devis/{id}/relancer-signature", tags=["US-A6"])
async def relancer_devis_signature(
id: str, relance: RelanceDevisRequest, session: AsyncSession = Depends(get_session)
):
"""📧 Relance devis via Universign"""
try:
# Lire devis via gateway
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Récupérer contact via gateway
contact = sage_client.lire_contact_client(devis["client_code"])
if not contact or not contact.get("email"):
raise HTTPException(400, "Aucun email trouvé pour ce client")
# Générer PDF
pdf_bytes = email_queue._generate_pdf(id, TypeDocument.DEVIS)
# Envoi Universign
resultat = await universign_envoyer(
id,
pdf_bytes,
contact["email"],
contact["nom"] or contact["client_intitule"],
)
if "error" in resultat:
raise HTTPException(500, resultat["error"])
# Logger en DB
signature_log = SignatureLog(
id=str(uuid.uuid4()),
document_id=id,
type_document=TypeDocument.DEVIS,
transaction_id=resultat["transaction_id"],
signer_url=resultat["signer_url"],
email_signataire=contact["email"],
nom_signataire=contact["nom"] or contact["client_intitule"],
statut=StatutSignatureEnum.ENVOYE,
date_envoi=datetime.now(),
est_relance=True,
nb_relances=1,
)
session.add(signature_log)
await session.commit()
return {
"success": True,
"devis_id": id,
"transaction_id": resultat["transaction_id"],
"message": "Relance signature envoyée",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur relance: {e}")
raise HTTPException(500, str(e))
class ContactClientResponse(BaseModel):
client_code: str
client_intitule: str
email: Optional[str]
nom: Optional[str]
telephone: Optional[str]
peut_etre_relance: bool
@app.get("/devis/{id}/contact", response_model=ContactClientResponse, tags=["US-A6"])
async def recuperer_contact_devis(id: str):
"""👤 US-A6: Récupération du contact client associé au devis"""
try:
# Lire devis via gateway Windows
devis = sage_client.lire_devis(id)
if not devis:
raise HTTPException(404, f"Devis {id} introuvable")
# Lire contact via gateway Windows
contact = sage_client.lire_contact_client(devis["client_code"])
if not contact:
raise HTTPException(
404, f"Contact introuvable pour client {devis['client_code']}"
)
peut_relancer = bool(contact.get("email"))
return ContactClientResponse(**contact, peut_etre_relance=peut_relancer)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur récupération contact: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - US-A7
# =====================================================
@app.get("/factures", tags=["US-A7"])
async def lister_factures(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""
📋 Liste toutes les factures via gateway Windows
✅ CORRECTION : Utilise sage_client.lister_factures() qui existe déjà
Le filtrage sur type 60 est fait côté Windows dans main.py
"""
try:
factures = sage_client.lister_factures(limit=limit, statut=statut)
return factures
except Exception as e:
logger.error(f"Erreur liste factures: {e}")
raise HTTPException(500, str(e))
class RelanceFactureRequest(BaseModel):
doc_id: str
message_personnalise: Optional[str] = None
# Templates email (si pas déjà définis)
templates_email_db = {
"relance_facture": {
"id": "relance_facture",
"nom": "Relance Facture",
"sujet": "Rappel - Facture {{DO_Piece}}",
"corps_html": """
<p>Bonjour {{CT_Intitule}},</p>
<p>La facture <strong>{{DO_Piece}}</strong> du {{DO_Date}}
d'un montant de <strong>{{DO_TotalTTC}}€ TTC</strong> reste impayée.</p>
<p>Merci de régulariser dans les meilleurs délais.</p>
<p>Cordialement,</p>
""",
"variables_disponibles": [
"DO_Piece",
"DO_Date",
"CT_Intitule",
"DO_TotalHT",
"DO_TotalTTC",
],
}
}
@app.post("/factures/{id}/relancer", tags=["US-A7"])
async def relancer_facture(
id: str,
relance: RelanceFactureRequest,
session: AsyncSession = Depends(get_session),
):
"""💸 US-A7: Relance facture en un clic"""
try:
# Lire facture via gateway Windows
facture = sage_client.lire_document(id, TypeDocument.FACTURE)
if not facture:
raise HTTPException(404, f"Facture {id} introuvable")
# Récupérer contact via gateway Windows
contact = sage_client.lire_contact_client(facture["client_code"])
if not contact or not contact.get("email"):
raise HTTPException(400, "Aucun email trouvé pour ce client")
# Préparer email
template = templates_email_db["relance_facture"]
variables = {
"DO_Piece": facture.get("numero", id),
"DO_Date": str(facture.get("date", "")),
"CT_Intitule": facture.get("client_intitule", ""),
"DO_TotalHT": f"{facture.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{facture.get('total_ttc', 0):.2f}",
}
sujet = template["sujet"]
corps = relance.message_personnalise or template["corps_html"]
for var, valeur in variables.items():
sujet = sujet.replace(f"{{{{{var}}}}}", valeur)
corps = corps.replace(f"{{{{{var}}}}}", valeur)
# Créer log email
email_log = EmailLog(
id=str(uuid.uuid4()),
destinataire=contact["email"],
sujet=sujet,
corps_html=corps,
document_ids=id,
type_document=TypeDocument.FACTURE,
statut=StatutEmailEnum.EN_ATTENTE,
date_creation=datetime.now(),
nb_tentatives=0,
)
session.add(email_log)
await session.flush()
# Enqueue
email_queue.enqueue(email_log.id)
# ✅ MAJ champ libre via gateway Windows
sage_client.mettre_a_jour_derniere_relance(id, TypeDocument.FACTURE)
await session.commit()
logger.info(f"✅ Relance facture: {id}{contact['email']}")
return {
"success": True,
"facture_id": id,
"email_log_id": email_log.id,
"destinataire": contact["email"],
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur relance facture: {e}")
raise HTTPException(500, str(e))
# ============================================
# US-A9 - JOURNAL DES E-MAILS
# ============================================
@app.get("/emails/logs", tags=["US-A9"])
async def journal_emails(
statut: Optional[StatutEmail] = Query(None),
destinataire: Optional[str] = Query(None),
limit: int = Query(100, le=1000),
session: AsyncSession = Depends(get_session),
):
"""📋 US-A9: Journal des e-mails envoyés"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
if destinataire:
query = query.where(EmailLog.destinataire.contains(destinataire))
query = query.order_by(EmailLog.date_creation.desc()).limit(limit)
result = await session.execute(query)
logs = result.scalars().all()
return [
{
"id": log.id,
"destinataire": log.destinataire,
"sujet": log.sujet,
"statut": log.statut.value,
"date_creation": log.date_creation.isoformat(),
"date_envoi": log.date_envoi.isoformat() if log.date_envoi else None,
"nb_tentatives": log.nb_tentatives,
"derniere_erreur": log.derniere_erreur,
"document_ids": log.document_ids,
}
for log in logs
]
@app.get("/emails/logs/export", tags=["US-A9"])
async def exporter_logs_csv(
statut: Optional[StatutEmail] = Query(None),
session: AsyncSession = Depends(get_session),
):
"""📥 US-A9: Export CSV des logs d'envoi"""
query = select(EmailLog)
if statut:
query = query.where(EmailLog.statut == StatutEmailEnum[statut.value])
query = query.order_by(EmailLog.date_creation.desc())
result = await session.execute(query)
logs = result.scalars().all()
# Génération CSV
output = io.StringIO()
writer = csv.writer(output)
# En-têtes
writer.writerow(
[
"ID",
"Destinataire",
"Sujet",
"Statut",
"Date Création",
"Date Envoi",
"Nb Tentatives",
"Erreur",
"Documents",
]
)
# Données
for log in logs:
writer.writerow(
[
log.id,
log.destinataire,
log.sujet,
log.statut.value,
log.date_creation.isoformat(),
log.date_envoi.isoformat() if log.date_envoi else "",
log.nb_tentatives,
log.derniere_erreur or "",
log.document_ids or "",
]
)
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=emails_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
},
)
# ============================================
# US-A10 - MODÈLES D'E-MAILS
# ============================================
class TemplateEmail(BaseModel):
id: Optional[str] = None
nom: str
sujet: str
corps_html: str
variables_disponibles: List[str] = []
class TemplatePreviewRequest(BaseModel):
template_id: str
document_id: str
type_document: TypeDocument
@app.get("/templates/emails", response_model=List[TemplateEmail], tags=["US-A10"])
async def lister_templates():
"""📧 US-A10: Liste tous les templates d'emails"""
return [TemplateEmail(**template) for template in templates_email_db.values()]
@app.get(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def lire_template(template_id: str):
"""📖 Lecture d'un template par ID"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
return TemplateEmail(**templates_email_db[template_id])
@app.post("/templates/emails", response_model=TemplateEmail, tags=["US-A10"])
async def creer_template(template: TemplateEmail):
""" Création d'un nouveau template"""
template_id = str(uuid.uuid4())
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template créé: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.put(
"/templates/emails/{template_id}", response_model=TemplateEmail, tags=["US-A10"]
)
async def modifier_template(template_id: str, template: TemplateEmail):
"""✏️ Modification d'un template existant"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
# Ne pas modifier les templates système
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être modifiés")
templates_email_db[template_id] = {
"id": template_id,
"nom": template.nom,
"sujet": template.sujet,
"corps_html": template.corps_html,
"variables_disponibles": template.variables_disponibles,
}
logger.info(f"Template modifié: {template_id}")
return TemplateEmail(id=template_id, **template.dict())
@app.delete("/templates/emails/{template_id}", tags=["US-A10"])
async def supprimer_template(template_id: str):
"""🗑️ Suppression d'un template"""
if template_id not in templates_email_db:
raise HTTPException(404, f"Template {template_id} introuvable")
if template_id in ["relance_devis", "relance_facture"]:
raise HTTPException(400, "Les templates système ne peuvent pas être supprimés")
del templates_email_db[template_id]
logger.info(f"Template supprimé: {template_id}")
return {"success": True, "message": f"Template {template_id} supprimé"}
@app.post("/templates/emails/preview", tags=["US-A10"])
async def previsualiser_email(preview: TemplatePreviewRequest):
"""👁️ US-A10: Prévisualisation email avec fusion variables"""
if preview.template_id not in templates_email_db:
raise HTTPException(404, f"Template {preview.template_id} introuvable")
template = templates_email_db[preview.template_id]
# Lire document via gateway Windows
doc = sage_client.lire_document(preview.document_id, preview.type_document)
if not doc:
raise HTTPException(404, f"Document {preview.document_id} introuvable")
# Variables
variables = {
"DO_Piece": doc.get("numero", preview.document_id),
"DO_Date": str(doc.get("date", "")),
"CT_Intitule": doc.get("client_intitule", ""),
"DO_TotalHT": f"{doc.get('total_ht', 0):.2f}",
"DO_TotalTTC": f"{doc.get('total_ttc', 0):.2f}",
}
# Fusion
sujet_preview = template["sujet"]
corps_preview = template["corps_html"]
for var, valeur in variables.items():
sujet_preview = sujet_preview.replace(f"{{{{{var}}}}}", valeur)
corps_preview = corps_preview.replace(f"{{{{{var}}}}}", valeur)
return {
"template_id": preview.template_id,
"document_id": preview.document_id,
"sujet": sujet_preview,
"corps_html": corps_preview,
"variables_utilisees": variables,
}
# =====================================================
# ENDPOINTS - HEALTH
# =====================================================
@app.get("/health", tags=["System"])
async def health_check():
"""🏥 Health check"""
gateway_health = sage_client.health()
return {
"status": "healthy",
"sage_gateway": gateway_health,
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers),
"queue_size": email_queue.queue.qsize(),
},
"timestamp": datetime.now().isoformat(),
}
@app.get("/", tags=["System"])
async def root():
"""🏠 Page d'accueil"""
return {
"api": "Sage 100c Dataven - VPS Linux",
"version": "2.0.0",
"documentation": "/docs",
"health": "/health",
}
# =====================================================
# ENDPOINTS - ADMIN
# =====================================================
@app.get("/admin/cache/info", tags=["Admin"])
async def info_cache():
"""
📊 Informations sur l'état du cache Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
cache_info = sage_client.get_cache_info()
return cache_info
except Exception as e:
logger.error(f"Erreur info cache: {e}")
raise HTTPException(500, str(e))
# 🔧 CORRECTION ADMIN: Cache refresh (doit appeler Windows)
@app.post("/admin/cache/refresh", tags=["Admin"])
async def forcer_actualisation():
"""
🔄 Force l'actualisation du cache Windows
"""
try:
# ✅ APPEL VIA SAGE_CLIENT (HTTP vers Windows)
resultat = sage_client.refresh_cache()
cache_info = sage_client.get_cache_info()
return {
"success": True,
"message": "Cache actualisé sur Windows Server",
"info": cache_info,
}
except Exception as e:
logger.error(f"Erreur refresh cache: {e}")
raise HTTPException(500, str(e))
# ✅ RESTE INCHANGÉ: admin/queue/status (local au VPS)
@app.get("/admin/queue/status", tags=["Admin"])
async def statut_queue():
"""
📊 Statut de la queue d'emails (local VPS)
"""
return {
"queue_size": email_queue.queue.qsize(),
"workers": len(email_queue.workers),
"running": email_queue.running,
}
# =====================================================
# ENDPOINTS - PROSPECTS
# =====================================================
@app.get("/prospects", tags=["Prospects"])
async def rechercher_prospects(query: Optional[str] = Query(None)):
"""🔍 Recherche prospects via gateway Windows"""
try:
prospects = sage_client.lister_prospects(filtre=query or "")
return prospects
except Exception as e:
logger.error(f"Erreur recherche prospects: {e}")
raise HTTPException(500, str(e))
@app.get("/prospects/{code}", tags=["Prospects"])
async def lire_prospect(code: str):
"""📄 Lecture d'un prospect par code"""
try:
prospect = sage_client.lire_prospect(code)
if not prospect:
raise HTTPException(404, f"Prospect {code} introuvable")
return prospect
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture prospect: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - FOURNISSEURS
# =====================================================
@app.get("/fournisseurs", tags=["Fournisseurs"])
async def rechercher_fournisseurs(query: Optional[str] = Query(None)):
"""🔍 Recherche fournisseurs via gateway Windows"""
try:
fournisseurs = sage_client.lister_fournisseurs(filtre=query or "")
return fournisseurs
except Exception as e:
logger.error(f"Erreur recherche fournisseurs: {e}")
raise HTTPException(500, str(e))
@app.get("/fournisseurs/{code}", tags=["Fournisseurs"])
async def lire_fournisseur(code: str):
"""📄 Lecture d'un fournisseur par code"""
try:
fournisseur = sage_client.lire_fournisseur(code)
if not fournisseur:
raise HTTPException(404, f"Fournisseur {code} introuvable")
return fournisseur
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture fournisseur: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - AVOIRS
# =====================================================
@app.get("/avoirs", tags=["Avoirs"])
async def lister_avoirs(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""📋 Liste tous les avoirs via gateway Windows"""
try:
avoirs = sage_client.lister_avoirs(limit=limit, statut=statut)
return avoirs
except Exception as e:
logger.error(f"Erreur liste avoirs: {e}")
raise HTTPException(500, str(e))
@app.get("/avoirs/{numero}", tags=["Avoirs"])
async def lire_avoir(numero: str):
"""📄 Lecture d'un avoir avec ses lignes"""
try:
avoir = sage_client.lire_avoir(numero)
if not avoir:
raise HTTPException(404, f"Avoir {numero} introuvable")
return avoir
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture avoir: {e}")
raise HTTPException(500, str(e))
# =====================================================
# ENDPOINTS - LIVRAISONS
# =====================================================
@app.get("/livraisons", tags=["Livraisons"])
async def lister_livraisons(
limit: int = Query(100, le=1000), statut: Optional[int] = Query(None)
):
"""📋 Liste tous les bons de livraison via gateway Windows"""
try:
livraisons = sage_client.lister_livraisons(limit=limit, statut=statut)
return livraisons
except Exception as e:
logger.error(f"Erreur liste livraisons: {e}")
raise HTTPException(500, str(e))
@app.get("/livraisons/{numero}", tags=["Livraisons"])
async def lire_livraison(numero: str):
"""📄 Lecture d'une livraison avec ses lignes"""
try:
livraison = sage_client.lire_livraison(numero)
if not livraison:
raise HTTPException(404, f"Livraison {numero} introuvable")
return livraison
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur lecture livraison: {e}")
raise HTTPException(500, str(e))
@app.get("/debug/users", response_model=List[UserResponse], tags=["Debug"])
async def lister_utilisateurs_debug(
session: AsyncSession = Depends(get_session),
limit: int = Query(100, le=1000),
role: Optional[str] = Query(None),
verified_only: bool = Query(False),
):
"""
🔓 **ROUTE DEBUG** - Liste tous les utilisateurs inscrits
⚠️ **ATTENTION**: Cette route n'est PAS protégée par authentification.
À utiliser uniquement en développement ou à sécuriser en production.
Args:
limit: Nombre maximum d'utilisateurs à retourner
role: Filtrer par rôle (user, admin, commercial)
verified_only: Afficher uniquement les utilisateurs vérifiés
Returns:
Liste des utilisateurs avec leurs informations (mot de passe masqué)
"""
from database import User
from sqlalchemy import select
try:
# Construction de la requête
query = select(User)
# Filtres optionnels
if role:
query = query.where(User.role == role)
if verified_only:
query = query.where(User.is_verified == True)
# Tri par date de création (plus récents en premier)
query = query.order_by(User.created_at.desc()).limit(limit)
# Exécution
result = await session.execute(query)
users = result.scalars().all()
# Conversion en réponse
users_response = []
for user in users:
users_response.append(
UserResponse(
id=user.id,
email=user.email,
nom=user.nom,
prenom=user.prenom,
role=user.role,
is_verified=user.is_verified,
is_active=user.is_active,
created_at=user.created_at.isoformat() if user.created_at else "",
last_login=user.last_login.isoformat() if user.last_login else None,
failed_login_attempts=user.failed_login_attempts or 0,
)
)
logger.info(
f"📋 Liste utilisateurs retournée: {len(users_response)} résultat(s)"
)
return users_response
except Exception as e:
logger.error(f"❌ Erreur liste utilisateurs: {e}")
raise HTTPException(500, str(e))
@app.get("/debug/users/stats", tags=["Debug"])
async def statistiques_utilisateurs(session: AsyncSession = Depends(get_session)):
"""
📊 **ROUTE DEBUG** - Statistiques sur les utilisateurs
⚠️ Non protégée - à sécuriser en production
"""
from database import User
from sqlalchemy import select, func
try:
# Total utilisateurs
total_query = select(func.count(User.id))
total_result = await session.execute(total_query)
total = total_result.scalar()
# Utilisateurs vérifiés
verified_query = select(func.count(User.id)).where(User.is_verified == True)
verified_result = await session.execute(verified_query)
verified = verified_result.scalar()
# Utilisateurs actifs
active_query = select(func.count(User.id)).where(User.is_active == True)
active_result = await session.execute(active_query)
active = active_result.scalar()
# Par rôle
roles_query = select(User.role, func.count(User.id)).group_by(User.role)
roles_result = await session.execute(roles_query)
roles_stats = {role: count for role, count in roles_result.all()}
return {
"total_utilisateurs": total,
"utilisateurs_verifies": verified,
"utilisateurs_actifs": active,
"utilisateurs_non_verifies": total - verified,
"repartition_roles": roles_stats,
"taux_verification": f"{(verified/total*100):.1f}%" if total > 0 else "0%",
}
except Exception as e:
logger.error(f"❌ Erreur stats utilisateurs: {e}")
raise HTTPException(500, str(e))
@app.get("/debug/users/{user_id}", response_model=UserResponse, tags=["Debug"])
async def lire_utilisateur_debug(
user_id: str, session: AsyncSession = Depends(get_session)
):
"""
👤 **ROUTE DEBUG** - Détails d'un utilisateur par ID
⚠️ Non protégée - à sécuriser en production
"""
from database import User
from sqlalchemy import select
try:
query = select(User).where(User.id == user_id)
result = await session.execute(query)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, f"Utilisateur {user_id} introuvable")
return UserResponse(
id=user.id,
email=user.email,
nom=user.nom,
prenom=user.prenom,
role=user.role,
is_verified=user.is_verified,
is_active=user.is_active,
created_at=user.created_at.isoformat() if user.created_at else "",
last_login=user.last_login.isoformat() if user.last_login else None,
failed_login_attempts=user.failed_login_attempts or 0,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Erreur lecture utilisateur: {e}")
raise HTTPException(500, str(e))
@app.get("/debug/database/check", tags=["Debug"])
async def verifier_integrite_database(session: AsyncSession = Depends(get_session)):
"""
🔍 Vérification de l'intégrité de la base de données
Retourne des statistiques détaillées sur toutes les tables
"""
from database import User, RefreshToken, LoginAttempt, EmailLog, SignatureLog
from sqlalchemy import func, text
try:
diagnostics = {}
# === TABLE USERS ===
# Compter tous les users
total_users = await session.execute(select(func.count(User.id)))
diagnostics["users"] = {"total": total_users.scalar(), "details": []}
# Lister tous les users avec détails
all_users = await session.execute(select(User))
users_list = all_users.scalars().all()
for u in users_list:
diagnostics["users"]["details"].append(
{
"id": u.id,
"email": u.email,
"nom": f"{u.prenom} {u.nom}",
"role": u.role,
"is_active": u.is_active,
"is_verified": u.is_verified,
"created_at": u.created_at.isoformat() if u.created_at else None,
"has_reset_token": u.reset_token is not None,
"has_verification_token": u.verification_token is not None,
}
)
# === TABLE REFRESH_TOKENS ===
total_tokens = await session.execute(select(func.count(RefreshToken.id)))
diagnostics["refresh_tokens"] = {"total": total_tokens.scalar()}
# === TABLE LOGIN_ATTEMPTS ===
total_attempts = await session.execute(select(func.count(LoginAttempt.id)))
diagnostics["login_attempts"] = {"total": total_attempts.scalar()}
# === TABLE EMAIL_LOGS ===
total_emails = await session.execute(select(func.count(EmailLog.id)))
diagnostics["email_logs"] = {"total": total_emails.scalar()}
# === TABLE SIGNATURE_LOGS ===
total_signatures = await session.execute(select(func.count(SignatureLog.id)))
diagnostics["signature_logs"] = {"total": total_signatures.scalar()}
# === VÉRIFIER LES FICHIERS SQLITE ===
import os
db_file = "sage_dataven.db"
diagnostics["database_file"] = {
"exists": os.path.exists(db_file),
"size_bytes": os.path.getsize(db_file) if os.path.exists(db_file) else 0,
"path": os.path.abspath(db_file),
}
# === TESTER UNE REQUÊTE RAW SQL ===
try:
raw_count = await session.execute(text("SELECT COUNT(*) FROM users"))
diagnostics["raw_sql_check"] = {
"users_count": raw_count.scalar(),
"status": "✅ Connexion DB OK",
}
except Exception as e:
diagnostics["raw_sql_check"] = {"status": "❌ Erreur", "error": str(e)}
return {
"success": True,
"timestamp": datetime.now().isoformat(),
"diagnostics": diagnostics,
}
except Exception as e:
logger.error(f"❌ Erreur diagnostic DB: {e}", exc_info=True)
raise HTTPException(500, f"Erreur diagnostic: {str(e)}")
@app.post("/debug/database/test-user-persistence", tags=["Debug"])
async def tester_persistance_utilisateur(session: AsyncSession = Depends(get_session)):
"""
🧪 Test de création/lecture/modification d'un utilisateur de test
Crée un utilisateur de test, le modifie, et vérifie la persistance
"""
import uuid
from database import User
from security.auth import hash_password
try:
test_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
# === ÉTAPE 1: CRÉATION ===
test_user = User(
id=str(uuid.uuid4()),
email=test_email,
hashed_password=hash_password("TestPassword123!"),
nom="Test",
prenom="User",
role="user",
is_verified=True,
is_active=True,
created_at=datetime.now(),
)
session.add(test_user)
await session.flush()
user_id = test_user.id
await session.commit()
logger.info(f"✅ ÉTAPE 1: User créé - {user_id}")
# === ÉTAPE 2: LECTURE ===
result = await session.execute(select(User).where(User.id == user_id))
loaded_user = result.scalar_one_or_none()
if not loaded_user:
return {
"success": False,
"error": "❌ User introuvable après création !",
"step": "LECTURE",
}
logger.info(f"✅ ÉTAPE 2: User chargé - {loaded_user.email}")
# === ÉTAPE 3: MODIFICATION (simulate reset password) ===
loaded_user.hashed_password = hash_password("NewPassword456!")
loaded_user.reset_token = None
loaded_user.reset_token_expires = None
session.add(loaded_user)
await session.flush()
await session.commit()
await session.refresh(loaded_user)
logger.info(f"✅ ÉTAPE 3: User modifié")
# === ÉTAPE 4: RE-LECTURE ===
result2 = await session.execute(select(User).where(User.id == user_id))
reloaded_user = result2.scalar_one_or_none()
if not reloaded_user:
return {
"success": False,
"error": "❌ User DISPARU après modification !",
"step": "RE-LECTURE",
"user_id": user_id,
}
logger.info(f"✅ ÉTAPE 4: User re-chargé - {reloaded_user.email}")
# === ÉTAPE 5: SUPPRESSION DU TEST ===
await session.delete(reloaded_user)
await session.commit()
logger.info(f"✅ ÉTAPE 5: User test supprimé")
return {
"success": True,
"message": "✅ Tous les tests de persistance sont OK",
"test_user_id": user_id,
"test_email": test_email,
"steps_completed": [
"1. Création",
"2. Lecture",
"3. Modification (reset password simulé)",
"4. Re-lecture (vérification persistance)",
"5. Suppression (cleanup)",
],
}
except Exception as e:
logger.error(f"❌ Erreur test persistance: {e}", exc_info=True)
# Rollback en cas d'erreur
await session.rollback()
return {
"success": False,
"error": str(e),
"traceback": str(e.__class__.__name__),
}
# =====================================================
# LANCEMENT
# =====================================================
if __name__ == "__main__":
uvicorn.run(
"api:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.api_reload,
)