Sage100-vps/services/sage_gateway.py
2026-01-08 16:58:43 +03:00

400 lines
13 KiB
Python

from __future__ import annotations
import uuid
import json
import httpx
from datetime import datetime
from typing import Optional, Tuple, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import false, select, true, update, and_
import logging
from config.config import settings
from database import SageGatewayConfig
logger = logging.getLogger(__name__)
class SageGatewayService:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, user_id: str, data: dict) -> SageGatewayConfig:
"""Créer une nouvelle configuration gateway"""
if data.get("is_active"):
await self._deactivate_all_for_user(user_id)
if data.get("is_default"):
await self._unset_default_for_user(user_id)
extra_config = data.pop("extra_config", None)
allowed_ips = data.pop("allowed_ips", None)
gateway = SageGatewayConfig(
id=str(uuid.uuid4()),
user_id=user_id,
created_by=user_id,
extra_config=json.dumps(extra_config) if extra_config else None,
allowed_ips=json.dumps(allowed_ips) if allowed_ips else None,
**data,
)
self.session.add(gateway)
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway créée: {gateway.name} pour user {user_id}")
return gateway
async def get_by_id(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.id == gateway_id,
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_deleted == false(),
)
)
)
return result.scalar_one_or_none()
async def list_for_user(
self, user_id: str, include_deleted: bool = False
) -> List[SageGatewayConfig]:
query = select(SageGatewayConfig).where(SageGatewayConfig.user_id == user_id)
if not include_deleted:
query = query.where(SageGatewayConfig.is_deleted == false())
query = query.order_by(
SageGatewayConfig.is_active.desc(),
SageGatewayConfig.priority.desc(),
SageGatewayConfig.name,
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update(
self, gateway_id: str, user_id: str, data: dict
) -> Optional[SageGatewayConfig]:
"""Mettre à jour une gateway"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
if data.get("is_default") and not gateway.is_default:
await self._unset_default_for_user(user_id)
if "extra_config" in data:
data["extra_config"] = (
json.dumps(data["extra_config"]) if data["extra_config"] else None
)
if "allowed_ips" in data:
data["allowed_ips"] = (
json.dumps(data["allowed_ips"]) if data["allowed_ips"] else None
)
for key, value in data.items():
if value is not None and hasattr(gateway, key):
setattr(gateway, key, value)
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway mise à jour: {gateway.name}")
return gateway
async def delete(
self, gateway_id: str, user_id: str, hard_delete: bool = False
) -> bool:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return False
if hard_delete:
await self.session.delete(gateway)
else:
gateway.is_deleted = True
gateway.deleted_at = datetime.now()
gateway.is_active = False
await self.session.commit()
logger.info(f"Gateway supprimée: {gateway.name} (hard={hard_delete})")
return True
async def activate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
"""Activer une gateway (désactive les autres)"""
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
await self._deactivate_all_for_user(user_id)
gateway.is_active = True
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway activée: {gateway.name} pour user {user_id}")
return gateway
async def deactivate(
self, gateway_id: str, user_id: str
) -> Optional[SageGatewayConfig]:
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return None
gateway.is_active = False
gateway.updated_at = datetime.now()
await self.session.commit()
await self.session.refresh(gateway)
logger.info(f"Gateway désactivée: {gateway.name} - fallback .env actif")
return gateway
async def get_active_gateway(self, user_id: str) -> Optional[SageGatewayConfig]:
result = await self.session.execute(
select(SageGatewayConfig).where(
and_(
SageGatewayConfig.user_id == user_id,
SageGatewayConfig.is_active,
SageGatewayConfig.is_deleted == false(),
)
)
)
return result.scalar_one_or_none()
async def get_effective_gateway_config(
self, user_id: Optional[str]
) -> Tuple[str, str, Optional[str]]:
if user_id:
active = await self.get_active_gateway(user_id)
if active:
active.total_requests += 1
active.last_used_at = datetime.now()
await self.session.commit()
return (active.gateway_url, active.gateway_token, active.id)
return (settings.sage_gateway_url, settings.sage_gateway_token, None)
async def health_check(self, gateway_id: str, user_id: str) -> dict:
import time
gateway = await self.get_by_id(gateway_id, user_id)
if not gateway:
return {"error": "Gateway introuvable"}
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{gateway.gateway_url}/health",
headers={"Authorization": f"Bearer {gateway.gateway_token}"},
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
gateway.last_health_check = datetime.now()
gateway.last_health_status = True
gateway.last_error = None
await self.session.commit()
return {
"status": "healthy",
"response_time_ms": round(response_time, 2),
"sage_version": data.get("sage_version"),
"details": data,
}
else:
raise Exception(f"HTTP {response.status_code}")
except Exception as e:
gateway.last_health_check = datetime.now()
gateway.last_health_status = False
gateway.last_error = str(e)
await self.session.commit()
return {
"status": "unhealthy",
"error": str(e),
"response_time_ms": round((time.time() - start_time) * 1000, 2),
}
async def test_gateway(self, url: str, token: str) -> dict:
"""Tester une configuration gateway avant création"""
import time
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{url}/health", headers={"Authorization": f"Bearer {token}"}
)
response_time = (time.time() - start_time) * 1000
if response.status_code == 200:
return {
"success": True,
"status": "healthy",
"response_time_ms": round(response_time, 2),
"details": response.json(),
}
else:
return {
"success": False,
"status": "unhealthy",
"error": f"HTTP {response.status_code}: {response.text}",
}
except httpx.TimeoutException:
return {
"success": False,
"status": "timeout",
"error": "Connexion timeout (10s)",
}
except httpx.ConnectError as e:
return {
"success": False,
"status": "unreachable",
"error": f"Impossible de se connecter: {e}",
}
except Exception as e:
return {"success": False, "status": "error", "error": str(e)}
async def record_request(self, gateway_id: str, success: bool) -> None:
"""Enregistrer une requête (succès/échec)"""
if not gateway_id:
return
result = await self.session.execute(
select(SageGatewayConfig).where(SageGatewayConfig.id == gateway_id)
)
gateway = result.scalar_one_or_none()
if gateway:
gateway.total_requests += 1
if success:
gateway.successful_requests += 1
else:
gateway.failed_requests += 1
gateway.last_used_at = datetime.now()
await self.session.commit()
async def get_stats(self, user_id: str) -> dict:
"""Statistiques d'utilisation pour un utilisateur"""
gateways = await self.list_for_user(user_id)
total_requests = sum(g.total_requests for g in gateways)
successful = sum(g.successful_requests for g in gateways)
failed = sum(g.failed_requests for g in gateways)
most_used = max(gateways, key=lambda g: g.total_requests) if gateways else None
last_activity = max(
(g.last_used_at for g in gateways if g.last_used_at), default=None
)
return {
"total_gateways": len(gateways),
"active_gateways": sum(1 for g in gateways if g.is_active),
"total_requests": total_requests,
"successful_requests": successful,
"failed_requests": failed,
"average_success_rate": (successful / total_requests * 100)
if total_requests > 0
else 0,
"most_used_gateway": most_used.name if most_used else None,
"last_activity": last_activity,
}
async def _deactivate_all_for_user(self, user_id: str) -> None:
"""Désactiver toutes les gateways d'un utilisateur"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_active=False)
)
async def _unset_default_for_user(self, user_id: str) -> None:
"""Retirer le flag default de toutes les gateways"""
await self.session.execute(
update(SageGatewayConfig)
.where(SageGatewayConfig.user_id == user_id)
.values(is_default=False)
)
def gateway_response_from_model(gateway: SageGatewayConfig) -> dict:
"""Convertir un model en réponse API (masque le token)"""
token_preview = (
f"****{gateway.gateway_token[-4:]}" if gateway.gateway_token else "****"
)
success_rate = 0.0
if gateway.total_requests > 0:
success_rate = (gateway.successful_requests / gateway.total_requests) * 100
if gateway.last_health_status is None:
health_status = "unknown"
elif gateway.last_health_status:
health_status = "healthy"
else:
health_status = "unhealthy"
extra_config = None
if gateway.extra_config:
try:
extra_config = json.loads(gateway.extra_config)
except json.JSONDecodeError:
pass
allowed_ips = None
if gateway.allowed_ips:
try:
allowed_ips = json.loads(gateway.allowed_ips)
except json.JSONDecodeError:
pass
return {
"id": gateway.id,
"user_id": gateway.user_id,
"name": gateway.name,
"description": gateway.description,
"gateway_url": gateway.gateway_url,
"token_preview": token_preview,
"sage_database": gateway.sage_database,
"sage_company": gateway.sage_company,
"is_active": gateway.is_active,
"is_default": gateway.is_default,
"priority": gateway.priority,
"health_status": health_status,
"last_health_check": gateway.last_health_check,
"last_error": gateway.last_error,
"total_requests": gateway.total_requests,
"successful_requests": gateway.successful_requests,
"failed_requests": gateway.failed_requests,
"success_rate": round(success_rate, 2),
"last_used_at": gateway.last_used_at,
"extra_config": extra_config,
"allowed_ips": allowed_ips,
"created_at": gateway.created_at,
"updated_at": gateway.updated_at,
}