from __future__ import annotations import uuid import json import httpx from datetime import datetime from typing import Optional, Tuple, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, and_ import logging from config.config import settings from database import SageGatewayConfig logger = logging.getLogger(__name__) class SageGatewayService: def __init__(self, session: AsyncSession): self.session = session async def create(self, user_id: str, data: dict) -> SageGatewayConfig: """Créer une nouvelle configuration gateway""" if data.get("is_active"): await self._deactivate_all_for_user(user_id) if data.get("is_default"): await self._unset_default_for_user(user_id) extra_config = data.pop("extra_config", None) allowed_ips = data.pop("allowed_ips", None) gateway = SageGatewayConfig( id=str(uuid.uuid4()), user_id=user_id, created_by=user_id, extra_config=json.dumps(extra_config) if extra_config else None, allowed_ips=json.dumps(allowed_ips) if allowed_ips else None, **data, ) self.session.add(gateway) await self.session.commit() await self.session.refresh(gateway) logger.info(f"Gateway créée: {gateway.name} pour user {user_id}") return gateway async def get_by_id( self, gateway_id: str, user_id: str ) -> Optional[SageGatewayConfig]: result = await self.session.execute( select(SageGatewayConfig).where( and_( SageGatewayConfig.id == gateway_id, SageGatewayConfig.user_id == user_id, SageGatewayConfig.is_deleted, ) ) ) return result.scalar_one_or_none() async def list_for_user( self, user_id: str, include_deleted: bool = False ) -> List[SageGatewayConfig]: query = select(SageGatewayConfig).where(SageGatewayConfig.user_id == user_id) if not include_deleted: query = query.where(SageGatewayConfig.is_deleted) query = query.order_by( SageGatewayConfig.is_active.desc(), SageGatewayConfig.priority.desc(), SageGatewayConfig.name, ) result = await self.session.execute(query) return list(result.scalars().all()) async def update( self, gateway_id: str, user_id: str, data: dict ) -> Optional[SageGatewayConfig]: """Mettre à jour une gateway""" gateway = await self.get_by_id(gateway_id, user_id) if not gateway: return None if data.get("is_default") and not gateway.is_default: await self._unset_default_for_user(user_id) if "extra_config" in data: data["extra_config"] = ( json.dumps(data["extra_config"]) if data["extra_config"] else None ) if "allowed_ips" in data: data["allowed_ips"] = ( json.dumps(data["allowed_ips"]) if data["allowed_ips"] else None ) for key, value in data.items(): if value is not None and hasattr(gateway, key): setattr(gateway, key, value) gateway.updated_at = datetime.now() await self.session.commit() await self.session.refresh(gateway) logger.info(f"Gateway mise à jour: {gateway.name}") return gateway async def delete( self, gateway_id: str, user_id: str, hard_delete: bool = False ) -> bool: gateway = await self.get_by_id(gateway_id, user_id) if not gateway: return False if hard_delete: await self.session.delete(gateway) else: gateway.is_deleted = True gateway.deleted_at = datetime.now() gateway.is_active = False await self.session.commit() logger.info(f"Gateway supprimée: {gateway.name} (hard={hard_delete})") return True async def activate( self, gateway_id: str, user_id: str ) -> Optional[SageGatewayConfig]: """Activer une gateway (désactive les autres)""" gateway = await self.get_by_id(gateway_id, user_id) if not gateway: return None await self._deactivate_all_for_user(user_id) gateway.is_active = True gateway.updated_at = datetime.now() await self.session.commit() await self.session.refresh(gateway) logger.info(f"Gateway activée: {gateway.name} pour user {user_id}") return gateway async def deactivate( self, gateway_id: str, user_id: str ) -> Optional[SageGatewayConfig]: gateway = await self.get_by_id(gateway_id, user_id) if not gateway: return None gateway.is_active = False gateway.updated_at = datetime.now() await self.session.commit() await self.session.refresh(gateway) logger.info(f"Gateway désactivée: {gateway.name} - fallback .env actif") return gateway async def get_active_gateway(self, user_id: str) -> Optional[SageGatewayConfig]: result = await self.session.execute( select(SageGatewayConfig).where( and_( SageGatewayConfig.user_id == user_id, SageGatewayConfig.is_active, SageGatewayConfig.is_deleted, ) ) ) return result.scalar_one_or_none() async def get_effective_gateway_config( self, user_id: Optional[str] ) -> Tuple[str, str, Optional[str]]: if user_id: active = await self.get_active_gateway(user_id) if active: active.total_requests += 1 active.last_used_at = datetime.now() await self.session.commit() return (active.gateway_url, active.gateway_token, active.id) return (settings.sage_gateway_url, settings.sage_gateway_token, None) async def health_check(self, gateway_id: str, user_id: str) -> dict: import time gateway = await self.get_by_id(gateway_id, user_id) if not gateway: return {"error": "Gateway introuvable"} start_time = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get( f"{gateway.gateway_url}/health", headers={"Authorization": f"Bearer {gateway.gateway_token}"}, ) response_time = (time.time() - start_time) * 1000 if response.status_code == 200: data = response.json() gateway.last_health_check = datetime.now() gateway.last_health_status = True gateway.last_error = None await self.session.commit() return { "status": "healthy", "response_time_ms": round(response_time, 2), "sage_version": data.get("sage_version"), "details": data, } else: raise Exception(f"HTTP {response.status_code}") except Exception as e: gateway.last_health_check = datetime.now() gateway.last_health_status = False gateway.last_error = str(e) await self.session.commit() return { "status": "unhealthy", "error": str(e), "response_time_ms": round((time.time() - start_time) * 1000, 2), } async def test_gateway(self, url: str, token: str) -> dict: """Tester une configuration gateway avant création""" import time start_time = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get( f"{url}/health", headers={"Authorization": f"Bearer {token}"} ) response_time = (time.time() - start_time) * 1000 if response.status_code == 200: return { "success": True, "status": "healthy", "response_time_ms": round(response_time, 2), "details": response.json(), } else: return { "success": False, "status": "unhealthy", "error": f"HTTP {response.status_code}: {response.text}", } except httpx.TimeoutException: return { "success": False, "status": "timeout", "error": "Connexion timeout (10s)", } except httpx.ConnectError as e: return { "success": False, "status": "unreachable", "error": f"Impossible de se connecter: {e}", } except Exception as e: return {"success": False, "status": "error", "error": str(e)} async def record_request(self, gateway_id: str, success: bool) -> None: """Enregistrer une requête (succès/échec)""" if not gateway_id: return result = await self.session.execute( select(SageGatewayConfig).where(SageGatewayConfig.id == gateway_id) ) gateway = result.scalar_one_or_none() if gateway: gateway.total_requests += 1 if success: gateway.successful_requests += 1 else: gateway.failed_requests += 1 gateway.last_used_at = datetime.now() await self.session.commit() async def get_stats(self, user_id: str) -> dict: """Statistiques d'utilisation pour un utilisateur""" gateways = await self.list_for_user(user_id) total_requests = sum(g.total_requests for g in gateways) successful = sum(g.successful_requests for g in gateways) failed = sum(g.failed_requests for g in gateways) most_used = max(gateways, key=lambda g: g.total_requests) if gateways else None last_activity = max( (g.last_used_at for g in gateways if g.last_used_at), default=None ) return { "total_gateways": len(gateways), "active_gateways": sum(1 for g in gateways if g.is_active), "total_requests": total_requests, "successful_requests": successful, "failed_requests": failed, "average_success_rate": (successful / total_requests * 100) if total_requests > 0 else 0, "most_used_gateway": most_used.name if most_used else None, "last_activity": last_activity, } async def _deactivate_all_for_user(self, user_id: str) -> None: """Désactiver toutes les gateways d'un utilisateur""" await self.session.execute( update(SageGatewayConfig) .where(SageGatewayConfig.user_id == user_id) .values(is_active=False) ) async def _unset_default_for_user(self, user_id: str) -> None: """Retirer le flag default de toutes les gateways""" await self.session.execute( update(SageGatewayConfig) .where(SageGatewayConfig.user_id == user_id) .values(is_default=False) ) def gateway_response_from_model(gateway: SageGatewayConfig) -> dict: """Convertir un model en réponse API (masque le token)""" token_preview = ( f"****{gateway.gateway_token[-4:]}" if gateway.gateway_token else "****" ) success_rate = 0.0 if gateway.total_requests > 0: success_rate = (gateway.successful_requests / gateway.total_requests) * 100 if gateway.last_health_status is None: health_status = "unknown" elif gateway.last_health_status: health_status = "healthy" else: health_status = "unhealthy" extra_config = None if gateway.extra_config: try: extra_config = json.loads(gateway.extra_config) except json.JSONDecodeError: pass allowed_ips = None if gateway.allowed_ips: try: allowed_ips = json.loads(gateway.allowed_ips) except json.JSONDecodeError: pass return { "id": gateway.id, "user_id": gateway.user_id, "name": gateway.name, "description": gateway.description, "gateway_url": gateway.gateway_url, "token_preview": token_preview, "sage_database": gateway.sage_database, "sage_company": gateway.sage_company, "is_active": gateway.is_active, "is_default": gateway.is_default, "priority": gateway.priority, "health_status": health_status, "last_health_check": gateway.last_health_check, "last_error": gateway.last_error, "total_requests": gateway.total_requests, "successful_requests": gateway.successful_requests, "failed_requests": gateway.failed_requests, "success_rate": round(success_rate, 2), "last_used_at": gateway.last_used_at, "extra_config": extra_config, "allowed_ips": allowed_ips, "created_at": gateway.created_at, "updated_at": gateway.updated_at, }