Merge branch 'feat/controlled_swagger_access' into main_2

This commit is contained in:
Fanilo-Nantenaina 2026-01-21 13:01:38 +03:00
commit 23a94f5558
4 changed files with 305 additions and 105 deletions

207
api.py
View file

@ -1,7 +1,10 @@
from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body
from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body, Request
from fastapi.responses import JSONResponse
from fastapi.openapi.utils import get_openapi
from fastapi.responses import StreamingResponse, HTMLResponse, Response
from fastapi.encoders import jsonable_encoder
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from pydantic import BaseModel, Field, EmailStr
from typing import List, Optional
from datetime import datetime, date
@ -95,7 +98,6 @@ from utils.generic_functions import (
universign_envoyer,
)
from middleware.security import SwaggerAuthMiddleware, ApiKeyMiddlewareHTTP
from core.dependencies import get_current_user
from config.cors_config import setup_cors
@ -122,12 +124,12 @@ logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle de l'application"""
await init_db()
logger.info("Base de données initialisée")
email_queue.session_factory = async_session_factory
email_queue.sage_client = sage_client
logger.info("sage_client injecté dans email_queue")
email_queue.start(num_workers=settings.max_email_workers)
@ -136,18 +138,12 @@ async def lifespan(app: FastAPI):
sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key
)
sync_service.configure(
sage_client=sage_client, email_queue=email_queue, settings=settings
)
scheduler = UniversignSyncScheduler(
sync_service=sync_service,
interval_minutes=5,
)
scheduler = UniversignSyncScheduler(sync_service=sync_service, interval_minutes=5)
sync_task = asyncio.create_task(scheduler.start(async_session_factory))
logger.info("Synchronisation Universign démarrée (5min)")
yield
@ -159,34 +155,32 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="Sage Gateways",
title="Sage Gateways API",
version="3.0.0",
description="Configuration multi-tenant des connexions Sage Gateway",
description="API multi-tenant pour Sage 100c avec authentification hybride",
lifespan=lifespan,
openapi_tags=TAGS_METADATA,
docs_url=None,
redoc_url=None,
openapi_url=None,
)
""" app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
allow_credentials=True,
) """
def get_swagger_user_from_state(request: Request) -> Optional[dict]:
return getattr(request.state, "swagger_user", None)
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
def generate_filtered_openapi_schema(
app: FastAPI, allowed_tags: Optional[List[str]] = None
) -> dict:
base_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
openapi_schema["components"]["securitySchemes"] = {
base_schema["components"]["securitySchemes"] = {
"HTTPBearer": {
"type": "http",
"scheme": "bearer",
@ -201,19 +195,121 @@ def custom_openapi():
},
}
openapi_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}]
base_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
if not allowed_tags:
logger.info("📚 Schéma OpenAPI complet (admin)")
return base_schema
filtered_paths = {}
for path, path_item in base_schema.get("paths", {}).items():
filtered_operations = {}
for method, operation in path_item.items():
if method not in [
"get",
"post",
"put",
"delete",
"patch",
"options",
"head",
]:
continue
operation_tags = operation.get("tags", [])
if any(tag in allowed_tags for tag in operation_tags):
filtered_operations[method] = operation
if filtered_operations:
filtered_paths[path] = filtered_operations
base_schema["paths"] = filtered_paths
if "tags" in base_schema:
base_schema["tags"] = [
tag_obj
for tag_obj in base_schema["tags"]
if tag_obj.get("name") in allowed_tags
]
logger.info(f"🔒 Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}")
return base_schema
app.openapi = custom_openapi
@app.get("/openapi.json", include_in_schema=False)
async def custom_openapi_endpoint(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
username = swagger_user.get("username", "unknown")
allowed_tags = swagger_user.get("allowed_tags")
logger.info(f"📖 OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}")
schema = generate_filtered_openapi_schema(app, allowed_tags)
return JSONResponse(content=schema)
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
return get_swagger_ui_html(
openapi_url="/openapi.json",
title=f"{app.title} - Documentation",
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
swagger_ui_parameters={
"persistAuthorization": True,
"displayRequestDuration": True,
"filter": True,
"tryItOutEnabled": True,
},
)
@app.get("/redoc", include_in_schema=False)
async def custom_redoc(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
return get_redoc_html(
openapi_url="/openapi.json",
title=f"{app.title} - Documentation",
redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
)
setup_cors(app, mode="open")
app.add_middleware(SwaggerAuthMiddleware)
app.add_middleware(ApiKeyMiddlewareHTTP)
app.include_router(api_keys_router)
app.include_router(auth_router)
app.include_router(sage_gateway_router)
@ -3241,7 +3337,7 @@ async def get_reglement_detail(rg_no):
return sage_client.get_reglement_detail(rg_no)
@app.get("/health", tags=["System"])
""" @app.get("/health", tags=["System"])
async def health_check(
sage: SageGatewayClient = Depends(get_sage_client_for_user),
):
@ -3257,29 +3353,64 @@ async def health_check(
"queue_size": email_queue.queue.qsize(),
},
"timestamp": datetime.now().isoformat(),
}
} """
@app.get("/", tags=["System"])
async def root():
"""
Point d'entrée de l'API
"""
return {
"api": "Sage 100c Dataven - VPS Linux",
"api": "Sage 100c Dataven API",
"version": "3.0.0",
"documentation": "/docs (authentification requise)",
"health": "/health",
"status": "operational",
"documentation": {
"swagger": "/docs",
"redoc": "/redoc",
"openapi": "/openapi.json",
},
"authentication": {
"methods": [
{
"type": "JWT",
"type": "JWT (Bearer Token)",
"header": "Authorization: Bearer <token>",
"endpoint": "/api/auth/login",
"obtain_token": "POST /auth/login",
"description": "Pour les utilisateurs finaux",
},
{
"type": "API Key",
"header": "X-API-Key: sdk_live_xxx",
"endpoint": "/api/api-keys",
"manage_keys": "GET /api-keys",
"description": "Pour les intégrations externes",
},
]
],
"note": "Les routes acceptent JWT OU API Key (au choix)",
},
"swagger_access": {
"authentication": "HTTP Basic Auth (voir /scripts/manage_security.py)",
"filtering": "Les routes visibles dépendent des tags autorisés de l'utilisateur",
},
}
@app.get("/health", tags=["System"])
async def health_check():
"""
Vérification de santé de l'API (sans authentification)
"""
return {
"status": "healthy",
"timestamp": "2025-01-21T00:00:00Z",
"services": {
"api": "operational",
"database": "connected",
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers)
if hasattr(email_queue, "workers")
else 0,
},
},
}

View file

@ -1,4 +1,6 @@
from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text
from typing import Optional, List
import json
from datetime import datetime
import uuid
@ -49,8 +51,23 @@ class SwaggerUser(Base):
is_active = Column(Boolean, default=True, nullable=False)
allowed_tags = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.now, nullable=False)
last_login = Column(DateTime, nullable=True)
@property
def allowed_tags_list(self) -> Optional[List[str]]:
if self.allowed_tags:
try:
return json.loads(self.allowed_tags)
except json.JSONDecodeError:
return None
return None
@allowed_tags_list.setter
def allowed_tags_list(self, tags: Optional[List[str]]):
self.allowed_tags = json.dumps(tags) if tags is not None else None
def __repr__(self):
return f"<SwaggerUser(username='{self.username}', active={self.is_active})>"

View file

@ -2,12 +2,13 @@ from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from starlette.types import ASGIApp, Receive, Send
from sqlalchemy import select
from typing import Callable
from typing import Callable, Optional
from datetime import datetime
import logging
import base64
import json
logger = logging.getLogger(__name__)
@ -20,7 +21,7 @@ class SwaggerAuthMiddleware:
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope, receive, send):
async def __call__(self, scope, receive: Receive, send: Send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
@ -50,7 +51,9 @@ class SwaggerAuthMiddleware:
credentials = HTTPBasicCredentials(username=username, password=password)
if not await self._verify_credentials(credentials):
swagger_user = await self._verify_credentials(credentials)
if not swagger_user:
response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"},
@ -59,6 +62,14 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send)
return
if "state" not in scope:
scope["state"] = {}
scope["state"]["swagger_user"] = swagger_user
logger.info(
f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}"
)
except Exception as e:
logger.error(f"Erreur parsing auth header: {e}")
response = JSONResponse(
@ -71,8 +82,9 @@ class SwaggerAuthMiddleware:
await self.app(scope, receive, send)
async def _verify_credentials(self, credentials: HTTPBasicCredentials) -> bool:
"""Vérifie les identifiants dans la base de données"""
async def _verify_credentials(
self, credentials: HTTPBasicCredentials
) -> Optional[dict]:
from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser
from security.auth import verify_password
@ -92,15 +104,22 @@ class SwaggerAuthMiddleware:
):
swagger_user.last_login = datetime.now()
await session.commit()
logger.info(f"✓ Accès Swagger autorisé: {credentials.username}")
return True
return {
"id": swagger_user.id,
"username": swagger_user.username,
"allowed_tags": swagger_user.allowed_tags_list, # None = admin complet
"is_active": swagger_user.is_active,
}
logger.warning(f"✗ Accès Swagger refusé: {credentials.username}")
return False
return None
except Exception as e:
logger.error(f"Erreur vérification credentials: {e}")
return False
logger.error(f"Erreur vérification credentials: {e}", exc_info=True)
return None
class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
@ -116,7 +135,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
]
def _is_excluded_path(self, path: str) -> bool:
"""Vérifie si le chemin est exclu de l'authentification"""
"""Vérifie si le chemin est exclu de l'authentification API Key"""
if path == "/":
return True
@ -149,7 +168,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
if token.startswith("sdk_live_"):
logger.warning(
" API Key envoyée dans Authorization au lieu de X-API-Key"
" API Key envoyée dans Authorization au lieu de X-API-Key"
)
return await self._handle_api_key_auth(
request, token, path, method, call_next
@ -159,7 +178,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
request.state.authenticated_via = "jwt"
return await call_next(request)
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request)
async def _handle_api_key_auth(
@ -170,7 +189,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
method: str,
call_next: Callable,
):
"""Gère l'authentification par API Key avec vérification STRICTE"""
try:
from database.db_config import async_session_factory
from services.api_key import ApiKeyService
@ -181,7 +199,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
api_key_obj = await service.verify_api_key(api_key)
if not api_key_obj:
logger.warning(f" Clé API invalide: {method} {path}")
logger.warning(f" Clé API invalide: {method} {path}")
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
@ -192,7 +210,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
is_allowed, rate_info = await service.check_rate_limit(api_key_obj)
if not is_allowed:
logger.warning(f" Rate limit: {api_key_obj.name}")
logger.warning(f" Rate limit: {api_key_obj.name}")
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"},
@ -205,8 +223,6 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
has_access = await service.check_endpoint_access(api_key_obj, path)
if not has_access:
import json
allowed = (
json.loads(api_key_obj.allowed_endpoints)
if api_key_obj.allowed_endpoints
@ -214,7 +230,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
)
logger.warning(
f" ACCÈS REFUSÉ: {api_key_obj.name}\n"
f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n"
f" Endpoint demandé: {path}\n"
f" Endpoints autorisés: {allowed}"
)
@ -233,12 +249,12 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
request.state.api_key = api_key_obj
request.state.authenticated_via = "api_key"
logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name}{method} {path}")
logger.info(f" ACCÈS AUTORISÉ: {api_key_obj.name}{method} {path}")
return await call_next(request)
except Exception as e:
logger.error(f" Erreur validation API Key: {e}", exc_info=True)
logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": f"Erreur interne: {str(e)}"},
@ -248,7 +264,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
ApiKeyMiddleware = ApiKeyMiddlewareHTTP
def get_api_key_from_request(request: Request):
def get_api_key_from_request(request: Request) -> Optional:
"""Récupère l'objet ApiKey depuis la requête si présent"""
return getattr(request.state, "api_key", None)
@ -258,10 +274,16 @@ def get_auth_method(request: Request) -> str:
return getattr(request.state, "authenticated_via", "none")
def get_swagger_user_from_request(request: Request) -> Optional[dict]:
"""Récupère l'utilisateur Swagger depuis la requête"""
return getattr(request.state, "swagger_user", None)
__all__ = [
"SwaggerAuthMiddleware",
"ApiKeyMiddlewareHTTP",
"ApiKeyMiddleware",
"get_api_key_from_request",
"get_auth_method",
"get_swagger_user_from_request",
]

View file

@ -1,6 +1,13 @@
import sys
import os
from pathlib import Path
import asyncio
import argparse
import logging
from datetime import datetime
from typing import Optional, List
import json
from sqlalchemy import select
_current_file = Path(__file__).resolve()
_script_dir = _current_file.parent
@ -35,30 +42,28 @@ for module in _test_imports:
except ImportError as e:
print(f" {module}: {e}")
import asyncio
import argparse
import logging
from datetime import datetime
from sqlalchemy import select
try:
from database.db_config import async_session_factory
from database.models.user import User
from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService
from security.auth import hash_password
except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}")
print(f" Vérifiez que vous êtes dans /app")
print(f" Commande correcte: cd /app && python scripts/manage_security.py ...")
print(" Vérifiez que vous êtes dans /app")
print(" Commande correcte: cd /app && python scripts/manage_security.py ...")
sys.exit(1)
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
async def add_swagger_user(username: str, password: str, full_name: str = None):
async def add_swagger_user(
username: str,
password: str,
full_name: str = None,
tags: Optional[List[str]] = None,
):
"""Ajouter un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
@ -75,6 +80,7 @@ async def add_swagger_user(username: str, password: str, full_name: str = None):
hashed_password=hash_password(password),
full_name=full_name or username,
is_active=True,
allowed_tags=json.dumps(tags) if tags else None,
)
session.add(swagger_user)
@ -94,17 +100,30 @@ async def list_swagger_users():
logger.info("🔭 Aucun utilisateur Swagger")
return
logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n")
logger.info("👥 {} utilisateur(s) Swagger:\n".format(len(users)))
for user in users:
status = "" if user.is_active else ""
logger.info(f" {status} {user.username}")
logger.info(f" Nom: {user.full_name}")
logger.info(f" Créé: {user.created_at}")
logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n")
status = "ACTIF" if user.is_active else "NON ACTIF"
logger.info(" {} {}".format(status, user.username))
logger.info("Nom: {}".format(user.full_name))
logger.info("Créé: {}".format(user.created_at))
logger.info(" Dernière connexion: {}".format(user.last_login or "Jamais"))
if user.allowed_tags:
try:
tags = json.loads(user.allowed_tags)
if tags:
logger.info("Tags autorisés: {}".format(", ".join(tags)))
else:
logger.info("Tags autorisés: Tous (admin)")
except json.JSONDecodeError:
logger.info("Tags: Erreur format")
else:
logger.info("Tags autorisés: Tous (admin)")
logger.info("")
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
@ -112,7 +131,7 @@ async def delete_swagger_user(username: str):
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
logger.error(f" Utilisateur '{username}' introuvable")
return
await session.delete(user)
@ -143,21 +162,23 @@ async def create_api_key(
logger.info("=" * 70)
logger.info("🔑 Clé API créée avec succès")
logger.info("=" * 70)
logger.info(f" ID: {api_key_obj.id}")
logger.info(f" Nom: {api_key_obj.name}")
logger.info(f" Clé: {api_key_plain}")
logger.info(f" Préfixe: {api_key_obj.key_prefix}")
logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
logger.info(f" Expire le: {api_key_obj.expires_at}")
logger.info(" ID: {}".format(api_key_obj.id))
logger.info(" Nom: {}".format(api_key_obj.name))
logger.info(" Clé: {}".format(api_key_plain))
logger.info(" Préfixe: {}".format(api_key_obj.key_prefix))
logger.info(
" Rate limit: {} req/min".format(api_key_obj.rate_limit_per_minute)
)
logger.info(" Expire le: {}".format(api_key_obj.expires_at))
if api_key_obj.allowed_endpoints:
import json
try:
endpoints_list = json.loads(api_key_obj.allowed_endpoints)
logger.info(f" Endpoints: {', '.join(endpoints_list)}")
except:
logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}")
logger.info(" Endpoints: {}".format(", ".join(endpoints_list)))
except Exception:
logger.info(" Endpoints: {}".format(api_key_obj.allowed_endpoints))
else:
logger.info(" Endpoints: Tous (aucune restriction)")
@ -176,7 +197,7 @@ async def list_api_keys():
logger.info("🔭 Aucune clé API")
return
logger.info(f"🔑 {len(keys)} clé(s) API:\n")
logger.info("🔑 {} clé(s) API:\n".format(len(keys)))
for key in keys:
is_valid = key.is_active and (
@ -185,11 +206,11 @@ async def list_api_keys():
status = "" if is_valid else ""
logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if key.allowed_endpoints:
import json
@ -199,11 +220,11 @@ async def list_api_keys():
display = ", ".join(endpoints[:4])
if len(endpoints) > 4:
display += f"... (+{len(endpoints) - 4})"
logger.info(f" Endpoints: {display}")
except:
logger.info(f" Endpoints: {display}")
except Exception:
pass
else:
logger.info(" Endpoints: Tous")
logger.info("Endpoints: Tous")
logger.info("")
@ -250,7 +271,7 @@ async def verify_api_key(api_key: str):
try:
endpoints = json.loads(key.allowed_endpoints)
logger.info(f" Endpoints autorisés: {endpoints}")
except:
except Exception:
pass
else:
logger.info(" Endpoints autorisés: Tous")
@ -263,12 +284,14 @@ async def main():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
python scripts/manage_security.py swagger add admin MyP@ssw0rd
python scripts/manage_security.py swagger list
python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
python scripts/manage_security.py apikey list
python scripts/manage_security.py apikey verify sdk_live_xxxxx
python scripts/manage_security.py swagger add admin MyP@ssw0rd
python scripts/manage_security.py swagger list
python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
python scripts/manage_security.py apikey list
python scripts/manage_security.py apikey verify sdk_live_xxxxx
python scripts/manage_security.py swagger add client_user Secret123 --full-name "Client Tech IT" --tags Authentication Clients Devis Factures
python scripts/manage_security.py swagger add admin_user AdminPass --tags # vide = tout voir
""",
)
subparsers = parser.add_subparsers(dest="command", help="Commandes")
@ -279,7 +302,13 @@ Exemples:
add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur")
add_p.add_argument("username", help="Nom d'utilisateur")
add_p.add_argument("password", help="Mot de passe")
add_p.add_argument("--full-name", help="Nom complet")
add_p.add_argument("--full-name", help="Nom complet", default=None)
add_p.add_argument(
"--tags",
nargs="*",
help="Tags autorisés (Clients Devis etc). Vide ou omis = admin complet",
default=None,
)
swagger_sub.add_parser("list", help="Lister utilisateurs")
@ -312,7 +341,8 @@ Exemples:
if args.command == "swagger":
if args.swagger_command == "add":
await add_swagger_user(args.username, args.password, args.full_name)
tags = args.tags if args.tags else None
await add_swagger_user(args.username, args.password, args.full_name, tags)
elif args.swagger_command == "list":
await list_swagger_users()
elif args.swagger_command == "delete":