354 lines
11 KiB
Python
354 lines
11 KiB
Python
import requests
|
|
import argparse
|
|
import sys
|
|
from typing import Tuple
|
|
|
|
|
|
class SecurityTester:
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.results = {"passed": 0, "failed": 0, "tests": []}
|
|
|
|
def log_test(self, name: str, passed: bool, details: str = ""):
|
|
"""Enregistrer le résultat d'un test"""
|
|
status = " PASS" if passed else " FAIL"
|
|
print(f"{status} - {name}")
|
|
if details:
|
|
print(f" {details}")
|
|
|
|
self.results["tests"].append(
|
|
{"name": name, "passed": passed, "details": details}
|
|
)
|
|
|
|
if passed:
|
|
self.results["passed"] += 1
|
|
else:
|
|
self.results["failed"] += 1
|
|
|
|
def test_swagger_without_auth(self) -> bool:
|
|
"""Test 1: Swagger UI devrait demander une authentification"""
|
|
print("\n Test 1: Protection Swagger UI")
|
|
|
|
try:
|
|
response = requests.get(f"{self.base_url}/docs", timeout=5)
|
|
|
|
if response.status_code == 401:
|
|
self.log_test(
|
|
"Swagger protégé",
|
|
True,
|
|
"Code 401 retourné sans authentification",
|
|
)
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"Swagger protégé",
|
|
False,
|
|
f"Code {response.status_code} au lieu de 401",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_test("Swagger protégé", False, f"Erreur: {str(e)}")
|
|
return False
|
|
|
|
def test_swagger_with_auth(self, username: str, password: str) -> bool:
|
|
"""Test 2: Swagger UI accessible avec credentials valides"""
|
|
print("\n Test 2: Accès Swagger avec authentification")
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/docs", auth=(username, password), timeout=5
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
self.log_test(
|
|
"Accès Swagger avec auth",
|
|
True,
|
|
f"Authentifié comme {username}",
|
|
)
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"Accès Swagger avec auth",
|
|
False,
|
|
f"Code {response.status_code}, credentials invalides?",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_test("Accès Swagger avec auth", False, f"Erreur: {str(e)}")
|
|
return False
|
|
|
|
def test_api_without_auth(self) -> bool:
|
|
"""Test 3: Endpoints API devraient demander une authentification"""
|
|
print("\n Test 3: Protection des endpoints API")
|
|
|
|
test_endpoints = ["/api/v1/clients", "/api/v1/documents"]
|
|
|
|
all_protected = True
|
|
for endpoint in test_endpoints:
|
|
try:
|
|
response = requests.get(f"{self.base_url}{endpoint}", timeout=5)
|
|
|
|
if response.status_code == 401:
|
|
print(f" {endpoint} protégé (401)")
|
|
else:
|
|
print(
|
|
f" {endpoint} accessible sans auth (code {response.status_code})"
|
|
)
|
|
all_protected = False
|
|
|
|
except Exception as e:
|
|
print(f" {endpoint} erreur: {str(e)}")
|
|
all_protected = False
|
|
|
|
self.log_test("Endpoints API protégés", all_protected)
|
|
return all_protected
|
|
|
|
def test_health_endpoint_public(self) -> bool:
|
|
"""Test 4: Endpoint /health devrait être accessible sans auth"""
|
|
print("\n Test 4: Endpoint /health public")
|
|
|
|
try:
|
|
response = requests.get(f"{self.base_url}/health", timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
self.log_test("/health accessible", True, "Endpoint public fonctionne")
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"/health accessible",
|
|
False,
|
|
f"Code {response.status_code} inattendu",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_test("/health accessible", False, f"Erreur: {str(e)}")
|
|
return False
|
|
|
|
def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]:
|
|
"""Test 5: Créer une clé API via l'endpoint"""
|
|
print("\n Test 5: Création d'une clé API")
|
|
|
|
try:
|
|
login_response = requests.post(
|
|
f"{self.base_url}/api/v1/auth/login",
|
|
json={"email": username, "password": password},
|
|
timeout=5,
|
|
)
|
|
|
|
if login_response.status_code != 200:
|
|
self.log_test(
|
|
"Création clé API",
|
|
False,
|
|
"Impossible de se connecter pour obtenir un JWT",
|
|
)
|
|
return False, ""
|
|
|
|
jwt_token = login_response.json().get("access_token")
|
|
|
|
create_response = requests.post(
|
|
f"{self.base_url}/api/v1/api-keys",
|
|
headers={"Authorization": f"Bearer {jwt_token}"},
|
|
json={
|
|
"name": "Test API Key",
|
|
"description": "Clé de test automatisé",
|
|
"rate_limit_per_minute": 60,
|
|
"expires_in_days": 30,
|
|
},
|
|
timeout=5,
|
|
)
|
|
|
|
if create_response.status_code == 201:
|
|
api_key = create_response.json().get("api_key")
|
|
self.log_test("Création clé API", True, f"Clé créée: {api_key[:20]}...")
|
|
return True, api_key
|
|
else:
|
|
self.log_test(
|
|
"Création clé API",
|
|
False,
|
|
f"Code {create_response.status_code}",
|
|
)
|
|
return False, ""
|
|
|
|
except Exception as e:
|
|
self.log_test("Création clé API", False, f"Erreur: {str(e)}")
|
|
return False, ""
|
|
|
|
def test_api_key_usage(self, api_key: str) -> bool:
|
|
"""Test 6: Utiliser une clé API pour accéder à un endpoint"""
|
|
print("\n Test 6: Utilisation d'une clé API")
|
|
|
|
if not api_key:
|
|
self.log_test("Utilisation clé API", False, "Pas de clé disponible")
|
|
return False
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/api/v1/clients",
|
|
headers={"X-API-Key": api_key},
|
|
timeout=5,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
self.log_test("Utilisation clé API", True, "Clé acceptée")
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"Utilisation clé API",
|
|
False,
|
|
f"Code {response.status_code}, clé refusée?",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_test("Utilisation clé API", False, f"Erreur: {str(e)}")
|
|
return False
|
|
|
|
def test_invalid_api_key(self) -> bool:
|
|
"""Test 7: Une clé invalide devrait être refusée"""
|
|
print("\n Test 7: Rejet de clé API invalide")
|
|
|
|
invalid_key = "sdk_live_invalid_key_12345"
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/api/v1/clients",
|
|
headers={"X-API-Key": invalid_key},
|
|
timeout=5,
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
self.log_test("Clé invalide rejetée", True, "Code 401 comme attendu")
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"Clé invalide rejetée",
|
|
False,
|
|
f"Code {response.status_code} au lieu de 401",
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_test("Clé invalide rejetée", False, f"Erreur: {str(e)}")
|
|
return False
|
|
|
|
def test_rate_limiting(self, api_key: str) -> bool:
|
|
"""Test 8: Rate limiting (optionnel, peut prendre du temps)"""
|
|
print("\n Test 8: Rate limiting (test simple)")
|
|
|
|
if not api_key:
|
|
self.log_test("Rate limiting", False, "Pas de clé disponible")
|
|
return False
|
|
|
|
print(" Envoi de 70 requêtes rapides...")
|
|
|
|
rate_limited = False
|
|
for i in range(70):
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/health",
|
|
headers={"X-API-Key": api_key},
|
|
timeout=1,
|
|
)
|
|
|
|
if response.status_code == 429:
|
|
rate_limited = True
|
|
print(f" Rate limit atteint à la requête {i + 1}")
|
|
break
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
if rate_limited:
|
|
self.log_test("Rate limiting", True, "Rate limit détecté")
|
|
return True
|
|
else:
|
|
self.log_test(
|
|
"Rate limiting",
|
|
True,
|
|
"Aucun rate limit détecté (peut être normal si pas implémenté)",
|
|
)
|
|
return True
|
|
|
|
def print_summary(self):
|
|
"""Afficher le résumé des tests"""
|
|
print("\n" + "=" * 60)
|
|
print(" RÉSUMÉ DES TESTS")
|
|
print("=" * 60)
|
|
|
|
total = self.results["passed"] + self.results["failed"]
|
|
success_rate = (self.results["passed"] / total * 100) if total > 0 else 0
|
|
|
|
print(f"\nTotal: {total} tests")
|
|
print(f" Réussis: {self.results['passed']}")
|
|
print(f" Échoués: {self.results['failed']}")
|
|
print(f"Taux de réussite: {success_rate:.1f}%\n")
|
|
|
|
if self.results["failed"] == 0:
|
|
print("🎉 Tous les tests sont passés ! Sécurité OK.")
|
|
return 0
|
|
else:
|
|
print(" Certains tests ont échoué. Vérifiez la configuration.")
|
|
return 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Test automatisé de la sécurité de l'API"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--url",
|
|
required=True,
|
|
help="URL de base de l'API (ex: http://localhost:8000)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--swagger-user", required=True, help="Utilisateur Swagger pour les tests"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--swagger-pass", required=True, help="Mot de passe Swagger pour les tests"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--skip-rate-limit",
|
|
action="store_true",
|
|
help="Sauter le test de rate limiting (long)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(" Démarrage des tests de sécurité")
|
|
print(f" URL cible: {args.url}\n")
|
|
|
|
tester = SecurityTester(args.url)
|
|
|
|
tester.test_swagger_without_auth()
|
|
tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass)
|
|
tester.test_api_without_auth()
|
|
tester.test_health_endpoint_public()
|
|
|
|
success, api_key = tester.test_api_key_creation(
|
|
args.swagger_user, args.swagger_pass
|
|
)
|
|
|
|
if success and api_key:
|
|
tester.test_api_key_usage(api_key)
|
|
tester.test_invalid_api_key()
|
|
|
|
if not args.skip_rate_limit:
|
|
tester.test_rate_limiting(api_key)
|
|
else:
|
|
print("\n Test de rate limiting sauté")
|
|
else:
|
|
print("\n Tests avec clé API sautés (création échouée)")
|
|
|
|
exit_code = tester.print_summary()
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|