feat(sage-gateway): add multi-tenant sage gateway configuration system

This commit is contained in:
Fanilo-Nantenaina 2025-12-31 10:19:16 +03:00
parent 4d2f13e6e3
commit 6b1710ad99
9 changed files with 1269 additions and 239 deletions

146
api.py
View file

@ -13,7 +13,8 @@ import io
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import os
from pathlib import Path as FilePath
from data.data import TAGS_METADATA, templates_signature_email
from routes.auth import router as auth_router
from config import settings
@ -28,7 +29,7 @@ from database import (
StatutSignature as StatutSignatureEnum,
)
from email_queue import email_queue
from sage_client import sage_client
from sage_client import sage_client, SageGatewayClient
from schemas import (
TiersDetails,
@ -72,10 +73,27 @@ from schemas import (
)
from utils.normalization import normaliser_type_tiers
from routes.sage_gateway import router as sage_gateway_router
from core.sage_context import (
get_sage_client_for_user,
get_gateway_context_for_user,
GatewayContext,
)
if os.path.exists("/app"):
LOGS_DIR = FilePath("/app/logs")
else:
LOGS_DIR = FilePath(__file__).resolve().parent / "logs"
LOGS_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("/app/logs/sage_api.log"), logging.StreamHandler()],
handlers=[
logging.FileHandler(LOGS_DIR / "sage_api.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
@ -101,9 +119,9 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="API Sage 100c Dataven",
version="2.0.0",
description="API de gestion commerciale - VPS Linux",
title="Sage Gateways",
version="3.0.0",
description="Configuration multi-tenant des connexions Sage Gateway",
lifespan=lifespan,
openapi_tags=TAGS_METADATA,
)
@ -118,6 +136,7 @@ app.add_middleware(
app.include_router(auth_router)
app.include_router(sage_gateway_router)
async def universign_envoyer(
@ -296,9 +315,12 @@ async def universign_statut(transaction_id: str) -> dict:
@app.get("/clients", response_model=List[ClientDetails], tags=["Clients"])
async def obtenir_clients(query: Optional[str] = Query(None)):
async def obtenir_clients(
query: Optional[str] = Query(None),
sage: SageGatewayClient = Depends(get_sage_client_for_user),
):
try:
clients = sage_client.lister_clients(filtre=query or "")
clients = sage.lister_clients(filtre=query or "")
return [ClientDetails(**c) for c in clients]
except Exception as e:
logger.error(f"Erreur recherche clients: {e}")
@ -2171,52 +2193,6 @@ async def previsualiser_email(preview: TemplatePreviewRequest):
}
@app.get("/health", tags=["System"])
async def health_check():
gateway_health = sage_client.health()
return {
"status": "healthy",
"sage_gateway": gateway_health,
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers),
"queue_size": email_queue.queue.qsize(),
},
"timestamp": datetime.now().isoformat(),
}
@app.get("/", tags=["System"])
async def root():
return {
"api": "Sage 100c Dataven - VPS Linux",
"version": "2.0.0",
"documentation": "/docs",
"health": "/health",
}
@app.get("/admin/cache/info", tags=["Admin"])
async def info_cache():
try:
cache_info = sage_client.get_cache_info()
return cache_info
except Exception as e:
logger.error(f"Erreur info cache: {e}")
raise HTTPException(500, str(e))
@app.get("/admin/queue/status", tags=["Admin"])
async def statut_queue():
return {
"queue_size": email_queue.queue.qsize(),
"workers": len(email_queue.workers),
"running": email_queue.running,
}
@app.get("/prospects", tags=["Prospects"])
async def rechercher_prospects(query: Optional[str] = Query(None)):
try:
@ -3176,6 +3152,68 @@ async def lire_tiers_detail(code: str):
raise HTTPException(500, str(e))
@app.get("/sage/current-config", tags=["System"])
async def get_current_sage_config(
ctx: GatewayContext = Depends(get_gateway_context_for_user),
):
return {
"source": "user_gateway" if not ctx.is_fallback else "fallback_env",
"gateway_id": ctx.gateway_id,
"gateway_name": ctx.gateway_name,
"gateway_url": ctx.url,
"user_id": ctx.user_id,
}
@app.get("/health", tags=["System"])
async def health_check(
sage: SageGatewayClient = Depends(get_sage_client_for_user),
):
gateway_health = sage.health()
return {
"status": "healthy",
"sage_gateway": gateway_health,
"using_gateway_id": sage.gateway_id,
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers),
"queue_size": email_queue.queue.qsize(),
},
"timestamp": datetime.now().isoformat(),
}
@app.get("/", tags=["System"])
async def root():
return {
"api": "Sage 100c Dataven - VPS Linux",
"version": "2.0.0",
"documentation": "/docs",
"health": "/health",
}
@app.get("/admin/cache/info", tags=["Admin"])
async def info_cache():
try:
cache_info = sage_client.get_cache_info()
return cache_info
except Exception as e:
logger.error(f"Erreur info cache: {e}")
raise HTTPException(500, str(e))
@app.get("/admin/queue/status", tags=["Admin"])
async def statut_queue():
return {
"queue_size": email_queue.queue.qsize(),
"workers": len(email_queue.workers),
"running": email_queue.running,
}
if __name__ == "__main__":
uvicorn.run(
"api:app",

77
core/sage_context.py Normal file
View file

@ -0,0 +1,77 @@
from dataclasses import dataclass
from typing import Optional
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session, User
from core.dependencies import get_current_user
from sage_client import SageGatewayClient
from config import settings
import logging
logger = logging.getLogger(__name__)
@dataclass
class GatewayContext:
url: str
token: str
gateway_id: Optional[str] = None
gateway_name: Optional[str] = None
user_id: Optional[str] = None
is_fallback: bool = False
async def get_sage_client_for_user(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
) -> SageGatewayClient:
from services.sage_gateway import SageGatewayService
service = SageGatewayService(session)
active_gateway = await service.get_active_gateway(user.id)
if active_gateway:
logger.debug(f"Gateway active: {active_gateway.name} pour {user.email}")
return SageGatewayClient(
gateway_url=active_gateway.gateway_url,
gateway_token=active_gateway.gateway_token,
gateway_id=active_gateway.id,
)
logger.debug(f"Fallback .env pour {user.email}")
return SageGatewayClient()
async def get_gateway_context_for_user(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
) -> GatewayContext:
from services.sage_gateway import SageGatewayService
service = SageGatewayService(session)
active_gateway = await service.get_active_gateway(user.id)
if active_gateway:
return GatewayContext(
url=active_gateway.gateway_url,
token=active_gateway.gateway_token,
gateway_id=active_gateway.id,
gateway_name=active_gateway.name,
user_id=user.id,
is_fallback=False,
)
return GatewayContext(
url=settings.sage_gateway_url,
token=settings.sage_gateway_token,
gateway_id=None,
gateway_name="Fallback (.env)",
user_id=user.id,
is_fallback=True,
)
async def get_sage_client_public() -> SageGatewayClient:
return SageGatewayClient()

View file

@ -19,7 +19,6 @@ from database.Enum.status import (
StatutEmail,
StatutSignature,
)
from database.models.workflow import WorkflowLog
__all__ = [

View file

@ -17,3 +17,5 @@ bcrypt==4.2.0
sqlalchemy
aiosqlite
tenacity
httpx

323
routes/sage_gateway.py Normal file
View file

@ -0,0 +1,323 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
import logging
from database import get_session, User
from core.dependencies import get_current_user
from services.sage_gateway import (
SageGatewayService,
gateway_response_from_model,
)
from schemas import (
SageGatewayCreate,
SageGatewayUpdate,
SageGatewayResponse,
SageGatewayListResponse,
SageGatewayHealthCheck,
SageGatewayTestRequest,
SageGatewayStatsResponse,
CurrentGatewayInfo,
)
from config import settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sage-gateways", tags=["Sage Gateways"])
@router.post(
"", response_model=SageGatewayResponse, status_code=status.HTTP_201_CREATED
)
async def create_gateway(
data: SageGatewayCreate,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateway = await service.create(user.id, data.model_dump())
logger.info(f"Gateway créée: {gateway.name} par {user.email}")
return SageGatewayResponse(**gateway_response_from_model(gateway))
@router.get("", response_model=SageGatewayListResponse)
async def list_gateways(
include_deleted: bool = Query(False, description="Inclure les gateways supprimées"),
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateways = await service.list_for_user(user.id, include_deleted)
active = await service.get_active_gateway(user.id)
items = [SageGatewayResponse(**gateway_response_from_model(g)) for g in gateways]
return SageGatewayListResponse(
items=items,
total=len(items),
active_gateway=SageGatewayResponse(**gateway_response_from_model(active))
if active
else None,
using_fallback=active is None,
)
@router.get("/current", response_model=CurrentGatewayInfo)
async def get_current_gateway(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
url, token, gateway_id = await service.get_effective_gateway_config(user.id)
if gateway_id:
gateway = await service.get_by_id(gateway_id, user.id)
return CurrentGatewayInfo(
source="user_config",
gateway_id=gateway_id,
gateway_name=gateway.name if gateway else None,
gateway_url=url,
is_healthy=gateway.last_health_status if gateway else None,
user_id=user.id,
)
else:
return CurrentGatewayInfo(
source="fallback",
gateway_id=None,
gateway_name="Configuration .env (défaut)",
gateway_url=url,
is_healthy=None,
user_id=user.id,
)
@router.get("/stats", response_model=SageGatewayStatsResponse)
async def get_gateway_stats(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
stats = await service.get_stats(user.id)
return SageGatewayStatsResponse(**stats)
@router.get("/{gateway_id}", response_model=SageGatewayResponse)
async def get_gateway(
gateway_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateway = await service.get_by_id(gateway_id, user.id)
if not gateway:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
return SageGatewayResponse(**gateway_response_from_model(gateway))
@router.put("/{gateway_id}", response_model=SageGatewayResponse)
async def update_gateway(
gateway_id: str,
data: SageGatewayUpdate,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
update_data = {k: v for k, v in data.model_dump().items() if v is not None}
if not update_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Aucun champ à modifier"
)
gateway = await service.update(gateway_id, user.id, update_data)
if not gateway:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
logger.info(f"Gateway mise à jour: {gateway.name} par {user.email}")
return SageGatewayResponse(**gateway_response_from_model(gateway))
@router.delete("/{gateway_id}")
async def delete_gateway(
gateway_id: str,
hard_delete: bool = Query(False, description="Suppression définitive"),
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
success = await service.delete(gateway_id, user.id, hard_delete)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
logger.info(
f"Gateway supprimée: {gateway_id} par {user.email} (hard={hard_delete})"
)
return {
"success": True,
"message": f"Gateway supprimée {'définitivement' if hard_delete else '(soft delete)'}",
}
@router.post("/{gateway_id}/activate", response_model=SageGatewayResponse)
async def activate_gateway(
gateway_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateway = await service.activate(gateway_id, user.id)
if not gateway:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
logger.info(f"Gateway activée: {gateway.name} par {user.email}")
return SageGatewayResponse(**gateway_response_from_model(gateway))
@router.post("/{gateway_id}/deactivate", response_model=SageGatewayResponse)
async def deactivate_gateway(
gateway_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateway = await service.deactivate(gateway_id, user.id)
if not gateway:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
logger.info(f"Gateway désactivée: {gateway.name} - fallback actif")
return SageGatewayResponse(**gateway_response_from_model(gateway))
@router.post("/deactivate-all")
async def deactivate_all_gateways(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
await service._deactivate_all_for_user(user.id)
await session.commit()
logger.info(
f"Toutes les gateways désactivées pour {user.email} - fallback .env actif"
)
return {
"success": True,
"message": "Toutes les gateways désactivées. Le fallback .env est maintenant utilisé.",
"fallback_url": settings.sage_gateway_url,
}
@router.post("/{gateway_id}/health-check", response_model=SageGatewayHealthCheck)
async def check_gateway_health(
gateway_id: str,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
from datetime import datetime
service = SageGatewayService(session)
gateway = await service.get_by_id(gateway_id, user.id)
if not gateway:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Gateway {gateway_id} introuvable",
)
result = await service.health_check(gateway_id, user.id)
return SageGatewayHealthCheck(
gateway_id=gateway_id,
gateway_name=gateway.name,
status=result.get("status", "unknown"),
response_time_ms=result.get("response_time_ms"),
sage_version=result.get("sage_version"),
error=result.get("error"),
checked_at=datetime.now(),
)
@router.post("/test", response_model=dict)
async def test_gateway_config(
data: SageGatewayTestRequest,
user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
):
service = SageGatewayService(session)
result = await service.test_gateway(data.gateway_url, data.gateway_token)
return {"tested_url": data.gateway_url, "result": result}
@router.post("/health-check-all")
async def check_all_gateways_health(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
service = SageGatewayService(session)
gateways = await service.list_for_user(user.id)
results = []
for gateway in gateways:
result = await service.health_check(gateway.id, user.id)
results.append(
{
"gateway_id": gateway.id,
"gateway_name": gateway.name,
"is_active": gateway.is_active,
**result,
}
)
healthy_count = sum(1 for r in results if r.get("status") == "healthy")
return {
"total": len(results),
"healthy": healthy_count,
"unhealthy": len(results) - healthy_count,
"results": results,
}
@router.get("/fallback/info")
async def get_fallback_info(
user: User = Depends(get_current_user),
):
return {
"source": ".env",
"gateway_url": settings.sage_gateway_url,
"token_configured": bool(settings.sage_gateway_token),
"token_preview": f"****{settings.sage_gateway_token[-4:]}"
if settings.sage_gateway_token
else None,
"description": "Configuration par défaut utilisée quand aucune gateway utilisateur n'est active",
}

View file

@ -1,3 +1,4 @@
# sage_client.py
import requests
from typing import Dict, List, Optional
from config import settings
@ -7,14 +8,28 @@ logger = logging.getLogger(__name__)
class SageGatewayClient:
def __init__(self):
self.url = settings.sage_gateway_url.rstrip("/")
def __init__(
self,
gateway_url: Optional[str] = None,
gateway_token: Optional[str] = None,
gateway_id: Optional[str] = None,
):
self.url = (gateway_url or settings.sage_gateway_url).rstrip("/")
self.token = gateway_token or settings.sage_gateway_token
self.gateway_id = gateway_id
self.headers = {
"X-Sage-Token": settings.sage_gateway_token,
"X-Sage-Token": self.token,
"Content-Type": "application/json",
}
self.timeout = 30
@classmethod
def from_context(
cls, url: str, token: str, gateway_id: Optional[str] = None
) -> "SageGatewayClient":
return cls(gateway_url=url, gateway_token=token, gateway_id=gateway_id)
def _post(self, endpoint: str, data: dict = None, retries: int = 3) -> dict:
import time
@ -63,12 +78,29 @@ class SageGatewayClient:
def lire_client(self, code: str) -> Optional[Dict]:
return self._post("/sage/clients/get", {"code": code}).get("data")
def creer_client(self, client_data: Dict) -> Dict:
return self._post("/sage/clients/create", client_data).get("data", {})
def modifier_client(self, code: str, client_data: Dict) -> Dict:
return self._post(
"/sage/clients/update", {"code": code, "client_data": client_data}
).get("data", {})
def lister_articles(self, filtre: str = "") -> List[Dict]:
return self._post("/sage/articles/list", {"filtre": filtre}).get("data", [])
def lire_article(self, ref: str) -> Optional[Dict]:
return self._post("/sage/articles/get", {"code": ref}).get("data")
def creer_article(self, article_data: Dict) -> Dict:
return self._post("/sage/articles/create", article_data).get("data", {})
def modifier_article(self, reference: str, article_data: Dict) -> Dict:
return self._post(
"/sage/articles/update",
{"reference": reference, "article_data": article_data},
).get("data", {})
def creer_devis(self, devis_data: Dict) -> Dict:
return self._post("/sage/devis/create", devis_data).get("data", {})
@ -86,6 +118,81 @@ class SageGatewayClient:
payload["statut"] = statut
return self._post("/sage/devis/list", payload).get("data", [])
def modifier_devis(self, numero: str, devis_data: Dict) -> Dict:
return self._post(
"/sage/devis/update", {"numero": numero, "devis_data": devis_data}
).get("data", {})
def lister_commandes(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/commandes/list", payload).get("data", [])
def creer_commande(self, commande_data: Dict) -> Dict:
return self._post("/sage/commandes/create", commande_data).get("data", {})
def modifier_commande(self, numero: str, commande_data: Dict) -> Dict:
return self._post(
"/sage/commandes/update", {"numero": numero, "commande_data": commande_data}
).get("data", {})
def lister_factures(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/factures/list", payload).get("data", [])
def creer_facture(self, facture_data: Dict) -> Dict:
return self._post("/sage/factures/create", facture_data).get("data", {})
def modifier_facture(self, numero: str, facture_data: Dict) -> Dict:
return self._post(
"/sage/factures/update", {"numero": numero, "facture_data": facture_data}
).get("data", {})
def lister_livraisons(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/livraisons/list", payload).get("data", [])
def creer_livraison(self, livraison_data: Dict) -> Dict:
return self._post("/sage/livraisons/create", livraison_data).get("data", {})
def modifier_livraison(self, numero: str, livraison_data: Dict) -> Dict:
return self._post(
"/sage/livraisons/update",
{"numero": numero, "livraison_data": livraison_data},
).get("data", {})
def lister_avoirs(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/avoirs/list", payload).get("data", [])
def creer_avoir(self, avoir_data: Dict) -> Dict:
return self._post("/sage/avoirs/create", avoir_data).get("data", {})
def modifier_avoir(self, numero: str, avoir_data: Dict) -> Dict:
return self._post(
"/sage/avoirs/update", {"numero": numero, "avoir_data": avoir_data}
).get("data", {})
def lire_document(self, numero: str, type_doc: int) -> Optional[Dict]:
return self._post(
"/sage/documents/get", {"numero": numero, "type_doc": type_doc}
).get("data")
def changer_statut_document(
self, document_type_code: int, numero: str, nouveau_statut: int
) -> Dict:
@ -106,11 +213,6 @@ class SageGatewayClient:
logger.error(f"Erreur changement statut: {e}")
raise
def lire_document(self, numero: str, type_doc: int) -> Optional[Dict]:
return self._post(
"/sage/documents/get", {"numero": numero, "type_doc": type_doc}
).get("data")
def transformer_document(
self, numero_source: str, type_source: int, type_cible: int
) -> Dict:
@ -145,140 +247,15 @@ class SageGatewayClient:
)
return resp.get("success", False)
def lister_commandes(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/commandes/list", payload).get("data", [])
def lister_factures(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/factures/list", payload).get("data", [])
def mettre_a_jour_derniere_relance(self, doc_id: str, type_doc: int) -> bool:
resp = self._post(
"/sage/documents/derniere-relance", {"doc_id": doc_id, "type_doc": type_doc}
)
return resp.get("success", False)
def lire_contact_client(self, code_client: str) -> Optional[Dict]:
return self._post("/sage/contact/read", {"code": code_client}).get("data")
def lire_remise_max_client(self, code_client: str) -> float:
result = self._post("/sage/client/remise-max", {"code": code_client})
return result.get("data", {}).get("remise_max", 10.0)
def lister_prospects(self, filtre: str = "") -> List[Dict]:
return self._post("/sage/prospects/list", {"filtre": filtre}).get("data", [])
def lire_prospect(self, code: str) -> Optional[Dict]:
return self._post("/sage/prospects/get", {"code": code}).get("data")
def lister_fournisseurs(self, filtre: str = "") -> List[Dict]:
return self._post("/sage/fournisseurs/list", {"filtre": filtre}).get("data", [])
def lire_fournisseur(self, code: str) -> Optional[Dict]:
return self._post("/sage/fournisseurs/get", {"code": code}).get("data")
def creer_fournisseur(self, fournisseur_data: Dict) -> Dict:
return self._post("/sage/fournisseurs/create", fournisseur_data).get("data", {})
def modifier_fournisseur(self, code: str, fournisseur_data: Dict) -> Dict:
return self._post(
"/sage/fournisseurs/update",
{"code": code, "fournisseur_data": fournisseur_data},
).get("data", {})
def lister_avoirs(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/avoirs/list", payload).get("data", [])
def lire_avoir(self, numero: str) -> Optional[Dict]:
return self._post("/sage/avoirs/get", {"code": numero}).get("data")
def lister_livraisons(
self, limit: int = 100, statut: Optional[int] = None
) -> List[Dict]:
payload = {"limit": limit}
if statut is not None:
payload["statut"] = statut
return self._post("/sage/livraisons/list", payload).get("data", [])
def lire_livraison(self, numero: str) -> Optional[Dict]:
return self._post("/sage/livraisons/get", {"code": numero}).get("data")
def refresh_cache(self) -> Dict:
return self._post("/sage/cache/refresh")
def get_cache_info(self) -> Dict:
return self._get("/sage/cache/info").get("data", {})
def health(self) -> dict:
try:
r = requests.get(f"{self.url}/health", timeout=5)
return r.json()
except Exception:
return {"status": "down"}
def creer_client(self, client_data: Dict) -> Dict:
return self._post("/sage/clients/create", client_data).get("data", {})
def modifier_client(self, code: str, client_data: Dict) -> Dict:
return self._post(
"/sage/clients/update", {"code": code, "client_data": client_data}
).get("data", {})
def modifier_devis(self, numero: str, devis_data: Dict) -> Dict:
return self._post(
"/sage/devis/update", {"numero": numero, "devis_data": devis_data}
).get("data", {})
def creer_commande(self, commande_data: Dict) -> Dict:
return self._post("/sage/commandes/create", commande_data).get("data", {})
def modifier_commande(self, numero: str, commande_data: Dict) -> Dict:
return self._post(
"/sage/commandes/update", {"numero": numero, "commande_data": commande_data}
).get("data", {})
def creer_livraison(self, livraison_data: Dict) -> Dict:
return self._post("/sage/livraisons/create", livraison_data).get("data", {})
def modifier_livraison(self, numero: str, livraison_data: Dict) -> Dict:
return self._post(
"/sage/livraisons/update",
{"numero": numero, "livraison_data": livraison_data},
).get("data", {})
def creer_avoir(self, avoir_data: Dict) -> Dict:
return self._post("/sage/avoirs/create", avoir_data).get("data", {})
def modifier_avoir(self, numero: str, avoir_data: Dict) -> Dict:
return self._post(
"/sage/avoirs/update", {"numero": numero, "avoir_data": avoir_data}
).get("data", {})
def creer_facture(self, facture_data: Dict) -> Dict:
return self._post("/sage/factures/create", facture_data).get("data", {})
def modifier_facture(self, numero: str, facture_data: Dict) -> Dict:
return self._post(
"/sage/factures/update", {"numero": numero, "facture_data": facture_data}
).get("data", {})
def generer_pdf_document(self, doc_id: str, type_doc: int) -> bytes:
try:
logger.info(f"📄 Demande génération PDF: doc_id={doc_id}, type={type_doc}")
logger.info(f"Demande génération PDF: doc_id={doc_id}, type={type_doc}")
r = requests.post(
f"{self.url}/sage/documents/generate-pdf",
@ -311,7 +288,7 @@ class SageGatewayClient:
return pdf_bytes
except requests.exceptions.Timeout:
logger.error(f"⏱️ Timeout génération PDF pour {doc_id}")
logger.error(f"Timeout génération PDF pour {doc_id}")
raise RuntimeError(
f"Timeout lors de la génération du PDF (>60s). "
f"Le document {doc_id} est peut-être trop volumineux."
@ -325,15 +302,70 @@ class SageGatewayClient:
logger.error(f"Erreur génération PDF: {e}", exc_info=True)
raise
def creer_article(self, article_data: Dict) -> Dict:
return self._post("/sage/articles/create", article_data).get("data", {})
def lister_prospects(self, filtre: str = "") -> List[Dict]:
return self._post("/sage/prospects/list", {"filtre": filtre}).get("data", [])
def modifier_article(self, reference: str, article_data: Dict) -> Dict:
def lire_prospect(self, code: str) -> Optional[Dict]:
return self._post("/sage/prospects/get", {"code": code}).get("data")
def lister_fournisseurs(self, filtre: str = "") -> List[Dict]:
return self._post("/sage/fournisseurs/list", {"filtre": filtre}).get("data", [])
def lire_fournisseur(self, code: str) -> Optional[Dict]:
return self._post("/sage/fournisseurs/get", {"code": code}).get("data")
def creer_fournisseur(self, fournisseur_data: Dict) -> Dict:
return self._post("/sage/fournisseurs/create", fournisseur_data).get("data", {})
def modifier_fournisseur(self, code: str, fournisseur_data: Dict) -> Dict:
return self._post(
"/sage/articles/update",
{"reference": reference, "article_data": article_data},
"/sage/fournisseurs/update",
{"code": code, "fournisseur_data": fournisseur_data},
).get("data", {})
def lister_tiers(
self, type_tiers: Optional[str] = None, filtre: str = ""
) -> List[Dict]:
return self._post(
"/sage/tiers/list", {"type_tiers": type_tiers, "filtre": filtre}
).get("data", [])
def lire_tiers(self, code: str) -> Optional[Dict]:
return self._post("/sage/tiers/get", {"code": code}).get("data")
def lire_contact_client(self, code_client: str) -> Optional[Dict]:
return self._post("/sage/contact/read", {"code": code_client}).get("data")
def creer_contact(self, contact_data: Dict) -> Dict:
return self._post("/sage/contacts/create", contact_data)
def lister_contacts(self, numero: str) -> List[Dict]:
return self._post("/sage/contacts/list", {"numero": numero}).get("data", [])
def obtenir_contact(self, numero: str, contact_numero: int) -> Dict:
result = self._post(
"/sage/contacts/get", {"numero": numero, "contact_numero": contact_numero}
)
return result.get("data") if result.get("success") else None
def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict:
return self._post(
"/sage/contacts/update",
{"numero": numero, "contact_numero": contact_numero, "updates": updates},
)
def supprimer_contact(self, numero: str, contact_numero: int) -> Dict:
return self._post(
"/sage/contacts/delete",
{"numero": numero, "contact_numero": contact_numero},
)
def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict:
return self._post(
"/sage/contacts/set-default",
{"numero": numero, "contact_numero": contact_numero},
)
def lister_familles(self, filtre: str = "") -> List[Dict]:
return self._get("/sage/familles", params={"filtre": filtre}).get("data", [])
@ -365,45 +397,22 @@ class SageGatewayClient:
logger.error(f"Erreur lecture mouvement {numero}: {e}")
return None
def creer_contact(self, contact_data: Dict) -> Dict:
return self._post("/sage/contacts/create", contact_data)
def lire_remise_max_client(self, code_client: str) -> float:
result = self._post("/sage/client/remise-max", {"code": code_client})
return result.get("data", {}).get("remise_max", 10.0)
def lister_contacts(self, numero: str) -> List[Dict]:
return self._post("/sage/contacts/list", {"numero": numero}).get("data", [])
def refresh_cache(self) -> Dict:
return self._post("/sage/cache/refresh")
def obtenir_contact(self, numero: str, contact_numero: int) -> Dict:
result = self._post(
"/sage/contacts/get", {"numero": numero, "contact_numero": contact_numero}
)
return result.get("data") if result.get("success") else None
def get_cache_info(self) -> Dict:
return self._get("/sage/cache/info").get("data", {})
def modifier_contact(self, numero: str, contact_numero: int, updates: Dict) -> Dict:
return self._post(
"/sage/contacts/update",
{"numero": numero, "contact_numero": contact_numero, "updates": updates},
)
def supprimer_contact(self, numero: str, contact_numero: int) -> Dict:
return self._post(
"/sage/contacts/delete",
{"numero": numero, "contact_numero": contact_numero},
)
def definir_contact_defaut(self, numero: str, contact_numero: int) -> Dict:
return self._post(
"/sage/contacts/set-default",
{"numero": numero, "contact_numero": contact_numero},
)
def lister_tiers(
self, type_tiers: Optional[str] = None, filtre: str = ""
) -> List[Dict]:
return self._post(
"/sage/tiers/list", {"type_tiers": type_tiers, "filtre": filtre}
).get("data", [])
def lire_tiers(self, code: str) -> Optional[Dict]:
return self._post("/sage/tiers/get", {"code": code}).get("data")
def health(self) -> dict:
try:
r = requests.get(f"{self.url}/health", timeout=5)
return r.json()
except Exception:
return {"status": "down"}
sage_client = SageGatewayClient()

View file

@ -42,6 +42,16 @@ from schemas.articles.famille_article import (
FamilleListResponse,
)
from schemas.sage.sage_gateway import (
SageGatewayCreate,
SageGatewayUpdate,
SageGatewayResponse,
SageGatewayListResponse,
SageGatewayHealthCheck,
SageGatewayTestRequest,
SageGatewayStatsResponse,
CurrentGatewayInfo,
)
__all__ = [
"TiersDetails",
@ -87,4 +97,12 @@ __all__ = [
"FamilleListResponse",
"ContactCreate",
"ContactUpdate",
"SageGatewayCreate",
"SageGatewayUpdate",
"SageGatewayResponse",
"SageGatewayListResponse",
"SageGatewayHealthCheck",
"SageGatewayTestRequest",
"SageGatewayStatsResponse",
"CurrentGatewayInfo",
]

View file

@ -0,0 +1,164 @@
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
class GatewayHealthStatus(str, Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
# === CREATE ===
class SageGatewayCreate(BaseModel):
name: str = Field(
..., min_length=2, max_length=100, description="Nom de la gateway"
)
description: Optional[str] = Field(None, max_length=500)
gateway_url: str = Field(
..., description="URL de la gateway Sage (ex: http://192.168.1.50:8100)"
)
gateway_token: str = Field(
..., min_length=10, description="Token d'authentification"
)
sage_database: Optional[str] = Field(None, max_length=255)
sage_company: Optional[str] = Field(None, max_length=255)
is_active: bool = Field(False, description="Activer immédiatement cette gateway")
is_default: bool = Field(False, description="Définir comme gateway par défaut")
priority: int = Field(0, ge=0, le=100)
extra_config: Optional[Dict[str, Any]] = Field(
None, description="Configuration JSON additionnelle"
)
allowed_ips: Optional[List[str]] = Field(
None, description="Liste des IPs autorisées"
)
@field_validator("gateway_url")
@classmethod
def validate_url(cls, v):
if not v.startswith(("http://", "https://")):
raise ValueError("L'URL doit commencer par http:// ou https://")
return v.rstrip("/")
class SageGatewayUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=100)
description: Optional[str] = Field(None, max_length=500)
gateway_url: Optional[str] = None
gateway_token: Optional[str] = Field(None, min_length=10)
sage_database: Optional[str] = None
sage_company: Optional[str] = None
is_default: Optional[bool] = None
priority: Optional[int] = Field(None, ge=0, le=100)
extra_config: Optional[Dict[str, Any]] = None
allowed_ips: Optional[List[str]] = None
@field_validator("gateway_url")
@classmethod
def validate_url(cls, v):
if v and not v.startswith(("http://", "https://")):
raise ValueError("L'URL doit commencer par http:// ou https://")
return v.rstrip("/") if v else v
# === RESPONSE ===
class SageGatewayResponse(BaseModel):
id: str
user_id: str
name: str
description: Optional[str] = None
gateway_url: str
token_preview: str
sage_database: Optional[str] = None
sage_company: Optional[str] = None
is_active: bool
is_default: bool
priority: int
health_status: GatewayHealthStatus
last_health_check: Optional[datetime] = None
last_error: Optional[str] = None
total_requests: int
successful_requests: int
failed_requests: int
success_rate: float
last_used_at: Optional[datetime] = None
extra_config: Optional[Dict[str, Any]] = None
allowed_ips: Optional[List[str]] = None
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class SageGatewayListResponse(BaseModel):
items: List[SageGatewayResponse]
total: int
active_gateway: Optional[SageGatewayResponse] = None
using_fallback: bool = False
class SageGatewayHealthCheck(BaseModel):
gateway_id: str
gateway_name: str
status: GatewayHealthStatus
response_time_ms: Optional[float] = None
sage_version: Optional[str] = None
error: Optional[str] = None
checked_at: datetime
class SageGatewayActivateRequest(BaseModel):
gateway_id: str
class SageGatewayTestRequest(BaseModel):
gateway_url: str
gateway_token: str
@field_validator("gateway_url")
@classmethod
def validate_url(cls, v):
if not v.startswith(("http://", "https://")):
raise ValueError("L'URL doit commencer par http:// ou https://")
return v.rstrip("/")
class SageGatewayStatsResponse(BaseModel):
total_gateways: int
active_gateways: int
total_requests: int
successful_requests: int
failed_requests: int
average_success_rate: float
most_used_gateway: Optional[str] = None
last_activity: Optional[datetime] = None
class CurrentGatewayInfo(BaseModel):
source: str
gateway_id: Optional[str] = None
gateway_name: Optional[str] = None
gateway_url: str
is_healthy: Optional[bool] = None
user_id: Optional[str] = None

400
services/sage_gateway.py Normal file
View file

@ -0,0 +1,400 @@
from __future__ import annotations
import uuid
import json
import httpx
from datetime import datetime
from typing import Optional, Tuple, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, and_
import logging
from config import settings
from database import SageGatewayConfig
logger = logging.getLogger(__name__)
class SageGatewayService:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, user_id: str, data: dict) -> SageGatewayConfig:
"""Créer une nouvelle configuration gateway"""
if data.get("is_active"):
await self._deactivate_all_for_user(user_id)
if data.get("is_default"):
await self._unset_default_for_user(user_id)
extra_config = data.pop("extra_config", None)
allowed_ips = data.pop("allowed_ips", None)
gateway = SageGatewayConfig(
id=str(uuid.uuid4()),
user_id=user_id,
created_by=user_id,
extra_config=json.dumps(extra_config) if extra_config else None,
allowed_ips=json.dumps(allowed_ips) if allowed_ips else None,
**data,
)
self.session.add(gateway)
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway créée: {gateway.name} pour user {user_id}")
return gateway
async def get_by_id(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.id == gateway_id,
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_deleted,
)
)
)
return result.scalar_one_or_none()
async def list_for_user(
self, user_id: str, include_deleted: bool = False
) -> List[SageGatewayConfig]:
query = select(SageGatewayConfig).where(SageGatewayConfig.user_id == user_id)
if not include_deleted:
query = query.where(SageGatewayConfig.is_deleted)
query = query.order_by(
SageGatewayConfig.is_active.desc(),
SageGatewayConfig.priority.desc(),
SageGatewayConfig.name,
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update(
self, gateway_id: str, user_id: str, data: dict
) -> Optional[SageGatewayConfig]:
"""Mettre à jour une gateway"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
if data.get("is_default") and not gateway.is_default:
await self._unset_default_for_user(user_id)
if "extra_config" in data:
data["extra_config"] = (
json.dumps(data["extra_config"]) if data["extra_config"] else None
)
if "allowed_ips" in data:
data["allowed_ips"] = (
json.dumps(data["allowed_ips"]) if data["allowed_ips"] else None
)
for key, value in data.items():
if value is not None and hasattr(gateway, key):
setattr(gateway, key, value)
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway mise à jour: {gateway.name}")
return gateway
async def delete(
self, gateway_id: str, user_id: str, hard_delete: bool = False
) -> bool:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return False
if hard_delete:
await self.session.delete(gateway)
else:
gateway.is_deleted = True
gateway.deleted_at = datetime.now()
gateway.is_active = False
await self.session.commit()
logger.info(f"Gateway supprimée: {gateway.name} (hard={hard_delete})")
return True
async def activate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
"""Activer une gateway (désactive les autres)"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
await self._deactivate_all_for_user(user_id)
gateway.is_active = True
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway activée: {gateway.name} pour user {user_id}")
return gateway
async def deactivate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
gateway.is_active = False
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway désactivée: {gateway.name} - fallback .env actif")
return gateway
async def get_active_gateway(self, user_id: str) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_active,
SageGatewayConfig.is_deleted,
)
)
)
return result.scalar_one_or_none()
async def get_effective_gateway_config(
self, user_id: Optional[str]
) -> Tuple[str, str, Optional[str]]:
if user_id:
active = await self.get_active_gateway(user_id)
if active:
active.total_requests += 1
active.last_used_at = datetime.now()
await self.session.commit()
return (active.gateway_url, active.gateway_token, active.id)
return (settings.sage_gateway_url, settings.sage_gateway_token, None)
async def health_check(self, gateway_id: str, user_id: str) -> dict:
import time
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return {"error": "Gateway introuvable"}
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{gateway.gateway_url}/health",
headers={"Authorization": f"Bearer {gateway.gateway_token}"},
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
gateway.last_health_check = datetime.now()
gateway.last_health_status = True
gateway.last_error = None
await self.session.commit()
return {
"status": "healthy",
"response_time_ms": round(response_time, 2),
"sage_version": data.get("sage_version"),
"details": data,
}
else:
raise Exception(f"HTTP {response.status_code}")
except Exception as e:
gateway.last_health_check = datetime.now()
gateway.last_health_status = False
gateway.last_error = str(e)
await self.session.commit()
return {
"status": "unhealthy",
"error": str(e),
"response_time_ms": round((time.time() - start_time) * 1000, 2),
}
async def test_gateway(self, url: str, token: str) -> dict:
"""Tester une configuration gateway avant création"""
import time
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{url}/health", headers={"Authorization": f"Bearer {token}"}
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
return {
"success": True,
"status": "healthy",
"response_time_ms": round(response_time, 2),
"details": response.json(),
}
else:
return {
"success": False,
"status": "unhealthy",
"error": f"HTTP {response.status_code}: {response.text}",
}
except httpx.TimeoutException:
return {
"success": False,
"status": "timeout",
"error": "Connexion timeout (10s)",
}
except httpx.ConnectError as e:
return {
"success": False,
"status": "unreachable",
"error": f"Impossible de se connecter: {e}",
}
except Exception as e:
return {"success": False, "status": "error", "error": str(e)}
async def record_request(self, gateway_id: str, success: bool) -> None:
"""Enregistrer une requête (succès/échec)"""
if not gateway_id:
return
result = await self.session.execute(
select(SageGatewayConfig).where(SageGatewayConfig.id == gateway_id)
)
gateway = result.scalar_one_or_none()
if gateway:
gateway.total_requests += 1
if success:
gateway.successful_requests += 1
else:
gateway.failed_requests += 1
gateway.last_used_at = datetime.now()
await self.session.commit()
async def get_stats(self, user_id: str) -> dict:
"""Statistiques d'utilisation pour un utilisateur"""
gateways = await self.list_for_user(user_id)
total_requests = sum(g.total_requests for g in gateways)
successful = sum(g.successful_requests for g in gateways)
failed = sum(g.failed_requests for g in gateways)
most_used = max(gateways, key=lambda g: g.total_requests) if gateways else None
last_activity = max(
(g.last_used_at for g in gateways if g.last_used_at), default=None
)
return {
"total_gateways": len(gateways),
"active_gateways": sum(1 for g in gateways if g.is_active),
"total_requests": total_requests,
"successful_requests": successful,
"failed_requests": failed,
"average_success_rate": (successful / total_requests * 100)
if total_requests > 0
else 0,
"most_used_gateway": most_used.name if most_used else None,
"last_activity": last_activity,
}
async def _deactivate_all_for_user(self, user_id: str) -> None:
"""Désactiver toutes les gateways d'un utilisateur"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_active=False)
)
async def _unset_default_for_user(self, user_id: str) -> None:
"""Retirer le flag default de toutes les gateways"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_default=False)
)
def gateway_response_from_model(gateway: SageGatewayConfig) -> dict:
"""Convertir un model en réponse API (masque le token)"""
token_preview = (
f"****{gateway.gateway_token[-4:]}" if gateway.gateway_token else "****"
)
success_rate = 0.0
if gateway.total_requests > 0:
success_rate = (gateway.successful_requests / gateway.total_requests) * 100
if gateway.last_health_status is None:
health_status = "unknown"
elif gateway.last_health_status:
health_status = "healthy"
else:
health_status = "unhealthy"
extra_config = None
if gateway.extra_config:
try:
extra_config = json.loads(gateway.extra_config)
except json.JSONDecodeError:
pass
allowed_ips = None
if gateway.allowed_ips:
try:
allowed_ips = json.loads(gateway.allowed_ips)
except json.JSONDecodeError:
pass
return {
"id": gateway.id,
"user_id": gateway.user_id,
"name": gateway.name,
"description": gateway.description,
"gateway_url": gateway.gateway_url,
"token_preview": token_preview,
"sage_database": gateway.sage_database,
"sage_company": gateway.sage_company,
"is_active": gateway.is_active,
"is_default": gateway.is_default,
"priority": gateway.priority,
"health_status": health_status,
"last_health_check": gateway.last_health_check,
"last_error": gateway.last_error,
"total_requests": gateway.total_requests,
"successful_requests": gateway.successful_requests,
"failed_requests": gateway.failed_requests,
"success_rate": round(success_rate, 2),
"last_used_at": gateway.last_used_at,
"extra_config": extra_config,
"allowed_ips": allowed_ips,
"created_at": gateway.created_at,
"updated_at": gateway.updated_at,
}