Sage100-vps/routes/auth.py
2025-12-02 09:33:18 +03:00

608 lines
No EOL
17 KiB
Python

# auth/routes.py
from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, timedelta
from typing import Optional
import uuid
from database import get_session, User, RefreshToken, LoginAttempt
from security.auth import (
hash_password,
verify_password,
validate_password_strength,
create_access_token,
create_refresh_token,
decode_token,
generate_verification_token,
generate_reset_token,
hash_token
)
from services.email_service import AuthEmailService
from core.dependencies import get_current_user
from config import settings
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# === MODÈLES PYDANTIC ===
class RegisterRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
nom: str = Field(..., min_length=2, max_length=100)
prenom: str = Field(..., min_length=2, max_length=100)
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int = 1800 # 30 minutes en secondes
class RefreshTokenRequest(BaseModel):
refresh_token: str
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str = Field(..., min_length=8)
class VerifyEmailRequest(BaseModel):
token: str
class ResendVerificationRequest(BaseModel):
email: EmailStr
# === UTILITAIRES ===
async def log_login_attempt(
session: AsyncSession,
email: str,
ip: str,
user_agent: str,
success: bool,
failure_reason: Optional[str] = None
):
"""Enregistre une tentative de connexion"""
attempt = LoginAttempt(
email=email,
ip_address=ip,
user_agent=user_agent,
success=success,
failure_reason=failure_reason,
timestamp=datetime.now()
)
session.add(attempt)
await session.commit()
async def check_rate_limit(session: AsyncSession, email: str, ip: str) -> tuple[bool, str]:
"""
Vérifie si l'utilisateur/IP est rate limité
Returns:
(is_allowed, error_message)
"""
# Vérifier les tentatives échouées des 15 dernières minutes
time_window = datetime.now() - timedelta(minutes=15)
result = await session.execute(
select(LoginAttempt)
.where(
LoginAttempt.email == email,
LoginAttempt.success == False,
LoginAttempt.timestamp >= time_window
)
)
failed_attempts = result.scalars().all()
if len(failed_attempts) >= 5:
return False, "Trop de tentatives échouées. Réessayez dans 15 minutes."
return True, ""
# === ENDPOINTS ===
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
data: RegisterRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
📝 Inscription d'un nouvel utilisateur
- Valide le mot de passe
- Crée le compte (non vérifié)
- Envoie email de vérification
"""
# Vérifier si l'email existe déjà
result = await session.execute(
select(User).where(User.email == data.email)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cet email est déjà utilisé"
)
# Valider le mot de passe
is_valid, error_msg = validate_password_strength(data.password)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg
)
# Générer token de vérification
verification_token = generate_verification_token()
# Créer l'utilisateur
new_user = User(
id=str(uuid.uuid4()),
email=data.email.lower(),
hashed_password=hash_password(data.password),
nom=data.nom,
prenom=data.prenom,
is_verified=False,
verification_token=verification_token,
verification_token_expires=datetime.now() + timedelta(hours=24),
created_at=datetime.now()
)
session.add(new_user)
await session.commit()
# Envoyer email de vérification
base_url = str(request.base_url).rstrip('/')
email_sent = AuthEmailService.send_verification_email(
data.email,
verification_token,
base_url
)
if not email_sent:
logger.warning(f"Échec envoi email vérification pour {data.email}")
logger.info(f"✅ Nouvel utilisateur inscrit: {data.email} (ID: {new_user.id})")
return {
"success": True,
"message": "Inscription réussie ! Consultez votre email pour vérifier votre compte.",
"user_id": new_user.id,
"email": data.email
}
@router.post("/verify-email")
async def verify_email(
data: VerifyEmailRequest,
session: AsyncSession = Depends(get_session)
):
"""
✅ Vérification de l'email via token
"""
result = await session.execute(
select(User).where(User.verification_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de vérification invalide"
)
# Vérifier l'expiration
if user.verification_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouvel email de vérification."
)
# Activer le compte
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await session.commit()
logger.info(f"✅ Email vérifié: {user.email}")
return {
"success": True,
"message": "Email vérifié avec succès ! Vous pouvez maintenant vous connecter."
}
@router.post("/resend-verification")
async def resend_verification(
data: ResendVerificationRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔄 Renvoyer l'email de vérification
"""
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
if not user:
# Ne pas révéler si l'utilisateur existe
return {
"success": True,
"message": "Si cet email existe, un nouveau lien de vérification a été envoyé."
}
if user.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ce compte est déjà vérifié"
)
# Générer nouveau token
verification_token = generate_verification_token()
user.verification_token = verification_token
user.verification_token_expires = datetime.now() + timedelta(hours=24)
await session.commit()
# Envoyer email
base_url = str(request.base_url).rstrip('/')
AuthEmailService.send_verification_email(
user.email,
verification_token,
base_url
)
return {
"success": True,
"message": "Un nouveau lien de vérification a été envoyé."
}
@router.post("/login", response_model=TokenResponse)
async def login(
data: LoginRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔐 Connexion utilisateur
Retourne access_token (30min) et refresh_token (7 jours)
"""
ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
# Rate limiting
is_allowed, error_msg = await check_rate_limit(session, data.email.lower(), ip)
if not is_allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=error_msg
)
# Charger l'utilisateur
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
# Vérifications
if not user or not verify_password(data.password, user.hashed_password):
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Identifiants incorrects")
# Incrémenter compteur échecs
if user:
user.failed_login_attempts += 1
# Verrouiller après 5 échecs
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=15)
await session.commit()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte verrouillé suite à trop de tentatives. Réessayez dans 15 minutes."
)
await session.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
# Vérifier statut compte
if not user.is_active:
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Compte désactivé")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte désactivé"
)
if not user.is_verified:
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Email non vérifié")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception."
)
# Vérifier verrouillage
if user.locked_until and user.locked_until > datetime.now():
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Compte verrouillé")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé"
)
# ✅ CONNEXION RÉUSSIE
# Réinitialiser compteur échecs
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now()
# Créer tokens
access_token = create_access_token({"sub": user.id, "email": user.email, "role": user.role})
refresh_token_jwt = create_refresh_token(user.id)
# Stocker refresh token en DB (hashé)
refresh_token_record = RefreshToken(
id=str(uuid.uuid4()),
user_id=user.id,
token_hash=hash_token(refresh_token_jwt),
device_info=user_agent[:500],
ip_address=ip,
expires_at=datetime.now() + timedelta(days=7),
created_at=datetime.now()
)
session.add(refresh_token_record)
await session.commit()
# Logger succès
await log_login_attempt(session, data.email.lower(), ip, user_agent, True)
logger.info(f"✅ Connexion réussie: {user.email}")
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token_jwt,
expires_in=1800 # 30 minutes
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_access_token(
data: RefreshTokenRequest,
session: AsyncSession = Depends(get_session)
):
"""
🔄 Renouvellement du access_token via refresh_token
"""
# Décoder le refresh token
payload = decode_token(data.refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token invalide"
)
user_id = payload.get("sub")
token_hash = hash_token(data.refresh_token)
# Vérifier en DB
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user_id,
RefreshToken.token_hash == token_hash,
RefreshToken.is_revoked == False
)
)
token_record = result.scalar_one_or_none()
if not token_record:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token révoqué ou introuvable"
)
# Vérifier expiration
if token_record.expires_at < datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token expiré"
)
# Charger utilisateur
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable ou désactivé"
)
# Générer nouveau access token
new_access_token = create_access_token({
"sub": user.id,
"email": user.email,
"role": user.role
})
logger.info(f"🔄 Token rafraîchi: {user.email}")
return TokenResponse(
access_token=new_access_token,
refresh_token=data.refresh_token, # Refresh token reste le même
expires_in=1800
)
@router.post("/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔑 Demande de réinitialisation de mot de passe
"""
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
# Ne pas révéler si l'utilisateur existe
if not user:
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé."
}
# Générer token de reset
reset_token = generate_reset_token()
user.reset_token = reset_token
user.reset_token_expires = datetime.now() + timedelta(hours=1)
await session.commit()
# Envoyer email
frontend_url = settings.frontend_url if hasattr(settings, 'frontend_url') else str(request.base_url).rstrip('/')
AuthEmailService.send_password_reset_email(
user.email,
reset_token,
frontend_url
)
logger.info(f"📧 Reset password demandé: {user.email}")
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé."
}
@router.post("/reset-password")
async def reset_password(
data: ResetPasswordRequest,
session: AsyncSession = Depends(get_session)
):
"""
🔐 Réinitialisation du mot de passe avec token
"""
result = await session.execute(
select(User).where(User.reset_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de réinitialisation invalide"
)
# Vérifier expiration
if user.reset_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouveau lien de réinitialisation."
)
# Valider nouveau mot de passe
is_valid, error_msg = validate_password_strength(data.new_password)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg
)
# Mettre à jour
user.hashed_password = hash_password(data.new_password)
user.reset_token = None
user.reset_token_expires = None
user.failed_login_attempts = 0
user.locked_until = None
await session.commit()
# Envoyer notification
AuthEmailService.send_password_changed_notification(user.email)
logger.info(f"🔐 Mot de passe réinitialisé: {user.email}")
return {
"success": True,
"message": "Mot de passe réinitialisé avec succès. Vous pouvez maintenant vous connecter."
}
@router.post("/logout")
async def logout(
data: RefreshTokenRequest,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user)
):
"""
🚪 Déconnexion (révocation du refresh token)
"""
token_hash = hash_token(data.refresh_token)
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user.id,
RefreshToken.token_hash == token_hash
)
)
token_record = result.scalar_one_or_none()
if token_record:
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
await session.commit()
logger.info(f"👋 Déconnexion: {user.email}")
return {
"success": True,
"message": "Déconnexion réussie"
}
@router.get("/me")
async def get_current_user_info(user: User = Depends(get_current_user)):
"""
👤 Récupération du profil utilisateur
"""
return {
"id": user.id,
"email": user.email,
"nom": user.nom,
"prenom": user.prenom,
"role": user.role,
"is_verified": user.is_verified,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None
}