Added authentication logics

This commit is contained in:
Fanilo-Nantenaina 2025-12-02 09:09:29 +03:00
parent 3f8238f674
commit b2cfb31e40
10 changed files with 1314 additions and 3 deletions

7
api.py
View file

@ -14,6 +14,10 @@ import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from routes.auth import router as auth_router
from core.dependencies import get_current_user, require_role
# Configuration logging
logging.basicConfig(
level=logging.INFO,
@ -303,6 +307,9 @@ app.add_middleware(
)
app.include_router(auth_router)
# =====================================================
# ENDPOINTS - US-A1 (CRÉATION RAPIDE DEVIS)
# =====================================================

View file

@ -7,6 +7,12 @@ class Settings(BaseSettings):
env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"
)
# === JWT & Auth ===
jwt_secret: str
jwt_algorithm: str
access_token_expire_minutes: int
refresh_token_expire_days: int
SAGE_TYPE_DEVIS: int = 0
SAGE_TYPE_BON_COMMANDE: int = 10
SAGE_TYPE_PREPARATION: int = 20
@ -18,6 +24,7 @@ class Settings(BaseSettings):
# === Sage Gateway (Windows) ===
sage_gateway_url: str
sage_gateway_token: str
client_url: str = "http://localhost:3000"
# === Base de données ===
database_url: str = "sqlite+aiosqlite:///./sage_dataven.db"
@ -28,6 +35,7 @@ class Settings(BaseSettings):
smtp_user: str
smtp_password: str
smtp_from: str
smtp_use_tls: bool = True
# === Universign ===
universign_api_key: str

122
core/dependencies.py Normal file
View file

@ -0,0 +1,122 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_session, User
from security.auth import decode_token
from typing import Optional
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
session: AsyncSession = Depends(get_session)
) -> User:
"""
Dépendance FastAPI pour extraire l'utilisateur du JWT
Usage dans un endpoint:
@app.get("/protected")
async def protected_route(user: User = Depends(get_current_user)):
return {"user_id": user.id}
"""
token = credentials.credentials
# Décoder le token
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
# Vérifier le type
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
# Extraire user_id
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
# Charger l'utilisateur
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
# Vérifications de sécurité
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception."
)
# Vérifier si le compte est verrouillé
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées"
)
return user
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session)
) -> Optional[User]:
"""
Version optionnelle - ne lève pas d'erreur si pas de token
Utile pour des endpoints publics avec contenu enrichi si authentifié
"""
if not credentials:
return None
try:
return await get_current_user(credentials, session)
except HTTPException:
return None
def require_role(*allowed_roles: str):
"""
Décorateur pour restreindre l'accès par rôle
Usage:
@app.get("/admin/users")
async def list_users(user: User = Depends(require_role("admin"))):
...
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}"
)
return user
return role_checker

109
create_admin.py Normal file
View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de création du premier utilisateur administrateur
Usage:
python create_admin.py
"""
import asyncio
import sys
from pathlib import Path
import uuid
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent))
from database import async_session_factory, User
from security.auth import hash_password, validate_password_strength
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def create_admin():
"""Crée un utilisateur admin"""
print("\n" + "="*60)
print("🔐 Création d'un compte administrateur")
print("="*60 + "\n")
# Saisie des informations
email = input("Email de l'admin: ").strip().lower()
if not email or '@' not in email:
print("❌ Email invalide")
return False
prenom = input("Prénom: ").strip()
nom = input("Nom: ").strip()
if not prenom or not nom:
print("❌ Prénom et nom requis")
return False
# Mot de passe avec validation
while True:
password = input("Mot de passe (min 8 car., 1 maj, 1 min, 1 chiffre, 1 spécial): ")
is_valid, error_msg = validate_password_strength(password)
if is_valid:
confirm = input("Confirmez le mot de passe: ")
if password == confirm:
break
else:
print("❌ Les mots de passe ne correspondent pas\n")
else:
print(f"{error_msg}\n")
# Vérifier si l'email existe déjà
async with async_session_factory() as session:
from sqlalchemy import select
result = await session.execute(
select(User).where(User.email == email)
)
existing = result.scalar_one_or_none()
if existing:
print(f"\n❌ Un utilisateur avec l'email {email} existe déjà")
return False
# Créer l'admin
admin = User(
id=str(uuid.uuid4()),
email=email,
hashed_password=hash_password(password),
nom=nom,
prenom=prenom,
role="admin",
is_verified=True, # Admin vérifié par défaut
is_active=True,
created_at=datetime.now()
)
session.add(admin)
await session.commit()
print("\n✅ Administrateur créé avec succès!")
print(f"📧 Email: {email}")
print(f"👤 Nom: {prenom} {nom}")
print(f"🔑 Rôle: admin")
print(f"🆔 ID: {admin.id}")
print("\n💡 Vous pouvez maintenant vous connecter à l'API\n")
return True
if __name__ == "__main__":
try:
result = asyncio.run(create_admin())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n\n❌ Création annulée")
sys.exit(1)
except Exception as e:
print(f"\n❌ Erreur: {e}")
logger.exception("Détails:")
sys.exit(1)

View file

@ -14,7 +14,11 @@ from database.models import (
CacheMetadata,
AuditLog,
StatutEmail,
StatutSignature
StatutSignature,
# Nouveaux modèles auth
User,
RefreshToken,
LoginAttempt,
)
__all__ = [
@ -25,7 +29,7 @@ __all__ = [
'get_session',
'close_db',
# Models
# Models existants
'Base',
'EmailLog',
'SignatureLog',
@ -36,4 +40,9 @@ __all__ = [
# Enums
'StatutEmail',
'StatutSignature',
# Modèles auth
'User',
'RefreshToken',
'LoginAttempt',
]

View file

@ -202,3 +202,89 @@ class AuditLog(Base):
def __repr__(self):
return f"<AuditLog {self.action} on {self.ressource_type}/{self.ressource_id}>"
# Ajouter ces modèles à la fin de database/models.py
class User(Base):
"""
Utilisateurs de l'API avec validation email
"""
__tablename__ = "users"
id = Column(String(36), primary_key=True)
email = Column(String(255), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
# Profil
nom = Column(String(100), nullable=False)
prenom = Column(String(100), nullable=False)
role = Column(String(50), default="user") # user, admin, commercial
# Validation email
is_verified = Column(Boolean, default=False)
verification_token = Column(String(255), nullable=True, unique=True, index=True)
verification_token_expires = Column(DateTime, nullable=True)
# Sécurité
is_active = Column(Boolean, default=True)
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime, nullable=True)
# Mot de passe oublié
reset_token = Column(String(255), nullable=True, unique=True, index=True)
reset_token_expires = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.now, nullable=False)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
last_login = Column(DateTime, nullable=True)
def __repr__(self):
return f"<User {self.email} verified={self.is_verified}>"
class RefreshToken(Base):
"""
Tokens de rafraîchissement JWT
"""
__tablename__ = "refresh_tokens"
id = Column(String(36), primary_key=True)
user_id = Column(String(36), nullable=False, index=True)
token_hash = Column(String(255), nullable=False, unique=True, index=True)
# Métadonnées
device_info = Column(String(500), nullable=True)
ip_address = Column(String(45), nullable=True)
# Expiration
expires_at = Column(DateTime, nullable=False)
created_at = Column(DateTime, default=datetime.now, nullable=False)
# Révocation
is_revoked = Column(Boolean, default=False)
revoked_at = Column(DateTime, nullable=True)
def __repr__(self):
return f"<RefreshToken user={self.user_id} revoked={self.is_revoked}>"
class LoginAttempt(Base):
"""
Journal des tentatives de connexion (détection bruteforce)
"""
__tablename__ = "login_attempts"
id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(String(255), nullable=False, index=True)
ip_address = Column(String(45), nullable=False, index=True)
user_agent = Column(String(500), nullable=True)
success = Column(Boolean, default=False)
failure_reason = Column(String(255), nullable=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False, index=True)
def __repr__(self):
return f"<LoginAttempt {self.email} success={self.success}>"

View file

@ -5,10 +5,15 @@ pydantic-settings
reportlab
requests
msal
python-multipart
email-validator
python-dotenv
python-jose[cryptography]
passlib[bcrypt]
bcrypt==4.2.0
sqlalchemy
aiosqlite
tenacity

608
routes/auth.py Normal file
View file

@ -0,0 +1,608 @@
# auth/routes.py
from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, timedelta
from typing import Optional
import uuid
from database import get_session, User, RefreshToken, LoginAttempt
from security.auth import (
hash_password,
verify_password,
validate_password_strength,
create_access_token,
create_refresh_token,
decode_token,
generate_verification_token,
generate_reset_token,
hash_token
)
from auth.email_service import AuthEmailService
from auth.dependencies import get_current_user
from config import settings
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# === MODÈLES PYDANTIC ===
class RegisterRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
nom: str = Field(..., min_length=2, max_length=100)
prenom: str = Field(..., min_length=2, max_length=100)
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int = 1800 # 30 minutes en secondes
class RefreshTokenRequest(BaseModel):
refresh_token: str
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str = Field(..., min_length=8)
class VerifyEmailRequest(BaseModel):
token: str
class ResendVerificationRequest(BaseModel):
email: EmailStr
# === UTILITAIRES ===
async def log_login_attempt(
session: AsyncSession,
email: str,
ip: str,
user_agent: str,
success: bool,
failure_reason: Optional[str] = None
):
"""Enregistre une tentative de connexion"""
attempt = LoginAttempt(
email=email,
ip_address=ip,
user_agent=user_agent,
success=success,
failure_reason=failure_reason,
timestamp=datetime.now()
)
session.add(attempt)
await session.commit()
async def check_rate_limit(session: AsyncSession, email: str, ip: str) -> tuple[bool, str]:
"""
Vérifie si l'utilisateur/IP est rate limité
Returns:
(is_allowed, error_message)
"""
# Vérifier les tentatives échouées des 15 dernières minutes
time_window = datetime.now() - timedelta(minutes=15)
result = await session.execute(
select(LoginAttempt)
.where(
LoginAttempt.email == email,
LoginAttempt.success == False,
LoginAttempt.timestamp >= time_window
)
)
failed_attempts = result.scalars().all()
if len(failed_attempts) >= 5:
return False, "Trop de tentatives échouées. Réessayez dans 15 minutes."
return True, ""
# === ENDPOINTS ===
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
data: RegisterRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
📝 Inscription d'un nouvel utilisateur
- Valide le mot de passe
- Crée le compte (non vérifié)
- Envoie email de vérification
"""
# Vérifier si l'email existe déjà
result = await session.execute(
select(User).where(User.email == data.email)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cet email est déjà utilisé"
)
# Valider le mot de passe
is_valid, error_msg = validate_password_strength(data.password)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg
)
# Générer token de vérification
verification_token = generate_verification_token()
# Créer l'utilisateur
new_user = User(
id=str(uuid.uuid4()),
email=data.email.lower(),
hashed_password=hash_password(data.password),
nom=data.nom,
prenom=data.prenom,
is_verified=False,
verification_token=verification_token,
verification_token_expires=datetime.now() + timedelta(hours=24),
created_at=datetime.now()
)
session.add(new_user)
await session.commit()
# Envoyer email de vérification
base_url = str(request.base_url).rstrip('/')
email_sent = AuthEmailService.send_verification_email(
data.email,
verification_token,
base_url
)
if not email_sent:
logger.warning(f"Échec envoi email vérification pour {data.email}")
logger.info(f"✅ Nouvel utilisateur inscrit: {data.email} (ID: {new_user.id})")
return {
"success": True,
"message": "Inscription réussie ! Consultez votre email pour vérifier votre compte.",
"user_id": new_user.id,
"email": data.email
}
@router.post("/verify-email")
async def verify_email(
data: VerifyEmailRequest,
session: AsyncSession = Depends(get_session)
):
"""
Vérification de l'email via token
"""
result = await session.execute(
select(User).where(User.verification_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de vérification invalide"
)
# Vérifier l'expiration
if user.verification_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouvel email de vérification."
)
# Activer le compte
user.is_verified = True
user.verification_token = None
user.verification_token_expires = None
await session.commit()
logger.info(f"✅ Email vérifié: {user.email}")
return {
"success": True,
"message": "Email vérifié avec succès ! Vous pouvez maintenant vous connecter."
}
@router.post("/resend-verification")
async def resend_verification(
data: ResendVerificationRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔄 Renvoyer l'email de vérification
"""
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
if not user:
# Ne pas révéler si l'utilisateur existe
return {
"success": True,
"message": "Si cet email existe, un nouveau lien de vérification a été envoyé."
}
if user.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ce compte est déjà vérifié"
)
# Générer nouveau token
verification_token = generate_verification_token()
user.verification_token = verification_token
user.verification_token_expires = datetime.now() + timedelta(hours=24)
await session.commit()
# Envoyer email
base_url = str(request.base_url).rstrip('/')
AuthEmailService.send_verification_email(
user.email,
verification_token,
base_url
)
return {
"success": True,
"message": "Un nouveau lien de vérification a été envoyé."
}
@router.post("/login", response_model=TokenResponse)
async def login(
data: LoginRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔐 Connexion utilisateur
Retourne access_token (30min) et refresh_token (7 jours)
"""
ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "unknown")
# Rate limiting
is_allowed, error_msg = await check_rate_limit(session, data.email.lower(), ip)
if not is_allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=error_msg
)
# Charger l'utilisateur
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
# Vérifications
if not user or not verify_password(data.password, user.hashed_password):
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Identifiants incorrects")
# Incrémenter compteur échecs
if user:
user.failed_login_attempts += 1
# Verrouiller après 5 échecs
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=15)
await session.commit()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte verrouillé suite à trop de tentatives. Réessayez dans 15 minutes."
)
await session.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
# Vérifier statut compte
if not user.is_active:
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Compte désactivé")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte désactivé"
)
if not user.is_verified:
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Email non vérifié")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception."
)
# Vérifier verrouillage
if user.locked_until and user.locked_until > datetime.now():
await log_login_attempt(session, data.email.lower(), ip, user_agent, False, "Compte verrouillé")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé"
)
# ✅ CONNEXION RÉUSSIE
# Réinitialiser compteur échecs
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now()
# Créer tokens
access_token = create_access_token({"sub": user.id, "email": user.email, "role": user.role})
refresh_token_jwt = create_refresh_token(user.id)
# Stocker refresh token en DB (hashé)
refresh_token_record = RefreshToken(
id=str(uuid.uuid4()),
user_id=user.id,
token_hash=hash_token(refresh_token_jwt),
device_info=user_agent[:500],
ip_address=ip,
expires_at=datetime.now() + timedelta(days=7),
created_at=datetime.now()
)
session.add(refresh_token_record)
await session.commit()
# Logger succès
await log_login_attempt(session, data.email.lower(), ip, user_agent, True)
logger.info(f"✅ Connexion réussie: {user.email}")
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token_jwt,
expires_in=1800 # 30 minutes
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_access_token(
data: RefreshTokenRequest,
session: AsyncSession = Depends(get_session)
):
"""
🔄 Renouvellement du access_token via refresh_token
"""
# Décoder le refresh token
payload = decode_token(data.refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token invalide"
)
user_id = payload.get("sub")
token_hash = hash_token(data.refresh_token)
# Vérifier en DB
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user_id,
RefreshToken.token_hash == token_hash,
RefreshToken.is_revoked == False
)
)
token_record = result.scalar_one_or_none()
if not token_record:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token révoqué ou introuvable"
)
# Vérifier expiration
if token_record.expires_at < datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token expiré"
)
# Charger utilisateur
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable ou désactivé"
)
# Générer nouveau access token
new_access_token = create_access_token({
"sub": user.id,
"email": user.email,
"role": user.role
})
logger.info(f"🔄 Token rafraîchi: {user.email}")
return TokenResponse(
access_token=new_access_token,
refresh_token=data.refresh_token, # Refresh token reste le même
expires_in=1800
)
@router.post("/forgot-password")
async def forgot_password(
data: ForgotPasswordRequest,
request: Request,
session: AsyncSession = Depends(get_session)
):
"""
🔑 Demande de réinitialisation de mot de passe
"""
result = await session.execute(
select(User).where(User.email == data.email.lower())
)
user = result.scalar_one_or_none()
# Ne pas révéler si l'utilisateur existe
if not user:
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé."
}
# Générer token de reset
reset_token = generate_reset_token()
user.reset_token = reset_token
user.reset_token_expires = datetime.now() + timedelta(hours=1)
await session.commit()
# Envoyer email
frontend_url = settings.frontend_url if hasattr(settings, 'frontend_url') else str(request.base_url).rstrip('/')
AuthEmailService.send_password_reset_email(
user.email,
reset_token,
frontend_url
)
logger.info(f"📧 Reset password demandé: {user.email}")
return {
"success": True,
"message": "Si cet email existe, un lien de réinitialisation a été envoyé."
}
@router.post("/reset-password")
async def reset_password(
data: ResetPasswordRequest,
session: AsyncSession = Depends(get_session)
):
"""
🔐 Réinitialisation du mot de passe avec token
"""
result = await session.execute(
select(User).where(User.reset_token == data.token)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token de réinitialisation invalide"
)
# Vérifier expiration
if user.reset_token_expires < datetime.now():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token expiré. Demandez un nouveau lien de réinitialisation."
)
# Valider nouveau mot de passe
is_valid, error_msg = validate_password_strength(data.new_password)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg
)
# Mettre à jour
user.hashed_password = hash_password(data.new_password)
user.reset_token = None
user.reset_token_expires = None
user.failed_login_attempts = 0
user.locked_until = None
await session.commit()
# Envoyer notification
AuthEmailService.send_password_changed_notification(user.email)
logger.info(f"🔐 Mot de passe réinitialisé: {user.email}")
return {
"success": True,
"message": "Mot de passe réinitialisé avec succès. Vous pouvez maintenant vous connecter."
}
@router.post("/logout")
async def logout(
data: RefreshTokenRequest,
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user)
):
"""
🚪 Déconnexion (révocation du refresh token)
"""
token_hash = hash_token(data.refresh_token)
result = await session.execute(
select(RefreshToken).where(
RefreshToken.user_id == user.id,
RefreshToken.token_hash == token_hash
)
)
token_record = result.scalar_one_or_none()
if token_record:
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
await session.commit()
logger.info(f"👋 Déconnexion: {user.email}")
return {
"success": True,
"message": "Déconnexion réussie"
}
@router.get("/me")
async def get_current_user_info(user: User = Depends(get_current_user)):
"""
👤 Récupération du profil utilisateur
"""
return {
"id": user.id,
"email": user.email,
"nom": user.nom,
"prenom": user.prenom,
"role": user.role,
"is_verified": user.is_verified,
"created_at": user.created_at.isoformat(),
"last_login": user.last_login.isoformat() if user.last_login else None
}

131
security/auth.py Normal file
View file

@ -0,0 +1,131 @@
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional, Dict
import jwt
import secrets
import hashlib
# Configuration
SECRET_KEY = "VOTRE_SECRET_KEY_A_METTRE_EN_.ENV" # À remplacer par settings.jwt_secret
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# === Hachage de mots de passe ===
def hash_password(password: str) -> str:
"""Hash un mot de passe avec bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Vérifie un mot de passe contre son hash"""
return pwd_context.verify(plain_password, hashed_password)
# === Génération de tokens aléatoires ===
def generate_verification_token() -> str:
"""Génère un token de vérification email sécurisé"""
return secrets.token_urlsafe(32)
def generate_reset_token() -> str:
"""Génère un token de réinitialisation mot de passe"""
return secrets.token_urlsafe(32)
def hash_token(token: str) -> str:
"""Hash un refresh token pour stockage en DB"""
return hashlib.sha256(token.encode()).hexdigest()
# === JWT Access Token ===
def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Crée un JWT access token
Args:
data: Payload (doit contenir 'sub' = user_id)
expires_delta: Durée de validité personnalisée
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token(user_id: str) -> str:
"""
Crée un refresh token (JWT long terme)
Returns:
Token JWT non hashé (à hasher avant stockage DB)
"""
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {
"sub": user_id,
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh",
"jti": secrets.token_urlsafe(16) # Unique ID
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[Dict]:
"""
Décode et valide un JWT
Returns:
Payload si valide, None sinon
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.JWTError:
return None
# === Validation mot de passe ===
def validate_password_strength(password: str) -> tuple[bool, str]:
"""
Valide la robustesse d'un mot de passe
Returns:
(is_valid, error_message)
"""
if len(password) < 8:
return False, "Le mot de passe doit contenir au moins 8 caractères"
if not any(c.isupper() for c in password):
return False, "Le mot de passe doit contenir au moins une majuscule"
if not any(c.islower() for c in password):
return False, "Le mot de passe doit contenir au moins une minuscule"
if not any(c.isdigit() for c in password):
return False, "Le mot de passe doit contenir au moins un chiffre"
special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"
if not any(c in special_chars for c in password):
return False, "Le mot de passe doit contenir au moins un caractère spécial"
return True, ""

226
services/email_service.py Normal file
View file

@ -0,0 +1,226 @@
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from config import settings
import logging
logger = logging.getLogger(__name__)
class AuthEmailService:
"""Service d'envoi d'emails pour l'authentification"""
@staticmethod
def _send_email(to: str, subject: str, html_body: str) -> bool:
"""Envoi SMTP générique"""
try:
msg = MIMEMultipart()
msg['From'] = settings.smtp_from
msg['To'] = to
msg['Subject'] = subject
msg.attach(MIMEText(html_body, 'html'))
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=30) as server:
if settings.smtp_use_tls:
server.starttls()
if settings.smtp_user and settings.smtp_password:
server.login(settings.smtp_user, settings.smtp_password)
server.send_message(msg)
logger.info(f"✅ Email envoyé: {subject}{to}")
return True
except Exception as e:
logger.error(f"❌ Erreur envoi email: {e}")
return False
@staticmethod
def send_verification_email(email: str, token: str, base_url: str) -> bool:
"""
Envoie l'email de vérification avec lien de confirmation
Args:
email: Email du destinataire
token: Token de vérification
base_url: URL de base de l'API (ex: https://api.votredomaine.com)
"""
verification_link = f"{base_url}/auth/verify-email?token={token}"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: #4F46E5; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }}
.content {{ background: #f9fafb; padding: 30px; border-radius: 0 0 8px 8px; }}
.button {{
display: inline-block;
background: #4F46E5;
color: white;
padding: 12px 30px;
text-decoration: none;
border-radius: 6px;
margin: 20px 0;
}}
.footer {{ text-align: center; margin-top: 20px; font-size: 12px; color: #6b7280; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎉 Bienvenue sur Sage Dataven</h1>
</div>
<div class="content">
<h2>Vérifiez votre adresse email</h2>
<p>Merci de vous être inscrit ! Pour activer votre compte, veuillez cliquer sur le bouton ci-dessous :</p>
<div style="text-align: center;">
<a href="{verification_link}" class="button">Vérifier mon email</a>
</div>
<p style="margin-top: 30px;">Ou copiez ce lien dans votre navigateur :</p>
<p style="word-break: break-all; background: #e5e7eb; padding: 10px; border-radius: 4px;">
{verification_link}
</p>
<p style="margin-top: 30px; color: #ef4444;">
Ce lien expire dans <strong>24 heures</strong>
</p>
<p style="margin-top: 30px; font-size: 14px; color: #6b7280;">
Si vous n'avez pas créé de compte, ignorez cet email.
</p>
</div>
<div class="footer">
<p>© 2024 Sage Dataven - API de gestion commerciale</p>
</div>
</div>
</body>
</html>
"""
return AuthEmailService._send_email(
email,
"🔐 Vérifiez votre adresse email - Sage Dataven",
html_body
)
@staticmethod
def send_password_reset_email(email: str, token: str, base_url: str) -> bool:
"""
Envoie l'email de réinitialisation de mot de passe
Args:
email: Email du destinataire
token: Token de reset
base_url: URL de base du frontend (ex: https://app.votredomaine.com)
"""
reset_link = f"{base_url}/reset-password?token={token}"
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; color: #333; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: #EF4444; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }}
.content {{ background: #f9fafb; padding: 30px; border-radius: 0 0 8px 8px; }}
.button {{
display: inline-block;
background: #EF4444;
color: white;
padding: 12px 30px;
text-decoration: none;
border-radius: 6px;
margin: 20px 0;
}}
.footer {{ text-align: center; margin-top: 20px; font-size: 12px; color: #6b7280; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🔑 Réinitialisation de mot de passe</h1>
</div>
<div class="content">
<h2>Demande de réinitialisation</h2>
<p>Vous avez demandé à réinitialiser votre mot de passe. Cliquez sur le bouton ci-dessous pour créer un nouveau mot de passe :</p>
<div style="text-align: center;">
<a href="{reset_link}" class="button">Réinitialiser mon mot de passe</a>
</div>
<p style="margin-top: 30px;">Ou copiez ce lien dans votre navigateur :</p>
<p style="word-break: break-all; background: #e5e7eb; padding: 10px; border-radius: 4px;">
{reset_link}
</p>
<p style="margin-top: 30px; color: #ef4444;">
Ce lien expire dans <strong>1 heure</strong>
</p>
<p style="margin-top: 30px; font-size: 14px; color: #6b7280;">
Si vous n'avez pas demandé cette réinitialisation, ignorez cet email. Votre mot de passe actuel reste inchangé.
</p>
</div>
<div class="footer">
<p>© 2024 Sage Dataven - API de gestion commerciale</p>
</div>
</div>
</body>
</html>
"""
return AuthEmailService._send_email(
email,
"🔐 Réinitialisation de votre mot de passe - Sage Dataven",
html_body
)
@staticmethod
def send_password_changed_notification(email: str) -> bool:
"""Notification après changement de mot de passe réussi"""
html_body = """
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; }
.container { max-width: 600px; margin: 0 auto; padding: 20px; }
.header { background: #10B981; color: white; padding: 20px; text-align: center; border-radius: 8px 8px 0 0; }
.content { background: #f9fafb; padding: 30px; border-radius: 0 0 8px 8px; }
.footer { text-align: center; margin-top: 20px; font-size: 12px; color: #6b7280; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1> Mot de passe modifié</h1>
</div>
<div class="content">
<h2>Votre mot de passe a été changé avec succès</h2>
<p>Ce message confirme que le mot de passe de votre compte Sage Dataven a été modifié.</p>
<p style="margin-top: 30px; padding: 15px; background: #FEF3C7; border-left: 4px solid #F59E0B; border-radius: 4px;">
Si vous n'êtes pas à l'origine de ce changement, contactez immédiatement notre support.
</p>
</div>
<div class="footer">
<p>© 2024 Sage Dataven - API de gestion commerciale</p>
</div>
</div>
</body>
</html>
"""
return AuthEmailService._send_email(
email,
"✅ Votre mot de passe a été modifié - Sage Dataven",
html_body
)