Compare commits

...

57 commits

Author SHA1 Message Date
Fanilo-Nantenaina
3b5c183b47 refactor(security): remove emojis from logs and improve script debugging 2026-01-21 16:53:38 +03:00
Fanilo-Nantenaina
d25c2cffa9 refactor(security): clean up authentication middleware and api docs 2026-01-21 16:23:34 +03:00
Fanilo-Nantenaina
1c6c45465f fix(security): improve api key and jwt validation handling 2026-01-21 14:00:57 +03:00
Fanilo-Nantenaina
574d82f3c4 fix(security): improve auth handling and openapi schema generation 2026-01-21 13:50:44 +03:00
Fanilo-Nantenaina
a6a623d1ab refactor(api): simplify auth scheme handling and improve schema filtering 2026-01-21 13:41:37 +03:00
Fanilo-Nantenaina
437ecd0ed3 Merge branch 'feat/controlled_swagger_access' into main_2 2026-01-21 13:26:49 +03:00
Fanilo-Nantenaina
c057a085ed refactor(security): improve swagger user management and logging 2026-01-21 13:21:16 +03:00
Fanilo-Nantenaina
23a94f5558 Merge branch 'feat/controlled_swagger_access' into main_2 2026-01-21 13:01:38 +03:00
Fanilo-Nantenaina
797aed0240 feat(security): enhance swagger auth with user context and filtered docs 2026-01-21 12:56:02 +03:00
Fanilo-Nantenaina
5f40c677a8 refactor(manage_security): simplify delete_swagger_user function 2026-01-21 12:21:10 +03:00
Fanilo-Nantenaina
8a22e285df refactor(security): improve logging format and argument handling 2026-01-21 12:12:35 +03:00
Fanilo-Nantenaina
92597a1143 feat(api): add tag-based OpenAPI schema filtering for Swagger users 2026-01-21 12:05:06 +03:00
Fanilo-Nantenaina
c1f4c66e8c refactor(api): remove user dependency from all endpoints 2026-01-20 19:28:27 +03:00
Fanilo-Nantenaina
43da1b09ed Merge branch 'develop' into develop_like 2026-01-20 19:26:42 +03:00
Fanilo-Nantenaina
6d5f8594d0 chore: ignore python clean scripts in gitignore 2026-01-20 19:26:28 +03:00
Fanilo-Nantenaina
a7457c3979 fix(security): improve auth handling and logging in middleware 2026-01-20 19:14:00 +03:00
Fanilo-Nantenaina
5eec115d1d Merge branch 'main' into develop 2026-01-20 16:29:35 +03:00
Fanilo-Nantenaina
d89c9fd35b chore: ignore clean scripts in gitignore 2026-01-20 16:29:10 +03:00
Fanilo-Nantenaina
211dd4fd23 fix(security): improve api key authentication and error handling 2026-01-20 16:18:04 +03:00
Fanilo-Nantenaina
67ef83c4e3 refactor(security): improve authentication logging and endpoint checks 2026-01-20 16:01:54 +03:00
Fanilo-Nantenaina
82d1d92e58 refactor(scripts): improve import handling and path management 2026-01-20 15:35:06 +03:00
Fanilo-Nantenaina
28c8fb3008 refactor(security): improve user management and session handling 2026-01-20 15:27:26 +03:00
Fanilo-Nantenaina
f8cec7ebc5 refactor(security): improve security scripts and api documentation 2026-01-20 15:13:19 +03:00
Fanilo-Nantenaina
1a08894b47 refactor(scripts): improve logging format and endpoint handling in security management 2026-01-20 15:01:57 +03:00
Fanilo-Nantenaina
3cdb490ee5 refactor(security): improve middleware structure and configuration handling 2026-01-20 14:47:07 +03:00
Fanilo-Nantenaina
c84e4ddc20 refactor(auth): simplify authentication logic and improve error handling 2026-01-20 14:25:06 +03:00
Fanilo-Nantenaina
41ca202d4b refactor(security): move security config to environment variables and improve error handling 2026-01-20 14:19:48 +03:00
Fanilo-Nantenaina
918f5d3f19 docs(api): fix incorrect comment syntax in openapi configuration 2026-01-20 14:06:10 +03:00
Fanilo-Nantenaina
fa95d0d117 refactor(api): replace get_current_user with get_sage_client_for_user in dependencies 2026-01-20 13:56:16 +03:00
Fanilo-Nantenaina
a1150390f4 Merge branch 'fix/security' into main_2 2026-01-20 13:54:42 +03:00
Fanilo-Nantenaina
0001dbe634 docs(api): add comments for security schemas and openapi setup 2026-01-20 13:54:36 +03:00
Fanilo-Nantenaina
5b584bf969 refactor(security): improve auth middleware and logging 2026-01-20 13:51:09 +03:00
Fanilo-Nantenaina
022149c237 refactor(api): replace get_sage_client_for_user with get_current_user for dependency injection 2026-01-20 13:46:27 +03:00
Fanilo-Nantenaina
72d1ac58d1 refactor(security): reorganize imports and improve logging message 2026-01-20 12:40:56 +03:00
Fanilo-Nantenaina
cce1cdf76a refactor(scripts): improve manage_security.py organization and error handling 2026-01-20 12:32:49 +03:00
Fanilo-Nantenaina
e51a5e0a0b refactor(database): update import to use direct get_session import 2026-01-20 12:19:51 +03:00
Fanilo-Nantenaina
dd65ae4d96 style: remove emoji from log messages 2026-01-20 12:17:52 +03:00
Fanilo-Nantenaina
cc0062b3bc refactor(security): improve security management script with better logging and structure 2026-01-20 12:11:20 +03:00
Fanilo-Nantenaina
9bd0f62459 feat(api): add authentication to all endpoints and update OpenAPI schema 2026-01-20 11:49:57 +03:00
Fanilo-Nantenaina
e0f08fd83a refactor(dependencies): rename require_role_hybrid to require_role for consistency 2026-01-20 11:29:12 +03:00
Fanilo-Nantenaina
f59e56490c feat(api): add api keys router to middleware stack 2026-01-20 11:24:42 +03:00
Fanilo-Nantenaina
2aafd525cd refactor(api): update middleware and cors configuration 2026-01-20 11:23:10 +03:00
Fanilo-Nantenaina
17a4251eea Merge branch 'feat/secureing_API' 2026-01-20 11:12:59 +03:00
Fanilo-Nantenaina
abc9ff820a feat(security): implement api key management and authentication system 2026-01-20 11:11:32 +03:00
Fanilo-Nantenaina
b85bd26dbe Merge branches 'develop' and 'main' of https://git.dataven.fr/fanilo/Sage100-vps 2026-01-20 11:02:28 +03:00
Fanilo-Nantenaina
4b686c4544 Merge branch 'feat/get_all_reglements' into develop_like 2026-01-17 12:53:01 +03:00
Fanilo-Nantenaina
9f12727bd3 refactor(routes): remove authentication dependency from universign router 2026-01-16 13:26:24 +03:00
Fanilo-Nantenaina
18603ded6e refactor(auth): reorganize imports and remove unused dependencies - Added some missing auth 2026-01-16 13:22:13 +03:00
Fanilo-Nantenaina
18d72b3bf9 Secured all routes 2026-01-16 12:47:56 +03:00
Fanilo-Nantenaina
fdf359738b Merge branch 'develop' 2026-01-16 12:35:17 +03:00
Fanilo-Nantenaina
ba9e474109 Merge branch 'main' of https://git.dataven.fr/fanilo/backend_vps 2026-01-16 12:34:47 +03:00
Fanilo-Nantenaina
c5c17fdd9b fix(auth): increase failed login attempt threshold from 5 to 15 2026-01-13 10:42:58 +03:00
Fanilo-Nantenaina
c389129ae7 Updated using the correct redirection structure 2026-01-12 19:04:09 +03:00
Fanilo-Nantenaina
6b6246b6e5 Testing multi-sage users 2026-01-12 18:56:37 +03:00
b3419eafaa Deleted cached dev database 2026-01-08 17:29:16 +00:00
Fanilo-Nantenaina
795b848dff chore: update gitignore and add status enums 2026-01-08 16:59:08 +03:00
Fanilo-Nantenaina
e990cbdc08 MERGING branch develop INTO MAIN 2026-01-08 16:58:43 +03:00
15 changed files with 1755 additions and 1067 deletions

1251
api.py

File diff suppressed because it is too large Load diff

View file

@ -2,13 +2,11 @@ from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from jwt.exceptions import InvalidTokenError
from database import get_session, User
from security.auth import decode_token
from typing import Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
security = HTTPBearer(auto_error=False)
@ -18,62 +16,6 @@ async def get_current_user_hybrid(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> User:
if credentials and credentials.credentials:
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré",
headers={"WWW-Authenticate": "Bearer"},
)
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Type de token incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token malformé",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Compte désactivé"
)
if not user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email non vérifié. Consultez votre boîte de réception.",
)
if user.locked_until and user.locked_until > datetime.now():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Compte temporairement verrouillé suite à trop de tentatives échouées",
)
logger.debug(f" Authentifié via JWT: {user.email}")
return user
api_key_obj = getattr(request.state, "api_key", None)
if api_key_obj:
@ -84,69 +26,79 @@ async def get_current_user_hybrid(
user = result.scalar_one_or_none()
if user:
logger.debug(
f" Authentifié via API Key ({api_key_obj.name}) → User: {user.email}"
)
user._is_api_key_user = True
user._api_key_obj = api_key_obj
return user
from database import User as UserModel
virtual_user = UserModel(
virtual_user = User(
id=f"api_key_{api_key_obj.id}",
email=f"{api_key_obj.name.lower().replace(' ', '_')}@api-key.local",
nom="API Key",
prenom=api_key_obj.name,
email=f"api_key_{api_key_obj.id}@virtual.local",
nom=api_key_obj.name,
prenom="API",
hashed_password="",
role="api_client",
is_verified=True,
is_active=True,
is_verified=True,
)
virtual_user._is_api_key_user = True
virtual_user._api_key_obj = api_key_obj
logger.debug(f" Authentifié via API Key: {api_key_obj.name} (user virtuel)")
return virtual_user
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (JWT ou API Key)",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
async def get_current_user_optional_hybrid(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
session: AsyncSession = Depends(get_session),
) -> Optional[User]:
"""Version optionnelle de get_current_user_hybrid (ne lève pas d'erreur)"""
try:
return await get_current_user_hybrid(request, credentials, session)
except HTTPException:
return None
payload = decode_token(token)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide: user_id manquant",
headers={"WWW-Authenticate": "Bearer"},
)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur introuvable",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utilisateur inactif",
)
return user
except InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token invalide: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def require_role_hybrid(*allowed_roles: str):
async def role_checker(
request: Request, user: User = Depends(get_current_user_hybrid)
) -> User:
is_api_key_user = getattr(user, "_is_api_key_user", False)
if is_api_key_user:
if "api_client" not in allowed_roles and "*" not in allowed_roles:
async def role_checker(user: User = Depends(get_current_user_hybrid)) -> User:
if user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}. API Keys non autorisées.",
detail=f"Accès interdit. Rôles autorisés: {', '.join(allowed_roles)}",
)
logger.debug(" API Key autorisée pour cette route")
return user
if user.role not in allowed_roles and "*" not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Accès refusé. Rôles requis: {', '.join(allowed_roles)}",
)
return user
return role_checker
@ -158,9 +110,9 @@ def is_api_key_user(user: User) -> bool:
def get_api_key_from_user(user: User):
"""Récupère l'objet API Key depuis un user virtuel"""
"""Récupère l'objet ApiKey depuis un utilisateur (si applicable)"""
return getattr(user, "_api_key_obj", None)
get_current_user = get_current_user_hybrid
get_current_user_optional = get_current_user_optional_hybrid
require_role = require_role_hybrid

View file

@ -152,7 +152,7 @@ templates_signature_email = {
</table>
<p style="color: #718096; font-size: 13px; line-height: 1.5; margin: 0;">
<strong>🔒 Signature électronique sécurisée</strong><br>
<strong> Signature électronique sécurisée</strong><br>
Votre signature est protégée par notre partenaire de confiance <strong>Universign</strong>,
certifié eIDAS et conforme au RGPD. Votre identité sera vérifiée et le document sera
horodaté de manière infalsifiable.

View file

@ -1,14 +1,14 @@
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.pool import NullPool
from sqlalchemy import event, text
import logging
from config.config import settings
from database.models.generic_model import Base
logger = logging.getLogger(__name__)
DATABASE_URL = os.getenv("DATABASE_URL")
DATABASE_URL = settings.database_url
def _configure_sqlite_connection(dbapi_connection, connection_record):

View file

@ -1,4 +1,6 @@
from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text
from typing import Optional, List
import json
from datetime import datetime
import uuid
@ -49,8 +51,23 @@ class SwaggerUser(Base):
is_active = Column(Boolean, default=True, nullable=False)
allowed_tags = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.now, nullable=False)
last_login = Column(DateTime, nullable=True)
@property
def allowed_tags_list(self) -> Optional[List[str]]:
if self.allowed_tags:
try:
return json.loads(self.allowed_tags)
except json.JSONDecodeError:
return None
return None
@allowed_tags_list.setter
def allowed_tags_list(self, tags: Optional[List[str]]):
self.allowed_tags = json.dumps(tags) if tags is not None else None
def __repr__(self):
return f"<SwaggerUser(username='{self.username}', active={self.is_active})>"

View file

@ -1,52 +1,27 @@
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp, Receive, Send
from sqlalchemy import select
from typing import Optional
from typing import Callable, Optional
from datetime import datetime
import logging
from database import get_session
from database.models.api_key import SwaggerUser
from security.auth import verify_password
import base64
import json
logger = logging.getLogger(__name__)
security = HTTPBasic()
async def verify_swagger_credentials(credentials: HTTPBasicCredentials) -> bool:
username = credentials.username
password = credentials.password
try:
async for session in get_session():
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
swagger_user = result.scalar_one_or_none()
if swagger_user and swagger_user.is_active:
if verify_password(password, swagger_user.hashed_password):
swagger_user.last_login = datetime.now()
await session.commit()
logger.info(f" Accès Swagger autorisé (DB): {username}")
return True
logger.warning(f" Tentative d'accès Swagger refusée: {username}")
return False
except Exception as e:
logger.error(f" Erreur vérification Swagger credentials: {e}")
return False
class SwaggerAuthMiddleware:
def __init__(self, app):
PROTECTED_PATHS = ["/docs", "/redoc", "/openapi.json"]
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope, receive, send):
async def __call__(self, scope, receive: Receive, send: Send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
@ -54,34 +29,31 @@ class SwaggerAuthMiddleware:
request = Request(scope, receive=receive)
path = request.url.path
protected_paths = ["/docs", "/redoc", "/openapi.json"]
if not any(path.startswith(p) for p in self.PROTECTED_PATHS):
await self.app(scope, receive, send)
return
if any(path.startswith(protected_path) for protected_path in protected_paths):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise pour accéder à la documentation"
},
content={"detail": "Authentification requise pour la documentation"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
await response(scope, receive, send)
return
try:
import base64
encoded_credentials = auth_header.split(" ")[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode(
"utf-8"
)
decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8")
username, password = decoded_credentials.split(":", 1)
credentials = HTTPBasicCredentials(username=username, password=password)
if not await verify_swagger_credentials(credentials):
swagger_user = await self._verify_credentials(credentials)
if not swagger_user:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"},
@ -90,6 +62,15 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send)
return
if "state" not in scope:
scope["state"] = {}
scope["state"]["swagger_user"] = swagger_user
logger.info(
f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}"
)
except Exception as e:
logger.error(f" Erreur parsing auth header: {e}")
response = JSONResponse(
@ -102,74 +83,141 @@ class SwaggerAuthMiddleware:
await self.app(scope, receive, send)
async def _verify_credentials(
self, credentials: HTTPBasicCredentials
) -> Optional[dict]:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser
from security.auth import verify_password
class ApiKeyMiddleware:
def __init__(self, app):
self.app = app
try:
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(
SwaggerUser.username == credentials.username
)
)
swagger_user = result.scalar_one_or_none()
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
if swagger_user and swagger_user.is_active:
if verify_password(
credentials.password, swagger_user.hashed_password
):
swagger_user.last_login = datetime.now()
await session.commit()
request = Request(scope, receive=receive)
path = request.url.path
logger.info(f"✓ Accès Swagger autorisé: {credentials.username}")
excluded_paths = [
return {
"id": swagger_user.id,
"username": swagger_user.username,
"allowed_tags": swagger_user.allowed_tags_list,
"is_active": swagger_user.is_active,
}
logger.warning(f"✗ Accès Swagger refusé: {credentials.username}")
return None
except Exception as e:
logger.error(f" Erreur vérification credentials: {e}", exc_info=True)
return None
class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
EXCLUDED_PATHS = [
"/docs",
"/redoc",
"/openapi.json",
"/health",
"/",
"/auth/login",
"/auth/register",
"/auth/verify-email",
"/auth/reset-password",
"/auth/request-reset",
"/auth/refresh",
"/health",
"/auth",
"/api-keys/verify",
"/universign/webhook",
]
if any(path.startswith(excluded_path) for excluded_path in excluded_paths):
await self.app(scope, receive, send)
return
def _is_excluded_path(self, path: str) -> bool:
"""Vérifie si le chemin est exclu de l'authentification API Key"""
if path == "/":
return True
for excluded in self.EXCLUDED_PATHS:
if excluded == "/":
continue
if path == excluded or path.startswith(excluded + "/"):
return True
return False
async def dispatch(self, request: Request, call_next: Callable):
path = request.url.path
method = request.method
if self._is_excluded_path(path):
return await call_next(request)
auth_header = request.headers.get("Authorization")
has_jwt = auth_header and auth_header.startswith("Bearer ")
api_key_header = request.headers.get("X-API-Key")
api_key = request.headers.get("X-API-Key")
has_api_key = api_key is not None
if api_key_header:
api_key_header = api_key_header.strip()
if not api_key_header or api_key_header == "":
api_key_header = None
if has_jwt:
logger.debug(f" JWT détecté pour {path}")
await self.app(scope, receive, send)
return
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1].strip()
elif has_api_key:
logger.debug(f" API Key détectée pour {path}")
if token.startswith("sdk_live_"):
logger.warning(
" API Key envoyée dans Authorization au lieu de X-API-Key"
)
return await self._handle_api_key_auth(
request, token, path, method, call_next
)
logger.debug(f"JWT détecté pour {method} {path} → délégation à FastAPI")
request.state.authenticated_via = "jwt"
return await call_next(request)
if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}")
return await self._handle_api_key_auth(
request, api_key_header, path, method, call_next
)
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request)
async def _handle_api_key_auth(
self,
request: Request,
api_key: str,
path: str,
method: str,
call_next: Callable,
):
try:
from database.db_config import async_session_factory
from services.api_key import ApiKeyService
try:
async for session in get_session():
api_key_service = ApiKeyService(session)
api_key_obj = await api_key_service.verify_api_key(api_key)
async with async_session_factory() as session:
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
if not api_key_obj:
response = JSONResponse(
logger.warning(f"🔒 Clé API invalide: {method} {path}")
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Clé API invalide ou expirée",
"hint": "Utilisez X-API-Key: sdk_live_xxx ou Authorization: Bearer <jwt>",
"hint": "Vérifiez votre clé X-API-Key",
},
)
await response(scope, receive, send)
return
is_allowed, rate_info = await api_key_service.check_rate_limit(
api_key_obj
)
is_allowed, rate_info = await service.check_rate_limit(api_key_obj)
if not is_allowed:
response = JSONResponse(
logger.warning(f"⏱️ Rate limit: {api_key_obj.name}")
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"},
headers={
@ -177,54 +225,49 @@ class ApiKeyMiddleware:
"X-RateLimit-Remaining": "0",
},
)
await response(scope, receive, send)
return
has_access = await api_key_service.check_endpoint_access(
api_key_obj, path
)
has_access = await service.check_endpoint_access(api_key_obj, path)
if not has_access:
response = JSONResponse(
allowed = (
json.loads(api_key_obj.allowed_endpoints)
if api_key_obj.allowed_endpoints
else ["Tous"]
)
logger.warning(
f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n"
f" Endpoint demandé: {path}\n"
f" Endpoints autorisés: {allowed}"
)
return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
"detail": "Accès non autorisé à cet endpoint",
"endpoint": path,
"api_key": api_key_obj.key_prefix + "...",
"endpoint_requested": path,
"api_key_name": api_key_obj.name,
"allowed_endpoints": allowed,
"hint": "Cette clé API n'a pas accès à cet endpoint.",
},
)
await response(scope, receive, send)
return
request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key"
logger.info(f" API Key valide: {api_key_obj.name} {path}")
logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name} {method} {path}")
await self.app(scope, receive, send)
return
return await call_next(request)
except Exception as e:
logger.error(f" Erreur validation API Key: {e}", exc_info=True)
response = JSONResponse(
logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "Erreur interne lors de la validation de la clé"
},
content={"detail": f"Erreur interne: {str(e)}"},
)
await response(scope, receive, send)
return
else:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"detail": "Authentification requise",
"hint": "Utilisez soit 'X-API-Key: sdk_live_xxx' soit 'Authorization: Bearer <jwt>'",
},
headers={"WWW-Authenticate": 'Bearer realm="API", charset="UTF-8"'},
)
await response(scope, receive, send)
return
ApiKeyMiddleware = ApiKeyMiddlewareHTTP
def get_api_key_from_request(request: Request) -> Optional:
@ -233,4 +276,20 @@ def get_api_key_from_request(request: Request) -> Optional:
def get_auth_method(request: Request) -> str:
"""Retourne la méthode d'authentification utilisée"""
return getattr(request.state, "authenticated_via", "none")
def get_swagger_user_from_request(request: Request) -> Optional[dict]:
"""Récupère l'utilisateur Swagger depuis la requête"""
return getattr(request.state, "swagger_user", None)
__all__ = [
"SwaggerAuthMiddleware",
"ApiKeyMiddlewareHTTP",
"ApiKeyMiddleware",
"get_api_key_from_request",
"get_auth_method",
"get_swagger_user_from_request",
]

View file

@ -70,7 +70,7 @@ async def get_api_key(
session: AsyncSession = Depends(get_session),
user: User = Depends(get_current_user),
):
""" Récupérer une clé API par son ID"""
"""Récupérer une clé API par son ID"""
service = ApiKeyService(session)
api_key_obj = await service.get_by_id(key_id)

View file

@ -7,6 +7,7 @@ from typing import Optional
import uuid
from database import get_session, User, RefreshToken, LoginAttempt
from core.dependencies import get_current_user
from security.auth import (
hash_password,
verify_password,
@ -19,7 +20,6 @@ from security.auth import (
hash_token,
)
from services.email_service import AuthEmailService
from core.dependencies import get_current_user
from config.config import settings
import logging
@ -101,7 +101,7 @@ async def check_rate_limit(
)
failed_attempts = result.scalars().all()
if len(failed_attempts) >= 5:
if len(failed_attempts) >= 15:
return False, "Trop de tentatives échouées. Réessayez dans 15 minutes."
return True, ""
@ -286,7 +286,7 @@ async def login(
if user:
user.failed_login_attempts += 1
if user.failed_login_attempts >= 5:
if user.failed_login_attempts >= 15:
user.locked_until = datetime.now() + timedelta(minutes=15)
await session.commit()
raise HTTPException(
@ -510,7 +510,7 @@ async def logout(
token_record.revoked_at = datetime.now()
await session.commit()
logger.info(f"👋 Déconnexion: {user.email}")
logger.info(f" Déconnexion: {user.email}")
return {"success": True, "message": "Déconnexion réussie"}

View file

@ -1,11 +1,11 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from typing import List, Optional
from datetime import datetime, timedelta
from datetime import datetime
import logging
from core.dependencies import get_current_user
from data.data import templates_signature_email
from email_queue import email_queue
from database import UniversignSignerStatus, UniversignTransactionStatus, get_session
@ -32,7 +32,10 @@ from schemas import (
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/universign", tags=["Universign"])
router = APIRouter(
prefix="/universign",
tags=["Universign"],
)
sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key
@ -494,14 +497,11 @@ async def sync_all_transactions(
return {"success": True, "stats": stats, "timestamp": datetime.now().isoformat()}
@router.post("/webhook")
@router.post("/webhook/")
@router.post("/webhook", dependencies=[])
@router.post("/webhook/", dependencies=[])
async def webhook_universign(
request: Request, session: AsyncSession = Depends(get_session)
):
"""
CORRECTION : Extraction correcte du transaction_id selon la structure réelle d'Universign
"""
try:
payload = await request.json()
@ -1081,159 +1081,6 @@ async def trouver_transactions_inconsistantes(
raise HTTPException(500, str(e))
@router.post("/admin/nettoyer-transactions-erreur", tags=["Admin"])
async def nettoyer_transactions_erreur(
age_jours: int = Query(
7, description="Supprimer les transactions en erreur de plus de X jours"
),
session: AsyncSession = Depends(get_session),
):
try:
date_limite = datetime.now() - timedelta(days=age_jours)
query = select(UniversignTransaction).where(
and_(
UniversignTransaction.local_status == LocalDocumentStatus.ERROR,
UniversignTransaction.created_at < date_limite,
)
)
result = await session.execute(query)
transactions = result.scalars().all()
supprimees = []
for tx in transactions:
supprimees.append(
{
"transaction_id": tx.transaction_id,
"document_id": tx.sage_document_id,
"date_creation": tx.created_at.isoformat(),
"erreur": tx.sync_error,
}
)
await session.delete(tx)
await session.commit()
return {
"success": True,
"transactions_supprimees": len(supprimees),
"age_limite_jours": age_jours,
"details": supprimees,
}
except Exception as e:
logger.error(f"Erreur nettoyage: {e}")
raise HTTPException(500, str(e))
@router.get("/debug/webhook-payload/{transaction_id}", tags=["Debug"])
async def voir_dernier_webhook(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
tx = result.scalar_one_or_none()
if not tx:
raise HTTPException(404, "Transaction introuvable")
logs_query = (
select(UniversignSyncLog)
.where(
and_(
UniversignSyncLog.transaction_id == tx.id,
UniversignSyncLog.sync_type.like("webhook:%"),
)
)
.order_by(UniversignSyncLog.sync_timestamp.desc())
.limit(1)
)
logs_result = await session.execute(logs_query)
last_webhook_log = logs_result.scalar_one_or_none()
if not last_webhook_log:
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_payload": None,
"message": "Aucun webhook reçu pour cette transaction",
}
return {
"transaction_id": transaction_id,
"webhook_recu": tx.webhook_received,
"dernier_webhook": {
"timestamp": last_webhook_log.sync_timestamp.isoformat(),
"type": last_webhook_log.sync_type,
"success": last_webhook_log.success,
"payload": json.loads(last_webhook_log.changes_detected)
if last_webhook_log.changes_detected
else None,
},
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur debug webhook: {e}")
raise HTTPException(500, str(e))
@router.get(
"/transactions/{transaction_id}/document/download", tags=["Documents Signés"]
)
async def telecharger_document_signe(
transaction_id: str, session: AsyncSession = Depends(get_session)
):
try:
query = select(UniversignTransaction).where(
UniversignTransaction.transaction_id == transaction_id
)
result = await session.execute(query)
transaction = result.scalar_one_or_none()
if not transaction:
raise HTTPException(404, f"Transaction {transaction_id} introuvable")
if not transaction.signed_document_path:
raise HTTPException(
404,
"Document signé non disponible localement. "
"Utilisez POST /admin/download-missing-documents pour le récupérer.",
)
file_path = Path(transaction.signed_document_path)
if not file_path.exists():
logger.warning(f"Fichier perdu : {file_path}")
raise HTTPException(
404,
"Fichier introuvable sur le serveur. "
"Utilisez POST /admin/download-missing-documents pour le récupérer.",
)
download_name = (
f"{transaction.sage_document_id}_"
f"{transaction.sage_document_type.name}_"
f"signe.pdf"
)
return FileResponse(
path=str(file_path), media_type="application/pdf", filename=download_name
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur téléchargement document : {e}", exc_info=True)
raise HTTPException(500, str(e))
@router.get("/transactions/{transaction_id}/document/info", tags=["Documents Signés"])
async def info_document_signe(
transaction_id: str, session: AsyncSession = Depends(get_session)

View file

@ -1,86 +1,340 @@
import asyncio
import sys
import os
from pathlib import Path
import asyncio
import argparse
sys.path.insert(0, str(Path(__file__).parent.parent))
from database import get_session
from database.models.api_key import SwaggerUser
from services.api_key import ApiKeyService
from security.auth import hash_password
from sqlalchemy import select
import logging
from datetime import datetime
from typing import Optional, List
import json
from sqlalchemy import select
logging.basicConfig(level=logging.INFO)
_current_file = Path(__file__).resolve()
_script_dir = _current_file.parent
_app_dir = _script_dir.parent
print(f"DEBUG: Script path: {_current_file}")
print(f"DEBUG: App dir: {_app_dir}")
print(f"DEBUG: Current working dir: {os.getcwd()}")
if str(_app_dir) in sys.path:
sys.path.remove(str(_app_dir))
sys.path.insert(0, str(_app_dir))
os.chdir(str(_app_dir))
print(f"DEBUG: sys.path[0]: {sys.path[0]}")
print(f"DEBUG: New working dir: {os.getcwd()}")
_test_imports = [
"database",
"database.db_config",
"database.models",
"services",
"security",
]
print("\nDEBUG: Vérification des imports...")
for module in _test_imports:
try:
__import__(module)
print(f"{module}")
except ImportError as e:
print(f"{module}: {e}")
try:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService
from security.auth import hash_password
except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}")
print(" Vérifiez que vous êtes dans /app")
print(" Commande correcte: cd /app && python scripts/manage_security.py ...")
sys.exit(1)
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
async def add_swagger_user(username: str, password: str, full_name: str = None):
"""Ajouter un utilisateur Swagger"""
async with get_session() as session:
AVAILABLE_TAGS = {
"Authentication": " Authentification et gestion des comptes",
"API Keys Management": "🔑 Gestion des clés API",
"Clients": "👥 Gestion des clients",
"Fournisseurs": "🏭 Gestion des fournisseurs",
"Prospects": "🎯 Gestion des prospects",
"Tiers": "📋 Gestion générale des tiers",
"Contacts": "📞 Contacts des tiers",
"Articles": "📦 Catalogue articles",
"Familles": "🏷️ Familles d'articles",
"Stock": "📊 Mouvements de stock",
"Devis": "📄 Devis",
"Commandes": "🛒 Commandes",
"Livraisons": "🚚 Bons de livraison",
"Factures": "💰 Factures",
"Avoirs": "↩️ Avoirs",
"Règlements": "💳 Règlements et encaissements",
"Workflows": " Transformations de documents",
"Documents": "📑 Gestion documents (PDF)",
"Emails": "📧 Envoi d'emails",
"Validation": " Validations métier",
"Collaborateurs": "👔 Collaborateurs internes",
"Société": "🏢 Informations société",
"Référentiels": "📚 Données de référence",
"System": "⚙️ Système et santé",
"Admin": "🛠️ Administration",
"Debug": "🐛 Debug et diagnostics",
}
PRESET_PROFILES = {
"commercial": [
"Clients",
"Contacts",
"Devis",
"Commandes",
"Factures",
"Articles",
"Documents",
"Emails",
],
"comptable": [
"Clients",
"Fournisseurs",
"Factures",
"Avoirs",
"Règlements",
"Documents",
"Emails",
],
"logistique": [
"Articles",
"Stock",
"Commandes",
"Livraisons",
"Fournisseurs",
"Documents",
],
"readonly": ["Clients", "Articles", "Devis", "Commandes", "Factures", "Documents"],
"developer": [
"Authentication",
"API Keys Management",
"System",
"Clients",
"Articles",
"Devis",
"Commandes",
"Factures",
],
}
async def add_swagger_user(
username: str,
password: str,
full_name: str = None,
tags: Optional[List[str]] = None,
preset: Optional[str] = None,
):
"""Ajouter un utilisateur Swagger avec configuration avancée"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
existing = result.scalar_one_or_none()
if existing:
logger.error(f" L'utilisateur {username} existe déjà")
logger.error(f" L'utilisateur '{username}' existe déjà")
return
user = SwaggerUser(
if preset:
if preset not in PRESET_PROFILES:
logger.error(
f" Preset '{preset}' inconnu. Disponibles: {list(PRESET_PROFILES.keys())}"
)
return
tags = PRESET_PROFILES[preset]
logger.info(f"📋 Application du preset '{preset}': {len(tags)} tags")
swagger_user = SwaggerUser(
username=username,
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
allowed_tags=json.dumps(tags) if tags else None,
)
session.add(user)
session.add(swagger_user)
await session.commit()
logger.info(f" Utilisateur Swagger créé: {username}")
print("\n Utilisateur créé avec succès")
print(f" Username: {username}")
print(" Accès: https://votre-serveur/docs")
logger.info(f" Nom complet: {swagger_user.full_name}")
if tags:
logger.info(f" 🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
else:
logger.info(" 👑 Accès ADMIN COMPLET (tous les tags)")
async def list_swagger_users():
"""Lister les utilisateurs Swagger"""
async with get_session() as session:
"""Lister tous les utilisateurs Swagger avec détails"""
async with async_session_factory() as session:
result = await session.execute(select(SwaggerUser))
users = result.scalars().all()
if not users:
print("Aucun utilisateur Swagger trouvé")
logger.info("🔭 Aucun utilisateur Swagger")
return
print(f"\n {len(users)} utilisateur(s) Swagger:\n")
logger.info(f"\n👥 {len(users)} utilisateur(s) Swagger:\n")
logger.info("=" * 80)
for user in users:
status = " Actif" if user.is_active else " Inactif"
print(f"{user.username:<20} {status}")
if user.full_name:
print(f" Nom: {user.full_name}")
if user.last_login:
print(f" Dernière connexion: {user.last_login}")
print()
status = " ACTIF" if user.is_active else " NON ACTIF"
logger.info(f"\n{status} {user.username}")
logger.info(f"📛 Nom: {user.full_name}")
logger.info(f"🆔 ID: {user.id}")
logger.info(f"📅 Créé: {user.created_at}")
logger.info(f"🕐 Dernière connexion: {user.last_login or 'Jamais'}")
if user.allowed_tags:
try:
tags = json.loads(user.allowed_tags)
if tags:
logger.info(f"🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
auth_schemes = []
if "Authentication" in tags:
auth_schemes.append("JWT (Bearer)")
if "API Keys Management" in tags or len(tags) > 3:
auth_schemes.append("X-API-Key")
if not auth_schemes:
auth_schemes.append("JWT (Bearer)")
logger.info(
f" Authentification autorisée: {', '.join(auth_schemes)}"
)
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info(" Authentification: JWT + X-API-Key (tout)")
except json.JSONDecodeError:
logger.info(" Tags: Erreur format")
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info(" Authentification: JWT + X-API-Key (tout)")
logger.info("\n" + "=" * 80)
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with get_session() as session:
async def update_swagger_user(
username: str,
add_tags: Optional[List[str]] = None,
remove_tags: Optional[List[str]] = None,
set_tags: Optional[List[str]] = None,
preset: Optional[str] = None,
active: Optional[bool] = None,
):
"""Mettre à jour un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur {username} introuvable")
logger.error(f" Utilisateur '{username}' introuvable")
return
modified = False
if preset:
if preset not in PRESET_PROFILES:
logger.error(f" Preset '{preset}' inconnu")
return
user.allowed_tags = json.dumps(PRESET_PROFILES[preset])
logger.info(f"📋 Preset '{preset}' appliqué")
modified = True
elif set_tags is not None:
user.allowed_tags = json.dumps(set_tags) if set_tags else None
logger.info(f" Tags remplacés: {len(set_tags) if set_tags else 0}")
modified = True
elif add_tags or remove_tags:
current_tags = []
if user.allowed_tags:
try:
current_tags = json.loads(user.allowed_tags)
except json.JSONDecodeError:
current_tags = []
if add_tags:
for tag in add_tags:
if tag not in current_tags:
current_tags.append(tag)
logger.info(f" Tag ajouté: {tag}")
modified = True
if remove_tags:
for tag in remove_tags:
if tag in current_tags:
current_tags.remove(tag)
logger.info(f" Tag retiré: {tag}")
modified = True
user.allowed_tags = json.dumps(current_tags) if current_tags else None
if active is not None:
user.is_active = active
logger.info(f" Statut: {'ACTIF' if active else 'INACTIF'}")
modified = True
if modified:
await session.commit()
logger.info(f" Utilisateur '{username}' mis à jour")
else:
logger.info(" Aucune modification effectuée")
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
logger.info(f"🗑️ Utilisateur supprimé: {username}")
async def list_available_tags():
"""Liste tous les tags disponibles avec description"""
logger.info("\n🏷️ TAGS DISPONIBLES:\n")
logger.info("=" * 80)
for tag, desc in AVAILABLE_TAGS.items():
logger.info(f" {desc}")
logger.info(f" Nom: {tag}\n")
logger.info("=" * 80)
logger.info("\n📦 PRESETS DISPONIBLES:\n")
for preset_name, tags in PRESET_PROFILES.items():
logger.info(f" {preset_name}:")
logger.info(f" {', '.join(tags)}\n")
logger.info("=" * 80)
async def create_api_key(
@ -91,137 +345,239 @@ async def create_api_key(
endpoints: list = None,
):
"""Créer une clé API"""
async with get_session() as session:
async with async_session_factory() as session:
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
name=name,
description=description,
created_by="CLI",
created_by="cli",
expires_in_days=expires_in_days,
rate_limit_per_minute=rate_limit,
allowed_endpoints=endpoints,
)
print("\n Clé API créée avec succès\n")
print(f" ID: {api_key_obj.id}")
print(f" Nom: {name}")
print(f" Clé: {api_key_plain}")
print(f" Préfixe: {api_key_obj.key_prefix}")
print(f" Rate limit: {rate_limit} req/min")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}")
print("\n IMPORTANT: Sauvegardez cette clé, elle ne sera plus affichée !\n")
logger.info("=" * 70)
logger.info("🔑 Clé API créée avec succès")
logger.info("=" * 70)
logger.info(f" ID: {api_key_obj.id}")
logger.info(f" Nom: {api_key_obj.name}")
logger.info(f" Clé: {api_key_plain}")
logger.info(f" Préfixe: {api_key_obj.key_prefix}")
logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
logger.info(f" Expire le: {api_key_obj.expires_at}")
if api_key_obj.allowed_endpoints:
try:
endpoints_list = json.loads(api_key_obj.allowed_endpoints)
logger.info(f" Endpoints: {', '.join(endpoints_list)}")
except Exception:
logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}")
else:
logger.info(" Endpoints: Tous (aucune restriction)")
logger.info("=" * 70)
logger.info(" SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !")
logger.info("=" * 70)
async def list_api_keys():
"""Lister les clés API"""
async with get_session() as session:
"""Lister toutes les clés API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
print("Aucune clé API trouvée")
logger.info("🔭 Aucune clé API")
return
print(f"\n {len(keys)} clé(s) API:\n")
for key in keys:
status = "" if key.is_active else ""
expired = (
"⏰ Expirée"
if key.expires_at and key.expires_at < datetime.now()
else ""
)
logger.info(f"🔑 {len(keys)} clé(s) API:\n")
print(f" {status} {key.name:<30} ({key.key_prefix}...)")
print(f" ID: {key.id}")
print(f" Requêtes: {key.total_requests}")
print(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if expired:
print(f" {expired}")
print()
for key in keys:
is_valid = key.is_active and (
not key.expires_at or key.expires_at > datetime.now()
)
status = "" if is_valid else ""
logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
display = ", ".join(endpoints[:4])
if len(endpoints) > 4:
display += f"... (+{len(endpoints) - 4})"
logger.info(f" Endpoints: {display}")
except Exception:
pass
else:
logger.info(" Endpoints: Tous")
logger.info("")
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with get_session() as session:
service = ApiKeyService(session)
async with async_session_factory() as session:
result = await session.execute(select(ApiKey).where(ApiKey.id == key_id))
key = result.scalar_one_or_none()
api_key = await service.get_by_id(key_id)
if not api_key:
logger.error(f" Clé {key_id} introuvable")
if not key:
logger.error(f" Clé API '{key_id}' introuvable")
return
success = await service.revoke_api_key(key_id)
key.is_active = False
key.revoked_at = datetime.now()
await session.commit()
if success:
logger.info(f" Clé révoquée: {api_key.name}")
print(f"\n Clé '{api_key.name}' révoquée avec succès")
else:
logger.error(" Erreur lors de la révocation")
logger.info(f"🗑️ Clé API révoquée: {key.name}")
logger.info(f" ID: {key.id}")
async def verify_api_key_cmd(api_key: str):
async def verify_api_key(api_key: str):
"""Vérifier une clé API"""
async with get_session() as session:
async with async_session_factory() as session:
service = ApiKeyService(session)
api_key_obj = await service.verify_api_key(api_key)
key = await service.verify_api_key(api_key)
if api_key_obj:
print("\n Clé API valide\n")
print(f" Nom: {api_key_obj.name}")
print(f" ID: {api_key_obj.id}")
print(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
print(f" Requêtes: {api_key_obj.total_requests}")
print(f" Expire le: {api_key_obj.expires_at or 'Jamais'}\n")
if not key:
logger.error(" Clé API invalide ou expirée")
return
logger.info("=" * 60)
logger.info(" Clé API valide")
logger.info("=" * 60)
logger.info(f" Nom: {key.name}")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes totales: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
logger.info(f" Endpoints autorisés: {endpoints}")
except Exception:
pass
else:
print("\n Clé API invalide, expirée ou révoquée\n")
logger.info(" Endpoints autorisés: Tous")
logger.info("=" * 60)
async def main():
parser = argparse.ArgumentParser(
description="Gestion de la sécurité Sage Dataven API"
description="Gestion avancée des utilisateurs Swagger et clés API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
EXEMPLES D'UTILISATION:
=== UTILISATEURS SWAGGER ===
1. Créer un utilisateur avec preset:
python scripts/manage_security.py swagger add commercial Pass123! --preset commercial
2. Créer un admin complet:
python scripts/manage_security.py swagger add admin AdminPass
3. Créer avec tags spécifiques:
python scripts/manage_security.py swagger add client Pass123! --tags Clients Devis Factures
4. Mettre à jour un utilisateur (ajouter des tags):
python scripts/manage_security.py swagger update client --add-tags Commandes Livraisons
5. Changer complètement les tags:
python scripts/manage_security.py swagger update client --set-tags Clients Articles
6. Appliquer un preset:
python scripts/manage_security.py swagger update client --preset comptable
7. Lister les tags disponibles:
python scripts/manage_security.py swagger tags
8. Désactiver temporairement:
python scripts/manage_security.py swagger update client --inactive
=== CLÉS API ===
9. Créer une clé API:
python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
10. Créer avec endpoints restreints:
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
11. Lister les clés:
python scripts/manage_security.py apikey list
12. Vérifier une clé:
python scripts/manage_security.py apikey verify sdk_live_xxxxx
13. Révoquer une clé:
python scripts/manage_security.py apikey revoke <key_id>
""",
)
subparsers = parser.add_subparsers(dest="command", help="Commandes disponibles")
subparsers = parser.add_subparsers(dest="command", help="Commandes")
swagger_parser = subparsers.add_parser(
"swagger", help="Gestion utilisateurs Swagger"
swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger")
swagger_sub = swagger_parser.add_subparsers(dest="swagger_command")
add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur")
add_p.add_argument("username", help="Nom d'utilisateur")
add_p.add_argument("password", help="Mot de passe")
add_p.add_argument("--full-name", help="Nom complet", default=None)
add_p.add_argument(
"--tags",
nargs="*",
help="Tags autorisés. Vide = admin complet",
default=None,
)
swagger_subparsers = swagger_parser.add_subparsers(dest="action")
swagger_add = swagger_subparsers.add_parser("add", help="Ajouter un utilisateur")
swagger_add.add_argument("username", help="Nom d'utilisateur")
swagger_add.add_argument("password", help="Mot de passe")
swagger_add.add_argument("--full-name", help="Nom complet")
swagger_subparsers.add_parser("list", help="Lister les utilisateurs")
swagger_delete = swagger_subparsers.add_parser(
"delete", help="Supprimer un utilisateur"
add_p.add_argument(
"--preset",
choices=list(PRESET_PROFILES.keys()),
help="Appliquer un preset de tags",
)
swagger_delete.add_argument("username", help="Nom d'utilisateur")
update_p = swagger_sub.add_parser("update", help="Mettre à jour utilisateur")
update_p.add_argument("username", help="Nom d'utilisateur")
update_p.add_argument("--add-tags", nargs="+", help="Ajouter des tags")
update_p.add_argument("--remove-tags", nargs="+", help="Retirer des tags")
update_p.add_argument("--set-tags", nargs="*", help="Définir les tags (remplace)")
update_p.add_argument(
"--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer preset"
)
update_p.add_argument("--active", action="store_true", help="Activer l'utilisateur")
update_p.add_argument(
"--inactive", action="store_true", help="Désactiver l'utilisateur"
)
swagger_sub.add_parser("list", help="Lister utilisateurs")
del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur")
del_p.add_argument("username", help="Nom d'utilisateur")
swagger_sub.add_parser("tags", help="Lister les tags disponibles")
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_subparsers = apikey_parser.add_subparsers(dest="action")
apikey_sub = apikey_parser.add_subparsers(dest="apikey_command")
apikey_create = apikey_subparsers.add_parser("create", help="Créer une clé API")
apikey_create.add_argument("name", help="Nom de la clé")
apikey_create.add_argument("--description", help="Description")
apikey_create.add_argument(
"--days", type=int, default=365, help="Expiration en jours"
)
apikey_create.add_argument(
"--rate-limit", type=int, default=60, help="Limite req/min"
)
apikey_create.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
create_p = apikey_sub.add_parser("create", help="Créer clé API")
create_p.add_argument("name", help="Nom de la clé")
create_p.add_argument("--description", help="Description")
create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)")
create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min")
create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
apikey_subparsers.add_parser("list", help="Lister les clés")
apikey_sub.add_parser("list", help="Lister clés")
apikey_revoke = apikey_subparsers.add_parser("revoke", help="Révoquer une clé")
apikey_revoke.add_argument("key_id", help="ID de la clé")
rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé")
rev_p.add_argument("key_id", help="ID de la clé")
apikey_verify = apikey_subparsers.add_parser("verify", help="Vérifier une clé")
apikey_verify.add_argument("api_key", help="Clé API à vérifier")
ver_p = apikey_sub.add_parser("verify", help="Vérifier clé")
ver_p.add_argument("api_key", help="Clé API complète")
args = parser.parse_args()
@ -230,35 +586,66 @@ async def main():
return
if args.command == "swagger":
if args.action == "add":
await add_swagger_user(args.username, args.password, args.full_name)
elif args.action == "list":
if args.swagger_command == "add":
await add_swagger_user(
args.username,
args.password,
args.full_name,
args.tags,
args.preset,
)
elif args.swagger_command == "update":
active = None
if args.active:
active = True
elif args.inactive:
active = False
await update_swagger_user(
args.username,
add_tags=args.add_tags,
remove_tags=args.remove_tags,
set_tags=args.set_tags,
preset=args.preset,
active=active,
)
elif args.swagger_command == "list":
await list_swagger_users()
elif args.action == "delete":
elif args.swagger_command == "delete":
await delete_swagger_user(args.username)
elif args.swagger_command == "tags":
await list_available_tags()
else:
swagger_parser.print_help()
elif args.command == "apikey":
if args.action == "create":
if args.apikey_command == "create":
await create_api_key(
args.name,
args.description,
args.days,
args.rate_limit,
args.endpoints,
name=args.name,
description=args.description,
expires_in_days=args.days,
rate_limit=args.rate_limit,
endpoints=args.endpoints,
)
elif args.action == "list":
elif args.apikey_command == "list":
await list_api_keys()
elif args.action == "revoke":
elif args.apikey_command == "revoke":
await revoke_api_key(args.key_id)
elif args.action == "verify":
await verify_api_key_cmd(args.api_key)
elif args.apikey_command == "verify":
await verify_api_key(args.api_key)
else:
apikey_parser.print_help()
if __name__ == "__main__":
from datetime import datetime
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n Interrupted")
sys.exit(0)
except Exception as e:
logger.error(f" Erreur: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -5,10 +5,12 @@ import jwt
import secrets
import hashlib
SECRET_KEY = "VOTRE_SECRET_KEY_A_METTRE_EN_.ENV"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 10080
REFRESH_TOKEN_EXPIRE_DAYS = 7
from config.config import settings
SECRET_KEY = settings.jwt_secret
ALGORITHM = settings.jwt_algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
REFRESH_TOKEN_EXPIRE_DAYS = settings.refresh_token_expire_days
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@ -67,9 +69,13 @@ def decode_token(token: str) -> Optional[Dict]:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.JWTError:
return None
raise jwt.InvalidTokenError("Token expiré")
except jwt.DecodeError:
raise jwt.InvalidTokenError("Token invalide (format incorrect)")
except jwt.InvalidTokenError as e:
raise jwt.InvalidTokenError(f"Token invalide: {str(e)}")
except Exception as e:
raise jwt.InvalidTokenError(f"Erreur lors du décodage du token: {str(e)}")
def validate_password_strength(password: str) -> tuple[bool, str]:

View file

@ -134,7 +134,7 @@ class ApiKeyService:
api_key_obj.revoked_at = datetime.now()
await self.session.commit()
logger.info(f" Clé API révoquée: {api_key_obj.name}")
logger.info(f"🗑️ Clé API révoquée: {api_key_obj.name}")
return True
async def get_by_id(self, key_id: str) -> Optional[ApiKey]:
@ -150,24 +150,42 @@ class ApiKeyService:
}
async def check_endpoint_access(self, api_key_obj: ApiKey, endpoint: str) -> bool:
"""Vérifie si la clé a accès à un endpoint spécifique"""
if not api_key_obj.allowed_endpoints:
logger.debug(
f"🔓 API Key {api_key_obj.name}: Aucune restriction d'endpoint"
)
return True
try:
allowed = json.loads(api_key_obj.allowed_endpoints)
for pattern in allowed:
if pattern == "*":
return True
if pattern.endswith("*"):
prefix = pattern[:-1]
if endpoint.startswith(prefix):
return True
if pattern == endpoint:
if "*" in allowed or "/*" in allowed:
logger.debug(f"🔓 API Key {api_key_obj.name}: Accès global autorisé")
return True
for pattern in allowed:
if pattern == endpoint:
logger.debug(f" Match exact: {pattern} == {endpoint}")
return True
if pattern.endswith("/*"):
base = pattern[:-2] # "/clients/*" → "/clients"
if endpoint == base or endpoint.startswith(base + "/"):
logger.debug(f" Match wildcard: {pattern}{endpoint}")
return True
elif pattern.endswith("*"):
base = pattern[:-1] # "/clients*" → "/clients"
if endpoint.startswith(base):
logger.debug(f" Match prefix: {pattern}{endpoint}")
return True
logger.warning(
f" API Key {api_key_obj.name}: Accès refusé à {endpoint}\n"
f" Endpoints autorisés: {allowed}"
)
return False
except json.JSONDecodeError:
logger.error(f" Erreur parsing allowed_endpoints pour {api_key_obj.id}")
return False

View file

@ -6,7 +6,7 @@ import httpx
from datetime import datetime
from typing import Optional, Tuple, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import false, select, true, update, and_
from sqlalchemy import false, select, update, and_
import logging
from config.config import settings

15
tools/cleaner.py Normal file
View file

@ -0,0 +1,15 @@
from pathlib import Path
def supprimer_commentaires_ligne(fichier):
path = Path(fichier)
lignes = path.read_text(encoding="utf-8").splitlines()
lignes_sans_commentaires = [line for line in lignes if not line.lstrip().startswith("#")]
path.write_text("\n".join(lignes_sans_commentaires), encoding="utf-8")
if __name__ == "__main__":
base_dir = Path(__file__).resolve().parent.parent
fichier_api = base_dir / "data/data.py"
supprimer_commentaires_ligne(fichier_api)

View file

@ -0,0 +1,52 @@
import ast
import os
import textwrap
SOURCE_FILE = "main.py"
MODELS_DIR = "../models"
os.makedirs(MODELS_DIR, exist_ok=True)
with open(SOURCE_FILE, "r", encoding="utf-8") as f:
source_code = f.read()
tree = ast.parse(source_code)
pydantic_classes = []
other_nodes = []
for node in tree.body:
if isinstance(node, ast.ClassDef):
if any(
isinstance(base, ast.Name) and base.id == "BaseModel" for base in node.bases
):
pydantic_classes.append(node)
continue
other_nodes.append(node)
imports = """
from pydantic import BaseModel, Field
from typing import Optional, List
"""
for cls in pydantic_classes:
class_name = cls.name
file_name = f"{class_name.lower()}.py"
file_path = os.path.join(MODELS_DIR, file_name)
class_code = ast.get_source_segment(source_code, cls)
class_code = textwrap.dedent(class_code)
with open(file_path, "w", encoding="utf-8") as f:
f.write(imports.strip() + "\n\n")
f.write(class_code)
print(f"✅ Modèle extrait : {class_name}{file_path}")
new_tree = ast.Module(body=other_nodes, type_ignores=[])
new_source = ast.unparse(new_tree)
with open(SOURCE_FILE, "w", encoding="utf-8") as f:
f.write(new_source)
print("\n🎉 Extraction terminée")