Sage100-vps/routes/auth.py
2026-01-08 16:58:43 +03:00

529 lines
16 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, timedelta
from typing import Optional
import uuid
from database import get_session, User, RefreshToken, LoginAttempt
from security.auth import (
hash_password,
verify_password,
validate_password_strength,
create_access_token,
create_refresh_token,
decode_token,
generate_verification_token,
generate_reset_token,
hash_token,
)
from services.email_service import AuthEmailService
from core.dependencies import get_current_user
from config.config import settings
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
class RegisterRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
nom: str = Field(..., min_length=2, max_length=100)
prenom: str = Field(..., min_length=2, max_length=100)
class Login(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int = 86400
class RefreshTokenRequest(BaseModel):
refresh_token: str
class ForgotPassword(BaseModel):
email: EmailStr
class ResetPassword(BaseModel):
token: str
new_password: str = Field(..., min_length=8)
class VerifyEmail(BaseModel):
token: str
class ResendVerification(BaseModel):
email: EmailStr
async def log_login_attempt(
session: AsyncSession,
email: str,
ip: str,
user_agent: str,
success: bool,
failure_reason: Optional[str] = None,
):
attempt = LoginAttempt(
email=email,
ip_address=ip,
user_agent=user_agent,
success=success,
failure_reason=failure_reason,
timestamp=datetime.now(),
)
session.add(attempt)
await session.commit()
async def check_rate_limit(
session: AsyncSession, email: str, ip: str
) -> tuple[bool, str]:
time_window = datetime.now() - timedelta(minutes=15)
result = await session.execute(
select(LoginAttempt).where(
LoginAttempt.email == email,
LoginAttempt.success,
LoginAttempt.timestamp >= time_window,
)
)
failed_attempts = result.scalars().all()
if len(failed_attempts) >= 5:
return False, "Trop de tentatives échouées. Réessayez dans 15 minutes."
return True, ""
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
data: RegisterRequest,
request: Request,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(User).where(User.email == data.email))
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Cet email est déjà utilisé"
)
is_valid, error_msg = validate_password_strength(data.password)
if not is_valid:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
verification_token = generate_verification_token()
new_user = User(
id=str(uuid.uuid4()),
email=data.email.lower(),
hashed_password=hash_password(data.password),
nom=data.nom,
prenom=data.prenom,
is_verified=False,
verification_token=verification_token,
verification_token_expires=datetime.now() + timedelta(hours=24),
created_at=datetime.now(),
)
session.add(new_user)
await session.commit()
base_url = str(request.base_url).rstrip("/")
email_sent = AuthEmailService.send_verification_email(
data.email, verification_token, base_url
)
if not email_sent:
logger.warning(f"Échec envoi email vérification pour {data.email}")
logger.info(f" Nouvel utilisateur inscrit: {data.email} (ID: {new_user.id})")
return {
"success": True,
"message": "Inscription réussie ! Consultez votre email pour vérifier votre compte.",
"user_id": new_user.id,
"email": data.email,
}
@router.get("/verify-email")
async def verify_email_get(token: str, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(User).where(User.verification_token == token))
user = result.scalar_one_or_none()
if not user:
return {
"success": False,
"message": "Token de vérification invalide ou déjà utilisé.",
}
if user.verification_token_expires < datetime.now():
return {
"success": False,
"message": "Token expiré. Veuillez demander un nouvel email de vérification.",
"expired": True,
}
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await session.commit()
logger.info(f" Email vérifié: {user.email}")
return {
"success": True,
"message": " Email vérifié avec succès ! Vous pouvez maintenant vous connecter.",
"email": user.email,
}
@router.post("/verify-email")
async def verify_email_post(
data: VerifyEmail, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(User).where(User.verification_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de vérification invalide",
)
if user.verification_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouvel email de vérification.",
)
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await session.commit()
logger.info(f" Email vérifié: {user.email}")
return {
"success": True,
"message": "Email vérifié avec succès ! Vous pouvez maintenant vous connecter.",
}
@router.post("/resend-verification")
async def resend_verification(
data: ResendVerification,
request: Request,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
if not user:
return {
"success": True,
"message": "Si cet email existe, un nouveau lien de vérification a été envoyé.",
}
if user.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Ce compte est déjà vérifié"
)
verification_token = generate_verification_token()
user.verification_token = verification_token
user.verification_token_expires = datetime.now() + timedelta(hours=24)
await session.commit()
base_url = str(request.base_url).rstrip("/")
AuthEmailService.send_verification_email(user.email, verification_token, base_url)
return {"success": True, "message": "Un nouveau lien de vérification a été envoyé."}
@router.post("/login", response_model=TokenResponse)
async def login(
data: Login, request: Request, session: AsyncSession = Depends(get_session)
):
ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
is_allowed, error_msg = await check_rate_limit(session, data.email.lower(), ip)
if not is_allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg
)
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
if not user or not verify_password(data.password, user.hashed_password):
await log_login_attempt(
session,
data.email.lower(),
ip,
user_agent,
False,
"Identifiants incorrects",
)
if user:
user.failed_login_attempts += 1
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=15)
await session.commit()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte verrouillé suite à trop de tentatives. Réessayez dans 15 minutes.",
)
await session.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect",
)
if not user.is_active:
await log_login_attempt(
session, data.email.lower(), ip, user_agent, False, "Compte désactivé"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
await log_login_attempt(
session, data.email.lower(), ip, user_agent, False, "Email non vérifié"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
await log_login_attempt(
session, data.email.lower(), ip, user_agent, False, "Compte verrouillé"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé",
)
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now()
access_token = create_access_token(
{"sub": user.id, "email": user.email, "role": user.role}
)
refresh_token_jwt = create_refresh_token(user.id)
refresh_token_record = RefreshToken(
id=str(uuid.uuid4()),
user_id=user.id,
token_hash=hash_token(refresh_token_jwt),
device_info=user_agent[:500],
ip_address=ip,
expires_at=datetime.now() + timedelta(days=7),
created_at=datetime.now(),
)
session.add(refresh_token_record)
await session.commit()
await log_login_attempt(session, data.email.lower(), ip, user_agent, True)
logger.info(f" Connexion réussie: {user.email}")
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token_jwt,
expires_in=86400,
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_access_token(
data: RefreshTokenRequest, session: AsyncSession = Depends(get_session)
):
payload = decode_token(data.refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token invalide"
)
user_id = payload.get("sub")
token_hash = hash_token(data.refresh_token)
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user_id,
RefreshToken.token_hash == token_hash,
not RefreshToken.is_revoked,
)
)
token_record = result.scalar_one_or_none()
if not token_record:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token révoqué ou introuvable",
)
if token_record.expires_at < datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token expiré"
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable ou désactivé",
)
new_access_token = create_access_token(
{"sub": user.id, "email": user.email, "role": user.role}
)
logger.info(f" Token rafraîchi: {user.email}")
return TokenResponse(
access_token=new_access_token,
refresh_token=data.refresh_token,
expires_in=86400,
)
@router.post("/forgot-password")
async def forgot_password(
data: ForgotPassword,
request: Request,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(select(User).where(User.email == data.email.lower()))
user = result.scalar_one_or_none()
if not user:
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé.",
}
reset_token = generate_reset_token()
user.reset_token = reset_token
user.reset_token_expires = datetime.now() + timedelta(hours=1)
await session.commit()
frontend_url = (
settings.frontend_url
if hasattr(settings, "frontend_url")
else str(request.base_url).rstrip("/")
)
AuthEmailService.send_password_reset_email(user.email, reset_token, frontend_url)
logger.info(f" Reset password demandé: {user.email}")
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé.",
}
@router.post("/reset-password")
async def reset_password(
data: ResetPassword, session: AsyncSession = Depends(get_session)
):
result = await session.execute(select(User).where(User.reset_token == data.token))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de réinitialisation invalide",
)
if user.reset_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouveau lien de réinitialisation.",
)
is_valid, error_msg = validate_password_strength(data.new_password)
if not is_valid:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)
user.hashed_password = hash_password(data.new_password)
user.reset_token = None
user.reset_token_expires = None
user.failed_login_attempts = 0
user.locked_until = None
await session.commit()
AuthEmailService.send_password_changed_notification(user.email)
logger.info(f" Mot de passe réinitialisé: {user.email}")
return {
"success": True,
"message": "Mot de passe réinitialisé avec succès. Vous pouvez maintenant vous connecter.",
}
@router.post("/logout")
async def logout(
data: RefreshTokenRequest,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
token_hash = hash_token(data.refresh_token)
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user.id, RefreshToken.token_hash == token_hash
)
)
token_record = result.scalar_one_or_none()
if token_record:
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
await session.commit()
logger.info(f"👋 Déconnexion: {user.email}")
return {"success": True, "message": "Déconnexion réussie"}
@router.get("/me")
async def get_current_user_info(user: User = Depends(get_current_user)):
return {
"id": user.id,
"email": user.email,
"nom": user.nom,
"prenom": user.prenom,
"role": user.role,
"is_verified": user.is_verified,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None,
}