318 lines
9.9 KiB
Python
318 lines
9.9 KiB
Python
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import false, select, and_
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any, List
|
|
from fastapi import Request
|
|
import uuid
|
|
import json
|
|
import logging
|
|
|
|
from database import AuditLog, AuditEventType, LoginAttempt
|
|
from security.fingerprint import DeviceFingerprint, get_client_ip
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuditService:
|
|
@classmethod
|
|
async def log_event(
|
|
cls,
|
|
session: AsyncSession,
|
|
event_type: AuditEventType,
|
|
request: Optional[Request] = None,
|
|
user_id: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
success: bool = True,
|
|
failure_reason: Optional[str] = None,
|
|
resource_type: Optional[str] = None,
|
|
resource_id: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> AuditLog:
|
|
ip_address = None
|
|
user_agent = None
|
|
fingerprint_hash = None
|
|
request_method = None
|
|
request_path = None
|
|
|
|
if request:
|
|
ip_address = get_client_ip(request)
|
|
user_agent = request.headers.get("User-Agent", "")[:500]
|
|
fingerprint_hash = DeviceFingerprint.generate_hash(request)
|
|
request_method = request.method
|
|
request_path = str(request.url.path)[:500]
|
|
|
|
metadata_json = None
|
|
if metadata:
|
|
try:
|
|
metadata_json = json.dumps(metadata, default=str)
|
|
except Exception as e:
|
|
logger.warning(f"Erreur serialisation metadata audit: {e}")
|
|
|
|
audit_log = AuditLog(
|
|
id=str(uuid.uuid4()),
|
|
user_id=user_id,
|
|
event_type=event_type,
|
|
event_description=description,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
fingerprint_hash=fingerprint_hash,
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
request_method=request_method,
|
|
request_path=request_path,
|
|
metadata=metadata_json,
|
|
success=success,
|
|
failure_reason=failure_reason,
|
|
created_at=datetime.now(),
|
|
)
|
|
|
|
session.add(audit_log)
|
|
await session.flush()
|
|
|
|
log_level = logging.INFO if success else logging.WARNING
|
|
logger.log(
|
|
log_level,
|
|
f"Audit: {event_type.value} user={user_id} success={success} ip={ip_address}",
|
|
)
|
|
|
|
return audit_log
|
|
|
|
@classmethod
|
|
async def log_login_success(
|
|
cls, session: AsyncSession, request: Request, user_id: str, email: str
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.LOGIN_SUCCESS,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Connexion reussie pour {email}",
|
|
success=True,
|
|
metadata={"email": email},
|
|
)
|
|
|
|
@classmethod
|
|
async def log_login_failed(
|
|
cls,
|
|
session: AsyncSession,
|
|
request: Request,
|
|
email: str,
|
|
reason: str,
|
|
user_id: Optional[str] = None,
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.LOGIN_FAILED,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Echec connexion pour {email}: {reason}",
|
|
success=False,
|
|
failure_reason=reason,
|
|
metadata={"email": email},
|
|
)
|
|
|
|
@classmethod
|
|
async def log_logout(
|
|
cls, session: AsyncSession, request: Request, user_id: str
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.LOGOUT,
|
|
request=request,
|
|
user_id=user_id,
|
|
description="Deconnexion utilisateur",
|
|
success=True,
|
|
)
|
|
|
|
@classmethod
|
|
async def log_password_change(
|
|
cls,
|
|
session: AsyncSession,
|
|
request: Request,
|
|
user_id: str,
|
|
method: str = "user_initiated",
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.PASSWORD_CHANGE,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Mot de passe modifie ({method})",
|
|
success=True,
|
|
metadata={"method": method},
|
|
)
|
|
|
|
@classmethod
|
|
async def log_password_reset_request(
|
|
cls,
|
|
session: AsyncSession,
|
|
request: Request,
|
|
email: str,
|
|
user_id: Optional[str] = None,
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.PASSWORD_RESET_REQUEST,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Demande reset mot de passe pour {email}",
|
|
success=True,
|
|
metadata={"email": email},
|
|
)
|
|
|
|
@classmethod
|
|
async def log_account_locked(
|
|
cls, session: AsyncSession, request: Request, user_id: str, reason: str
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.ACCOUNT_LOCKED,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Compte verrouille: {reason}",
|
|
success=True,
|
|
metadata={"reason": reason},
|
|
)
|
|
|
|
@classmethod
|
|
async def log_token_refresh(
|
|
cls, session: AsyncSession, request: Request, user_id: str
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.TOKEN_REFRESH,
|
|
request=request,
|
|
user_id=user_id,
|
|
description="Token rafraichi",
|
|
success=True,
|
|
)
|
|
|
|
@classmethod
|
|
async def log_suspicious_activity(
|
|
cls,
|
|
session: AsyncSession,
|
|
request: Request,
|
|
user_id: Optional[str],
|
|
activity_type: str,
|
|
details: str,
|
|
) -> AuditLog:
|
|
return await cls.log_event(
|
|
session=session,
|
|
event_type=AuditEventType.SUSPICIOUS_ACTIVITY,
|
|
request=request,
|
|
user_id=user_id,
|
|
description=f"Activite suspecte: {activity_type} - {details}",
|
|
success=False,
|
|
failure_reason=activity_type,
|
|
metadata={"activity_type": activity_type, "details": details},
|
|
)
|
|
|
|
@classmethod
|
|
async def record_login_attempt(
|
|
cls,
|
|
session: AsyncSession,
|
|
request: Request,
|
|
email: str,
|
|
success: bool,
|
|
failure_reason: Optional[str] = None,
|
|
) -> LoginAttempt:
|
|
attempt = LoginAttempt(
|
|
email=email.lower(),
|
|
ip_address=get_client_ip(request),
|
|
user_agent=request.headers.get("User-Agent", "")[:500],
|
|
fingerprint_hash=DeviceFingerprint.generate_hash(request),
|
|
success=success,
|
|
failure_reason=failure_reason,
|
|
timestamp=datetime.now(),
|
|
)
|
|
|
|
session.add(attempt)
|
|
await session.flush()
|
|
|
|
return attempt
|
|
|
|
@classmethod
|
|
async def get_recent_failed_attempts(
|
|
cls, session: AsyncSession, email: str, window_minutes: int = 15
|
|
) -> int:
|
|
time_threshold = datetime.now() - timedelta(minutes=window_minutes)
|
|
|
|
result = await session.execute(
|
|
select(LoginAttempt).where(
|
|
and_(
|
|
LoginAttempt.email == email.lower(),
|
|
LoginAttempt.success.is_(false()),
|
|
LoginAttempt.timestamp >= time_threshold,
|
|
)
|
|
)
|
|
)
|
|
|
|
return len(result.scalars().all())
|
|
|
|
@classmethod
|
|
async def get_user_audit_history(
|
|
cls,
|
|
session: AsyncSession,
|
|
user_id: str,
|
|
limit: int = 50,
|
|
event_types: Optional[List[AuditEventType]] = None,
|
|
) -> List[AuditLog]:
|
|
query = select(AuditLog).where(AuditLog.user_id == user_id)
|
|
|
|
if event_types:
|
|
query = query.where(AuditLog.event_type.in_(event_types))
|
|
|
|
query = query.order_by(AuditLog.created_at.desc()).limit(limit)
|
|
|
|
result = await session.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@classmethod
|
|
async def detect_suspicious_patterns(
|
|
cls, session: AsyncSession, user_id: str
|
|
) -> Dict[str, Any]:
|
|
one_hour_ago = datetime.now() - timedelta(hours=1)
|
|
one_day_ago = datetime.now() - timedelta(days=1)
|
|
|
|
result = await session.execute(
|
|
select(AuditLog).where(
|
|
and_(
|
|
AuditLog.user_id == user_id,
|
|
AuditLog.event_type == AuditEventType.LOGIN_FAILED,
|
|
AuditLog.created_at >= one_hour_ago,
|
|
)
|
|
)
|
|
)
|
|
failed_logins_hour = len(result.scalars().all())
|
|
|
|
result = await session.execute(
|
|
select(AuditLog).where(
|
|
and_(
|
|
AuditLog.user_id == user_id,
|
|
AuditLog.event_type == AuditEventType.LOGIN_SUCCESS,
|
|
AuditLog.created_at >= one_day_ago,
|
|
)
|
|
)
|
|
)
|
|
login_logs = result.scalars().all()
|
|
unique_ips = set(log.ip_address for log in login_logs if log.ip_address)
|
|
|
|
result = await session.execute(
|
|
select(AuditLog).where(
|
|
and_(
|
|
AuditLog.user_id == user_id,
|
|
AuditLog.event_type == AuditEventType.PASSWORD_RESET_REQUEST,
|
|
AuditLog.created_at >= one_day_ago,
|
|
)
|
|
)
|
|
)
|
|
password_resets = len(result.scalars().all())
|
|
|
|
return {
|
|
"failed_logins_last_hour": failed_logins_hour,
|
|
"unique_ips_last_day": len(unique_ips),
|
|
"password_reset_requests_last_day": password_resets,
|
|
"is_suspicious": (
|
|
failed_logins_hour >= 5 or len(unique_ips) >= 5 or password_resets >= 3
|
|
),
|
|
}
|