import requests import argparse import sys from typing import Tuple class SecurityTester: def __init__(self, base_url: str): self.base_url = base_url.rstrip("/") self.results = {"passed": 0, "failed": 0, "tests": []} def log_test(self, name: str, passed: bool, details: str = ""): """Enregistrer le résultat d'un test""" status = " PASS" if passed else " FAIL" print(f"{status} - {name}") if details: print(f" {details}") self.results["tests"].append( {"name": name, "passed": passed, "details": details} ) if passed: self.results["passed"] += 1 else: self.results["failed"] += 1 def test_swagger_without_auth(self) -> bool: """Test 1: Swagger UI devrait demander une authentification""" print("\n Test 1: Protection Swagger UI") try: response = requests.get(f"{self.base_url}/docs", timeout=5) if response.status_code == 401: self.log_test( "Swagger protégé", True, "Code 401 retourné sans authentification", ) return True else: self.log_test( "Swagger protégé", False, f"Code {response.status_code} au lieu de 401", ) return False except Exception as e: self.log_test("Swagger protégé", False, f"Erreur: {str(e)}") return False def test_swagger_with_auth(self, username: str, password: str) -> bool: """Test 2: Swagger UI accessible avec credentials valides""" print("\n Test 2: Accès Swagger avec authentification") try: response = requests.get( f"{self.base_url}/docs", auth=(username, password), timeout=5 ) if response.status_code == 200: self.log_test( "Accès Swagger avec auth", True, f"Authentifié comme {username}", ) return True else: self.log_test( "Accès Swagger avec auth", False, f"Code {response.status_code}, credentials invalides?", ) return False except Exception as e: self.log_test("Accès Swagger avec auth", False, f"Erreur: {str(e)}") return False def test_api_without_auth(self) -> bool: """Test 3: Endpoints API devraient demander une authentification""" print("\n Test 3: Protection des endpoints API") test_endpoints = ["/api/v1/clients", "/api/v1/documents"] all_protected = True for endpoint in test_endpoints: try: response = requests.get(f"{self.base_url}{endpoint}", timeout=5) if response.status_code == 401: print(f" {endpoint} protégé (401)") else: print( f" {endpoint} accessible sans auth (code {response.status_code})" ) all_protected = False except Exception as e: print(f" {endpoint} erreur: {str(e)}") all_protected = False self.log_test("Endpoints API protégés", all_protected) return all_protected def test_health_endpoint_public(self) -> bool: """Test 4: Endpoint /health devrait être accessible sans auth""" print("\n Test 4: Endpoint /health public") try: response = requests.get(f"{self.base_url}/health", timeout=5) if response.status_code == 200: self.log_test("/health accessible", True, "Endpoint public fonctionne") return True else: self.log_test( "/health accessible", False, f"Code {response.status_code} inattendu", ) return False except Exception as e: self.log_test("/health accessible", False, f"Erreur: {str(e)}") return False def test_api_key_creation(self, username: str, password: str) -> Tuple[bool, str]: """Test 5: Créer une clé API via l'endpoint""" print("\n Test 5: Création d'une clé API") try: login_response = requests.post( f"{self.base_url}/api/v1/auth/login", json={"email": username, "password": password}, timeout=5, ) if login_response.status_code != 200: self.log_test( "Création clé API", False, "Impossible de se connecter pour obtenir un JWT", ) return False, "" jwt_token = login_response.json().get("access_token") create_response = requests.post( f"{self.base_url}/api/v1/api-keys", headers={"Authorization": f"Bearer {jwt_token}"}, json={ "name": "Test API Key", "description": "Clé de test automatisé", "rate_limit_per_minute": 60, "expires_in_days": 30, }, timeout=5, ) if create_response.status_code == 201: api_key = create_response.json().get("api_key") self.log_test("Création clé API", True, f"Clé créée: {api_key[:20]}...") return True, api_key else: self.log_test( "Création clé API", False, f"Code {create_response.status_code}", ) return False, "" except Exception as e: self.log_test("Création clé API", False, f"Erreur: {str(e)}") return False, "" def test_api_key_usage(self, api_key: str) -> bool: """Test 6: Utiliser une clé API pour accéder à un endpoint""" print("\n Test 6: Utilisation d'une clé API") if not api_key: self.log_test("Utilisation clé API", False, "Pas de clé disponible") return False try: response = requests.get( f"{self.base_url}/api/v1/clients", headers={"X-API-Key": api_key}, timeout=5, ) if response.status_code == 200: self.log_test("Utilisation clé API", True, "Clé acceptée") return True else: self.log_test( "Utilisation clé API", False, f"Code {response.status_code}, clé refusée?", ) return False except Exception as e: self.log_test("Utilisation clé API", False, f"Erreur: {str(e)}") return False def test_invalid_api_key(self) -> bool: """Test 7: Une clé invalide devrait être refusée""" print("\n Test 7: Rejet de clé API invalide") invalid_key = "sdk_live_invalid_key_12345" try: response = requests.get( f"{self.base_url}/api/v1/clients", headers={"X-API-Key": invalid_key}, timeout=5, ) if response.status_code == 401: self.log_test("Clé invalide rejetée", True, "Code 401 comme attendu") return True else: self.log_test( "Clé invalide rejetée", False, f"Code {response.status_code} au lieu de 401", ) return False except Exception as e: self.log_test("Clé invalide rejetée", False, f"Erreur: {str(e)}") return False def test_rate_limiting(self, api_key: str) -> bool: """Test 8: Rate limiting (optionnel, peut prendre du temps)""" print("\n Test 8: Rate limiting (test simple)") if not api_key: self.log_test("Rate limiting", False, "Pas de clé disponible") return False print(" Envoi de 70 requêtes rapides...") rate_limited = False for i in range(70): try: response = requests.get( f"{self.base_url}/health", headers={"X-API-Key": api_key}, timeout=1, ) if response.status_code == 429: rate_limited = True print(f" Rate limit atteint à la requête {i + 1}") break except Exception: pass if rate_limited: self.log_test("Rate limiting", True, "Rate limit détecté") return True else: self.log_test( "Rate limiting", True, "Aucun rate limit détecté (peut être normal si pas implémenté)", ) return True def print_summary(self): """Afficher le résumé des tests""" print("\n" + "=" * 60) print(" RÉSUMÉ DES TESTS") print("=" * 60) total = self.results["passed"] + self.results["failed"] success_rate = (self.results["passed"] / total * 100) if total > 0 else 0 print(f"\nTotal: {total} tests") print(f" Réussis: {self.results['passed']}") print(f" Échoués: {self.results['failed']}") print(f"Taux de réussite: {success_rate:.1f}%\n") if self.results["failed"] == 0: print("🎉 Tous les tests sont passés ! Sécurité OK.") return 0 else: print(" Certains tests ont échoué. Vérifiez la configuration.") return 1 def main(): parser = argparse.ArgumentParser( description="Test automatisé de la sécurité de l'API" ) parser.add_argument( "--url", required=True, help="URL de base de l'API (ex: http://localhost:8000)", ) parser.add_argument( "--swagger-user", required=True, help="Utilisateur Swagger pour les tests" ) parser.add_argument( "--swagger-pass", required=True, help="Mot de passe Swagger pour les tests" ) parser.add_argument( "--skip-rate-limit", action="store_true", help="Sauter le test de rate limiting (long)", ) args = parser.parse_args() print(" Démarrage des tests de sécurité") print(f" URL cible: {args.url}\n") tester = SecurityTester(args.url) tester.test_swagger_without_auth() tester.test_swagger_with_auth(args.swagger_user, args.swagger_pass) tester.test_api_without_auth() tester.test_health_endpoint_public() success, api_key = tester.test_api_key_creation( args.swagger_user, args.swagger_pass ) if success and api_key: tester.test_api_key_usage(api_key) tester.test_invalid_api_key() if not args.skip_rate_limit: tester.test_rate_limiting(api_key) else: print("\n Test de rate limiting sauté") else: print("\n Tests avec clé API sautés (création échouée)") exit_code = tester.print_summary() sys.exit(exit_code) if __name__ == "__main__": main()