Compare commits

..

No commits in common. "main" and "fix/security" have entirely different histories.

9 changed files with 641 additions and 1294 deletions

732
api.py

File diff suppressed because it is too large Load diff

View file

@ -2,11 +2,13 @@ from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from jwt.exceptions import InvalidTokenError
from database import get_session, User
from security.auth import decode_token
from typing import Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False)
@ -16,6 +18,62 @@ async def get_current_user_hybrid(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
if credentials and credentials.credentials:
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
logger.debug(f" Authentifié via JWT: {user.email}")
return user
api_key_obj = getattr(request.state, "api_key", None)
if api_key_obj:
@ -26,79 +84,69 @@ async def get_current_user_hybrid(
user = result.scalar_one_or_none()
if user:
user._is_api_key_user = True
user._api_key_obj = api_key_obj
logger.debug(
f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}"
)
return user
virtual_user = User(
from database import User as UserModel
virtual_user = UserModel(
id=f"api_key_{api_key_obj.id}",
email=f"api_key_{api_key_obj.id}@virtual.local",
nom=api_key_obj.name,
prenom="API",
hashed_password="",
email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local",
nom="API Key",
prenom=api_key_obj.name,
role="api_client",
is_active=True,
is_verified=True,
is_active=True,
)
virtual_user._is_api_key_user = True
virtual_user._api_key_obj = api_key_obj
logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)")
return virtual_user
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (JWT ou API Key)",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
async def get_current_user_optional_hybrid(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)"""
try:
payload = decode_token(token)
user_id: str = payload.get("sub")
return await get_current_user_hybrid(request, credentials, session)
except HTTPException:
return None
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide: user_id manquant",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
def require_role(*allowed_roles: str):
async def role_checker(
request: Request, user: User = Depends(get_current_user_hybrid)
) -> User:
is_api_key_user = getattr(user, "_is_api_key_user", False)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
if is_api_key_user:
if "api_client" not in allowed_roles and "*" not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utilisateur inactif",
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.",
)
logger.debug(" API Key autorisée pour cette route")
return user
except InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token invalide: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def require_role_hybrid(*allowed_roles: str):
async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User:
if user.role not in allowed_roles:
if user.role not in allowed_roles and "*" not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}",
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
)
return user
return role_checker
@ -110,9 +158,9 @@ def is_api_key_user(user: User) -> bool:
def get_api_key_from_user(user: User):
"""Récupère l'objet ApiKey depuis un utilisateur (si applicable)"""
"""Récupère l'objet API Key depuis un user virtuel"""
return getattr(user, "_api_key_obj", None)
get_current_user = get_current_user_hybrid
require_role = require_role_hybrid
get_current_user_optional = get_current_user_optional_hybrid

View file

@ -152,7 +152,7 @@ templates_signature_email = {
</table>
<p style="color: #718096; font-size: 13px; line-height: 1.5; margin: 0;">
<strong> Signature électronique sécurisée</strong><br>
<strong>🔒 Signature électronique sécurisée</strong><br>
Votre signature est protégée par notre partenaire de confiance <strong>Universign</strong>,
certifié eIDAS et conforme au RGPD. Votre identité sera vérifiée et le document sera
horodaté de manière infalsifiable.

View file

@ -1,14 +1,14 @@
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool
from sqlalchemy import event, text
import logging
from config.config import settings
from database.models.generic_model import Base
logger = logging.getLogger(__name__)
DATABASE_URL = settings.database_url
DATABASE_URL = os.getenv("DATABASE_URL")
def _configure_sqlite_connection(dbapi_connection, connection_record):

View file

@ -1,6 +1,4 @@
from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text
from typing import Optional, List
import json
from datetime import datetime
import uuid
@ -51,23 +49,8 @@ class SwaggerUser(Base):
is_active = Column(Boolean, default=True, nullable=False)
allowed_tags = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.now, nullable=False)
last_login = Column(DateTime, nullable=True)
@property
def allowed_tags_list(self) -> Optional[List[str]]:
if self.allowed_tags:
try:
return json.loads(self.allowed_tags)
except json.JSONDecodeError:
return None
return None
@allowed_tags_list.setter
def allowed_tags_list(self, tags: Optional[List[str]]):
self.allowed_tags = json.dumps(tags) if tags is not None else None
def __repr__(self):
return f"<SwaggerUser(username='{self.username}', active={self.is_active})>"

View file

@ -1,27 +1,53 @@
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp, Receive, Send
from sqlalchemy import select
from typing import Callable, Optional
from typing import Optional
from datetime import datetime
import logging
import base64
import json
from database import get_session
from database.models.api_key import SwaggerUser
from security.auth import verify_password
logger = logging.getLogger(__name__)
security = HTTPBasic()
class SwaggerAuthMiddleware:
PROTECTED_PATHS = ["/docs", "/redoc", "/openapi.json"]
async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
username = credentials.username
password = credentials.password
def __init__(self, app: ASGIApp):
try:
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
swagger_user = result.scalar_one_or_none()
if swagger_user and swagger_user.is_active:
if verify_password(password, swagger_user.hashed_password):
swagger_user.last_login = datetime.now()
await session.commit()
logger.info(f" Accès Swagger autorisé (DB): {username}")
return True
logger.warning(f"Tentative d'accès Swagger refusée: {username}")
return False
except Exception as e:
logger.error(f" Erreur vérification Swagger credentials: {e}")
return False
class SwaggerAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive: Receive, send: Send):
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
@ -29,31 +55,34 @@ class SwaggerAuthMiddleware:
request = Request(scope, receive=receive)
path = request.url.path
if not any(path.startswith(p) for p in self.PROTECTED_PATHS):
await self.app(scope, receive, send)
return
protected_paths = ["/docs", "/redoc"]
if any(path.startswith(protected_path) for protected_path in protected_paths):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Authentification requise pour la documentation"},
content={
"detail": "Authentification requise pour accéder à la documentation"
},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
try:
import base64
encoded_credentials = auth_header.split(" ")[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8")
decoded_credentials = base64.b64decode(encoded_credentials).decode(
"utf-8"
)
username, password = decoded_credentials.split(":", 1)
credentials = HTTPBasicCredentials(username=username, password=password)
swagger_user = await self._verify_credentials(credentials)
if not swagger_user:
if not await verify_swagger_credentials(credentials):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"},
@ -62,15 +91,6 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send)
return
if "state" not in scope:
scope["state"] = {}
scope["state"]["swagger_user"] = swagger_user
logger.info(
f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}"
)
except Exception as e:
logger.error(f" Erreur parsing auth header: {e}")
response = JSONResponse(
@ -83,141 +103,78 @@ class SwaggerAuthMiddleware:
await self.app(scope, receive, send)
async def _verify_credentials(
self, credentials: HTTPBasicCredentials
) -> Optional[dict]:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser
from security.auth import verify_password
try:
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(
SwaggerUser.username == credentials.username
)
)
swagger_user = result.scalar_one_or_none()
class ApiKeyMiddleware:
if swagger_user and swagger_user.is_active:
if verify_password(
credentials.password, swagger_user.hashed_password
):
swagger_user.last_login = datetime.now()
await session.commit()
def __init__(self, app):
self.app = app
logger.info(f"✓ Accès Swagger autorisé: {credentials.username}")
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
return {
"id": swagger_user.id,
"username": swagger_user.username,
"allowed_tags": swagger_user.allowed_tags_list,
"is_active": swagger_user.is_active,
}
request = Request(scope, receive=receive)
path = request.url.path
logger.warning(f"✗ Accès Swagger refusé: {credentials.username}")
return None
except Exception as e:
logger.error(f" Erreur vérification credentials: {e}", exc_info=True)
return None
class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
EXCLUDED_PATHS = [
public_exact_paths = [
"/",
"/health",
"/docs",
"/redoc",
"/openapi.json",
"/",
"/health",
"/auth",
"/api-keys/verify",
"/universign/webhook",
]
def _is_excluded_path(self, path: str) -> bool:
"""Vérifie si le chemin est exclu de l'authentification API Key"""
if path == "/":
return True
public_path_prefixes = [
"/api/v1/auth/",
]
for excluded in self.EXCLUDED_PATHS:
if excluded == "/":
continue
if path == excluded or path.startswith(excluded + "/"):
return True
is_public = path in public_exact_paths or any(
path.startswith(prefix) for prefix in public_path_prefixes
)
return False
async def dispatch(self, request: Request, call_next: Callable):
path = request.url.path
method = request.method
if self._is_excluded_path(path):
return await call_next(request)
if is_public:
logger.debug(f"Chemin public: {path}")
await self.app(scope, receive, send)
return
auth_header = request.headers.get("Authorization")
api_key_header = request.headers.get("X-API-Key")
has_jwt = auth_header and auth_header.startswith("Bearer ")
if api_key_header:
api_key_header = api_key_header.strip()
if not api_key_header or api_key_header == "":
api_key_header = None
api_key = request.headers.get("X-API-Key")
has_api_key = api_key is not None
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1].strip()
if has_jwt:
logger.debug(f"🔑 JWT détecté pour {path}")
await self.app(scope, receive, send)
return
if token.startswith("sdk_live_"):
logger.warning(
" API Key envoyée dans Authorization au lieu de X-API-Key"
)
return await self._handle_api_key_auth(
request, token, path, method, call_next
)
elif has_api_key:
logger.debug(f"🔑 API Key détectée pour {path}")
logger.debug(f"JWT détecté pour {method} {path} → délégation à FastAPI")
request.state.authenticated_via = "jwt"
return await call_next(request)
if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}")
return await self._handle_api_key_auth(
request, api_key_header, path, method, call_next
)
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request)
async def _handle_api_key_auth(
self,
request: Request,
api_key: str,
path: str,
method: str,
call_next: Callable,
):
try:
from database.db_config import async_session_factory
from services.api_key import ApiKeyService
async with async_session_factory() as session:
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
try:
async for session in get_session():
api_key_service = ApiKeyService(session)
api_key_obj = await api_key_service.verify_api_key(api_key)
if not api_key_obj:
logger.warning(f"🔒 Clé API invalide: {method} {path}")
return JSONResponse(
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Clé API invalide ou expirée",
"hint": "Vérifiez votre clé X-API-Key",
"hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer <jwt>",
},
)
await response(scope, receive, send)
return
is_allowed, rate_info = await service.check_rate_limit(api_key_obj)
is_allowed, rate_info = await api_key_service.check_rate_limit(
api_key_obj
)
if not is_allowed:
logger.warning(f"⏱️ Rate limit: {api_key_obj.name}")
return JSONResponse(
response = JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"},
headers={
@ -225,49 +182,55 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
"X-RateLimit-Remaining": "0",
},
)
await response(scope, receive, send)
return
has_access = await service.check_endpoint_access(api_key_obj, path)
has_access = await api_key_service.check_endpoint_access(
api_key_obj, path
)
if not has_access:
allowed = (
json.loads(api_key_obj.allowed_endpoints)
if api_key_obj.allowed_endpoints
else ["Tous"]
)
logger.warning(
f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n"
f" Endpoint demandé: {path}\n"
f" Endpoints autorisés: {allowed}"
)
return JSONResponse(
response = JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"detail": "Accès non autorisé à cet endpoint",
"endpoint_requested": path,
"api_key_name": api_key_obj.name,
"allowed_endpoints": allowed,
"hint": "Cette clé API n'a pas accès à cet endpoint.",
"endpoint": path,
"api_key": api_key_obj.key_prefix + "...",
},
)
await response(scope, receive, send)
return
request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key"
logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name} {method} {path}")
logger.info(f" API Key valide: {api_key_obj.name} {path}")
return await call_next(request)
await self.app(scope, receive, send)
return
except Exception as e:
logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True)
return JSONResponse(
logger.error(f" Erreur validation API Key: {e}", exc_info=True)
response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": f"Erreur interne: {str(e)}"},
content={
"detail": "Erreur interne lors de la validation de la clé"
},
)
await response(scope, receive, send)
return
ApiKeyMiddleware = ApiKeyMiddlewareHTTP
else:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise (JWT ou API Key)",
"hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer <jwt>'",
"endpoint": path,
},
headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'},
)
await response(scope, receive, send)
return
def get_api_key_from_request(request: Request) -> Optional:
@ -276,20 +239,5 @@ def get_api_key_from_request(request: Request) -> Optional:
def get_auth_method(request: Request) -> str:
"""Retourne la méthode d'authentification utilisée"""
return getattr(request.state, "authenticated_via", "none")
def get_swagger_user_from_request(request: Request) -> Optional[dict]:
"""Récupère l'utilisateur Swagger depuis la requête"""
return getattr(request.state, "swagger_user", None)
__all__ = [
"SwaggerAuthMiddleware",
"ApiKeyMiddlewareHTTP",
"ApiKeyMiddleware",
"get_api_key_from_request",
"get_auth_method",
"get_swagger_user_from_request",
]

View file

@ -1,142 +1,28 @@
import sys
import os
from pathlib import Path
import asyncio
import argparse
import logging
from datetime import datetime
from typing import Optional, List
import json
import sys
from pathlib import Path
from database import get_session
from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService
from security.auth import hash_password
from sqlalchemy import select
_current_file = Path(__file__).resolve()
_script_dir = _current_file.parent
_app_dir = _script_dir.parent
import argparse
from datetime import datetime
import logging
print(f"DEBUG: Script path: {_current_file}")
print(f"DEBUG: App dir: {_app_dir}")
print(f"DEBUG: Current working dir: {os.getcwd()}")
if str(_app_dir) in sys.path:
sys.path.remove(str(_app_dir))
sys.path.insert(0, str(_app_dir))
os.chdir(str(_app_dir))
print(f"DEBUG: sys.path[0]: {sys.path[0]}")
print(f"DEBUG: New working dir: {os.getcwd()}")
_test_imports = [
"database",
"database.db_config",
"database.models",
"services",
"security",
]
print("\nDEBUG: Vérification des imports...")
for module in _test_imports:
try:
__import__(module)
print(f"{module}")
except ImportError as e:
print(f"{module}: {e}")
try:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService
from security.auth import hash_password
except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}")
print(" Vérifiez que vous êtes dans /app")
print(" Commande correcte: cd /app && python scripts/manage_security.py ...")
sys.exit(1)
current_dir = Path(__file__).resolve().parent
parent_dir = current_dir.parent
sys.path.insert(0, str(parent_dir))
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
AVAILABLE_TAGS = {
"Authentication": " Authentification et gestion des comptes",
"API Keys Management": "🔑 Gestion des clés API",
"Clients": "👥 Gestion des clients",
"Fournisseurs": "🏭 Gestion des fournisseurs",
"Prospects": "🎯 Gestion des prospects",
"Tiers": "📋 Gestion générale des tiers",
"Contacts": "📞 Contacts des tiers",
"Articles": "📦 Catalogue articles",
"Familles": "🏷️ Familles d'articles",
"Stock": "📊 Mouvements de stock",
"Devis": "📄 Devis",
"Commandes": "🛒 Commandes",
"Livraisons": "🚚 Bons de livraison",
"Factures": "💰 Factures",
"Avoirs": "↩️ Avoirs",
"Règlements": "💳 Règlements et encaissements",
"Workflows": " Transformations de documents",
"Documents": "📑 Gestion documents (PDF)",
"Emails": "📧 Envoi d'emails",
"Validation": " Validations métier",
"Collaborateurs": "👔 Collaborateurs internes",
"Société": "🏢 Informations société",
"Référentiels": "📚 Données de référence",
"System": "⚙️ Système et santé",
"Admin": "🛠️ Administration",
"Debug": "🐛 Debug et diagnostics",
}
async def add_swagger_user(username: str, password: str, full_name: str = None):
"""Ajouter un utilisateur Swagger"""
PRESET_PROFILES = {
"commercial": [
"Clients",
"Contacts",
"Devis",
"Commandes",
"Factures",
"Articles",
"Documents",
"Emails",
],
"comptable": [
"Clients",
"Fournisseurs",
"Factures",
"Avoirs",
"Règlements",
"Documents",
"Emails",
],
"logistique": [
"Articles",
"Stock",
"Commandes",
"Livraisons",
"Fournisseurs",
"Documents",
],
"readonly": ["Clients", "Articles", "Devis", "Commandes", "Factures", "Documents"],
"developer": [
"Authentication",
"API Keys Management",
"System",
"Clients",
"Articles",
"Devis",
"Commandes",
"Factures",
],
}
async def add_swagger_user(
username: str,
password: str,
full_name: str = None,
tags: Optional[List[str]] = None,
preset: Optional[str] = None,
):
"""Ajouter un utilisateur Swagger avec configuration avancée"""
async with async_session_factory() as session:
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
@ -146,21 +32,11 @@ async def add_swagger_user(
logger.error(f" L'utilisateur '{username}' existe déjà")
return
if preset:
if preset not in PRESET_PROFILES:
logger.error(
f" Preset '{preset}' inconnu. Disponibles: {list(PRESET_PROFILES.keys())}"
)
return
tags = PRESET_PROFILES[preset]
logger.info(f"📋 Application du preset '{preset}': {len(tags)} tags")
swagger_user = SwaggerUser(
username=username,
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
allowed_tags=json.dumps(tags) if tags else None,
)
session.add(swagger_user)
@ -168,142 +44,39 @@ async def add_swagger_user(
logger.info(f" Utilisateur Swagger créé: {username}")
logger.info(f" Nom complet: {swagger_user.full_name}")
logger.info(f" Actif: {swagger_user.is_active}")
if tags:
logger.info(f" 🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
else:
logger.info(" 👑 Accès ADMIN COMPLET (tous les tags)")
break
async def list_swagger_users():
"""Lister tous les utilisateurs Swagger avec détails"""
async with async_session_factory() as session:
"""Lister tous les utilisateurs Swagger"""
async for session in get_session():
result = await session.execute(select(SwaggerUser))
users = result.scalars().all()
if not users:
logger.info("🔭 Aucun utilisateur Swagger")
return
logger.info(" Aucun utilisateur Swagger")
break
logger.info(f"\n👥 {len(users)} utilisateur(s) Swagger:\n")
logger.info("=" * 80)
logger.info(f" {len(users)} utilisateur(s) Swagger:\n")
for user in users:
status = " ACTIF" if user.is_active else " NON ACTIF"
logger.info(f"\n{status} {user.username}")
logger.info(f"📛 Nom: {user.full_name}")
logger.info(f"🆔 ID: {user.id}")
logger.info(f"📅 Créé: {user.created_at}")
logger.info(f"🕐 Dernière connexion: {user.last_login or 'Jamais'}")
status = "" if user.is_active else ""
logger.info(f" {status} {user.username}")
logger.info(f" Nom: {user.full_name}")
logger.info(f" Créé: {user.created_at}")
logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}")
logger.info("")
if user.allowed_tags:
try:
tags = json.loads(user.allowed_tags)
if tags:
logger.info(f"🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
auth_schemes = []
if "Authentication" in tags:
auth_schemes.append("JWT (Bearer)")
if "API Keys Management" in tags or len(tags) > 3:
auth_schemes.append("X-API-Key")
if not auth_schemes:
auth_schemes.append("JWT (Bearer)")
logger.info(
f" Authentification autorisée: {', '.join(auth_schemes)}"
)
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info(" Authentification: JWT + X-API-Key (tout)")
except json.JSONDecodeError:
logger.info(" Tags: Erreur format")
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info(" Authentification: JWT + X-API-Key (tout)")
logger.info("\n" + "=" * 80)
async def update_swagger_user(
username: str,
add_tags: Optional[List[str]] = None,
remove_tags: Optional[List[str]] = None,
set_tags: Optional[List[str]] = None,
preset: Optional[str] = None,
active: Optional[bool] = None,
):
"""Mettre à jour un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
modified = False
if preset:
if preset not in PRESET_PROFILES:
logger.error(f" Preset '{preset}' inconnu")
return
user.allowed_tags = json.dumps(PRESET_PROFILES[preset])
logger.info(f"📋 Preset '{preset}' appliqué")
modified = True
elif set_tags is not None:
user.allowed_tags = json.dumps(set_tags) if set_tags else None
logger.info(f" Tags remplacés: {len(set_tags) if set_tags else 0}")
modified = True
elif add_tags or remove_tags:
current_tags = []
if user.allowed_tags:
try:
current_tags = json.loads(user.allowed_tags)
except json.JSONDecodeError:
current_tags = []
if add_tags:
for tag in add_tags:
if tag not in current_tags:
current_tags.append(tag)
logger.info(f" Tag ajouté: {tag}")
modified = True
if remove_tags:
for tag in remove_tags:
if tag in current_tags:
current_tags.remove(tag)
logger.info(f" Tag retiré: {tag}")
modified = True
user.allowed_tags = json.dumps(current_tags) if current_tags else None
if active is not None:
user.is_active = active
logger.info(f" Statut: {'ACTIF' if active else 'INACTIF'}")
modified = True
if modified:
await session.commit()
logger.info(f" Utilisateur '{username}' mis à jour")
else:
logger.info(" Aucune modification effectuée")
break
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with async_session_factory() as session:
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
@ -311,30 +84,13 @@ async def delete_swagger_user(username: str):
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
break
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
async def list_available_tags():
"""Liste tous les tags disponibles avec description"""
logger.info("\n🏷️ TAGS DISPONIBLES:\n")
logger.info("=" * 80)
for tag, desc in AVAILABLE_TAGS.items():
logger.info(f" {desc}")
logger.info(f" Nom: {tag}\n")
logger.info("=" * 80)
logger.info("\n📦 PRESETS DISPONIBLES:\n")
for preset_name, tags in PRESET_PROFILES.items():
logger.info(f" {preset_name}:")
logger.info(f" {', '.join(tags)}\n")
logger.info("=" * 80)
logger.info(f" Utilisateur Swagger supprimé: {username}")
break
async def create_api_key(
@ -345,7 +101,8 @@ async def create_api_key(
endpoints: list = None,
):
"""Créer une clé API"""
async with async_session_factory() as session:
async for session in get_session():
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
@ -357,96 +114,104 @@ async def create_api_key(
allowed_endpoints=endpoints,
)
logger.info("=" * 70)
logger.info("🔑 Clé API créée avec succès")
logger.info("=" * 70)
logger.info("=" * 60)
logger.info(" Clé API créée avec succès")
logger.info("=" * 60)
logger.info(f" ID: {api_key_obj.id}")
logger.info(f" Nom: {api_key_obj.name}")
logger.info(f" Clé: {api_key_plain}")
logger.info(f" Préfixe: {api_key_obj.key_prefix}")
logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
logger.info(f" Créée le: {api_key_obj.created_at}")
logger.info(f" Expire le: {api_key_obj.expires_at}")
if api_key_obj.allowed_endpoints:
try:
endpoints_list = json.loads(api_key_obj.allowed_endpoints)
logger.info(f" Endpoints: {', '.join(endpoints_list)}")
except Exception:
logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}")
logger.info(
f" Endpoints autorisés: {', '.join(api_key_obj.allowed_endpoints)}"
)
else:
logger.info(" Endpoints: Tous (aucune restriction)")
logger.info(" Endpoints autorisés: Tous")
logger.info("=" * 70)
logger.info(" SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !")
logger.info("=" * 70)
logger.info("=" * 60)
logger.info(" IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !")
logger.info("=" * 60)
break
async def list_api_keys():
"""Lister toutes les clés API"""
async with async_session_factory() as session:
async for session in get_session():
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
logger.info("🔭 Aucune clé API")
return
logger.info(" Aucune clé API")
break
logger.info(f"🔑 {len(keys)} clé(s) API:\n")
logger.info(f" {len(keys)} clé(s) API:\n")
for key in keys:
is_valid = key.is_active and (
not key.expires_at or key.expires_at > datetime.now()
status = (
""
if key.is_active
and (not key.expires_at or key.expires_at > datetime.now())
else ""
)
status = "" if is_valid else ""
logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Créée le: {key.created_at}")
logger.info(f" Expire le: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
display = ", ".join(endpoints[:4])
if len(endpoints) > 4:
display += f"... (+{len(endpoints) - 4})"
logger.info(f" Endpoints: {display}")
except Exception:
pass
else:
logger.info(" Endpoints: Tous")
logger.info(
f" Endpoints: {', '.join(key.allowed_endpoints[:3])}..."
)
logger.info("")
break
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with async_session_factory() as session:
async for session in get_session():
service = ApiKeyService(session)
result = await session.execute(select(ApiKey).where(ApiKey.id == key_id))
key = result.scalar_one_or_none()
if not key:
logger.error(f" Clé API '{key_id}' introuvable")
return
break
key.is_active = False
key.revoked_at = datetime.now()
await session.commit()
logger.info(f"🗑️ Clé API révoquée: {key.name}")
logger.info(f" Clé API révoquée: {key.name}")
logger.info(f" ID: {key.id}")
logger.info(f" Préfixe: {key.key_prefix}")
break
async def verify_api_key(api_key: str):
"""Vérifier une clé API"""
async with async_session_factory() as session:
async for session in get_session():
service = ApiKeyService(session)
key = await service.verify_api_key(api_key)
if not key:
logger.error(" Clé API invalide ou expirée")
return
break
logger.info("=" * 60)
logger.info(" Clé API valide")
@ -455,129 +220,61 @@ async def verify_api_key(api_key: str):
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes totales: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
logger.info(f" Endpoints autorisés: {endpoints}")
except Exception:
pass
else:
logger.info(" Endpoints autorisés: Tous")
logger.info(f" Expire le: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
logger.info("=" * 60)
break
async def main():
parser = argparse.ArgumentParser(
description="Gestion avancée des utilisateurs Swagger et clés API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
EXEMPLES D'UTILISATION:
description="Gestion des utilisateurs Swagger et clés API"
)
subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles")
=== UTILISATEURS SWAGGER ===
swagger_parser = subparsers.add_parser(
"swagger", help="Gestion des utilisateurs Swagger"
)
swagger_subparsers = swagger_parser.add_subparsers(dest="swagger_command")
1. Créer un utilisateur avec preset:
python scripts/manage_security.py swagger add commercial Pass123! --preset commercial
add_parser = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur")
add_parser.add_argument("username", help="Nom d'utilisateur")
add_parser.add_argument("password", help="Mot de passe")
add_parser.add_argument("--full-name", help="Nom complet (optionnel)")
2. Créer un admin complet:
python scripts/manage_security.py swagger add admin AdminPass
swagger_subparsers.add_parser("list", help="Lister les utilisateurs")
3. Créer avec tags spécifiques:
python scripts/manage_security.py swagger add client Pass123! --tags Clients Devis Factures
delete_parser = swagger_subparsers.add_parser(
"delete", help="Supprimer un utilisateur"
)
delete_parser.add_argument("username", help="Nom d'utilisateur")
4. Mettre à jour un utilisateur (ajouter des tags):
python scripts/manage_security.py swagger update client --add-tags Commandes Livraisons
apikey_parser = subparsers.add_parser("apikey", help="Gestion des clés API")
apikey_subparsers = apikey_parser.add_subparsers(dest="apikey_command")
5. Changer complètement les tags:
python scripts/manage_security.py swagger update client --set-tags Clients Articles
6. Appliquer un preset:
python scripts/manage_security.py swagger update client --preset comptable
7. Lister les tags disponibles:
python scripts/manage_security.py swagger tags
8. Désactiver temporairement:
python scripts/manage_security.py swagger update client --inactive
=== CLÉS API ===
9. Créer une clé API:
python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
10. Créer avec endpoints restreints:
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
11. Lister les clés:
python scripts/manage_security.py apikey list
12. Vérifier une clé:
python scripts/manage_security.py apikey verify sdk_live_xxxxx
13. Révoquer une clé:
python scripts/manage_security.py apikey revoke <key_id>
""",
create_parser = apikey_subparsers.add_parser("create", help="Créer une clé API")
create_parser.add_argument("name", help="Nom de la clé")
create_parser.add_argument("--description", help="Description (optionnel)")
create_parser.add_argument(
"--days", type=int, default=365, help="Jours avant expiration (défaut: 365)"
)
create_parser.add_argument(
"--rate-limit", type=int, default=60, help="Requêtes par minute (défaut: 60)"
)
create_parser.add_argument(
"--endpoints",
nargs="+",
help="Endpoints autorisés (ex: /clients /articles)",
)
subparsers = parser.add_subparsers(dest="command", help="Commandes")
apikey_subparsers.add_parser("list", help="Lister les clés API")
swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger")
swagger_sub = swagger_parser.add_subparsers(dest="swagger_command")
revoke_parser = apikey_subparsers.add_parser("revoke", help="Révoquer une clé")
revoke_parser.add_argument("key_id", help="ID de la clé")
add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur")
add_p.add_argument("username", help="Nom d'utilisateur")
add_p.add_argument("password", help="Mot de passe")
add_p.add_argument("--full-name", help="Nom complet", default=None)
add_p.add_argument(
"--tags",
nargs="*",
help="Tags autorisés. Vide = admin complet",
default=None,
)
add_p.add_argument(
"--preset",
choices=list(PRESET_PROFILES.keys()),
help="Appliquer un preset de tags",
)
update_p = swagger_sub.add_parser("update", help="Mettre à jour utilisateur")
update_p.add_argument("username", help="Nom d'utilisateur")
update_p.add_argument("--add-tags", nargs="+", help="Ajouter des tags")
update_p.add_argument("--remove-tags", nargs="+", help="Retirer des tags")
update_p.add_argument("--set-tags", nargs="*", help="Définir les tags (remplace)")
update_p.add_argument(
"--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer preset"
)
update_p.add_argument("--active", action="store_true", help="Activer l'utilisateur")
update_p.add_argument(
"--inactive", action="store_true", help="Désactiver l'utilisateur"
)
swagger_sub.add_parser("list", help="Lister utilisateurs")
del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur")
del_p.add_argument("username", help="Nom d'utilisateur")
swagger_sub.add_parser("tags", help="Lister les tags disponibles")
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_sub = apikey_parser.add_subparsers(dest="apikey_command")
create_p = apikey_sub.add_parser("create", help="Créer clé API")
create_p.add_argument("name", help="Nom de la clé")
create_p.add_argument("--description", help="Description")
create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)")
create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min")
create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
apikey_sub.add_parser("list", help="Lister clés")
rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé")
rev_p.add_argument("key_id", help="ID de la clé")
ver_p = apikey_sub.add_parser("verify", help="Vérifier clé")
ver_p.add_argument("api_key", help="Clé API complète")
verify_parser = apikey_subparsers.add_parser("verify", help="Vérifier une clé")
verify_parser.add_argument("api_key", help="Clé API complète")
args = parser.parse_args()
@ -587,34 +284,11 @@ python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limi
if args.command == "swagger":
if args.swagger_command == "add":
await add_swagger_user(
args.username,
args.password,
args.full_name,
args.tags,
args.preset,
)
elif args.swagger_command == "update":
active = None
if args.active:
active = True
elif args.inactive:
active = False
await update_swagger_user(
args.username,
add_tags=args.add_tags,
remove_tags=args.remove_tags,
set_tags=args.set_tags,
preset=args.preset,
active=active,
)
await add_swagger_user(args.username, args.password, args.full_name)
elif args.swagger_command == "list":
await list_swagger_users()
elif args.swagger_command == "delete":
await delete_swagger_user(args.username)
elif args.swagger_command == "tags":
await list_available_tags()
else:
swagger_parser.print_help()
@ -641,7 +315,7 @@ if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n Interrupted")
logger.info("\n Interrupted")
sys.exit(0)
except Exception as e:
logger.error(f" Erreur: {e}")

View file

@ -5,12 +5,10 @@ import jwt
import secrets
import hashlib
from config.config import settings
SECRET_KEY = settings.jwt_secret
ALGORITHM = settings.jwt_algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
REFRESH_TOKEN_EXPIRE_DAYS = settings.refresh_token_expire_days
SECRET_KEY = "VOTRE_SECRET_KEY_A_METTRE_EN_.ENV"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 10080
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@ -69,13 +67,9 @@ def decode_token(token: str) -> Optional[Dict]:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise jwt.InvalidTokenError("Token expiré")
except jwt.DecodeError:
raise jwt.InvalidTokenError("Token invalide (format incorrect)")
except jwt.InvalidTokenError as e:
raise jwt.InvalidTokenError(f"Token invalide: {str(e)}")
except Exception as e:
raise jwt.InvalidTokenError(f"Erreur lors du décodage du token: {str(e)}")
return None
except jwt.JWTError:
return None
def validate_password_strength(password: str) -> tuple[bool, str]:

View file

@ -134,7 +134,7 @@ class ApiKeyService:
api_key_obj.revoked_at = datetime.now()
await self.session.commit()
logger.info(f"🗑️ Clé API révoquée: {api_key_obj.name}")
logger.info(f" Clé API révoquée: {api_key_obj.name}")
return True
async def get_by_id(self, key_id: str) -> Optional[ApiKey]:
@ -150,42 +150,24 @@ class ApiKeyService:
}
async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool:
"""Vérifie si la clé a accès à un endpoint spécifique"""
if not api_key_obj.allowed_endpoints:
logger.debug(
f"🔓 API Key {api_key_obj.name}: Aucune restriction d'endpoint"
)
return True
try:
allowed = json.loads(api_key_obj.allowed_endpoints)
if "*" in allowed or "/*" in allowed:
logger.debug(f"🔓 API Key {api_key_obj.name}: Accès global autorisé")
return True
for pattern in allowed:
if pattern == "*":
return True
if pattern.endswith("*"):
prefix = pattern[:-1]
if endpoint.startswith(prefix):
return True
if pattern == endpoint:
logger.debug(f" Match exact: {pattern} == {endpoint}")
return True
if pattern.endswith("/*"):
base = pattern[:-2] # "/clients/*" → "/clients"
if endpoint == base or endpoint.startswith(base + "/"):
logger.debug(f" Match wildcard: {pattern}{endpoint}")
return True
elif pattern.endswith("*"):
base = pattern[:-1] # "/clients*" → "/clients"
if endpoint.startswith(base):
logger.debug(f" Match prefix: {pattern}{endpoint}")
return True
logger.warning(
f" API Key {api_key_obj.name}: Accès refusé à {endpoint}\n"
f" Endpoints autorisés: {allowed}"
)
return False
except json.JSONDecodeError:
logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}")
return False