Compare commits

...

5 commits

3 changed files with 338 additions and 74 deletions

128
api.py
View file

@ -283,13 +283,15 @@ def get_auth_schemes_for_user(swagger_user: dict) -> dict:
"type": "http", "type": "http",
"scheme": "bearer", "scheme": "bearer",
"bearerFormat": "JWT", "bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)", "description": "Authentification JWT pour utilisateurs (POST /auth/login). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
}, },
"ApiKeyAuth": { "ApiKeyAuth": {
"type": "apiKey", "type": "apiKey",
"in": "header", "in": "header",
"name": "X-API-Key", "name": "X-API-Key",
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)", "description": " Clé API pour intégrations externes (format: sdk_live_xxx). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
}, },
} }
@ -300,24 +302,17 @@ def get_auth_schemes_for_user(swagger_user: dict) -> dict:
"type": "http", "type": "http",
"scheme": "bearer", "scheme": "bearer",
"bearerFormat": "JWT", "bearerFormat": "JWT",
"description": "Authentification JWT pour utilisateurs (POST /auth/login)", "description": "Authentification JWT pour utilisateurs (POST /auth/login). "
"Utilisez SOIT JWT SOIT API Key, pas les deux.",
} }
if "API Keys Management" in allowed_tags or len(allowed_tags) > 3: schemes["ApiKeyAuth"] = {
schemes["ApiKeyAuth"] = { "type": "apiKey",
"type": "apiKey", "in": "header",
"in": "header", "name": "X-API-Key",
"name": "X-API-Key", "description": " Clé API pour intégrations externes (format: sdk_live_xxx). "
"description": "Clé API pour intégrations externes (format: sdk_live_xxx)", "Utilisez SOIT JWT SOIT API Key, pas les deux.",
} }
if not schemes:
schemes["HTTPBearer"] = {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Authentification requise",
}
return schemes return schemes
@ -352,16 +347,10 @@ def generate_filtered_openapi_schema(
base_schema["components"]["securitySchemes"] = auth_schemes base_schema["components"]["securitySchemes"] = auth_schemes
security_requirements = [] base_schema["security"] = []
if "HTTPBearer" in auth_schemes:
security_requirements.append({"HTTPBearer": []})
if "ApiKeyAuth" in auth_schemes:
security_requirements.append({"ApiKeyAuth": []})
base_schema["security"] = security_requirements if security_requirements else []
if not allowed_tags: if not allowed_tags:
logger.info(" Schéma OpenAPI complet (admin)") logger.info("⚙️ Schéma OpenAPI complet (admin)")
return base_schema return base_schema
filtered_paths = {} filtered_paths = {}
@ -384,6 +373,13 @@ def generate_filtered_openapi_schema(
operation_tags = operation.get("tags", []) operation_tags = operation.get("tags", [])
if any(tag in allowed_tags for tag in operation_tags): if any(tag in allowed_tags for tag in operation_tags):
operation_security = []
if "HTTPBearer" in auth_schemes:
operation_security.append({"HTTPBearer": []})
if "ApiKeyAuth" in auth_schemes:
operation_security.append({"ApiKeyAuth": []})
operation["security"] = operation_security
filtered_operations[method] = operation filtered_operations[method] = operation
if filtered_operations: if filtered_operations:
@ -400,16 +396,47 @@ def generate_filtered_openapi_schema(
if "components" in base_schema and "schemas" in base_schema["components"]: if "components" in base_schema and "schemas" in base_schema["components"]:
all_schemas = base_schema["components"]["schemas"] all_schemas = base_schema["components"]["schemas"]
filtered_schemas = get_schemas_for_tags(allowed_tags, all_schemas)
referenced_schemas = set()
def extract_schema_refs(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "$ref" and isinstance(value, str):
schema_name = value.split("/")[-1]
referenced_schemas.add(schema_name)
else:
extract_schema_refs(value)
elif isinstance(obj, list):
for item in obj:
extract_schema_refs(item)
extract_schema_refs(filtered_paths)
def add_dependencies(schema_name):
if schema_name not in all_schemas:
return
schema_def = all_schemas[schema_name]
extract_schema_refs(schema_def)
initial_schemas = referenced_schemas.copy()
for schema_name in initial_schemas:
add_dependencies(schema_name)
filtered_schemas = {}
for schema_name in referenced_schemas:
if schema_name in all_schemas:
filtered_schemas[schema_name] = all_schemas[schema_name]
base_schema["components"]["schemas"] = filtered_schemas base_schema["components"]["schemas"] = filtered_schemas
logger.info( logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, " f"🔍 Schéma filtré: {len(filtered_paths)} paths, "
f"{len(filtered_schemas)} schémas, tags: {allowed_tags}" f"{len(filtered_schemas)} schémas, tags: {allowed_tags}"
) )
else: else:
logger.info( logger.info(
f" Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}" f"🔍 Schéma filtré: {len(filtered_paths)} paths, tags: {allowed_tags}"
) )
return base_schema return base_schema
@ -429,10 +456,23 @@ async def custom_openapi_endpoint(request: Request):
username = swagger_user.get("username", "unknown") username = swagger_user.get("username", "unknown")
allowed_tags = swagger_user.get("allowed_tags") allowed_tags = swagger_user.get("allowed_tags")
logger.info(f" OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}") logger.info(f"📖 OpenAPI demandé par: {username}, tags: {allowed_tags or 'ALL'}")
schema = generate_filtered_openapi_schema(app, allowed_tags, swagger_user) schema = generate_filtered_openapi_schema(app, allowed_tags, swagger_user)
scheme = request.url.scheme
forwarded_proto = request.headers.get("X-Forwarded-Proto")
if forwarded_proto:
scheme = forwarded_proto
base_url = str(request.base_url).rstrip("/")
if scheme == "https" and base_url.startswith("http://"):
base_url = base_url.replace("http://", "https://", 1)
schema["servers"] = [{"url": base_url}]
return JSONResponse(content=schema) return JSONResponse(content=schema)
@ -447,17 +487,31 @@ async def custom_swagger_ui(request: Request):
headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'}, headers={"WWW-Authenticate": 'Basic realm="Swagger UI"'},
) )
allowed_tags = swagger_user.get("allowed_tags")
is_restricted = allowed_tags is not None and len(allowed_tags) > 0
swagger_params = {
"persistAuthorization": True,
"displayRequestDuration": True,
"filter": True,
"tryItOutEnabled": True,
"docExpansion": "list",
}
if is_restricted:
swagger_params["preAuthorizeApiKey"] = {
"ApiKeyAuth": {
"name": "X-API-Key",
"schema": {"type": "apiKey", "in": "header", "name": "X-API-Key"},
"value": "",
}
}
return get_swagger_ui_html( return get_swagger_ui_html(
openapi_url="/openapi.json", openapi_url="/openapi.json",
title=f"{app.title} - Documentation", title=f"{app.title} - Documentation",
swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png", swagger_favicon_url="https://fastapi.tiangolo.com/img/favicon.png",
swagger_ui_parameters={ swagger_ui_parameters=swagger_params,
"persistAuthorization": True,
"displayRequestDuration": True,
"filter": True,
"tryItOutEnabled": True,
"docExpansion": "list", # Meilleure UX
},
) )

View file

@ -159,27 +159,32 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
api_key_header = request.headers.get("X-API-Key") api_key_header = request.headers.get("X-API-Key")
if api_key_header: if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}") api_key_header = api_key_header.strip()
return await self._handle_api_key_auth( if not api_key_header or api_key_header == "":
request, api_key_header, path, method, call_next api_key_header = None
)
if auth_header and auth_header.startswith("Bearer "): if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1] token = auth_header.split(" ", 1)[1].strip()
if token.startswith("sdk_live_"): if token.startswith("sdk_live_"):
logger.warning( logger.warning(
"⚠️ API Key envoyée dans Authorization au lieu de X-API-Key" " API Key envoyée dans Authorization au lieu de X-API-Key"
) )
return await self._handle_api_key_auth( return await self._handle_api_key_auth(
request, token, path, method, call_next request, token, path, method, call_next
) )
logger.debug(f"🎫 JWT détecté pour {method} {path} → délégation à FastAPI") logger.debug(f"JWT détecté pour {method} {path} → délégation à FastAPI")
request.state.authenticated_via = "jwt" request.state.authenticated_via = "jwt"
return await call_next(request) return await call_next(request)
logger.debug(f"❓ Aucune auth pour {method} {path} → délégation à FastAPI") if api_key_header:
logger.debug(f" API Key détectée pour {method} {path}")
return await self._handle_api_key_auth(
request, api_key_header, path, method, call_next
)
logger.debug(f" Aucune auth pour {method} {path} → délégation à FastAPI")
return await call_next(request) return await call_next(request)
async def _handle_api_key_auth( async def _handle_api_key_auth(
@ -200,7 +205,7 @@ class ApiKeyMiddlewareHTTP(BaseHTTPMiddleware):
api_key_obj = await service.verify_api_key(api_key) api_key_obj = await service.verify_api_key(api_key)
if not api_key_obj: if not api_key_obj:
logger.warning(f" Clé API invalide: {method} {path}") logger.warning(f"🔒 Clé API invalide: {method} {path}")
return JSONResponse( return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
content={ content={

View file

@ -1,9 +1,3 @@
#!/usr/bin/env python3
"""
Script de gestion avancée des utilisateurs Swagger et API Keys
avec configuration des schémas d'authentification
"""
import sys import sys
import os import os
from pathlib import Path from pathlib import Path
@ -19,12 +13,35 @@ _current_file = Path(__file__).resolve()
_script_dir = _current_file.parent _script_dir = _current_file.parent
_app_dir = _script_dir.parent _app_dir = _script_dir.parent
print(f"DEBUG: Script path: {_current_file}")
print(f"DEBUG: App dir: {_app_dir}")
print(f"DEBUG: Current working dir: {os.getcwd()}")
if str(_app_dir) in sys.path: if str(_app_dir) in sys.path:
sys.path.remove(str(_app_dir)) sys.path.remove(str(_app_dir))
sys.path.insert(0, str(_app_dir)) sys.path.insert(0, str(_app_dir))
os.chdir(str(_app_dir)) os.chdir(str(_app_dir))
print(f"DEBUG: sys.path[0]: {sys.path[0]}")
print(f"DEBUG: New working dir: {os.getcwd()}")
_test_imports = [
"database",
"database.db_config",
"database.models",
"services",
"security",
]
print("\nDEBUG: Vérification des imports...")
for module in _test_imports:
try:
__import__(module)
print(f"{module}")
except ImportError as e:
print(f"{module}: {e}")
try: try:
from database.db_config import async_session_factory from database.db_config import async_session_factory
from database.models.api_key import SwaggerUser, ApiKey from database.models.api_key import SwaggerUser, ApiKey
@ -33,6 +50,7 @@ try:
except ImportError as e: except ImportError as e:
print(f"\n ERREUR D'IMPORT: {e}") print(f"\n ERREUR D'IMPORT: {e}")
print(" Vérifiez que vous êtes dans /app") print(" Vérifiez que vous êtes dans /app")
print(" Commande correcte: cd /app && python scripts/manage_security.py ...")
sys.exit(1) sys.exit(1)
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
@ -40,8 +58,8 @@ logger = logging.getLogger(__name__)
AVAILABLE_TAGS = { AVAILABLE_TAGS = {
"Authentication": "🔐 Authentification et gestion des comptes", "Authentication": " Authentification et gestion des comptes",
"API Keys Management": " Gestion des clés API", "API Keys Management": "🔑 Gestion des clés API",
"Clients": "👥 Gestion des clients", "Clients": "👥 Gestion des clients",
"Fournisseurs": "🏭 Gestion des fournisseurs", "Fournisseurs": "🏭 Gestion des fournisseurs",
"Prospects": "🎯 Gestion des prospects", "Prospects": "🎯 Gestion des prospects",
@ -56,13 +74,13 @@ AVAILABLE_TAGS = {
"Factures": "💰 Factures", "Factures": "💰 Factures",
"Avoirs": "↩️ Avoirs", "Avoirs": "↩️ Avoirs",
"Règlements": "💳 Règlements et encaissements", "Règlements": "💳 Règlements et encaissements",
"Workflows": "🔄 Transformations de documents", "Workflows": " Transformations de documents",
"Documents": "📑 Gestion documents (PDF)", "Documents": "📑 Gestion documents (PDF)",
"Emails": "📧 Envoi d'emails", "Emails": "📧 Envoi d'emails",
"Validation": " Validations métier", "Validation": " Validations métier",
"Collaborateurs": "👔 Collaborateurs internes", "Collaborateurs": "👔 Collaborateurs internes",
"Société": "🏢 Informations société", "Société": "🏢 Informations société",
"Référentiels": " Données de référence", "Référentiels": "📚 Données de référence",
"System": "⚙️ Système et santé", "System": "⚙️ Système et santé",
"Admin": "🛠️ Administration", "Admin": "🛠️ Administration",
"Debug": "🐛 Debug et diagnostics", "Debug": "🐛 Debug et diagnostics",
@ -199,16 +217,16 @@ async def list_swagger_users():
auth_schemes.append("JWT (Bearer)") auth_schemes.append("JWT (Bearer)")
logger.info( logger.info(
f"🔐 Authentification autorisée: {', '.join(auth_schemes)}" f" Authentification autorisée: {', '.join(auth_schemes)}"
) )
else: else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)") logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)") logger.info(" Authentification: JWT + X-API-Key (tout)")
except json.JSONDecodeError: except json.JSONDecodeError:
logger.info("⚠️ Tags: Erreur format") logger.info(" Tags: Erreur format")
else: else:
logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)") logger.info("👑 Tags autorisés: ADMIN COMPLET (tous)")
logger.info("🔐 Authentification: JWT + X-API-Key (tout)") logger.info(" Authentification: JWT + X-API-Key (tout)")
logger.info("\n" + "=" * 80) logger.info("\n" + "=" * 80)
@ -244,7 +262,7 @@ async def update_swagger_user(
elif set_tags is not None: elif set_tags is not None:
user.allowed_tags = json.dumps(set_tags) if set_tags else None user.allowed_tags = json.dumps(set_tags) if set_tags else None
logger.info(f"🔄 Tags remplacés: {len(set_tags) if set_tags else 0}") logger.info(f" Tags remplacés: {len(set_tags) if set_tags else 0}")
modified = True modified = True
elif add_tags or remove_tags: elif add_tags or remove_tags:
@ -273,19 +291,36 @@ async def update_swagger_user(
if active is not None: if active is not None:
user.is_active = active user.is_active = active
logger.info(f"🔄 Statut: {'ACTIF' if active else 'INACTIF'}") logger.info(f" Statut: {'ACTIF' if active else 'INACTIF'}")
modified = True modified = True
if modified: if modified:
await session.commit() await session.commit()
logger.info(f" Utilisateur '{username}' mis à jour") logger.info(f" Utilisateur '{username}' mis à jour")
else: else:
logger.info(" Aucune modification effectuée") logger.info(" Aucune modification effectuée")
async def delete_swagger_user(username: str):
"""Supprimer un utilisateur Swagger"""
async with async_session_factory() as session:
result = await session.execute(
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
logger.error(f" Utilisateur '{username}' introuvable")
return
await session.delete(user)
await session.commit()
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
async def list_available_tags(): async def list_available_tags():
"""Liste tous les tags disponibles avec description""" """Liste tous les tags disponibles avec description"""
logger.info("\n TAGS DISPONIBLES:\n") logger.info("\n🏷️ TAGS DISPONIBLES:\n")
logger.info("=" * 80) logger.info("=" * 80)
for tag, desc in AVAILABLE_TAGS.items(): for tag, desc in AVAILABLE_TAGS.items():
@ -302,20 +337,135 @@ async def list_available_tags():
logger.info("=" * 80) logger.info("=" * 80)
async def delete_swagger_user(username: str): async def create_api_key(
name: str,
description: str = None,
expires_in_days: int = 365,
rate_limit: int = 60,
endpoints: list = None,
):
"""Créer une clé API"""
async with async_session_factory() as session: async with async_session_factory() as session:
result = await session.execute( service = ApiKeyService(session)
select(SwaggerUser).where(SwaggerUser.username == username)
)
user = result.scalar_one_or_none()
if not user: api_key_obj, api_key_plain = await service.create_api_key(
logger.error(f" Utilisateur '{username}' introuvable") name=name,
description=description,
created_by="cli",
expires_in_days=expires_in_days,
rate_limit_per_minute=rate_limit,
allowed_endpoints=endpoints,
)
logger.info("=" * 70)
logger.info("🔑 Clé API créée avec succès")
logger.info("=" * 70)
logger.info(f" ID: {api_key_obj.id}")
logger.info(f" Nom: {api_key_obj.name}")
logger.info(f" Clé: {api_key_plain}")
logger.info(f" Préfixe: {api_key_obj.key_prefix}")
logger.info(f" Rate limit: {api_key_obj.rate_limit_per_minute} req/min")
logger.info(f" Expire le: {api_key_obj.expires_at}")
if api_key_obj.allowed_endpoints:
try:
endpoints_list = json.loads(api_key_obj.allowed_endpoints)
logger.info(f" Endpoints: {', '.join(endpoints_list)}")
except Exception:
logger.info(f" Endpoints: {api_key_obj.allowed_endpoints}")
else:
logger.info(" Endpoints: Tous (aucune restriction)")
logger.info("=" * 70)
logger.info(" SAUVEGARDEZ CETTE CLÉ - Elle ne sera plus affichée !")
logger.info("=" * 70)
async def list_api_keys():
"""Lister toutes les clés API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
keys = await service.list_api_keys()
if not keys:
logger.info("🔭 Aucune clé API")
return return
await session.delete(user) logger.info(f"🔑 {len(keys)} clé(s) API:\n")
for key in keys:
is_valid = key.is_active and (
not key.expires_at or key.expires_at > datetime.now()
)
status = "" if is_valid else ""
logger.info(f" {status} {key.name:<30} ({key.key_prefix}...)")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
logger.info(f" Dernière utilisation: {key.last_used_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
display = ", ".join(endpoints[:4])
if len(endpoints) > 4:
display += f"... (+{len(endpoints) - 4})"
logger.info(f" Endpoints: {display}")
except Exception:
pass
else:
logger.info(" Endpoints: Tous")
logger.info("")
async def revoke_api_key(key_id: str):
"""Révoquer une clé API"""
async with async_session_factory() as session:
result = await session.execute(select(ApiKey).where(ApiKey.id == key_id))
key = result.scalar_one_or_none()
if not key:
logger.error(f" Clé API '{key_id}' introuvable")
return
key.is_active = False
key.revoked_at = datetime.now()
await session.commit() await session.commit()
logger.info(f"🗑️ Utilisateur Swagger supprimé: {username}")
logger.info(f"🗑️ Clé API révoquée: {key.name}")
logger.info(f" ID: {key.id}")
async def verify_api_key(api_key: str):
"""Vérifier une clé API"""
async with async_session_factory() as session:
service = ApiKeyService(session)
key = await service.verify_api_key(api_key)
if not key:
logger.error(" Clé API invalide ou expirée")
return
logger.info("=" * 60)
logger.info(" Clé API valide")
logger.info("=" * 60)
logger.info(f" Nom: {key.name}")
logger.info(f" ID: {key.id}")
logger.info(f" Rate limit: {key.rate_limit_per_minute} req/min")
logger.info(f" Requêtes totales: {key.total_requests}")
logger.info(f" Expire: {key.expires_at or 'Jamais'}")
if key.allowed_endpoints:
try:
endpoints = json.loads(key.allowed_endpoints)
logger.info(f" Endpoints autorisés: {endpoints}")
except Exception:
pass
else:
logger.info(" Endpoints autorisés: Tous")
logger.info("=" * 60)
async def main(): async def main():
@ -325,6 +475,8 @@ async def main():
epilog=""" epilog="""
EXEMPLES D'UTILISATION: EXEMPLES D'UTILISATION:
=== UTILISATEURS SWAGGER ===
1. Créer un utilisateur avec preset: 1. Créer un utilisateur avec preset:
python scripts/manage_security.py swagger add commercial Pass123! --preset commercial python scripts/manage_security.py swagger add commercial Pass123! --preset commercial
@ -348,6 +500,23 @@ python scripts/manage_security.py swagger tags
8. Désactiver temporairement: 8. Désactiver temporairement:
python scripts/manage_security.py swagger update client --inactive python scripts/manage_security.py swagger update client --inactive
=== CLÉS API ===
9. Créer une clé API:
python scripts/manage_security.py apikey create "Mon App" --days 365 --rate-limit 100
10. Créer avec endpoints restreints:
python scripts/manage_security.py apikey create "SDK-ReadOnly" --endpoints "/clients" "/clients/*" "/devis" "/devis/*"
11. Lister les clés:
python scripts/manage_security.py apikey list
12. Vérifier une clé:
python scripts/manage_security.py apikey verify sdk_live_xxxxx
13. Révoquer une clé:
python scripts/manage_security.py apikey revoke <key_id>
""", """,
) )
@ -392,6 +561,24 @@ python scripts/manage_security.py swagger update client --inactive
swagger_sub.add_parser("tags", help="Lister les tags disponibles") swagger_sub.add_parser("tags", help="Lister les tags disponibles")
apikey_parser = subparsers.add_parser("apikey", help="Gestion clés API")
apikey_sub = apikey_parser.add_subparsers(dest="apikey_command")
create_p = apikey_sub.add_parser("create", help="Créer clé API")
create_p.add_argument("name", help="Nom de la clé")
create_p.add_argument("--description", help="Description")
create_p.add_argument("--days", type=int, default=365, help="Expiration (jours)")
create_p.add_argument("--rate-limit", type=int, default=60, help="Req/min")
create_p.add_argument("--endpoints", nargs="+", help="Endpoints autorisés")
apikey_sub.add_parser("list", help="Lister clés")
rev_p = apikey_sub.add_parser("revoke", help="Révoquer clé")
rev_p.add_argument("key_id", help="ID de la clé")
ver_p = apikey_sub.add_parser("verify", help="Vérifier clé")
ver_p.add_argument("api_key", help="Clé API complète")
args = parser.parse_args() args = parser.parse_args()
if not args.command: if not args.command:
@ -431,12 +618,30 @@ python scripts/manage_security.py swagger update client --inactive
else: else:
swagger_parser.print_help() swagger_parser.print_help()
elif args.command == "apikey":
if args.apikey_command == "create":
await create_api_key(
name=args.name,
description=args.description,
expires_in_days=args.days,
rate_limit=args.rate_limit,
endpoints=args.endpoints,
)
elif args.apikey_command == "list":
await list_api_keys()
elif args.apikey_command == "revoke":
await revoke_api_key(args.key_id)
elif args.apikey_command == "verify":
await verify_api_key(args.api_key)
else:
apikey_parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":
try: try:
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n Interrupted") print("\n Interrupted")
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
logger.error(f" Erreur: {e}") logger.error(f" Erreur: {e}")