Sage100-vps/routes/auth.py
2026-01-02 17:56:28 +03:00

672 lines
20 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, Request, Response
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import false, select
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, timedelta
from typing import Optional, List
import uuid
import logging
from config.config import settings
from database import get_session
from database import User, RefreshToken, AuditEventType
from security.auth import (
hash_password,
verify_password,
validate_password_strength,
generate_verification_token,
generate_reset_token,
)
from security.cookies import CookieManager, set_auth_cookies
from security.fingerprint import DeviceFingerprint, get_client_ip
from security.rate_limiter import RateLimiter
from services.token_service import TokenService
from services.audit_service import AuditService
from services.email_service import AuthEmailService
from core.dependencies import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
class RegisterRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
nom: str = Field(..., min_length=2, max_length=100)
prenom: str = Field(..., min_length=2, max_length=100)
class LoginRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=1, max_length=128)
class TokenResponse(BaseModel):
message: str
user: dict
expires_in: int
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str = Field(..., min_length=8, max_length=128)
class VerifyEmailRequest(BaseModel):
token: str
class ResendVerificationRequest(BaseModel):
email: EmailStr
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str = Field(..., min_length=8, max_length=128)
class SessionResponse(BaseModel):
id: str
device_info: Optional[str]
ip_address: Optional[str]
created_at: str
last_used_at: Optional[str]
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
data: RegisterRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
ip = get_client_ip(request)
allowed, error_msg = await RateLimiter.check_registration_rate_limit(ip)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg
)
result = await session.execute(select(User).where(User.email == data.email.lower()))
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Cet email est deja utilise"
)
is_valid, error_msg = validate_password_strength(data.password)
if not is_valid:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
verification_token = generate_verification_token()
new_user = User(
id=str(uuid.uuid4()),
email=data.email.lower(),
hashed_password=hash_password(data.password),
nom=data.nom,
prenom=data.prenom,
is_verified=False,
verification_token=verification_token,
verification_token_expires=datetime.now() + timedelta(hours=24),
created_at=datetime.now(),
)
session.add(new_user)
await session.commit()
base_url = str(request.base_url).rstrip("/")
AuthEmailService.send_verification_email(data.email, verification_token, base_url)
logger.info(f"Nouvel utilisateur inscrit: {data.email}")
return {
"success": True,
"message": "Inscription reussie. Consultez votre email pour verifier votre compte.",
"user_id": new_user.id,
}
@router.post("/login")
async def login(
data: LoginRequest,
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
):
ip = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "")
fingerprint_hash = DeviceFingerprint.generate_hash(request)
allowed, error_msg, _ = await RateLimiter.check_login_rate_limit(
data.email.lower(), ip
)
if not allowed:
await AuditService.log_login_failed(session, request, data.email, "rate_limit")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg
)
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
if not user or not verify_password(data.password, user.hashed_password):
await RateLimiter.record_login_attempt(data.email.lower(), ip, success=False)
await AuditService.record_login_attempt(
session, request, data.email, False, "invalid_credentials"
)
if user:
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
if user.failed_login_attempts >= settings.account_lockout_threshold:
user.locked_until = datetime.now() + timedelta(
minutes=settings.account_lockout_duration_minutes
)
await AuditService.log_account_locked(
session, request, user.id, "too_many_failed_attempts"
)
await session.commit()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Compte verrouille. Reessayez dans {settings.account_lockout_duration_minutes} minutes.",
)
await session.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect",
)
if not user.is_active:
await AuditService.log_login_failed(
session, request, data.email, "account_disabled", user.id
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte desactive"
)
if not user.is_verified:
await AuditService.log_login_failed(
session, request, data.email, "email_not_verified", user.id
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non verifie. Consultez votre boite de reception.",
)
if user.locked_until and user.locked_until > datetime.now():
await AuditService.log_login_failed(
session, request, data.email, "account_locked", user.id
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouille",
)
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now()
user.last_login_ip = ip
(
access_token,
refresh_token,
csrf_token,
session_id,
) = await TokenService.create_token_pair(
session=session,
user=user,
fingerprint_hash=fingerprint_hash,
device_info=user_agent,
ip_address=ip,
)
await session.commit()
await RateLimiter.record_login_attempt(data.email.lower(), ip, success=True)
await AuditService.log_login_success(session, request, user.id, user.email)
set_auth_cookies(response, access_token, refresh_token, csrf_token)
logger.info(f"Connexion reussie: {user.email} depuis {ip}")
return TokenResponse(
message="Connexion reussie",
user={
"id": user.id,
"email": user.email,
"nom": user.nom,
"prenom": user.prenom,
"role": user.role,
},
expires_in=settings.access_token_expire_minutes * 60,
)
@router.post("/refresh")
async def refresh_tokens(
request: Request, response: Response, session: AsyncSession = Depends(get_session)
):
refresh_token = CookieManager.get_refresh_token(request)
if not refresh_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token manquant"
)
ip = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "")
fingerprint_hash = DeviceFingerprint.generate_hash(request)
result = await TokenService.refresh_tokens(
session=session,
refresh_token=refresh_token,
fingerprint_hash=fingerprint_hash,
device_info=user_agent,
ip_address=ip,
)
if not result:
CookieManager.clear_all_auth_cookies(response)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token invalide ou expire",
)
new_access, new_refresh, new_csrf, session_id = result
await session.commit()
set_auth_cookies(response, new_access, new_refresh, new_csrf)
logger.debug("Tokens rafraichis avec succes")
return {
"message": "Tokens rafraichis",
"expires_in": settings.access_token_expire_minutes * 60,
}
@router.post("/logout")
async def logout(
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
refresh_token = CookieManager.get_refresh_token(request)
if refresh_token:
await TokenService.revoke_token(
session=session, refresh_token=refresh_token, reason="user_logout"
)
await AuditService.log_logout(session, request, user.id)
await session.commit()
CookieManager.clear_all_auth_cookies(response)
logger.info(f"Deconnexion: {user.email}")
return {"success": True, "message": "Deconnexion reussie"}
@router.post("/logout-all")
async def logout_all_sessions(
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
count = await TokenService.revoke_all_user_tokens(
session=session, user_id=user.id, reason="user_logout_all"
)
await AuditService.log_event(
session=session,
event_type=AuditEventType.SESSION_TERMINATED,
request=request,
user_id=user.id,
description=f"Toutes les sessions terminees ({count} tokens revoques)",
)
await session.commit()
CookieManager.clear_all_auth_cookies(response)
logger.info(f"Toutes les sessions terminees pour {user.email}: {count} tokens")
return {"success": True, "message": f"{count} session(s) terminee(s)"}
@router.get("/verify-email")
async def verify_email_get(token: str, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(User).where(User.verification_token == token))
user = result.scalar_one_or_none()
if not user:
return {
"success": False,
"message": "Token de verification invalide ou deja utilise.",
}
if (
user.verification_token_expires
and user.verification_token_expires < datetime.now()
):
return {
"success": False,
"message": "Token expire. Demandez un nouveau lien de verification.",
"expired": True,
}
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await session.commit()
logger.info(f"Email verifie: {user.email}")
return {
"success": True,
"message": "Email verifie avec succes. Vous pouvez maintenant vous connecter.",
}
@router.post("/verify-email")
async def verify_email_post(
data: VerifyEmailRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(User).where(User.verification_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de verification invalide",
)
if (
user.verification_token_expires
and user.verification_token_expires < datetime.now()
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expire. Demandez un nouveau lien de verification.",
)
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await AuditService.log_event(
session=session,
event_type=AuditEventType.EMAIL_VERIFICATION,
request=request,
user_id=user.id,
description="Email verifie avec succes",
)
await session.commit()
logger.info(f"Email verifie: {user.email}")
return {"success": True, "message": "Email verifie avec succes."}
@router.post("/resend-verification")
async def resend_verification(
data: ResendVerificationRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
if not user:
return {
"success": True,
"message": "Si cet email existe, un nouveau lien a ete envoye.",
}
if user.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Ce compte est deja verifie"
)
verification_token = generate_verification_token()
user.verification_token = verification_token
user.verification_token_expires = datetime.now() + timedelta(hours=24)
await session.commit()
base_url = str(request.base_url).rstrip("/")
AuthEmailService.send_verification_email(user.email, verification_token, base_url)
return {"success": True, "message": "Un nouveau lien de verification a ete envoye."}
@router.post("/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
ip = get_client_ip(request)
allowed, error_msg = await RateLimiter.check_password_reset_rate_limit(
data.email.lower(), ip
)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg
)
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
await AuditService.log_password_reset_request(
session, request, data.email, user.id if user else None
)
if not user:
return {
"success": True,
"message": "Si cet email existe, un lien de reinitialisation a ete envoye.",
}
reset_token = generate_reset_token()
user.reset_token = reset_token
user.reset_token_expires = datetime.now() + timedelta(hours=1)
await session.commit()
frontend_url = settings.frontend_url or str(request.base_url).rstrip("/")
AuthEmailService.send_password_reset_email(user.email, reset_token, frontend_url)
logger.info(f"Reset password demande: {user.email}")
return {
"success": True,
"message": "Si cet email existe, un lien de reinitialisation a ete envoye.",
}
@router.post("/reset-password")
async def reset_password(
data: ResetPasswordRequest,
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(User).where(User.reset_token == data.token))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de reinitialisation invalide",
)
if user.reset_token_expires and user.reset_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expire. Demandez un nouveau lien.",
)
is_valid, error_msg = validate_password_strength(data.new_password)
if not is_valid:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
user.hashed_password = hash_password(data.new_password)
user.reset_token = None
user.reset_token_expires = None
user.failed_login_attempts = 0
user.locked_until = None
user.password_changed_at = datetime.now()
await TokenService.revoke_all_user_tokens(
session=session, user_id=user.id, reason="password_reset"
)
await AuditService.log_password_change(session, request, user.id, "reset")
await session.commit()
CookieManager.clear_all_auth_cookies(response)
AuthEmailService.send_password_changed_notification(user.email)
logger.info(f"Mot de passe reinitialise: {user.email}")
return {
"success": True,
"message": "Mot de passe reinitialise. Vous pouvez maintenant vous connecter.",
}
@router.post("/change-password")
async def change_password(
data: ChangePasswordRequest,
request: Request,
response: Response,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
if not verify_password(data.current_password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Mot de passe actuel incorrect",
)
is_valid, error_msg = validate_password_strength(data.new_password)
if not is_valid:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
user.hashed_password = hash_password(data.new_password)
user.password_changed_at = datetime.now()
await TokenService.revoke_all_user_tokens(
session=session, user_id=user.id, reason="password_change"
)
await AuditService.log_password_change(session, request, user.id, "user_initiated")
await session.commit()
CookieManager.clear_all_auth_cookies(response)
AuthEmailService.send_password_changed_notification(user.email)
logger.info(f"Mot de passe change: {user.email}")
return {
"success": True,
"message": "Mot de passe modifie. Veuillez vous reconnecter.",
}
@router.get("/me")
async def get_current_user_info(user: User = Depends(get_current_user)):
return {
"id": user.id,
"email": user.email,
"nom": user.nom,
"prenom": user.prenom,
"role": user.role,
"is_verified": user.is_verified,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
}
@router.get("/sessions", response_model=List[SessionResponse])
async def get_active_sessions(
session: AsyncSession = Depends(get_session), user: User = Depends(get_current_user)
):
sessions = await TokenService.get_user_active_sessions(session, user.id)
return [SessionResponse(**s) for s in sessions]
@router.delete("/sessions/{session_id}")
async def revoke_session(
session_id: str,
request: Request,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
result = await session.execute(
select(RefreshToken).where(
RefreshToken.id == session_id,
RefreshToken.user_id == user.id,
RefreshToken.is_revoked.is_(false()),
)
)
token_record = result.scalar_one_or_none()
if not token_record:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Session introuvable"
)
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
token_record.revoked_reason = "user_revoked"
await AuditService.log_event(
session=session,
event_type=AuditEventType.SESSION_TERMINATED,
request=request,
user_id=user.id,
description=f"Session {session_id[:8]}... revoquee",
)
await session.commit()
return {"success": True, "message": "Session revoquee"}
@router.get("/csrf-token")
async def get_csrf_token(
request: Request, response: Response, user: User = Depends(get_current_user)
):
from security.auth import generate_session_id, create_csrf_token
session_id = getattr(request.state, "session_id", None)
if not session_id:
session_id = generate_session_id()
csrf_token = create_csrf_token(session_id)
CookieManager.set_csrf_token(response, csrf_token)
return {"csrf_token": csrf_token}