Compare commits

..

No commits in common. "437ecd0ed3f6053f7a301ac9366ecc9fa3d7d67e" and "c1f4c66e8c3cdcefd0e4d3ee3ca2b2a3080cf51a" have entirely different histories.

5 changed files with 300 additions and 740 deletions

409
api.py
View file

@ -1,10 +1,7 @@
from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body, Request from fastapi import FastAPI, HTTPException, Path, Query, Depends, status, Body
from fastapi.responses import JSONResponse
from fastapi.openapi.utils import get_openapi from fastapi.openapi.utils import get_openapi
from fastapi.responses import StreamingResponse, HTMLResponse, Response from fastapi.responses import StreamingResponse, HTMLResponse, Response
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from pydantic import BaseModel, Field, EmailStr from pydantic import BaseModel, Field, EmailStr
from typing import List, Optional from typing import List, Optional
from datetime import datetime, date from datetime import datetime, date
@ -98,6 +95,7 @@ from utils.generic_functions import (
universign_envoyer, universign_envoyer,
) )
from middleware.security import SwaggerAuthMiddleware, ApiKeyMiddlewareHTTP from middleware.security import SwaggerAuthMiddleware, ApiKeyMiddlewareHTTP
from core.dependencies import get_current_user from core.dependencies import get_current_user
from config.cors_config import setup_cors from config.cors_config import setup_cors
@ -124,12 +122,12 @@ logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Lifecycle de l'application"""
await init_db() await init_db()
logger.info("Base de données initialisée") logger.info("Base de données initialisée")
email_queue.session_factory = async_session_factory email_queue.session_factory = async_session_factory
email_queue.sage_client = sage_client email_queue.sage_client = sage_client
logger.info("sage_client injecté dans email_queue") logger.info("sage_client injecté dans email_queue")
email_queue.start(num_workers=settings.max_email_workers) email_queue.start(num_workers=settings.max_email_workers)
@ -138,12 +136,18 @@ async def lifespan(app: FastAPI):
sync_service = UniversignSyncService( sync_service = UniversignSyncService(
api_url=settings.universign_api_url, api_key=settings.universign_api_key api_url=settings.universign_api_url, api_key=settings.universign_api_key
) )
sync_service.configure( sync_service.configure(
sage_client=sage_client, email_queue=email_queue, settings=settings sage_client=sage_client, email_queue=email_queue, settings=settings
) )
scheduler = UniversignSyncScheduler(sync_service=sync_service, interval_minutes=5) scheduler = UniversignSyncScheduler(
sync_service=sync_service,
interval_minutes=5,
)
sync_task = asyncio.create_task(scheduler.start(async_session_factory)) sync_task = asyncio.create_task(scheduler.start(async_session_factory))
logger.info("Synchronisation Universign démarrée (5min)") logger.info("Synchronisation Universign démarrée (5min)")
yield yield
@ -155,337 +159,61 @@ async def lifespan(app: FastAPI):
app = FastAPI( app = FastAPI(
title="Sage Gateways API", title="Sage Gateways",
version="3.0.0", version="3.0.0",
description="API multi-tenant pour Sage 100c avec authentification hybride", description="Configuration multi-tenant des connexions Sage Gateway",
lifespan=lifespan, lifespan=lifespan,
openapi_tags=TAGS_METADATA, openapi_tags=TAGS_METADATA,
docs_url=None,
redoc_url=None,
openapi_url=None,
) )
""" app.add_middleware(
def get_swagger_user_from_state(request: Request) -> Optional[dict]: CORSMiddleware,
return getattr(request.state, "swagger_user", None) allow_origins=settings.cors_origins,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
allow_credentials=True,
) """
def get_schemas_for_tags(allowed_tags: List[str], all_schemas: dict) -> dict: def custom_openapi():
if not allowed_tags: # Admin = tous les schémas if app.openapi_schema:
return all_schemas return app.openapi_schema
base_schemas = { openapi_schema = get_openapi(
"HTTPValidationError",
"ValidationError",
"HTTPException",
"TokenResponse",
"Login",
"RegisterRequest",
}
tag_schemas_map = {
"Clients": {
"ClientDetails",
"ClientCreate",
"ClientUpdate",
"TiersDetails",
"Contact",
"ContactCreate",
"ContactUpdate",
},
"Articles": {
"Article",
"ArticleCreate",
"ArticleUpdate",
"Familles",
"FamilleCreate",
},
"Devis": {
"Devis",
"DevisRequest",
"DevisUpdate",
"LigneDocument",
"EmailEnvoi",
"RelanceDevis",
},
"Commandes": {"CommandeCreate", "CommandeUpdate", "LigneDocument"},
"Factures": {
"FactureCreate",
"FactureUpdate",
"ReglementFactureCreate",
"ReglementMultipleCreate",
},
"Livraisons": {"LivraisonCreate", "LivraisonUpdate"},
"Avoirs": {"AvoirCreate", "AvoirUpdate"},
"Fournisseurs": {
"FournisseurDetails",
"FournisseurCreate",
"FournisseurUpdate",
},
"Collaborateurs": {
"CollaborateurDetails",
"CollaborateurCreate",
"CollaborateurUpdate",
},
"Stock": {"EntreeStock", "SortieStock", "MouvementStock"},
"Emails": {"EmailEnvoi", "EmailBatch", "TemplateEmail", "TemplatePreview"},
"API Keys Management": {
"ApiKeyCreate",
"ApiKeyResponse",
"ApiKeyCreatedResponse",
"ApiKeyList",
},
}
allowed_schemas = base_schemas.copy()
for tag in allowed_tags:
if tag in tag_schemas_map:
allowed_schemas.update(tag_schemas_map[tag])
filtered_schemas = {}
for schema_name, schema_def in all_schemas.items():
if schema_name in allowed_schemas:
filtered_schemas[schema_name] = schema_def
if "$ref" in str(schema_def):
_add_referenced_schemas(schema_def, all_schemas, filtered_schemas)
return filtered_schemas
def _add_referenced_schemas(
schema_def: dict, all_schemas: dict, filtered_schemas: dict
):
"""Ajoute récursivement les schémas référencés"""
if isinstance(schema_def, dict):
for key, value in schema_def.items():
if key == "$ref" and isinstance(value, str):
ref_name = value.split("/")[-1]
if ref_name in all_schemas and ref_name not in filtered_schemas:
filtered_schemas[ref_name] = all_schemas[ref_name]
_add_referenced_schemas(
all_schemas[ref_name], all_schemas, filtered_schemas
)
elif isinstance(value, (dict, list)):
_add_referenced_schemas(value, all_schemas, filtered_schemas)
elif isinstance(schema_def, list):
for item in schema_def:
_add_referenced_schemas(item, all_schemas, filtered_schemas)
def get_auth_schemes_for_user(swagger_user: dict) -> dict:
allowed_tags = swagger_user.get("allowed_tags")
if not allowed_tags:
return {
"HTTPBearer": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)",
},
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)",
},
}
schemes = {}
if "Authentication" in allowed_tags:
schemes["HTTPBearer"] = {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)",
}
if "API Keys Management" in allowed_tags or len(allowed_tags) > 3:
schemes["ApiKeyAuth"] = {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)",
}
if not schemes:
schemes["HTTPBearer"] = {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification requise",
}
return schemes
def generate_filtered_openapi_schema(
app: FastAPI, allowed_tags: Optional[List[str]] = None, swagger_user: dict = None
) -> dict:
base_schema = get_openapi(
title=app.title, title=app.title,
version=app.version, version=app.version,
description=app.description, description=app.description,
routes=app.routes, routes=app.routes,
) )
if swagger_user: openapi_schema["components"]["securitySchemes"] = {
auth_schemes = get_auth_schemes_for_user(swagger_user) "HTTPBearer": {
else: "type": "http",
auth_schemes = { "scheme": "bearer",
"HTTPBearer": { "bearerFormat": "JWT",
"type": "http", "description": "Authentification JWT pour utilisateurs (POST /auth/login)",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification JWT",
},
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "Clé API",
},
}
base_schema["components"]["securitySchemes"] = auth_schemes
security_requirements = []
if "HTTPBearer" in auth_schemes:
security_requirements.append({"HTTPBearer": []})
if "ApiKeyAuth" in auth_schemes:
security_requirements.append({"ApiKeyAuth": []})
base_schema["security"] = security_requirements if security_requirements else []
if not allowed_tags:
logger.info(" Schéma OpenAPI complet (admin)")
return base_schema
filtered_paths = {}
for path, path_item in base_schema.get("paths", {}).items():
filtered_operations = {}
for method, operation in path_item.items():
if method not in [
"get",
"post",
"put",
"delete",
"patch",
"options",
"head",
]:
continue
operation_tags = operation.get("tags", [])
if any(tag in allowed_tags for tag in operation_tags):
filtered_operations[method] = operation
if filtered_operations:
filtered_paths[path] = filtered_operations
base_schema["paths"] = filtered_paths
if "tags" in base_schema:
base_schema["tags"] = [
tag_obj
for tag_obj in base_schema["tags"]
if tag_obj.get("name") in allowed_tags
]
if "components" in base_schema and "schemas" in base_schema["components"]:
all_schemas = base_schema["components"]["schemas"]
filtered_schemas = get_schemas_for_tags(allowed_tags, all_schemas)
base_schema["components"]["schemas"] = filtered_schemas
logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, "
f"{len(filtered_schemas)} schémas, tags: {allowed_tags}"
)
else:
logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}"
)
return base_schema
@app.get("/openapi.json", include_in_schema=False)
async def custom_openapi_endpoint(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
username = swagger_user.get("username", "unknown")
allowed_tags = swagger_user.get("allowed_tags")
logger.info(f" OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}")
schema = generate_filtered_openapi_schema(app, allowed_tags, swagger_user)
return JSONResponse(content=schema)
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
return get_swagger_ui_html(
openapi_url="/openapi.json",
title=f"{app.title} - Documentation",
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
swagger_ui_parameters={
"persistAuthorization": True,
"displayRequestDuration": True,
"filter": True,
"tryItOutEnabled": True,
"docExpansion": "list", # Meilleure UX
}, },
) "ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)",
},
}
openapi_schema["security"] = [{"HTTPBearer": []}, {"ApiKeyAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
@app.get("/redoc", include_in_schema=False) app.openapi = custom_openapi
async def custom_redoc(request: Request):
swagger_user = get_swagger_user_from_state(request)
if not swagger_user:
return JSONResponse(
status_code=401,
content={"detail": "Authentification Swagger requise"},
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
)
return get_redoc_html(
openapi_url="/openapi.json",
title=f"{app.title} - Documentation",
redoc_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
)
setup_cors(app, mode="open") setup_cors(app, mode="open")
app.add_middleware(SwaggerAuthMiddleware) app.add_middleware(SwaggerAuthMiddleware)
app.add_middleware(ApiKeyMiddlewareHTTP) app.add_middleware(ApiKeyMiddlewareHTTP)
app.include_router(api_keys_router) app.include_router(api_keys_router)
app.include_router(auth_router) app.include_router(auth_router)
app.include_router(sage_gateway_router) app.include_router(sage_gateway_router)
@ -3513,7 +3241,7 @@ async def get_reglement_detail(rg_no):
return sage_client.get_reglement_detail(rg_no) return sage_client.get_reglement_detail(rg_no)
""" @app.get("/health", tags=["System"]) @app.get("/health", tags=["System"])
async def health_check( async def health_check(
sage: SageGatewayClient = Depends(get_sage_client_for_user), sage: SageGatewayClient = Depends(get_sage_client_for_user),
): ):
@ -3529,64 +3257,29 @@ async def health_check(
"queue_size": email_queue.queue.qsize(), "queue_size": email_queue.queue.qsize(),
}, },
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
} """ }
@app.get("/", tags=["System"]) @app.get("/", tags=["System"])
async def root(): async def root():
"""
Point d'entrée de l'API
"""
return { return {
"api": "Sage 100c Dataven API", "api": "Sage 100c Dataven - VPS Linux",
"version": "3.0.0", "version": "3.0.0",
"status": "operational", "documentation": "/docs (authentification requise)",
"documentation": { "health": "/health",
"swagger": "/docs",
"redoc": "/redoc",
"openapi": "/openapi.json",
},
"authentication": { "authentication": {
"methods": [ "methods": [
{ {
"type": "JWT (Bearer Token)", "type": "JWT",
"header": "Authorization: Bearer <token>", "header": "Authorization: Bearer <token>",
"obtain_token": "POST /auth/login", "endpoint": "/api/auth/login",
"description": "Pour les utilisateurs finaux",
}, },
{ {
"type": "API Key", "type": "API Key",
"header": "X-API-Key: sdk_live_xxx", "header": "X-API-Key: sdk_live_xxx",
"manage_keys": "GET /api-keys", "endpoint": "/api/api-keys",
"description": "Pour les intégrations externes",
}, },
], ]
"note": "Les routes acceptent JWT OU API Key (au choix)",
},
"swagger_access": {
"authentication": "HTTP Basic Auth (voir /scripts/manage_security.py)",
"filtering": "Les routes visibles dépendent des tags autorisés de l'utilisateur",
},
}
@app.get("/health", tags=["System"])
async def health_check():
"""
Vérification de santé de l'API (sans authentification)
"""
return {
"status": "healthy",
"timestamp": "2025-01-21T00:00:00Z",
"services": {
"api": "operational",
"database": "connected",
"email_queue": {
"running": email_queue.running,
"workers": len(email_queue.workers)
if hasattr(email_queue, "workers")
else 0,
},
}, },
} }

View file

@ -152,7 +152,7 @@ templates_signature_email = {
</table> </table>
<p style="color: #718096; font-size: 13px; line-height: 1.5; margin: 0;"> <p style="color: #718096; font-size: 13px; line-height: 1.5; margin: 0;">
<strong> Signature électronique sécurisée</strong><br> <strong>🔒 Signature électronique sécurisée</strong><br>
Votre signature est protégée par notre partenaire de confiance <strong>Universign</strong>, Votre signature est protégée par notre partenaire de confiance <strong>Universign</strong>,
certifié eIDAS et conforme au RGPD. Votre identité sera vérifiée et le document sera certifié eIDAS et conforme au RGPD. Votre identité sera vérifiée et le document sera
horodaté de manière infalsifiable. horodaté de manière infalsifiable.

View file

@ -1,6 +1,4 @@
from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text from sqlalchemy import Column, String, Boolean, DateTime, Integer, Text
from typing import Optional, List
import json
from datetime import datetime from datetime import datetime
import uuid import uuid
@ -51,23 +49,8 @@ class SwaggerUser(Base):
is_active = Column(Boolean, default=True, nullable=False) is_active = Column(Boolean, default=True, nullable=False)
allowed_tags = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.now, nullable=False) created_at = Column(DateTime, default=datetime.now, nullable=False)
last_login = Column(DateTime, nullable=True) last_login = Column(DateTime, nullable=True)
@property
def allowed_tags_list(self) -> Optional[List[str]]:
if self.allowed_tags:
try:
return json.loads(self.allowed_tags)
except json.JSONDecodeError:
return None
return None
@allowed_tags_list.setter
def allowed_tags_list(self, tags: Optional[List[str]]):
self.allowed_tags = json.dumps(tags) if tags is not None else None
def __repr__(self): def __repr__(self):
return f"<SwaggerUser(username='{self.username}', active={self.is_active})>" return f"<SwaggerUser(username='{self.username}', active={self.is_active})>"

View file

@ -2,13 +2,12 @@ from fastapi import Request, status
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp, Receive, Send from starlette.types import ASGIApp
from sqlalchemy import select from sqlalchemy import select
from typing import Callable, Optional from typing import Callable
from datetime import datetime from datetime import datetime
import logging import logging
import base64 import base64
import json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,7 +20,7 @@ class SwaggerAuthMiddleware:
def __init__(self, app: ASGIApp): def __init__(self, app: ASGIApp):
self.app = app self.app = app
async def __call__(self, scope, receive: Receive, send: Send): async def __call__(self, scope, receive, send):
if scope["type"] != "http": if scope["type"] != "http":
await self.app(scope, receive, send) await self.app(scope, receive, send)
return return
@ -51,9 +50,7 @@ class SwaggerAuthMiddleware:
credentials = HTTPBasicCredentials(username=username, password=password) credentials = HTTPBasicCredentials(username=username, password=password)
swagger_user = await self._verify_credentials(credentials) if not await self._verify_credentials(credentials):
if not swagger_user:
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Identifiants invalides"}, content={"detail": "Identifiants invalides"},
@ -62,17 +59,8 @@ class SwaggerAuthMiddleware:
await response(scope, receive, send) await response(scope, receive, send)
return return
if "state" not in scope:
scope["state"] = {}
scope["state"]["swagger_user"] = swagger_user
logger.info(
f"✓ Swagger auth: {swagger_user['username']} - tags: {swagger_user.get('allowed_tags', 'ALL')}"
)
except Exception as e: except Exception as e:
logger.error(f" Erreur parsing auth header: {e}") logger.error(f"Erreur parsing auth header: {e}")
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Format d'authentification invalide"}, content={"detail": "Format d'authentification invalide"},
@ -83,9 +71,8 @@ class SwaggerAuthMiddleware:
await self.app(scope, receive, send) await self.app(scope, receive, send)
async def _verify_credentials( async def _verify_credentials(self, credentials: HTTPBasicCredentials) -> bool:
self, credentials: HTTPBasicCredentials """Vérifie les identifiants dans la base de données"""
) -> Optional[dict]:
from database.db_config import async_session_factory from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser from database.models.api_key import SwaggerUser
from security.auth import verify_password from security.auth import verify_password
@ -105,22 +92,15 @@ class SwaggerAuthMiddleware:
): ):
swagger_user.last_login = datetime.now() swagger_user.last_login = datetime.now()
await session.commit() await session.commit()
logger.info(f"✓ Accès Swagger autorisé: {credentials.username}") logger.info(f"✓ Accès Swagger autorisé: {credentials.username}")
return True
return {
"id": swagger_user.id,
"username": swagger_user.username,
"allowed_tags": swagger_user.allowed_tags_list,
"is_active": swagger_user.is_active,
}
logger.warning(f"✗ Accès Swagger refusé: {credentials.username}") logger.warning(f"✗ Accès Swagger refusé: {credentials.username}")
return None return False
except Exception as e: except Exception as e:
logger.error(f" Erreur vérification credentials: {e}", exc_info=True) logger.error(f"Erreur vérification credentials: {e}")
return None return False
class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware): class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
@ -136,7 +116,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
] ]
def _is_excluded_path(self, path: str) -> bool: def _is_excluded_path(self, path: str) -> bool:
"""Vérifie si le chemin est exclu de l'authentification API Key""" """Vérifie si le chemin est exclu de l'authentification"""
if path == "/": if path == "/":
return True return True
@ -159,7 +139,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
api_key_header = request.headers.get("X-API-Key") api_key_header = request.headers.get("X-API-Key")
if api_key_header: if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}") logger.debug(f"🔑 API Key détectée pour {method} {path}")
return await self._handle_api_key_auth( return await self._handle_api_key_auth(
request, api_key_header, path, method, call_next request, api_key_header, path, method, call_next
) )
@ -169,7 +149,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
if token.startswith("sdk_live_"): if token.startswith("sdk_live_"):
logger.warning( logger.warning(
"⚠️ API Key envoyée dans Authorization au lieu de X-API-Key" " API Key envoyée dans Authorization au lieu de X-API-Key"
) )
return await self._handle_api_key_auth( return await self._handle_api_key_auth(
request, token, path, method, call_next request, token, path, method, call_next
@ -179,7 +159,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
request.state.authenticated_via = "jwt" request.state.authenticated_via = "jwt"
return await call_next(request) return await call_next(request)
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI") logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request) return await call_next(request)
async def _handle_api_key_auth( async def _handle_api_key_auth(
@ -190,6 +170,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
method: str, method: str,
call_next: Callable, call_next: Callable,
): ):
"""Gère l'authentification par API Key avec vérification STRICTE"""
try: try:
from database.db_config import async_session_factory from database.db_config import async_session_factory
from services.api_key import ApiKeyService from services.api_key import ApiKeyService
@ -211,7 +192,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
is_allowed, rate_info = await service.check_rate_limit(api_key_obj) is_allowed, rate_info = await service.check_rate_limit(api_key_obj)
if not is_allowed: if not is_allowed:
logger.warning(f"⏱️ Rate limit: {api_key_obj.name}") logger.warning(f" Rate limit: {api_key_obj.name}")
return JSONResponse( return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS, status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Rate limit dépassé"}, content={"detail": "Rate limit dépassé"},
@ -224,6 +205,8 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
has_access = await service.check_endpoint_access(api_key_obj, path) has_access = await service.check_endpoint_access(api_key_obj, path)
if not has_access: if not has_access:
import json
allowed = ( allowed = (
json.loads(api_key_obj.allowed_endpoints) json.loads(api_key_obj.allowed_endpoints)
if api_key_obj.allowed_endpoints if api_key_obj.allowed_endpoints
@ -231,7 +214,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
) )
logger.warning( logger.warning(
f"🚫 ACCÈS REFUSÉ: {api_key_obj.name}\n" f" ACCÈS REFUSÉ: {api_key_obj.name}\n"
f" Endpoint demandé: {path}\n" f" Endpoint demandé: {path}\n"
f" Endpoints autorisés: {allowed}" f" Endpoints autorisés: {allowed}"
) )
@ -243,7 +226,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
"endpoint_requested": path, "endpoint_requested": path,
"api_key_name": api_key_obj.name, "api_key_name": api_key_obj.name,
"allowed_endpoints": allowed, "allowed_endpoints": allowed,
"hint": "Cette clé API n'a pas accès à cet endpoint.", "hint": "Cette clé API n'a pas accès à cet endpoint. Contactez l'administrateur.",
}, },
) )
@ -255,7 +238,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
return await call_next(request) return await call_next(request)
except Exception as e: except Exception as e:
logger.error(f"💥 Erreur validation API Key: {e}", exc_info=True) logger.error(f" Erreur validation API Key: {e}", exc_info=True)
return JSONResponse( return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": f"Erreur interne: {str(e)}"}, content={"detail": f"Erreur interne: {str(e)}"},
@ -265,7 +248,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
ApiKeyMiddleware = ApiKeyMiddlewareHTTP ApiKeyMiddleware = ApiKeyMiddlewareHTTP
def get_api_key_from_request(request: Request) -> Optional: def get_api_key_from_request(request: Request):
"""Récupère l'objet ApiKey depuis la requête si présent""" """Récupère l'objet ApiKey depuis la requête si présent"""
return getattr(request.state, "api_key", None) return getattr(request.state, "api_key", None)
@ -275,16 +258,10 @@ def get_auth_method(request: Request) -> str:
return getattr(request.state, "authenticated_via", "none") return getattr(request.state, "authenticated_via", "none")
def get_swagger_user_from_request(request: Request) -> Optional[dict]:
"""Récupère l'utilisateur Swagger depuis la requête"""
return getattr(request.state, "swagger_user", None)
__all__ = [ __all__ = [
"SwaggerAuthMiddleware", "SwaggerAuthMiddleware",
"ApiKeyMiddlewareHTTP", "ApiKeyMiddlewareHTTP",
"ApiKeyMiddleware", "ApiKeyMiddleware",
"get_api_key_from_request", "get_api_key_from_request",
"get_auth_method", "get_auth_method",
"get_swagger_user_from_request",
] ]

View file

@ -1,123 +1,65 @@
#!/usr/bin/env python3
"""
Script de gestion avancée des utilisateurs Swagger et API Keys
avec configuration des schémas d'authentification
"""
import sys import sys
import os import os
from pathlib import Path from pathlib import Path
import asyncio
import argparse
import logging
from datetime import datetime
from typing import Optional, List
import json
from sqlalchemy import select
_current_file = Path(__file__).resolve() _current_file = Path(__file__).resolve()
_script_dir = _current_file.parent _script_dir = _current_file.parent
_app_dir = _script_dir.parent _app_dir = _script_dir.parent
print(f"DEBUG: Script path: {_current_file}")
print(f"DEBUG: App dir: {_app_dir}")
print(f"DEBUG: Current working dir: {os.getcwd()}")
if str(_app_dir) in sys.path: if str(_app_dir) in sys.path:
sys.path.remove(str(_app_dir)) sys.path.remove(str(_app_dir))
sys.path.insert(0, str(_app_dir)) sys.path.insert(0, str(_app_dir))
os.chdir(str(_app_dir)) os.chdir(str(_app_dir))
print(f"DEBUG: sys.path[0]: {sys.path[0]}")
print(f"DEBUG: New working dir: {os.getcwd()}")
_test_imports = [
"database",
"database.db_config",
"database.models",
"services",
"security",
]
print("\nDEBUG: Vérification des imports...")
for module in _test_imports:
try:
__import__(module)
print(f" {module}")
except ImportError as e:
print(f" {module}: {e}")
import asyncio
import argparse
import logging
from datetime import datetime
from sqlalchemy import select
try: try:
from database.db_config import async_session_factory from database.db_config import async_session_factory
from database.models.user import User
from database.models.api_key import SwaggerUser, ApiKey from database.models.api_key import SwaggerUser, ApiKey
from services.api_key import ApiKeyService from services.api_key import ApiKeyService
from security.auth import hash_password from security.auth import hash_password
except ImportError as e: except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}") print(f"\n ERREUR D'IMPORT: {e}")
print(" Vérifiez que vous êtes dans /app") print(f" Vérifiez que vous êtes dans /app")
print(f" Commande correcte: cd /app && python scripts/manage_security.py ...")
sys.exit(1) sys.exit(1)
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
AVAILABLE_TAGS = { async def add_swagger_user(username: str, password: str, full_name: str = None):
"Authentication": "🔐 Authentification et gestion des comptes", """Ajouter un utilisateur Swagger"""
"API Keys Management": " Gestion des clés API",
"Clients": "👥 Gestion des clients",
"Fournisseurs": "🏭 Gestion des fournisseurs",
"Prospects": "🎯 Gestion des prospects",
"Tiers": "📋 Gestion générale des tiers",
"Contacts": "📞 Contacts des tiers",
"Articles": "📦 Catalogue articles",
"Familles": "🏷️ Familles d'articles",
"Stock": "📊 Mouvements de stock",
"Devis": "📄 Devis",
"Commandes": "🛒 Commandes",
"Livraisons": "🚚 Bons de livraison",
"Factures": "💰 Factures",
"Avoirs": "↩️ Avoirs",
"Règlements": "💳 Règlements et encaissements",
"Workflows": "🔄 Transformations de documents",
"Documents": "📑 Gestion documents (PDF)",
"Emails": "📧 Envoi d'emails",
"Validation": " Validations métier",
"Collaborateurs": "👔 Collaborateurs internes",
"Société": "🏢 Informations société",
"Référentiels": " Données de référence",
"System": "⚙️ Système et santé",
"Admin": "🛠️ Administration",
"Debug": "🐛 Debug et diagnostics",
}
PRESET_PROFILES = {
"commercial": [
"Clients",
"Contacts",
"Devis",
"Commandes",
"Factures",
"Articles",
"Documents",
"Emails",
],
"comptable": [
"Clients",
"Fournisseurs",
"Factures",
"Avoirs",
"Règlements",
"Documents",
"Emails",
],
"logistique": [
"Articles",
"Stock",
"Commandes",
"Livraisons",
"Fournisseurs",
"Documents",
],
"readonly": ["Clients", "Articles", "Devis", "Commandes", "Factures", "Documents"],
"developer": [
"Authentication",
"API Keys Management",
"System",
"Clients",
"Articles",
"Devis",
"Commandes",
"Factures",
],
}
async def add_swagger_user(
username: str,
password: str,
full_name: str = None,
tags: Optional[List[str]] = None,
preset: Optional[str] = None,
):
"""Ajouter un utilisateur Swagger avec configuration avancée"""
async with async_session_factory() as session: async with async_session_factory() as session:
result = await session.execute( result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username) select(SwaggerUser).where(SwaggerUser.username == username)
@ -128,21 +70,11 @@ async def add_swagger_user(
logger.error(f" L'utilisateur '{username}' existe déjà") logger.error(f" L'utilisateur '{username}' existe déjà")
return return
if preset:
if preset not in PRESET_PROFILES:
logger.error(
f" Preset '{preset}' inconnu. Disponibles: {list(PRESET_PROFILES.keys())}"
)
return
tags = PRESET_PROFILES[preset]
logger.info(f"📋 Application du preset '{preset}': {len(tags)} tags")
swagger_user = SwaggerUser( swagger_user = SwaggerUser(
username=username, username=username,
hashed_password=hash_password(password), hashed_password=hash_password(password),
full_name=full_name or username, full_name=full_name or username,
is_active=True, is_active=True,
allowed_tags=json.dumps(tags) if tags else None,
) )
session.add(swagger_user) session.add(swagger_user)
@ -151,17 +83,9 @@ async def add_swagger_user(
logger.info(f" Utilisateur Swagger créé: {username}") logger.info(f" Utilisateur Swagger créé: {username}")
logger.info(f" Nom complet: {swagger_user.full_name}") logger.info(f" Nom complet: {swagger_user.full_name}")
if tags:
logger.info(f" 🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
else:
logger.info(" 👑 Accès ADMIN COMPLET (tous les tags)")
async def list_swagger_users(): async def list_swagger_users():
"""Lister tous les utilisateurs Swagger avec détails""" """Lister tous les utilisateurs Swagger"""
async with async_session_factory() as session: async with async_session_factory() as session:
result = await session.execute(select(SwaggerUser)) result = await session.execute(select(SwaggerUser))
users = result.scalars().all() users = result.scalars().all()
@ -170,139 +94,17 @@ async def list_swagger_users():
logger.info("🔭 Aucun utilisateur Swagger") logger.info("🔭 Aucun utilisateur Swagger")
return return
logger.info(f"\n👥 {len(users)} utilisateur(s) Swagger:\n") logger.info(f"👥 {len(users)} utilisateur(s) Swagger:\n")
logger.info("=" * 80)
for user in users: for user in users:
status = " ACTIF" if user.is_active else " NON ACTIF" status = "" if user.is_active else ""
logger.info(f"\n{status} {user.username}") logger.info(f" {status} {user.username}")
logger.info(f"📛 Nom: {user.full_name}") logger.info(f" Nom: {user.full_name}")
logger.info(f"🆔 ID: {user.id}") logger.info(f" Créé: {user.created_at}")
logger.info(f"📅 Créé: {user.created_at}") logger.info(f" Dernière connexion: {user.last_login or 'Jamais'}\n")
logger.info(f"🕐 Dernière connexion: {user.last_login or 'Jamais'}")
if user.allowed_tags:
try:
tags = json.loads(user.allowed_tags)
if tags:
logger.info(f"🏷️ Tags autorisés ({len(tags)}):")
for tag in tags:
desc = AVAILABLE_TAGS.get(tag, "")
logger.info(f"{tag} {desc}")
auth_schemes = []
if "Authentication" in tags:
auth_schemes.append("JWT (Bearer)")
if "API Keys Management" in tags or len(tags) > 3:
auth_schemes.append("X-API-Key")
if not auth_schemes:
auth_schemes.append("JWT (Bearer)")
logger.info(
f"🔐 Authentification autorisée: {', '.join(auth_schemes)}"
)
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)")
except json.JSONDecodeError:
logger.info("⚠️ Tags: Erreur format")
else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)")
logger.info("\n" + "=" * 80)
async def update_swagger_user(
username: str,
add_tags: Optional[List[str]] = None,
remove_tags: Optional[List[str]] = None,
set_tags: Optional[List[str]] = None,
preset: Optional[str] = None,
active: Optional[bool] = None,
):
"""Mettre à jour un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
modified = False
if preset:
if preset not in PRESET_PROFILES:
logger.error(f" Preset '{preset}' inconnu")
return
user.allowed_tags = json.dumps(PRESET_PROFILES[preset])
logger.info(f"📋 Preset '{preset}' appliqué")
modified = True
elif set_tags is not None:
user.allowed_tags = json.dumps(set_tags) if set_tags else None
logger.info(f"🔄 Tags remplacés: {len(set_tags) if set_tags else 0}")
modified = True
elif add_tags or remove_tags:
current_tags = []
if user.allowed_tags:
try:
current_tags = json.loads(user.allowed_tags)
except json.JSONDecodeError:
current_tags = []
if add_tags:
for tag in add_tags:
if tag not in current_tags:
current_tags.append(tag)
logger.info(f" Tag ajouté: {tag}")
modified = True
if remove_tags:
for tag in remove_tags:
if tag in current_tags:
current_tags.remove(tag)
logger.info(f" Tag retiré: {tag}")
modified = True
user.allowed_tags = json.dumps(current_tags) if current_tags else None
if active is not None:
user.is_active = active
logger.info(f"🔄 Statut: {'ACTIF' if active else 'INACTIF'}")
modified = True
if modified:
await session.commit()
logger.info(f" Utilisateur '{username}' mis à jour")
else:
logger.info(" Aucune modification effectuée")
async def list_available_tags():
"""Liste tous les tags disponibles avec description"""
logger.info("\n TAGS DISPONIBLES:\n")
logger.info("=" * 80)
for tag, desc in AVAILABLE_TAGS.items():
logger.info(f" {desc}")
logger.info(f" Nom: {tag}\n")
logger.info("=" * 80)
logger.info("\n📦 PRESETS DISPONIBLES:\n")
for preset_name, tags in PRESET_PROFILES.items():
logger.info(f" {preset_name}:")
logger.info(f" {', '.join(tags)}\n")
logger.info("=" * 80)
async def delete_swagger_user(username: str): async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with async_session_factory() as session: async with async_session_factory() as session:
result = await session.execute( result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username) select(SwaggerUser).where(SwaggerUser.username == username)
@ -318,39 +120,157 @@ async def delete_swagger_user(username: str):
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}") logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
async def create_api_key(
name: str,
description: str = None,
expires_in_days: int = 365,
rate_limit: int = 60,
endpoints: list = None,
):
"""Créer une clé API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
api_key_obj, api_key_plain = await service.create_api_key(
name=name,
description=description,
created_by="cli",
expires_in_days=expires_in_days,
rate_limit_per_minute=rate_limit,
allowed_endpoints=endpoints,
)
logger.info("=" * 70)
logger.info("🔑 Clé API créée avec succès")
logger.info("=" * 70)
logger.info(f" ID: {api_key_obj.id}")
logger.info(f" Nom: {api_key_obj.name}")
logger.info(f" Clé: {api_key_plain}")
logger.info(f" Préfixe: {api_key_obj.key_prefix}")
logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
logger.info(f" Expire le: {api_key_obj.expires_at}")
if api_key_obj.allowed_endpoints:
import json
try:
endpoints_list = json.loads(api_key_obj.allowed_endpoints)
logger.info(f" Endpoints: {', '.join(endpoints_list)}")
except:
logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}")
else:
logger.info(" Endpoints: Tous (aucune restriction)")
logger.info("=" * 70)
logger.info(" SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !")
logger.info("=" * 70)
async def list_api_keys():
"""Lister toutes les clés API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
logger.info("🔭 Aucune clé API")
return
logger.info(f"🔑 {len(keys)} clé(s) API:\n")
for key in keys:
is_valid = key.is_active and (
not key.expires_at or key.expires_at > datetime.now()
)
status = "" if is_valid else ""
logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if key.allowed_endpoints:
import json
try:
endpoints = json.loads(key.allowed_endpoints)
display = ", ".join(endpoints[:4])
if len(endpoints) > 4:
display += f"... (+{len(endpoints) - 4})"
logger.info(f" Endpoints: {display}")
except:
pass
else:
logger.info(" Endpoints: Tous")
logger.info("")
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with async_session_factory() as session:
result = await session.execute(select(ApiKey).where(ApiKey.id == key_id))
key = result.scalar_one_or_none()
if not key:
logger.error(f" Clé API '{key_id}' introuvable")
return
key.is_active = False
key.revoked_at = datetime.now()
await session.commit()
logger.info(f"🗑️ Clé API révoquée: {key.name}")
logger.info(f" ID: {key.id}")
async def verify_api_key(api_key: str):
"""Vérifier une clé API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
key = await service.verify_api_key(api_key)
if not key:
logger.error(" Clé API invalide ou expirée")
return
logger.info("=" * 60)
logger.info(" Clé API valide")
logger.info("=" * 60)
logger.info(f" Nom: {key.name}")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes totales: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
if key.allowed_endpoints:
import json
try:
endpoints = json.loads(key.allowed_endpoints)
logger.info(f" Endpoints autorisés: {endpoints}")
except:
pass
else:
logger.info(" Endpoints autorisés: Tous")
logger.info("=" * 60)
async def main(): async def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Gestion avancée des utilisateurs Swagger et clés API", description="Gestion des utilisateurs Swagger et clés API",
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
EXEMPLES D'UTILISATION: Exemples:
python scripts/manage_security.py swagger add admin MyP@ssw0rd
1. Créer un utilisateur avec preset: python scripts/manage_security.py swagger list
python scripts/manage_security.py swagger add commercial Pass123! --preset commercial python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
2. Créer un admin complet: python scripts/manage_security.py apikey list
python scripts/manage_security.py swagger add admin AdminPass python scripts/manage_security.py apikey verify sdk_live_xxxxx
3. Créer avec tags spécifiques:
python scripts/manage_security.py swagger add client Pass123! --tags Clients Devis Factures
4. Mettre à jour un utilisateur (ajouter des tags):
python scripts/manage_security.py swagger update client --add-tags Commandes Livraisons
5. Changer complètement les tags:
python scripts/manage_security.py swagger update client --set-tags Clients Articles
6. Appliquer un preset:
python scripts/manage_security.py swagger update client --preset comptable
7. Lister les tags disponibles:
python scripts/manage_security.py swagger tags
8. Désactiver temporairement:
python scripts/manage_security.py swagger update client --inactive
""", """,
) )
subparsers = parser.add_subparsers(dest="command", help="Commandes") subparsers = parser.add_subparsers(dest="command", help="Commandes")
swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger") swagger_parser = subparsers.add_parser("swagger", help="Gestion Swagger")
@ -359,38 +279,30 @@ python scripts/manage_security.py swagger update client --inactive
add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur") add_p = swagger_sub.add_parser("add", help="Ajouter utilisateur")
add_p.add_argument("username", help="Nom d'utilisateur") add_p.add_argument("username", help="Nom d'utilisateur")
add_p.add_argument("password", help="Mot de passe") add_p.add_argument("password", help="Mot de passe")
add_p.add_argument("--full-name", help="Nom complet", default=None) add_p.add_argument("--full-name", help="Nom complet")
add_p.add_argument(
"--tags",
nargs="*",
help="Tags autorisés. Vide = admin complet",
default=None,
)
add_p.add_argument(
"--preset",
choices=list(PRESET_PROFILES.keys()),
help="Appliquer un preset de tags",
)
update_p = swagger_sub.add_parser("update", help="Mettre à jour utilisateur")
update_p.add_argument("username", help="Nom d'utilisateur")
update_p.add_argument("--add-tags", nargs="+", help="Ajouter des tags")
update_p.add_argument("--remove-tags", nargs="+", help="Retirer des tags")
update_p.add_argument("--set-tags", nargs="*", help="Définir les tags (remplace)")
update_p.add_argument(
"--preset", choices=list(PRESET_PROFILES.keys()), help="Appliquer preset"
)
update_p.add_argument("--active", action="store_true", help="Activer l'utilisateur")
update_p.add_argument(
"--inactive", action="store_true", help="Désactiver l'utilisateur"
)
swagger_sub.add_parser("list", help="Lister utilisateurs") swagger_sub.add_parser("list", help="Lister utilisateurs")
del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur") del_p = swagger_sub.add_parser("delete", help="Supprimer utilisateur")
del_p.add_argument("username", help="Nom d'utilisateur") del_p.add_argument("username", help="Nom d'utilisateur")
swagger_sub.add_parser("tags", help="Lister les tags disponibles") apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_sub = apikey_parser.add_subparsers(dest="apikey_command")
create_p = apikey_sub.add_parser("create", help="Créer clé API")
create_p.add_argument("name", help="Nom de la clé")
create_p.add_argument("--description", help="Description")
create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)")
create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min")
create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
apikey_sub.add_parser("list", help="Lister clés")
rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé")
rev_p.add_argument("key_id", help="ID de la clé")
ver_p = apikey_sub.add_parser("verify", help="Vérifier clé")
ver_p.add_argument("api_key", help="Clé API complète")
args = parser.parse_args() args = parser.parse_args()
@ -400,37 +312,32 @@ python scripts/manage_security.py swagger update client --inactive
if args.command == "swagger": if args.command == "swagger":
if args.swagger_command == "add": if args.swagger_command == "add":
await add_swagger_user( await add_swagger_user(args.username, args.password, args.full_name)
args.username,
args.password,
args.full_name,
args.tags,
args.preset,
)
elif args.swagger_command == "update":
active = None
if args.active:
active = True
elif args.inactive:
active = False
await update_swagger_user(
args.username,
add_tags=args.add_tags,
remove_tags=args.remove_tags,
set_tags=args.set_tags,
preset=args.preset,
active=active,
)
elif args.swagger_command == "list": elif args.swagger_command == "list":
await list_swagger_users() await list_swagger_users()
elif args.swagger_command == "delete": elif args.swagger_command == "delete":
await delete_swagger_user(args.username) await delete_swagger_user(args.username)
elif args.swagger_command == "tags":
await list_available_tags()
else: else:
swagger_parser.print_help() swagger_parser.print_help()
elif args.command == "apikey":
if args.apikey_command == "create":
await create_api_key(
name=args.name,
description=args.description,
expires_in_days=args.days,
rate_limit=args.rate_limit,
endpoints=args.endpoints,
)
elif args.apikey_command == "list":
await list_api_keys()
elif args.apikey_command == "revoke":
await revoke_api_key(args.key_id)
elif args.apikey_command == "verify":
await verify_api_key(args.api_key)
else:
apikey_parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":
try: try: