Sage100-vps/services/audit_service.py
2026-01-02 17:56:28 +03:00

318 lines
9.9 KiB
Python

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import false, select, and_
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from fastapi import Request
import uuid
import json
import logging
from database import AuditLog, AuditEventType, LoginAttempt
from security.fingerprint import DeviceFingerprint, get_client_ip
logger = logging.getLogger(__name__)
class AuditService:
@classmethod
async def log_event(
cls,
session: AsyncSession,
event_type: AuditEventType,
request: Optional[Request] = None,
user_id: Optional[str] = None,
description: Optional[str] = None,
success: bool = True,
failure_reason: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> AuditLog:
ip_address = None
user_agent = None
fingerprint_hash = None
request_method = None
request_path = None
if request:
ip_address = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "")[:500]
fingerprint_hash = DeviceFingerprint.generate_hash(request)
request_method = request.method
request_path = str(request.url.path)[:500]
metadata_json = None
if metadata:
try:
metadata_json = json.dumps(metadata, default=str)
except Exception as e:
logger.warning(f"Erreur serialisation metadata audit: {e}")
audit_log = AuditLog(
id=str(uuid.uuid4()),
user_id=user_id,
event_type=event_type,
event_description=description,
ip_address=ip_address,
user_agent=user_agent,
fingerprint_hash=fingerprint_hash,
resource_type=resource_type,
resource_id=resource_id,
request_method=request_method,
request_path=request_path,
metadata=metadata_json,
success=success,
failure_reason=failure_reason,
created_at=datetime.now(),
)
session.add(audit_log)
await session.flush()
log_level = logging.INFO if success else logging.WARNING
logger.log(
log_level,
f"Audit: {event_type.value} user={user_id} success={success} ip={ip_address}",
)
return audit_log
@classmethod
async def log_login_success(
cls, session: AsyncSession, request: Request, user_id: str, email: str
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.LOGIN_SUCCESS,
request=request,
user_id=user_id,
description=f"Connexion reussie pour {email}",
success=True,
metadata={"email": email},
)
@classmethod
async def log_login_failed(
cls,
session: AsyncSession,
request: Request,
email: str,
reason: str,
user_id: Optional[str] = None,
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.LOGIN_FAILED,
request=request,
user_id=user_id,
description=f"Echec connexion pour {email}: {reason}",
success=False,
failure_reason=reason,
metadata={"email": email},
)
@classmethod
async def log_logout(
cls, session: AsyncSession, request: Request, user_id: str
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.LOGOUT,
request=request,
user_id=user_id,
description="Deconnexion utilisateur",
success=True,
)
@classmethod
async def log_password_change(
cls,
session: AsyncSession,
request: Request,
user_id: str,
method: str = "user_initiated",
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.PASSWORD_CHANGE,
request=request,
user_id=user_id,
description=f"Mot de passe modifie ({method})",
success=True,
metadata={"method": method},
)
@classmethod
async def log_password_reset_request(
cls,
session: AsyncSession,
request: Request,
email: str,
user_id: Optional[str] = None,
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.PASSWORD_RESET_REQUEST,
request=request,
user_id=user_id,
description=f"Demande reset mot de passe pour {email}",
success=True,
metadata={"email": email},
)
@classmethod
async def log_account_locked(
cls, session: AsyncSession, request: Request, user_id: str, reason: str
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.ACCOUNT_LOCKED,
request=request,
user_id=user_id,
description=f"Compte verrouille: {reason}",
success=True,
metadata={"reason": reason},
)
@classmethod
async def log_token_refresh(
cls, session: AsyncSession, request: Request, user_id: str
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.TOKEN_REFRESH,
request=request,
user_id=user_id,
description="Token rafraichi",
success=True,
)
@classmethod
async def log_suspicious_activity(
cls,
session: AsyncSession,
request: Request,
user_id: Optional[str],
activity_type: str,
details: str,
) -> AuditLog:
return await cls.log_event(
session=session,
event_type=AuditEventType.SUSPICIOUS_ACTIVITY,
request=request,
user_id=user_id,
description=f"Activite suspecte: {activity_type} - {details}",
success=False,
failure_reason=activity_type,
metadata={"activity_type": activity_type, "details": details},
)
@classmethod
async def record_login_attempt(
cls,
session: AsyncSession,
request: Request,
email: str,
success: bool,
failure_reason: Optional[str] = None,
) -> LoginAttempt:
attempt = LoginAttempt(
email=email.lower(),
ip_address=get_client_ip(request),
user_agent=request.headers.get("User-Agent", "")[:500],
fingerprint_hash=DeviceFingerprint.generate_hash(request),
success=success,
failure_reason=failure_reason,
timestamp=datetime.now(),
)
session.add(attempt)
await session.flush()
return attempt
@classmethod
async def get_recent_failed_attempts(
cls, session: AsyncSession, email: str, window_minutes: int = 15
) -> int:
time_threshold = datetime.now() - timedelta(minutes=window_minutes)
result = await session.execute(
select(LoginAttempt).where(
and_(
LoginAttempt.email == email.lower(),
LoginAttempt.success.is_(false()),
LoginAttempt.timestamp >= time_threshold,
)
)
)
return len(result.scalars().all())
@classmethod
async def get_user_audit_history(
cls,
session: AsyncSession,
user_id: str,
limit: int = 50,
event_types: Optional[List[AuditEventType]] = None,
) -> List[AuditLog]:
query = select(AuditLog).where(AuditLog.user_id == user_id)
if event_types:
query = query.where(AuditLog.event_type.in_(event_types))
query = query.order_by(AuditLog.created_at.desc()).limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
@classmethod
async def detect_suspicious_patterns(
cls, session: AsyncSession, user_id: str
) -> Dict[str, Any]:
one_hour_ago = datetime.now() - timedelta(hours=1)
one_day_ago = datetime.now() - timedelta(days=1)
result = await session.execute(
select(AuditLog).where(
and_(
AuditLog.user_id == user_id,
AuditLog.event_type == AuditEventType.LOGIN_FAILED,
AuditLog.created_at >= one_hour_ago,
)
)
)
failed_logins_hour = len(result.scalars().all())
result = await session.execute(
select(AuditLog).where(
and_(
AuditLog.user_id == user_id,
AuditLog.event_type == AuditEventType.LOGIN_SUCCESS,
AuditLog.created_at >= one_day_ago,
)
)
)
login_logs = result.scalars().all()
unique_ips = set(log.ip_address for log in login_logs if log.ip_address)
result = await session.execute(
select(AuditLog).where(
and_(
AuditLog.user_id == user_id,
AuditLog.event_type == AuditEventType.PASSWORD_RESET_REQUEST,
AuditLog.created_at >= one_day_ago,
)
)
)
password_resets = len(result.scalars().all())
return {
"failed_logins_last_hour": failed_logins_hour,
"unique_ips_last_day": len(unique_ips),
"password_reset_requests_last_day": password_resets,
"is_suspicious": (
failed_logins_hour >= 5 or len(unique_ips) >= 5 or password_resets >= 3
),
}