Sage100-vps/services/sage_gateway.py

400 lines
13 KiB
Python

from __future__ import annotations
import uuid
import json
import httpx
from datetime import datetime
from typing import Optional, Tuple, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, and_
import logging
from config import settings
from database import SageGatewayConfig
logger = logging.getLogger(__name__)
class SageGatewayService:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, user_id: str, data: dict) -> SageGatewayConfig:
"""Créer une nouvelle configuration gateway"""
if data.get("is_active"):
await self._deactivate_all_for_user(user_id)
if data.get("is_default"):
await self._unset_default_for_user(user_id)
extra_config = data.pop("extra_config", None)
allowed_ips = data.pop("allowed_ips", None)
gateway = SageGatewayConfig(
id=str(uuid.uuid4()),
user_id=user_id,
created_by=user_id,
extra_config=json.dumps(extra_config) if extra_config else None,
allowed_ips=json.dumps(allowed_ips) if allowed_ips else None,
**data,
)
self.session.add(gateway)
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway créée: {gateway.name} pour user {user_id}")
return gateway
async def get_by_id(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.id == gateway_id,
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_deleted,
)
)
)
return result.scalar_one_or_none()
async def list_for_user(
self, user_id: str, include_deleted: bool = False
) -> List[SageGatewayConfig]:
query = select(SageGatewayConfig).where(SageGatewayConfig.user_id == user_id)
if not include_deleted:
query = query.where(SageGatewayConfig.is_deleted)
query = query.order_by(
SageGatewayConfig.is_active.desc(),
SageGatewayConfig.priority.desc(),
SageGatewayConfig.name,
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update(
self, gateway_id: str, user_id: str, data: dict
) -> Optional[SageGatewayConfig]:
"""Mettre à jour une gateway"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
if data.get("is_default") and not gateway.is_default:
await self._unset_default_for_user(user_id)
if "extra_config" in data:
data["extra_config"] = (
json.dumps(data["extra_config"]) if data["extra_config"] else None
)
if "allowed_ips" in data:
data["allowed_ips"] = (
json.dumps(data["allowed_ips"]) if data["allowed_ips"] else None
)
for key, value in data.items():
if value is not None and hasattr(gateway, key):
setattr(gateway, key, value)
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway mise à jour: {gateway.name}")
return gateway
async def delete(
self, gateway_id: str, user_id: str, hard_delete: bool = False
) -> bool:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return False
if hard_delete:
await self.session.delete(gateway)
else:
gateway.is_deleted = True
gateway.deleted_at = datetime.now()
gateway.is_active = False
await self.session.commit()
logger.info(f"Gateway supprimée: {gateway.name} (hard={hard_delete})")
return True
async def activate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
"""Activer une gateway (désactive les autres)"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
await self._deactivate_all_for_user(user_id)
gateway.is_active = True
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway activée: {gateway.name} pour user {user_id}")
return gateway
async def deactivate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
gateway.is_active = False
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway désactivée: {gateway.name} - fallback .env actif")
return gateway
async def get_active_gateway(self, user_id: str) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_active,
SageGatewayConfig.is_deleted,
)
)
)
return result.scalar_one_or_none()
async def get_effective_gateway_config(
self, user_id: Optional[str]
) -> Tuple[str, str, Optional[str]]:
if user_id:
active = await self.get_active_gateway(user_id)
if active:
active.total_requests += 1
active.last_used_at = datetime.now()
await self.session.commit()
return (active.gateway_url, active.gateway_token, active.id)
return (settings.sage_gateway_url, settings.sage_gateway_token, None)
async def health_check(self, gateway_id: str, user_id: str) -> dict:
import time
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return {"error": "Gateway introuvable"}
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{gateway.gateway_url}/health",
headers={"Authorization": f"Bearer {gateway.gateway_token}"},
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
gateway.last_health_check = datetime.now()
gateway.last_health_status = True
gateway.last_error = None
await self.session.commit()
return {
"status": "healthy",
"response_time_ms": round(response_time, 2),
"sage_version": data.get("sage_version"),
"details": data,
}
else:
raise Exception(f"HTTP {response.status_code}")
except Exception as e:
gateway.last_health_check = datetime.now()
gateway.last_health_status = False
gateway.last_error = str(e)
await self.session.commit()
return {
"status": "unhealthy",
"error": str(e),
"response_time_ms": round((time.time() - start_time) * 1000, 2),
}
async def test_gateway(self, url: str, token: str) -> dict:
"""Tester une configuration gateway avant création"""
import time
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{url}/health", headers={"Authorization": f"Bearer {token}"}
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
return {
"success": True,
"status": "healthy",
"response_time_ms": round(response_time, 2),
"details": response.json(),
}
else:
return {
"success": False,
"status": "unhealthy",
"error": f"HTTP {response.status_code}: {response.text}",
}
except httpx.TimeoutException:
return {
"success": False,
"status": "timeout",
"error": "Connexion timeout (10s)",
}
except httpx.ConnectError as e:
return {
"success": False,
"status": "unreachable",
"error": f"Impossible de se connecter: {e}",
}
except Exception as e:
return {"success": False, "status": "error", "error": str(e)}
async def record_request(self, gateway_id: str, success: bool) -> None:
"""Enregistrer une requête (succès/échec)"""
if not gateway_id:
return
result = await self.session.execute(
select(SageGatewayConfig).where(SageGatewayConfig.id == gateway_id)
)
gateway = result.scalar_one_or_none()
if gateway:
gateway.total_requests += 1
if success:
gateway.successful_requests += 1
else:
gateway.failed_requests += 1
gateway.last_used_at = datetime.now()
await self.session.commit()
async def get_stats(self, user_id: str) -> dict:
"""Statistiques d'utilisation pour un utilisateur"""
gateways = await self.list_for_user(user_id)
total_requests = sum(g.total_requests for g in gateways)
successful = sum(g.successful_requests for g in gateways)
failed = sum(g.failed_requests for g in gateways)
most_used = max(gateways, key=lambda g: g.total_requests) if gateways else None
last_activity = max(
(g.last_used_at for g in gateways if g.last_used_at), default=None
)
return {
"total_gateways": len(gateways),
"active_gateways": sum(1 for g in gateways if g.is_active),
"total_requests": total_requests,
"successful_requests": successful,
"failed_requests": failed,
"average_success_rate": (successful / total_requests * 100)
if total_requests > 0
else 0,
"most_used_gateway": most_used.name if most_used else None,
"last_activity": last_activity,
}
async def _deactivate_all_for_user(self, user_id: str) -> None:
"""Désactiver toutes les gateways d'un utilisateur"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_active=False)
)
async def _unset_default_for_user(self, user_id: str) -> None:
"""Retirer le flag default de toutes les gateways"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_default=False)
)
def gateway_response_from_model(gateway: SageGatewayConfig) -> dict:
"""Convertir un model en réponse API (masque le token)"""
token_preview = (
f"****{gateway.gateway_token[-4:]}" if gateway.gateway_token else "****"
)
success_rate = 0.0
if gateway.total_requests > 0:
success_rate = (gateway.successful_requests / gateway.total_requests) * 100
if gateway.last_health_status is None:
health_status = "unknown"
elif gateway.last_health_status:
health_status = "healthy"
else:
health_status = "unhealthy"
extra_config = None
if gateway.extra_config:
try:
extra_config = json.loads(gateway.extra_config)
except json.JSONDecodeError:
pass
allowed_ips = None
if gateway.allowed_ips:
try:
allowed_ips = json.loads(gateway.allowed_ips)
except json.JSONDecodeError:
pass
return {
"id": gateway.id,
"user_id": gateway.user_id,
"name": gateway.name,
"description": gateway.description,
"gateway_url": gateway.gateway_url,
"token_preview": token_preview,
"sage_database": gateway.sage_database,
"sage_company": gateway.sage_company,
"is_active": gateway.is_active,
"is_default": gateway.is_default,
"priority": gateway.priority,
"health_status": health_status,
"last_health_check": gateway.last_health_check,
"last_error": gateway.last_error,
"total_requests": gateway.total_requests,
"successful_requests": gateway.successful_requests,
"failed_requests": gateway.failed_requests,
"success_rate": round(success_rate, 2),
"last_used_at": gateway.last_used_at,
"extra_config": extra_config,
"allowed_ips": allowed_ips,
"created_at": gateway.created_at,
"updated_at": gateway.updated_at,
}