diff --git a/api.py b/api.py index 930aa00..0014cc7 100644 --- a/api.py +++ b/api.py @@ -1,7 +1,10 @@ -from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body +from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body, Request +from fastapi.responses import JSONResponse from fastapi.openapi.utils import get_openapi from fastapi.responses import StreamingResponse, HTMLResponse, Response from fastapi.encoders import jsonable_encoder +from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html + from pydantic import BaseModel, Field, EmailStr from typing import List, Optional from datetime import datetime, date @@ -95,7 +98,6 @@ from utils.generic_functions import ( universign_envoyer, ) - from middleware.security import SwaggerAuthMiddleware, ApiKeyMiddlewareHTTP from core.dependencies import get_current_user from config.cors_config import setup_cors @@ -122,12 +124,12 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): + """Lifecycle de l'application""" await init_db() logger.info("Base de données initialisée") email_queue.session_factory = async_session_factory email_queue.sage_client = sage_client - logger.info("sage_client injecté dans email_queue") email_queue.start(num_workers=settings.max_email_workers) @@ -136,18 +138,12 @@ async def lifespan(app: FastAPI): sync_service = UniversignSyncService( api_url=settings.universign_api_url, api_key=settings.universign_api_key ) - sync_service.configure( sage_client=sage_client, email_queue=email_queue, settings=settings ) - scheduler = UniversignSyncScheduler( - sync_service=sync_service, - interval_minutes=5, - ) - + scheduler = UniversignSyncScheduler(sync_service=sync_service, interval_minutes=5) sync_task = asyncio.create_task(scheduler.start(async_session_factory)) - logger.info("Synchronisation Universign démarrée (5min)") yield @@ -159,34 +155,32 @@ async def lifespan(app: FastAPI): app = FastAPI( - title="Sage Gateways", + title="Sage Gateways API", version="3.0.0", - description="Configuration multi-tenant des connexions Sage Gateway", + description="API multi-tenant pour Sage 100c avec authentification hybride", lifespan=lifespan, openapi_tags=TAGS_METADATA, + docs_url=None, + redoc_url=None, + openapi_url=None, ) -""" app.add_middleware( - CORSMiddleware, - allow_origins=settings.cors_origins, - allow_methods=["GET", "POST", "PUT", "DELETE"], - allow_headers=["*"], - allow_credentials=True, -) """ + +def get_swagger_user_from_state(request: Request) -> Optional[dict]: + return getattr(request.state, "swagger_user", None) -def custom_openapi(): - if app.openapi_schema: - return app.openapi_schema - - openapi_schema = get_openapi( +def generate_filtered_openapi_schema( + app: FastAPI, allowed_tags: Optional[List[str]] = None +) -> dict: + base_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) - openapi_schema["components"]["securitySchemes"] = { + base_schema["components"]["securitySchemes"] = { "HTTPBearer": { "type": "http", "scheme": "bearer", @@ -201,19 +195,121 @@ def custom_openapi(): }, } - openapi_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}] + base_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}] - app.openapi_schema = openapi_schema - return app.openapi_schema + if not allowed_tags: + logger.info("📚 Schéma OpenAPI complet (admin)") + return base_schema + + filtered_paths = {} + + for path, path_item in base_schema.get("paths", {}).items(): + filtered_operations = {} + + for method, operation in path_item.items(): + if method not in [ + "get", + "post", + "put", + "delete", + "patch", + "options", + "head", + ]: + continue + + operation_tags = operation.get("tags", []) + + if any(tag in allowed_tags for tag in operation_tags): + filtered_operations[method] = operation + + if filtered_operations: + filtered_paths[path] = filtered_operations + + base_schema["paths"] = filtered_paths + + if "tags" in base_schema: + base_schema["tags"] = [ + tag_obj + for tag_obj in base_schema["tags"] + if tag_obj.get("name") in allowed_tags + ] + + logger.info(f"🔒 Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}") + + return base_schema -app.openapi = custom_openapi +@app.get("/openapi.json", include_in_schema=False) +async def custom_openapi_endpoint(request: Request): + swagger_user = get_swagger_user_from_state(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + username = swagger_user.get("username", "unknown") + allowed_tags = swagger_user.get("allowed_tags") + + logger.info(f"📖 OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}") + + schema = generate_filtered_openapi_schema(app, allowed_tags) + + return JSONResponse(content=schema) + + +@app.get("/docs", include_in_schema=False) +async def custom_swagger_ui(request: Request): + swagger_user = get_swagger_user_from_state(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + return get_swagger_ui_html( + openapi_url="/openapi.json", + title=f"{app.title} - Documentation", + swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", + swagger_ui_parameters={ + "persistAuthorization": True, + "displayRequestDuration": True, + "filter": True, + "tryItOutEnabled": True, + }, + ) + + +@app.get("/redoc", include_in_schema=False) +async def custom_redoc(request: Request): + swagger_user = get_swagger_user_from_state(request) + + if not swagger_user: + return JSONResponse( + status_code=401, + content={"detail": "Authentification Swagger requise"}, + headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, + ) + + return get_redoc_html( + openapi_url="/openapi.json", + title=f"{app.title} - Documentation", + redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", + ) setup_cors(app, mode="open") + app.add_middleware(SwaggerAuthMiddleware) + app.add_middleware(ApiKeyMiddlewareHTTP) + app.include_router(api_keys_router) app.include_router(auth_router) app.include_router(sage_gateway_router) @@ -3241,7 +3337,7 @@ async def get_reglement_detail(rg_no): return sage_client.get_reglement_detail(rg_no) -@app.get("/health", tags=["System"]) +""" @app.get("/health", tags=["System"]) async def health_check( sage: SageGatewayClient = Depends(get_sage_client_for_user), ): @@ -3257,29 +3353,64 @@ async def health_check( "queue_size": email_queue.queue.qsize(), }, "timestamp": datetime.now().isoformat(), - } + } """ @app.get("/", tags=["System"]) async def root(): + """ + Point d'entrée de l'API + """ return { - "api": "Sage 100c Dataven - VPS Linux", + "api": "Sage 100c Dataven API", "version": "3.0.0", - "documentation": "/docs (authentification requise)", - "health": "/health", + "status": "operational", + "documentation": { + "swagger": "/docs", + "redoc": "/redoc", + "openapi": "/openapi.json", + }, "authentication": { "methods": [ { - "type": "JWT", + "type": "JWT (Bearer Token)", "header": "Authorization: Bearer ", - "endpoint": "/api/auth/login", + "obtain_token": "POST /auth/login", + "description": "Pour les utilisateurs finaux", }, { "type": "API Key", "header": "X-API-Key: sdk_live_xxx", - "endpoint": "/api/api-keys", + "manage_keys": "GET /api-keys", + "description": "Pour les intégrations externes", }, - ] + ], + "note": "Les routes acceptent JWT OU API Key (au choix)", + }, + "swagger_access": { + "authentication": "HTTP Basic Auth (voir /scripts/manage_security.py)", + "filtering": "Les routes visibles dépendent des tags autorisés de l'utilisateur", + }, + } + + +@app.get("/health", tags=["System"]) +async def health_check(): + """ + Vérification de santé de l'API (sans authentification) + """ + return { + "status": "healthy", + "timestamp": "2025-01-21T00:00:00Z", + "services": { + "api": "operational", + "database": "connected", + "email_queue": { + "running": email_queue.running, + "workers": len(email_queue.workers) + if hasattr(email_queue, "workers") + else 0, + }, }, } diff --git a/database/models/api_key.py b/database/models/api_key.py index 0d246ab..b4ffbb5 100644 --- a/database/models/api_key.py +++ b/database/models/api_key.py @@ -1,4 +1,6 @@ from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text +from typing import Optional, List +import json from datetime import datetime import uuid @@ -49,8 +51,23 @@ class SwaggerUser(Base): is_active = Column(Boolean, default=True, nullable=False) + allowed_tags = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.now, nullable=False) last_login = Column(DateTime, nullable=True) + @property + def allowed_tags_list(self) -> Optional[List[str]]: + if self.allowed_tags: + try: + return json.loads(self.allowed_tags) + except json.JSONDecodeError: + return None + return None + + @allowed_tags_list.setter + def allowed_tags_list(self, tags: Optional[List[str]]): + self.allowed_tags = json.dumps(tags) if tags is not None else None + def __repr__(self): return f"" diff --git a/middleware/security.py b/middleware/security.py index 88df4f6..010d4d8 100644 --- a/middleware/security.py +++ b/middleware/security.py @@ -2,12 +2,13 @@ from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from starlette.middleware.base import BaseHTTPMiddleware -from starlette.types import ASGIApp +from starlette.types import ASGIApp, Receive, Send from sqlalchemy import select -from typing import Callable +from typing import Callable, Optional from datetime import datetime import logging import base64 +import json logger = logging.getLogger(__name__) @@ -20,7 +21,7 @@ class SwaggerAuthMiddleware: def __init__(self, app: ASGIApp): self.app = app - async def __call__(self, scope, receive, send): + async def __call__(self, scope, receive: Receive, send: Send): if scope["type"] != "http": await self.app(scope, receive, send) return @@ -50,7 +51,9 @@ class SwaggerAuthMiddleware: credentials = HTTPBasicCredentials(username=username, password=password) - if not await self._verify_credentials(credentials): + swagger_user = await self._verify_credentials(credentials) + + if not swagger_user: response = JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Identifiants invalides"}, @@ -59,6 +62,14 @@ class SwaggerAuthMiddleware: await response(scope, receive, send) return + if "state" not in scope: + scope["state"] = {} + scope["state"]["swagger_user"] = swagger_user + + logger.info( + f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}" + ) + except Exception as e: logger.error(f"Erreur parsing auth header: {e}") response = JSONResponse( @@ -71,8 +82,9 @@ class SwaggerAuthMiddleware: await self.app(scope, receive, send) - async def _verify_credentials(self, credentials: HTTPBasicCredentials) -> bool: - """Vérifie les identifiants dans la base de données""" + async def _verify_credentials( + self, credentials: HTTPBasicCredentials + ) -> Optional[dict]: from database.db_config import async_session_factory from database.models.api_key import SwaggerUser from security.auth import verify_password @@ -92,15 +104,22 @@ class SwaggerAuthMiddleware: ): swagger_user.last_login = datetime.now() await session.commit() + logger.info(f"✓ Accès Swagger autorisé: {credentials.username}") - return True + + return { + "id": swagger_user.id, + "username": swagger_user.username, + "allowed_tags": swagger_user.allowed_tags_list, # None = admin complet + "is_active": swagger_user.is_active, + } logger.warning(f"✗ Accès Swagger refusé: {credentials.username}") - return False + return None except Exception as e: - logger.error(f"Erreur vérification credentials: {e}") - return False + logger.error(f"Erreur vérification credentials: {e}", exc_info=True) + return None class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): @@ -116,7 +135,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ] def _is_excluded_path(self, path: str) -> bool: - """Vérifie si le chemin est exclu de l'authentification""" + """Vérifie si le chemin est exclu de l'authentification API Key""" if path == "/": return True @@ -149,7 +168,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): if token.startswith("sdk_live_"): logger.warning( - " API Key envoyée dans Authorization au lieu de X-API-Key" + "⚠ API Key envoyée dans Authorization au lieu de X-API-Key" ) return await self._handle_api_key_auth( request, token, path, method, call_next @@ -159,7 +178,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): request.state.authenticated_via = "jwt" return await call_next(request) - logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI") + logger.debug(f"❓ Aucune auth pour {method} {path} → délégation à FastAPI") return await call_next(request) async def _handle_api_key_auth( @@ -170,7 +189,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): method: str, call_next: Callable, ): - """Gère l'authentification par API Key avec vérification STRICTE""" try: from database.db_config import async_session_factory from services.api_key import ApiKeyService @@ -181,7 +199,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): api_key_obj = await service.verify_api_key(api_key) if not api_key_obj: - logger.warning(f" Clé API invalide: {method} {path}") + logger.warning(f"❌ Clé API invalide: {method} {path}") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={ @@ -192,7 +210,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): is_allowed, rate_info = await service.check_rate_limit(api_key_obj) if not is_allowed: - logger.warning(f" Rate limit: {api_key_obj.name}") + logger.warning(f"⏱ Rate limit: {api_key_obj.name}") return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Rate limit dépassé"}, @@ -205,8 +223,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): has_access = await service.check_endpoint_access(api_key_obj, path) if not has_access: - import json - allowed = ( json.loads(api_key_obj.allowed_endpoints) if api_key_obj.allowed_endpoints @@ -214,7 +230,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ) logger.warning( - f" ACCÈS REFUSÉ: {api_key_obj.name}\n" + f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n" f" Endpoint demandé: {path}\n" f" Endpoints autorisés: {allowed}" ) @@ -233,12 +249,12 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): request.state.api_key = api_key_obj request.state.authenticated_via = "api_key" - logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name} → {method} {path}") + logger.info(f"✅ ACCÈS AUTORISÉ: {api_key_obj.name} → {method} {path}") return await call_next(request) except Exception as e: - logger.error(f" Erreur validation API Key: {e}", exc_info=True) + logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": f"Erreur interne: {str(e)}"}, @@ -248,7 +264,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): ApiKeyMiddleware = ApiKeyMiddlewareHTTP -def get_api_key_from_request(request: Request): +def get_api_key_from_request(request: Request) -> Optional: """Récupère l'objet ApiKey depuis la requête si présent""" return getattr(request.state, "api_key", None) @@ -258,10 +274,16 @@ def get_auth_method(request: Request) -> str: return getattr(request.state, "authenticated_via", "none") +def get_swagger_user_from_request(request: Request) -> Optional[dict]: + """Récupère l'utilisateur Swagger depuis la requête""" + return getattr(request.state, "swagger_user", None) + + __all__ = [ "SwaggerAuthMiddleware", "ApiKeyMiddlewareHTTP", "ApiKeyMiddleware", "get_api_key_from_request", "get_auth_method", + "get_swagger_user_from_request", ] diff --git a/scripts/manage_security.py b/scripts/manage_security.py index 1e5cab9..c90b310 100644 --- a/scripts/manage_security.py +++ b/scripts/manage_security.py @@ -1,6 +1,13 @@ import sys import os from pathlib import Path +import asyncio +import argparse +import logging +from datetime import datetime +from typing import Optional, List +import json +from sqlalchemy import select _current_file = Path(__file__).resolve() _script_dir = _current_file.parent @@ -35,30 +42,28 @@ for module in _test_imports: except ImportError as e: print(f" {module}: {e}") -import asyncio -import argparse -import logging -from datetime import datetime - -from sqlalchemy import select try: from database.db_config import async_session_factory - from database.models.user import User from database.models.api_key import SwaggerUser, ApiKey from services.api_key import ApiKeyService from security.auth import hash_password except ImportError as e: print(f"\n ERREUR D'IMPORT: {e}") - print(f" Vérifiez que vous êtes dans /app") - print(f" Commande correcte: cd /app && python scripts/manage_security.py ...") + print(" Vérifiez que vous êtes dans /app") + print(" Commande correcte: cd /app && python scripts/manage_security.py ...") sys.exit(1) logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logger = logging.getLogger(__name__) -async def add_swagger_user(username: str, password: str, full_name: str = None): +async def add_swagger_user( + username: str, + password: str, + full_name: str = None, + tags: Optional[List[str]] = None, +): """Ajouter un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( @@ -75,6 +80,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None): hashed_password=hash_password(password), full_name=full_name or username, is_active=True, + allowed_tags=json.dumps(tags) if tags else None, ) session.add(swagger_user) @@ -94,17 +100,30 @@ async def list_swagger_users(): logger.info("🔭 Aucun utilisateur Swagger") return - logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n") + logger.info("👥 {} utilisateur(s) Swagger:\n".format(len(users))) for user in users: - status = "" if user.is_active else "" - logger.info(f" {status} {user.username}") - logger.info(f" Nom: {user.full_name}") - logger.info(f" Créé: {user.created_at}") - logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n") + status = "ACTIF" if user.is_active else "NON ACTIF" + logger.info(" {} {}".format(status, user.username)) + logger.info("Nom: {}".format(user.full_name)) + logger.info("Créé: {}".format(user.created_at)) + logger.info(" Dernière connexion: {}".format(user.last_login or "Jamais")) + + if user.allowed_tags: + try: + tags = json.loads(user.allowed_tags) + if tags: + logger.info("Tags autorisés: {}".format(", ".join(tags))) + else: + logger.info("Tags autorisés: Tous (admin)") + except json.JSONDecodeError: + logger.info("Tags: Erreur format") + else: + logger.info("Tags autorisés: Tous (admin)") + + logger.info("") async def delete_swagger_user(username: str): - """Supprimer un utilisateur Swagger""" async with async_session_factory() as session: result = await session.execute( select(SwaggerUser).where(SwaggerUser.username == username) @@ -112,7 +131,7 @@ async def delete_swagger_user(username: str): user = result.scalar_one_or_none() if not user: - logger.error(f" Utilisateur '{username}' introuvable") + logger.error(f"❌ Utilisateur '{username}' introuvable") return await session.delete(user) @@ -143,21 +162,23 @@ async def create_api_key( logger.info("=" * 70) logger.info("🔑 Clé API créée avec succès") logger.info("=" * 70) - logger.info(f" ID: {api_key_obj.id}") - logger.info(f" Nom: {api_key_obj.name}") - logger.info(f" Clé: {api_key_plain}") - logger.info(f" Préfixe: {api_key_obj.key_prefix}") - logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min") - logger.info(f" Expire le: {api_key_obj.expires_at}") + logger.info(" ID: {}".format(api_key_obj.id)) + logger.info(" Nom: {}".format(api_key_obj.name)) + logger.info(" Clé: {}".format(api_key_plain)) + logger.info(" Préfixe: {}".format(api_key_obj.key_prefix)) + logger.info( + " Rate limit: {} req/min".format(api_key_obj.rate_limit_per_minute) + ) + logger.info(" Expire le: {}".format(api_key_obj.expires_at)) if api_key_obj.allowed_endpoints: import json try: endpoints_list = json.loads(api_key_obj.allowed_endpoints) - logger.info(f" Endpoints: {', '.join(endpoints_list)}") - except: - logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}") + logger.info(" Endpoints: {}".format(", ".join(endpoints_list))) + except Exception: + logger.info(" Endpoints: {}".format(api_key_obj.allowed_endpoints)) else: logger.info(" Endpoints: Tous (aucune restriction)") @@ -176,7 +197,7 @@ async def list_api_keys(): logger.info("🔭 Aucune clé API") return - logger.info(f"🔑 {len(keys)} clé(s) API:\n") + logger.info("🔑 {} clé(s) API:\n".format(len(keys))) for key in keys: is_valid = key.is_active and ( @@ -185,11 +206,11 @@ async def list_api_keys(): status = "" if is_valid else "" logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)") - logger.info(f" ID: {key.id}") - logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") - logger.info(f" Requêtes: {key.total_requests}") - logger.info(f" Expire: {key.expires_at or 'Jamais'}") - logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") + logger.info(f" ID: {key.id}") + logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min") + logger.info(f" Requêtes: {key.total_requests}") + logger.info(f" Expire: {key.expires_at or 'Jamais'}") + logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}") if key.allowed_endpoints: import json @@ -199,11 +220,11 @@ async def list_api_keys(): display = ", ".join(endpoints[:4]) if len(endpoints) > 4: display += f"... (+{len(endpoints) - 4})" - logger.info(f" Endpoints: {display}") - except: + logger.info(f" Endpoints: {display}") + except Exception: pass else: - logger.info(" Endpoints: Tous") + logger.info("Endpoints: Tous") logger.info("") @@ -250,7 +271,7 @@ async def verify_api_key(api_key: str): try: endpoints = json.loads(key.allowed_endpoints) logger.info(f" Endpoints autorisés: {endpoints}") - except: + except Exception: pass else: logger.info(" Endpoints autorisés: Tous") @@ -263,12 +284,14 @@ async def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: - python scripts/manage_security.py swagger add admin MyP@ssw0rd - python scripts/manage_security.py swagger list - python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 - python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" - python scripts/manage_security.py apikey list - python scripts/manage_security.py apikey verify sdk_live_xxxxx + python scripts/manage_security.py swagger add admin MyP@ssw0rd + python scripts/manage_security.py swagger list + python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100 + python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*" + python scripts/manage_security.py apikey list + python scripts/manage_security.py apikey verify sdk_live_xxxxx + python scripts/manage_security.py swagger add client_user Secret123 --full-name "Client Tech IT" --tags Authentication Clients Devis Factures + python scripts/manage_security.py swagger add admin_user AdminPass --tags # vide = tout voir """, ) subparsers = parser.add_subparsers(dest="command", help="Commandes") @@ -279,7 +302,13 @@ Exemples: add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("password", help="Mot de passe") - add_p.add_argument("--full-name", help="Nom complet") + add_p.add_argument("--full-name", help="Nom complet", default=None) + add_p.add_argument( + "--tags", + nargs="*", + help="Tags autorisés (Clients Devis etc). Vide ou omis = admin complet", + default=None, + ) swagger_sub.add_parser("list", help="Lister utilisateurs") @@ -312,7 +341,8 @@ Exemples: if args.command == "swagger": if args.swagger_command == "add": - await add_swagger_user(args.username, args.password, args.full_name) + tags = args.tags if args.tags else None + await add_swagger_user(args.username, args.password, args.full_name, tags) elif args.swagger_command == "list": await list_swagger_users() elif args.swagger_command == "delete":