refactor(api): remove debug endpoints before production release

This commit is contained in:
Fanilo-Nantenaina 2025-12-09 15:55:47 +03:00
parent 1c53135b62
commit 44354ec9bd

318
api.py
View file

@ -3793,324 +3793,6 @@ async def statistiques_utilisateurs(session: AsyncSession = Depends(get_session)
raise HTTPException(500, str(e)) raise HTTPException(500, str(e))
@app.get("/debug/users/{user_id}", response_model=UserResponse, tags=["Debug"])
async def lire_utilisateur_debug(
user_id: str, session: AsyncSession = Depends(get_session)
):
"""
👤 **ROUTE DEBUG** - Détails d'un utilisateur par ID
Non protégée - à sécuriser en production
"""
from database import User
from sqlalchemy import select
try:
query = select(User).where(User.id == user_id)
result = await session.execute(query)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, f"Utilisateur {user_id} introuvable")
return UserResponse(
id=user.id,
email=user.email,
nom=user.nom,
prenom=user.prenom,
role=user.role,
is_verified=user.is_verified,
is_active=user.is_active,
created_at=user.created_at.isoformat() if user.created_at else "",
last_login=user.last_login.isoformat() if user.last_login else None,
failed_login_attempts=user.failed_login_attempts or 0,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Erreur lecture utilisateur: {e}")
raise HTTPException(500, str(e))
@app.get("/debug/database/check", tags=["Debug"])
async def verifier_integrite_database(session: AsyncSession = Depends(get_session)):
"""
🔍 Vérification de l'intégrité de la base de données
Retourne des statistiques détaillées sur toutes les tables
"""
from database import User, RefreshToken, LoginAttempt, EmailLog, SignatureLog
from sqlalchemy import func, text
try:
diagnostics = {}
# === TABLE USERS ===
# Compter tous les users
total_users = await session.execute(select(func.count(User.id)))
diagnostics["users"] = {"total": total_users.scalar(), "details": []}
# Lister tous les users avec détails
all_users = await session.execute(select(User))
users_list = all_users.scalars().all()
for u in users_list:
diagnostics["users"]["details"].append(
{
"id": u.id,
"email": u.email,
"nom": f"{u.prenom} {u.nom}",
"role": u.role,
"is_active": u.is_active,
"is_verified": u.is_verified,
"created_at": u.created_at.isoformat() if u.created_at else None,
"has_reset_token": u.reset_token is not None,
"has_verification_token": u.verification_token is not None,
}
)
# === TABLE REFRESH_TOKENS ===
total_tokens = await session.execute(select(func.count(RefreshToken.id)))
diagnostics["refresh_tokens"] = {"total": total_tokens.scalar()}
# === TABLE LOGIN_ATTEMPTS ===
total_attempts = await session.execute(select(func.count(LoginAttempt.id)))
diagnostics["login_attempts"] = {"total": total_attempts.scalar()}
# === TABLE EMAIL_LOGS ===
total_emails = await session.execute(select(func.count(EmailLog.id)))
diagnostics["email_logs"] = {"total": total_emails.scalar()}
# === TABLE SIGNATURE_LOGS ===
total_signatures = await session.execute(select(func.count(SignatureLog.id)))
diagnostics["signature_logs"] = {"total": total_signatures.scalar()}
# === VÉRIFIER LES FICHIERS SQLITE ===
import os
db_file = "sage_dataven.db"
diagnostics["database_file"] = {
"exists": os.path.exists(db_file),
"size_bytes": os.path.getsize(db_file) if os.path.exists(db_file) else 0,
"path": os.path.abspath(db_file),
}
# === TESTER UNE REQUÊTE RAW SQL ===
try:
raw_count = await session.execute(text("SELECT COUNT(*) FROM users"))
diagnostics["raw_sql_check"] = {
"users_count": raw_count.scalar(),
"status": "✅ Connexion DB OK",
}
except Exception as e:
diagnostics["raw_sql_check"] = {"status": "❌ Erreur", "error": str(e)}
return {
"success": True,
"timestamp": datetime.now().isoformat(),
"diagnostics": diagnostics,
}
except Exception as e:
logger.error(f"❌ Erreur diagnostic DB: {e}", exc_info=True)
raise HTTPException(500, f"Erreur diagnostic: {str(e)}")
@app.post("/debug/database/test-user-persistence", tags=["Debug"])
async def tester_persistance_utilisateur(session: AsyncSession = Depends(get_session)):
"""
🧪 Test de création/lecture/modification d'un utilisateur de test
Crée un utilisateur de test, le modifie, et vérifie la persistance
"""
import uuid
from database import User
from security.auth import hash_password
try:
test_email = f"test_{uuid.uuid4().hex[:8]}@example.com"
# === ÉTAPE 1: CRÉATION ===
test_user = User(
id=str(uuid.uuid4()),
email=test_email,
hashed_password=hash_password("TestPassword123!"),
nom="Test",
prenom="User",
role="user",
is_verified=True,
is_active=True,
created_at=datetime.now(),
)
session.add(test_user)
await session.flush()
user_id = test_user.id
await session.commit()
logger.info(f"✅ ÉTAPE 1: User créé - {user_id}")
# === ÉTAPE 2: LECTURE ===
result = await session.execute(select(User).where(User.id == user_id))
loaded_user = result.scalar_one_or_none()
if not loaded_user:
return {
"success": False,
"error": "❌ User introuvable après création !",
"step": "LECTURE",
}
logger.info(f"✅ ÉTAPE 2: User chargé - {loaded_user.email}")
# === ÉTAPE 3: MODIFICATION (simulate reset password) ===
loaded_user.hashed_password = hash_password("NewPassword456!")
loaded_user.reset_token = None
loaded_user.reset_token_expires = None
session.add(loaded_user)
await session.flush()
await session.commit()
await session.refresh(loaded_user)
logger.info(f"✅ ÉTAPE 3: User modifié")
# === ÉTAPE 4: RE-LECTURE ===
result2 = await session.execute(select(User).where(User.id == user_id))
reloaded_user = result2.scalar_one_or_none()
if not reloaded_user:
return {
"success": False,
"error": "❌ User DISPARU après modification !",
"step": "RE-LECTURE",
"user_id": user_id,
}
logger.info(f"✅ ÉTAPE 4: User re-chargé - {reloaded_user.email}")
# === ÉTAPE 5: SUPPRESSION DU TEST ===
await session.delete(reloaded_user)
await session.commit()
logger.info(f"✅ ÉTAPE 5: User test supprimé")
return {
"success": True,
"message": "✅ Tous les tests de persistance sont OK",
"test_user_id": user_id,
"test_email": test_email,
"steps_completed": [
"1. Création",
"2. Lecture",
"3. Modification (reset password simulé)",
"4. Re-lecture (vérification persistance)",
"5. Suppression (cleanup)",
],
}
except Exception as e:
logger.error(f"❌ Erreur test persistance: {e}", exc_info=True)
# Rollback en cas d'erreur
await session.rollback()
return {
"success": False,
"error": str(e),
"traceback": str(e.__class__.__name__),
}
@app.get("/debug/fournisseurs/cache", tags=["Debug"])
async def debug_cache_fournisseurs():
"""
🔍 Debug : État du cache côté VPS Linux
"""
try:
# Appeler la gateway Windows pour récupérer l'info cache
cache_info = sage_client.get_cache_info()
# Tenter de lister les fournisseurs
try:
fournisseurs = sage_client.lister_fournisseurs(filtre="")
nb_fournisseurs = len(fournisseurs) if fournisseurs else 0
exemple = fournisseurs[:3] if fournisseurs else []
except Exception as e:
nb_fournisseurs = -1
exemple = []
error = str(e)
return {
"success": True,
"cache_info_windows": cache_info,
"test_liste_fournisseurs": {
"nb_fournisseurs": nb_fournisseurs,
"exemples": exemple,
"erreur": error if nb_fournisseurs == -1 else None,
},
"diagnostic": {
"gateway_accessible": cache_info is not None,
"cache_fournisseurs_existe": (
"fournisseurs" in cache_info if cache_info else False
),
"probleme_probable": (
"Cache fournisseurs non initialisé côté Windows"
if cache_info and "fournisseurs" not in cache_info
else (
"OK"
if nb_fournisseurs > 0
else "Erreur lors de la récupération"
)
),
},
}
except Exception as e:
logger.error(f"❌ Erreur debug cache: {e}", exc_info=True)
raise HTTPException(500, str(e))
@app.post("/debug/fournisseurs/force-refresh", tags=["Debug"])
async def force_refresh_fournisseurs():
"""
🔄 Force le refresh du cache fournisseurs côté Windows
"""
try:
# Appeler la gateway Windows pour forcer le refresh
resultat = sage_client.refresh_cache()
# Attendre 2 secondes
import time
time.sleep(2)
# Récupérer le cache info après refresh
cache_info = sage_client.get_cache_info()
# Tester la liste
fournisseurs = sage_client.lister_fournisseurs(filtre="")
nb_fournisseurs = len(fournisseurs) if fournisseurs else 0
return {
"success": True,
"refresh_result": resultat,
"cache_apres_refresh": cache_info,
"nb_fournisseurs_maintenant": nb_fournisseurs,
"exemples": fournisseurs[:3] if fournisseurs else [],
"message": (
f"✅ Refresh OK : {nb_fournisseurs} fournisseurs disponibles"
if nb_fournisseurs > 0
else "❌ Problème : aucun fournisseur après refresh"
),
}
except Exception as e:
logger.error(f"❌ Erreur force refresh: {e}", exc_info=True)
raise HTTPException(500, str(e))
# ===================================================== # =====================================================
# LANCEMENT # LANCEMENT
# ===================================================== # =====================================================